각진 세상에 둥근 춤을 추자

Kafka 로컬 설치 및 Spring Boot에서 Kafka 연동하기 본문

Java

Kafka 로컬 설치 및 Spring Boot에서 Kafka 연동하기

circle.j 2025. 5. 15. 16:35

출처 및 참고: https://docs.spring.io/spring-kafka/reference/quick-tour.html

 

1. 스프링부트 버전 확인

인텔리제이 > File > Project Structure > Modules > Dependencies에서 스프링부트 버전을 확인한다. 

스프링부트 버전: 3.4.3

 

스프링부트 3.4.X의 경우 카프카 3.3.X 버전을 사용하는 게 좋다. 

 

 

2. Maven에서 spring-kafka  추가하기

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>3.3.2</version>
</dependency>

 

 

3. application.properties에 카프카 설정 작성

# ===========================
# Kafka 브로커 주소
# ===========================
spring.kafka.bootstrap-servers=localhost:9092

# ===========================
# Kafka Consumer 설정
# ===========================
# Kafka Consumer가 속할 Consumer Group ID
# 같은 그룹 ID를 가진 Consumer끼리는 메시지를 나눠서 처리함
spring.kafka.consumer.group-id=test-group

# 토픽에 처음 연결할 때 데이터를 어디서부터 읽을지 지정
# earliest: 가능한 가장 처음부터 읽기 / latest: 가장 최근 메시지부터
spring.kafka.consumer.auto-offset-reset=earliest

# 메시지의 key를 역직렬화할 때 사용할 클래스
# 일반적으로 문자열 키를 사용할 경우 StringDeserializer 사용
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 메시지의 value를 역직렬화할 때 사용할 클래스
# 메시지 값도 문자열일 경우 StringDeserializer 사용
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# ===========================
# Kafka Producer 설정
# ===========================
# 메시지의 key를 직렬화할 때 사용할 클래스 (일반적으로 문자열 키를 보낼 때 사용)
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

# 메시지의 value를 직렬화할 때 사용할 클래스 (문자열 메시지를 보낼 때 사용)
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# ===========================
# Kafka Streams 설정
# ===========================
# Kafka Streams 애플리케이션 ID (반드시 고유해야 함/이 값은 Kafka 내부의 상태 저장 이름 등에 사용됨)
spring.kafka.streams.application-id=my-streams-app

# Kafka Streams에서 상태를 커밋하는 주기 (ms 단위) (1초마다 상태 저장)
spring.kafka.streams.properties.commit.interval.ms=1000

 

 

 

4. 각각의 카프카 파일을 생성한다.

(1) KafkaProducerConfig.java:카프카 토픽에 메시지를 전송하는 설정 클래스 

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    // ProducerFactory: Kafka 메시지 전송을 위한 프로듀서 팩토리
    public ProducerFactory<String,String> producerFactory() {
        Map<String, Object> configProps  = new HashMap<String, Object>();

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");                 // Kafka 서버 주소
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        // Kafka 메시지의 키를 문자열로 직렬화
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);      // Kafka 메시지의 값을 문자열로 직렬화

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    // KafkaTemplate: Kafka에 메시지를 보낼 때 사용하는 핵심 객체
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

(2) KafkaProducer.java: 카프카 토픽에 메시지를 전송하는 프로듀서 구현체 

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final String TOPIC1 = "kafka_topic1";
    private static final String TOPIC2 = "kafka_topic2";
    private static final String TOPIC3 = "kafka_topic3";

    public void sendMessageTo(String topic, String message) {
        kafkaTemplate.send(topic, message);
        System.out.println("토픽(" + topic + ")에 메시지(" + message + ") 전송 완료...");
    }


}

 

(3) KafkaConsumerConfig.java: 카프카 토픽에 메시지를 받는 설정 클래스

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 브로커
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "cplug-consumer-group");    // Consumer Group ID (같은 group-id를 공유하는 컨슈머끼리는 메시지를 나눠가짐)
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    // kafkaListenerContainerFactory()
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

(4) KafkaConsumer: 카프카 토픽에 메시지를 받는 컨슈머 구현체 

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

// Kafaka Topic(=Message) 를 Consume(=Recv) 하는 구현체
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "kafka_topic1", groupId = "cplug-consumer-group")
    public void consume1(String message) {
        System.out.println("[토픽1] Kafka 메시지 수신: " + message);
        // 여기에 알람 UI 처리, 로그 저장 등
    }

    @KafkaListener(topics = "kafka_topic2", groupId = "cplug-consumer-group")
    public void consume2(String message) {
        System.out.println("[토픽2] Kafka 메시지 수신: " + message);
        // 여기에 알람 UI 처리, 로그 저장 등
    }

}

 

 

 

5. Kafka 설치하기 

https://kafka.apache.org/downloads

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

