각진 세상에 둥근 춤을 추자
Kafka 로컬 설치 및 Spring Boot에서 Kafka 연동하기 본문
출처 및 참고: https://docs.spring.io/spring-kafka/reference/quick-tour.html
1. 스프링부트 버전 확인
인텔리제이 > File > Project Structure > Modules > Dependencies에서 스프링부트 버전을 확인한다.
스프링부트 버전: 3.4.3
스프링부트 3.4.X의 경우 카프카 3.3.X 버전을 사용하는 게 좋다.
2. Maven에서 spring-kafka 추가하기
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.2</version>
</dependency>
3. application.properties에 카프카 설정 작성
# ===========================
# Kafka 브로커 주소
# ===========================
spring.kafka.bootstrap-servers=localhost:9092
# ===========================
# Kafka Consumer 설정
# ===========================
# Kafka Consumer가 속할 Consumer Group ID
# 같은 그룹 ID를 가진 Consumer끼리는 메시지를 나눠서 처리함
spring.kafka.consumer.group-id=test-group
# 토픽에 처음 연결할 때 데이터를 어디서부터 읽을지 지정
# earliest: 가능한 가장 처음부터 읽기 / latest: 가장 최근 메시지부터
spring.kafka.consumer.auto-offset-reset=earliest
# 메시지의 key를 역직렬화할 때 사용할 클래스
# 일반적으로 문자열 키를 사용할 경우 StringDeserializer 사용
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 메시지의 value를 역직렬화할 때 사용할 클래스
# 메시지 값도 문자열일 경우 StringDeserializer 사용
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# ===========================
# Kafka Producer 설정
# ===========================
# 메시지의 key를 직렬화할 때 사용할 클래스 (일반적으로 문자열 키를 보낼 때 사용)
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# 메시지의 value를 직렬화할 때 사용할 클래스 (문자열 메시지를 보낼 때 사용)
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# ===========================
# Kafka Streams 설정
# ===========================
# Kafka Streams 애플리케이션 ID (반드시 고유해야 함/이 값은 Kafka 내부의 상태 저장 이름 등에 사용됨)
spring.kafka.streams.application-id=my-streams-app
# Kafka Streams에서 상태를 커밋하는 주기 (ms 단위) (1초마다 상태 저장)
spring.kafka.streams.properties.commit.interval.ms=1000
4. 각각의 카프카 파일을 생성한다.
(1) KafkaProducerConfig.java:카프카 토픽에 메시지를 전송하는 설정 클래스
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
// ProducerFactory: Kafka 메시지 전송을 위한 프로듀서 팩토리
public ProducerFactory<String,String> producerFactory() {
Map<String, Object> configProps = new HashMap<String, Object>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 서버 주소
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Kafka 메시지의 키를 문자열로 직렬화
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Kafka 메시지의 값을 문자열로 직렬화
return new DefaultKafkaProducerFactory<>(configProps);
}
// KafkaTemplate: Kafka에 메시지를 보낼 때 사용하는 핵심 객체
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
(2) KafkaProducer.java: 카프카 토픽에 메시지를 전송하는 프로듀서 구현체
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC1 = "kafka_topic1";
private static final String TOPIC2 = "kafka_topic2";
private static final String TOPIC3 = "kafka_topic3";
public void sendMessageTo(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("토픽(" + topic + ")에 메시지(" + message + ") 전송 완료...");
}
}
(3) KafkaConsumerConfig.java: 카프카 토픽에 메시지를 받는 설정 클래스
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 브로커
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cplug-consumer-group"); // Consumer Group ID (같은 group-id를 공유하는 컨슈머끼리는 메시지를 나눠가짐)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
// kafkaListenerContainerFactory()
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
(4) KafkaConsumer: 카프카 토픽에 메시지를 받는 컨슈머 구현체
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
// Kafaka Topic(=Message) 를 Consume(=Recv) 하는 구현체
@Service
public class KafkaConsumer {
@KafkaListener(topics = "kafka_topic1", groupId = "cplug-consumer-group")
public void consume1(String message) {
System.out.println("[토픽1] Kafka 메시지 수신: " + message);
// 여기에 알람 UI 처리, 로그 저장 등
}
@KafkaListener(topics = "kafka_topic2", groupId = "cplug-consumer-group")
public void consume2(String message) {
System.out.println("[토픽2] Kafka 메시지 수신: " + message);
// 여기에 알람 UI 처리, 로그 저장 등
}
}
5. Kafka 설치하기
https://kafka.apache.org/downloads
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
압축해제
6. Zookeeper 실행하기
명령 프롬프트 실행 > 압축해제 한 파일이 있는 경로로 이동
(* PowerShell 아님)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
(*우선 테스트를 위해 로컬에서 실행 -> 해당 명령프롬프트를 닫으면 Zookepper 꺼짐)
(*실제 운영: 도커/백그라운드 실행 등 방식 사용)
7. Kafka Broker 실행하기
bin\windows\kafka-server-start.bat config\server.properties
(*우선 테스트를 위해 로컬에서 실행 -> 해당 명령프롬프트를 닫으면 Kafka Broker 꺼짐)
(*실제 운영: 도커/백그라운드 실행 등 방식 사용)
8. Kafka 테스트하기
간단하게 메시지를 전송할 컨트롤러를 생성한다.
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducer producer;
private final KafkaProducer kafkaProducer;
@GetMapping("/kafka/send")
public String sendMessage(@RequestParam String topic, @RequestParam String message) {
kafkaProducer.sendMessageTo(topic, message);
return "전송됨 → 토픽: " + topic + ", 메시지: " + message;
}
}
포스트맨으로 메시지를 임의로 보낸다.
9. 그외 카프카 명령어
압축해제한 파일이 있는 경로에서 명령어를 실행한다.
(1) 토픽 목록 보기
# localhost:9092 본인이 설정한 카프카 주소
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
(2) 특정 토픽 상세 정보 보기
bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic kafka_topic1
Topic | 토픽 이름 (kafka_topic1) |
TopicId | 내부적으로 Kafka가 이 토픽에 부여한 UUID (토픽 유일 식별자) |
PartitionCount | 총 파티션 개수 → 1개 |
ReplicationFactor | 복제본 개수 → 1개 (복제 안 함) |
Configs | 토픽에 설정된 추가 속성 (없음) |
Partition | 파티션 번호 (0번) |
Leader | 이 파티션의 리더 브로커 ID → 0번 브로커 |
Replicas | 이 파티션의 모든 복제본 리스트 → 0번 브로커만 있음 |
Isr | In-Sync Replica: 현재 정상 동기화 중인 복제본 리스트 → 0번 브로커만 있음 |
(3) 현재 메시지를 소비 중인 컨슈머 그룹 상태 보기
bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group cplug-consumer-group
GROUP | 컨슈머 그룹 이름 → cplug-consumer-group |
TOPIC | 구독 중인 토픽 이름 (kafka_topic1, kafka_topic2) |
PARTITION | 각 토픽의 파티션 번호 (지금은 전부 0번 파티션만 있음) |
CURRENT-OFFSET | 이 컨슈머가 가장 마지막으로 커밋한 오프셋 |
LOG-END-OFFSET | 해당 파티션에서 가장 마지막 메시지의 오프셋 |
LAG | 메시지 밀림 정도 → LOG-END - CURRENT |
CONSUMER-ID | 컨슈머 ID (자동 생성된 UUID 포함) |
HOST | 해당 컨슈머가 연결된 IP 주소 |
CLIENT-ID | 애플리케이션에서 설정한 클라이언트 ID (기본값이면 consumer-...) |
kafka_topic1 | 1 | 1 | 0 | ✅ 정상적으로 소비 완료 |
kafka_topic2 | 0 | 0 | 0 | ✅ 아직 소비할 메시지가 없음 |
(4) 컨슈머 그룹 목록 전체 보기
bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
10. 카프카 상태 대시보드로 확인하기
명령어 GUI가 아닌 대시보드로 카프카의 상태를 확인할 수도 있다.
https://github.com/provectus/kafka-ui/releases
Releases · provectus/kafka-ui
Open-Source Web UI for Apache Kafka Management. Contribute to provectus/kafka-ui development by creating an account on GitHub.
github.com
최신 릴리스에서 kafka-ui-api.jar 파일을 다운로드한다.
다운로드한 파일이 있는 폴더 내에 config.yml 파일을 생성한다.
server:
port: 8081
kafka:
clusters:
- name: local
bootstrapServers: localhost:9092
- server port: 카프카 UI 포트 (웹 브라우저 접속용)
- kafka clusters localhost port: 카프카 포트 (메시지 송수신용 카프카 브로커 주소)
다운로드한 jar 파일과 생성한 yml 파일이 있는 경로에서 cmd를 실행시키고 아래 명령어를 입력한다.
java -jar kafka-ui-api-v0.7.2.jar --spring.config.additional-location=./config.yml
yml 파일에서 설정한 포트 번호로 웹 브라우저에 접속한다.
왼쪽 메뉴 Topics > 토픽 선택 > Message 에서 어떤 메시지가 전송됐는지 볼 수 있다.
'Java' 카테고리의 다른 글
Kafka 단일 브로커 환경에서 고TPS 처리 구현 (0) | 2025.05.27 |
---|---|
Spring 기반 Kafka Producer-Consumer + Java 알람 매니저 (Kafka Streams 모듈화)로 실시간 알람 기능 구현 (0) | 2025.05.26 |
DDD (Domain-Driven Design) 란? (0) | 2024.11.26 |
[CBR] CBR (Case-Based Reasoning) 방식과 응용 (0) | 2024.11.12 |
[Kafka] 카프카(Kafka) 기본 개념 (0) | 2024.11.12 |