압축해제 

 

 

 

 

6. Zookeeper 실행하기 

명령 프롬프트 실행 > 압축해제 한 파일이 있는 경로로 이동 

(* PowerShell 아님)

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

(*우선 테스트를 위해 로컬에서 실행 -> 해당 명령프롬프트를 닫으면 Zookepper 꺼짐)

(*실제 운영: 도커/백그라운드 실행 등 방식 사용)

 

7. Kafka Broker 실행하기 

bin\windows\kafka-server-start.bat config\server.properties

(*우선 테스트를 위해 로컬에서 실행 -> 해당 명령프롬프트를 닫으면 Kafka Broker 꺼짐)

(*실제 운영: 도커/백그라운드 실행 등 방식 사용)

 

 

8. Kafka 테스트하기 

간단하게 메시지를 전송할 컨트롤러를 생성한다.

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class KafkaController {

    private final KafkaProducer producer;
    private final KafkaProducer kafkaProducer;

    @GetMapping("/kafka/send")
    public String sendMessage(@RequestParam String topic, @RequestParam String message) {
        kafkaProducer.sendMessageTo(topic, message);
        return "전송됨 → 토픽: " + topic + ", 메시지: " + message;
    }

}

 

포스트맨으로 메시지를 임의로 보낸다. 

 

 

 

9. 그외 카프카 명령어 

압축해제한 파일이 있는 경로에서 명령어를 실행한다. 

(1) 토픽 목록 보기

# localhost:9092 본인이 설정한 카프카 주소
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list

 

(2) 특정 토픽 상세 정보 보기 

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic kafka_topic1

Topic 토픽 이름 (kafka_topic1)
TopicId 내부적으로 Kafka가 이 토픽에 부여한 UUID (토픽 유일 식별자)
PartitionCount 총 파티션 개수 → 1개
ReplicationFactor 복제본 개수 → 1개 (복제 안 함)
Configs 토픽에 설정된 추가 속성 (없음)
Partition 파티션 번호 (0번)
Leader 이 파티션의 리더 브로커 ID → 0번 브로커
Replicas 이 파티션의 모든 복제본 리스트 → 0번 브로커만 있음
Isr In-Sync Replica: 현재 정상 동기화 중인 복제본 리스트 → 0번 브로커만 있음

 

(3) 현재 메시지를 소비 중인 컨슈머 그룹 상태 보기 

bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group cplug-consumer-group

GROUP 컨슈머 그룹 이름 → cplug-consumer-group
TOPIC 구독 중인 토픽 이름 (kafka_topic1, kafka_topic2)
PARTITION 각 토픽의 파티션 번호 (지금은 전부 0번 파티션만 있음)
CURRENT-OFFSET 이 컨슈머가 가장 마지막으로 커밋한 오프셋
LOG-END-OFFSET 해당 파티션에서 가장 마지막 메시지의 오프셋
LAG 메시지 밀림 정도 → LOG-END - CURRENT
CONSUMER-ID 컨슈머 ID (자동 생성된 UUID 포함)
HOST 해당 컨슈머가 연결된 IP 주소
CLIENT-ID 애플리케이션에서 설정한 클라이언트 ID (기본값이면 consumer-...)

 

kafka_topic1 1 1 0 정상적으로 소비 완료
kafka_topic2 0 0 0 아직 소비할 메시지가 없음

 

(4) 컨슈머 그룹 목록 전체 보기 

bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list

 

 

 

10. 카프카 상태 대시보드로 확인하기 

 

명령어 GUI가 아닌 대시보드로 카프카의 상태를 확인할 수도 있다.

https://github.com/provectus/kafka-ui/releases

 

Releases · provectus/kafka-ui

Open-Source Web UI for Apache Kafka Management. Contribute to provectus/kafka-ui development by creating an account on GitHub.

github.com

 

최신 릴리스에서 kafka-ui-api.jar 파일을 다운로드한다.

 

 

 

다운로드한 파일이 있는 폴더 내에 config.yml 파일을 생성한다.

server:
  port: 8081

kafka:
  clusters:
    - name: local
      bootstrapServers: localhost:9092

- server port: 카프카 UI 포트 (웹 브라우저 접속용)

- kafka clusters localhost port: 카프카 포트 (메시지 송수신용 카프카 브로커 주소)

 

다운로드한 jar 파일과 생성한 yml 파일이 있는 경로에서 cmd를 실행시키고 아래 명령어를 입력한다. 

java -jar kafka-ui-api-v0.7.2.jar --spring.config.additional-location=./config.yml

 

yml 파일에서 설정한 포트 번호로 웹 브라우저에 접속한다. 

 

왼쪽 메뉴 Topics > 토픽 선택 > Message 에서 어떤 메시지가 전송됐는지 볼 수 있다.