Spring 기반 Kafka Producer-Consumer + Java 알람 매니저 (Kafka Streams 모듈화)로 실시간 알람 기능 구현
기존에는 카프카의 프로듀서와 컨슈머를 같은 서버에 두어 메시지를 전송 및 수신하였다.
2025.05.15 - [Java] - Kafka 로컬 설치 및 Spring Boot에서 Kafka 연동하기
Kafka 로컬 설치 및 Spring Boot에서 Kafka 연동하기
출처 및 참고: https://docs.spring.io/spring-kafka/reference/quick-tour.html 1. 스프링부트 버전 확인인텔리제이 > File > Project Structure > Modules > Dependencies에서 스프링부트 버전을 확인한다. 스프링부트 버전: 3.4.3
this-circle-jeong.tistory.com
이번에는 Kafka Streams 기능을 통해 알람 기능을 추가하며, 해당 기능을 모듈화한다.
💡 "모듈화한다"
✔️ 하나의 시스템 안에서 특정 역할(알람 처리) 을 담당하는 것을 독립적인 프로그램 또는 애플리케이션으로 분리해서 관리하는 것.
🔧 왜 굳이 나눠야 하는 걸까?
1) 서버 부하 분산
- 하나의 서버에서 처리하면 알람 로직이 무거워질수록 같이 느력짐
- 알람 분석, 저장, SSE 전송, DB 저장까지 하면 CPU 점유율도 올라가고 장애도 한 군데서 터짐
2) 장애 전파 방지 (격리 효과)
3) 독립적인 유지 보수
항목 | 모듈화 전 | 모듈화 후 |
구조 | 웹 + 알람 판단 + DB 저장 다 처리 | 웹은 Producer + Consumer / 알람 판단은 별도 프로그램 |
위험 | 한 곳에 로직 몰림 → 서버 과부하/장애 시 전체 영향 | 역할 분리로 안정성 ↑ |
유지보수 | 전체 애플리케이션 수정/배포 | 기능별로 부분 배포 가능 |
확장성 | 변경 어려움 | 기능 단위 확장/이식 쉬움 |
단계별 시나리오는 다음과 같다.
✅ [1] 시스템 초기 실행 시
- 웹 서버가 최초 실행됨
- 웹 서버는 DB에서 알람 발생 조건 목록을 조회
- 해당 조건들을 Kafka의 alarm-condition 토픽에 전송
- 알람 매니저는 alarm-condition 토픽을 구독하여 조건 목록을 메모리에 보관
✅ [2] 실시간 이벤트 처리 흐름
- 웹에서 이벤트 발생 (예: 로그인 실패, 리소스 초과 등)
- 해당 이벤트 메시지를 Kafka의 event-topic에 전송
- 알람 매니저가 event-topic을 구독하여 메시지 수신
- 알람 매니저는 미리 보관한 조건과 비교하여:
- 조건 충족 시 → Kafka alarm-topic으로 전송
- 조건 불충족 시 → Kafka event-log-topic으로 전송
- 웹 서버는 alarm-topic과 event-log-topic을 각각 구독하여:
- alarm-topic: SSE 알림 전송 + 알람 DB 저장
- event-log-topic: 이벤트 로그 DB 저장
✅ [3] 알람 발생 조건 변경 시
- 관리자가 웹에서 알람 발생 조건을 수정
- 웹 서버는 수정된 조건을 다시 Kafka의 alarm-condition 토픽으로 전송
- 알람 매니저는 alarm-condition 토픽을 통해 조건 갱신하고 메모리에 업데이트
알람 매니저 모듈화는 일반 자바 코드(Spring X)로 작성하기 위해 프로젝트를 생성한다.
(알람 매니저가 스프링 기반일 경우 또 다른 톰캣 서버를 띄어야 하고 설정도 복잡해지기 때문에 자바 애플리케이션으로 실행시킨다.)
알람 매니저
1. 새 프로젝트 만들기
- File → New → Project
- 왼쪽에서 Java 선택
- Build system: Gradle 또는 Maven (편한 거 선택)
- Java SDK: 설치된 JDK 선택 (예: Java 17)
- 모듈 이름: alarm-manager
이후 프로젝트에 구조가 자동으로 생성되지 않았다면 구조를 직접 수동으로 생성한다.
2. pom.xml 작성하기
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kr.co.cplug</groupId>
<artifactId>alarm-manager</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.1</version>
</dependency>
<!-- Slf4j for logging -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
<!-- Jackson for JSON parsing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.2</version>
</dependency>
</dependencies>
</project>
pom.xml 작성 후 측 상단에서 "Load Maven Project"를 클릭한다.
만약 버튼이 없다면 우측 클릭 > Add as Maven Project
3. 애플리케이션 실행 함수 작성하기
다음과 같은 과정으로 기능을 구현한다.
(1) Kafka Streams 설정을 위한 Properties 생성
// 1. Kafka Streams 설정을 위한 Properties 생성
Properties props = new Properties();
// application.id
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-manager-app");
// bootstrap.servers
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
// key/value serde 설정 등
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// (선택) 에러 로그 보기 쉽게 하기 위해 record 캐싱 끔
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
// (선택) commit 주기 짧게
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
설정 키 | 설명 |
APPLICATION_ID_CONFIG | Kafka Streams 앱 식별자 (state store 이름 등에 사용됨) |
BOOTSTRAP_SERVERS_CONFIG | Kafka 브로커 주소 |
DEFAULT_KEY_SERDE_CLASS_CONFIG | 메시지 Key 직렬화/역직렬화 방식 |
CACHE_MAX_BYTES_BUFFERING_CONFIG | 캐시 비활성화 (실시간 디버깅에 도움됨) |
COMMIT_INTERVAL_MS_CONFIG | 오프셋 커밋 주기 설정 (기본 30초 → 1초로 단축) |
(+) 알람 발생 조건 AlarmCondition.java 파일 생성
public class AlarmCondition {
public String metricKey;
public double thresholdValue;
public String operator;
public int severity;
public int category;
public AlarmCondition(String metricKey, double thresholdValue,
String operator, int severity, int category) {
this.metricKey = metricKey;
this.thresholdValue = thresholdValue;
this.operator = operator;
this.severity = severity;
this.category = category;
}
}
각 파라미터 값은 알람 발생 조건 DB에 맞게 작성한다.
(2) 알람 발생 조건을 저장할 Map 초기화
// 2. 알람 발생 조건을 저장할 Map 초기화
// key: metric_key (예: login_fail)
// value: 조건 객체 (threshold, operator, severity, category 등)
Map<String, AlarmCondition> alarmConditions = new ConcurrentHashMap<>();
Kafka Streams가 멀티스레드 환경이니 동시성 문제를 방지하기 위해 ConcurrentHashMap을 사용한다.
(3) StreamBuilder 생성
// 3. StreamsBuilder 생성
StreamsBuilder builder = new StreamsBuilder();
(4) alarm-condition 토픽 구독 (조건 업데이트 수신)
// 4. alarm-condition 토픽 구독 (조건 업데이트 수신)
// alarm-condition 메시지를 받아서 JSON 파싱
ObjectMapper objectMapper = new ObjectMapper();
KStream<String, String> conditionStream = builder.stream("alarm-condition");
// Map<String, AlarmCondition> 형태로 조건 저장 (기존 조건 갱신 포함)
conditionStream
.peek((key, value) -> System.out.println("[조건 수신] key: " + key + ", value: " + value))
.foreach((key, value) -> {
try {
// 메시지를 Map으로 파싱
Map<String, Object> conditionData = objectMapper.readValue(value, Map.class);
String metricKey = (String) conditionData.get("metric_key");
double threshold = Double.parseDouble(conditionData.get("threshold_value").toString());
String operator = (String) conditionData.get("condition_operator");
int severity = Integer.parseInt(conditionData.get("severity").toString());
int category = Integer.parseInt(conditionData.get("category").toString());
// Map에 조건 저장 또는 갱신
alarmConditions.put(metricKey, new AlarmCondition(metricKey, threshold, operator, severity, category));
System.out.println("[조건 갱신] metric_key: " + metricKey);
} catch (Exception e) {
System.out.println("alarm-condition 파싱 실패: " + e.getMessage());
}
});
(5) event-topic 구독 (이벤트 수신)
// 5. event-topic 구독 (이벤트 수신)
KStream<String, String> eventStream = builder.stream("event-topic");
eventStream
.peek((key, value) -> System.out.println("[이벤트 수신] key: " + key + ", value: " + value))
.mapValues(value -> {
try {
Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
String metricKey = (String) eventData.get("type");
int count = Integer.parseInt(eventData.get("count").toString());
// 조건 비교
if(alarmConditions.containsKey(metricKey)){
AlarmCondition condition = alarmConditions.get(metricKey);
if (compare(count, condition.thresholdValue, condition.operator)) {
// 조건 충족 시 → 추가 필드 붙여서 알람 전송
eventData.put("severity", condition.severity);
eventData.put("category", condition.category);
System.out.println("[알람 판단됨] metric_key: " + metricKey + ", count: " + count);
return objectMapper.writeValueAsString(eventData); // JSON 문자열로 다시 변환
}
}
} catch (Exception e) {
System.out.println("event-message 파싱 실패: " + e.getMessage());
}
return null;
});
compare 함수이다.
private static boolean compare(int count, double threshold, String operator) {
return switch (operator) {
case ">" -> count > threshold;
case ">=" -> count >= threshold;
case "<" -> count < threshold;
case "<=" -> count <= threshold;
case "==" -> count == threshold;
case "!=" -> count != threshold;
default -> false;
};
}
(6) 조건 비교 결과에 따라 분기
((5)번에 이어 작성)
// 5. event-topic 구독 (이벤트 수신)
KStream<String, String> eventStream = builder.stream("event-topic");
eventStream
.peek((key, value) -> System.out.println("[이벤트 수신] key: " + key + ", value: " + value))
.mapValues(value -> {
try {
Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
String metricKey = (String) eventData.get("type");
int count = Integer.parseInt(eventData.get("count").toString());
// 조건 비교
if(alarmConditions.containsKey(metricKey)){
AlarmCondition condition = alarmConditions.get(metricKey);
if (compare(count, condition.thresholdValue, condition.operator)) {
// 조건 충족 시 → 추가 필드 붙여서 알람 전송
eventData.put("severity", condition.severity);
eventData.put("category", condition.category);
System.out.println("[알람 판단됨] metric_key: " + metricKey + ", count: " + count);
return objectMapper.writeValueAsString(eventData); // JSON 문자열로 다시 변환
}
}
} catch (Exception e) {
System.out.println("event-message 파싱 실패: " + e.getMessage());
}
return null;
})
// 6. 조건 비교 결과에 따라 분기 조건 만족 시 → alarm-topic으로 전송
.filter((key, value) -> value != null)
.peek((key, value) -> log.warn("알람 전송 예정 → {}", value))
.to("alarm-topic"); // alarm-topic으로 전송
// 조건 불만족 시: 그대로 event-log-topic으로 전송
eventStream
.filter((key, value) -> {
try {
Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
String metricKey = (String) eventData.get("type");
int count = Integer.parseInt(eventData.get("count").toString());
if (alarmConditions.containsKey(metricKey)) {
AlarmCondition condition = alarmConditions.get(metricKey);
return !compare(count, condition.thresholdValue, condition.operator);
}
} catch (Exception e) {
return true; // 파싱 실패 메시지는 로그로 보냄
}
return true; // 조건 없음도 로그로 보냄
})
.peek((key, value) -> log.warn("로그 전송 예정 → {}", value))
.to("event-log-topic"); // event-log-topic으로 전송
(7) KafkaStreams topology 빌드 및 시작
// 7. KafkaStreams topology 빌드 및 시작
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
log.warn("AlarmManagerApp 실행 시작됨!");
(8) shutdown hook 등록 (애플리케이션 종료 시 graceful shutdown)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.warn("AlarmManagerApp 종료 중... KafkaStreams 종료 중...");
streams.close();
}));
AlarmManager.java 전체 코드
package kr.co.cplug;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.co.cplug.alarmmanager.AlarmCondition;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class AlarmManagerApp {
public static void main(String[] args) {
// 1. Kafka Streams 설정을 위한 Properties 생성
Properties props = new Properties();
// application.id
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-manager-app");
// bootstrap.servers
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
// key/value serde 설정 등
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// (선택) 에러 로그 보기 쉽게 하기 위해 record 캐싱 끔
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
// (선택) commit 주기 짧게
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// 2. 알람 발생 조건을 저장할 Map 초기화
Map<String, AlarmCondition> alarmConditions = new ConcurrentHashMap<>();
// 3. StreamsBuilder 생성
StreamsBuilder builder = new StreamsBuilder();
// 4. alarm-condition 토픽 구독 (조건 업데이트 수신)
// alarm-condition 메시지를 받아서 JSON 파싱
ObjectMapper objectMapper = new ObjectMapper();
KStream<String, String> conditionStream = builder.stream("alarm-condition");
// Map<String, AlarmCondition> 형태로 조건 저장 (기존 조건 갱신 포함)
conditionStream
.peek((key, value) -> log.warn("[조건 수신] key: " + key + ", value: " + value))
.foreach((key, value) -> {
try {
// 메시지를 Map으로 파싱
Map<String, Object> conditionData = objectMapper.readValue(value, Map.class);
String metricKey = (String) conditionData.get("metric_key");
double threshold = Double.parseDouble(conditionData.get("threshold_value").toString());
String operator = (String) conditionData.get("condition_operator");
int severity = Integer.parseInt(conditionData.get("severity").toString());
int category = Integer.parseInt(conditionData.get("category").toString());
// Map에 조건 저장 또는 갱신
alarmConditions.put(metricKey, new AlarmCondition(metricKey, threshold, operator, severity, category));
log.warn("[조건 갱신] metric_key: " + metricKey);
} catch (Exception e) {
log.warn("alarm-condition 파싱 실패: " + e.getMessage());
}
});
// 5. event-topic 구독 (이벤트 수신)
KStream<String, String> eventStream = builder.stream("event-topic");
eventStream
.peek((key, value) -> log.warn("[이벤트 수신] key: " + key + ", value: " + value))
.mapValues(value -> {
try {
Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
String metricKey = (String) eventData.get("type");
int count = Integer.parseInt(eventData.get("count").toString());
// 조건 비교
if(alarmConditions.containsKey(metricKey)){
AlarmCondition condition = alarmConditions.get(metricKey);
if (compare(count, condition.thresholdValue, condition.operator)) {
// 조건 충족 시 → 추가 필드 붙여서 알람 전송
eventData.put("severity", condition.severity);
eventData.put("category", condition.category);
log.warn("[알람 판단됨] metric_key: " + metricKey + ", count: " + count);
return objectMapper.writeValueAsString(eventData); // JSON 문자열로 다시 변환
}
}
} catch (Exception e) {
log.warn("event-message 파싱 실패: " + e.getMessage());
}
return null;
})
// 6. 조건 비교 결과에 따라 분기 조건 만족 시 → alarm-topic으로 전송
.filter((key, value) -> value != null)
.peek((key, value) -> log.warn("알람 전송 예정 → {}", value))
.to("alarm-topic"); // alarm-topic으로 전송
// 조건 불만족 시: 그대로 event-log-topic으로 전송
eventStream
.filter((key, value) -> {
try {
Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
String metricKey = (String) eventData.get("type");
int count = Integer.parseInt(eventData.get("count").toString());
if (alarmConditions.containsKey(metricKey)) {
AlarmCondition condition = alarmConditions.get(metricKey);
return !compare(count, condition.thresholdValue, condition.operator);
}
} catch (Exception e) {
return true; // 파싱 실패 메시지는 로그로 보냄
}
return true; // 조건 없음도 로그로 보냄
})
.peek((key, value) -> log.warn("로그 전송 예정 → {}", value))
.to("event-log-topic"); // event-log-topic으로 전송
// 7. KafkaStreams topology 빌드 및 시작
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
log.warn("AlarmManagerApp 실행 시작됨!");
// 8. shutdown hook 등록 (애플리케이션 종료 시 graceful shutdown)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.warn("AlarmManagerApp 종료 중... KafkaStreams 종료 중...");
streams.close();
}));
}
private static boolean compare(int count, double threshold, String operator) {
return switch (operator) {
case ">" -> count > threshold;
case ">=" -> count >= threshold;
case "<" -> count < threshold;
case "<=" -> count <= threshold;
case "==" -> count == threshold;
case "!=" -> count != threshold;
default -> false;
};
}
}
알람 매니저에 대한 코드를 작성했다.
이제 다시 흐름도를 살펴보면
웹 서버 최초 시작 시 - 알람 발생 조건 전송
DB에서 알람 발생 조건을 조회하여 알람 매니저에게 전송해야 한다. (topic: alarm-condition)
↓ 코드 확인
웹 서버에 ScheduledTask.java 파일을 생성하여 서버 초기화 작업을 수행하는 스케줄러 메서드를 작성한다.
@Slf4j
@Component
@RequiredArgsConstructor
public class ScheduledTask {
private final LicenseRead licenseRead;
private final NetworkMgmtService networkMgmtService;
private final SystemSettingService systemSettingService;
private final AuditLogService auditLogService;
private final TestService testService;
private final AlarmDao alarmDao;
private final KafkaProducer kafkaProducer;
boolean testSystem = true;
private boolean initialized = false;
private Date testedTimer = null;
/**
* 서버 초기화 작업을 수행하는 스케줄러 메서드
* - cron: "* * * * * *" => 매초마다 실행되지만, 실제 초기화는 한 번만 수행됨 (initialized 플래그 사용)
*/
@Scheduled(cron = "* * * * * *")
public void initializeServer()
throws NoSuchPaddingException, IllegalBlockSizeException, IOException, NoSuchAlgorithmException, InvalidKeySpecException, BadPaddingException, InvalidKeyException {
// 이미 초기화되었으면 더 이상 실행하지 않음
if (initialized) {
return;
}
// 알람 발생 조건 DB 조회 및 TOPIC(alarm-condition) 전송
List<Map<String,Object>> conditions = alarmDao.getAlarmCondition();
for(Map<String,Object> condition : conditions) {
String message = OBJECT_MAPPER.writeValueAsString(condition);
kafkaProducer.sendMessageTo("alarm-condition", "init", message);
}
// 서버 초기화 완료 플래그 설정
initialized = true;
}
}
관리자가 웹에서 알람 발생 조건 변경 시 - 알람 발생 조건 전송
수정하고자 하는 알람 발생 조건을 DB에 업데이트 후 알람 매니저에게 전송해야 한다. (topic: alarm-condition)
↓ 코드 확인
관리자가 웹에서 알람 발생 조건 변경 요청을 한 경우, DB에 반영 성공 시 토픽에 전송한다.

기존에 사용 중이던 알람 발생 조건 변경 요청 코드 (AlarmService.java)
String message = OBJECT_MAPPER.writeValueAsString(param);
kafkaProducer.sendMessageTo("alarm-condition", param.get("metric_key").toString(), message);
웹에서 이벤트 발생 시 데이터 전송
웹에서 발생한 데이터를 카프카에 전송한다. (topic: event-topic)
알람매니저는 event-topic을 구독하여 데이터를 수신 후 알람 발생 조건과 비교하여 알람 발생 유무를 판단한다.
알람 발생 조건 만족 시 alarm-topic으로 해당 데이터를 전송하고 웹 서버는 해당 토픽을 구독하여 알람을 발생시킨다.
알람 발생 조건 불만족 시 event-log-topic으로 해당 데이터를 전송하고 웹 서버는 해당 토픽을 구독하여 DB에 저장한다.
먼저 웹 서버 최초 시작 시
Kafka Producer와 Kafka Producer Config는 저번 게시글의 코드와 동일하다.
// Kafaka Topic(=Message) 를 Produce(=Send) 구현체
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String event = "event-topic"; // 웹 서버에서 발생한 이벤트
private static final String alarm = "alarm-topic"; // 발생한 알람
private static final String log = "event-log-topic"; // 알람이 아닌 이벤트
private static final String condition = "alarm-condition"; // 알람 발생 조건
// 토픽 전송
public void sendMessageTo(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
}
// Kafaka Topic(=Message)를 Produce(=Send) 하는 설정 클래스
@Configuration
public class KafkaProducerConfig {
// ProducerFactory: Kafka 메시지 전송을 위한 프로듀서 팩토리
public ProducerFactory<String,String> producerFactory() {
Map<String, Object> configProps = new HashMap<String, Object>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); // Kafka 서버 주소 (로컬 PC에서 구현 시)
//configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); // Kafka 서버 주소 (이후 도커에서 구현 시)
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Kafka 메시지의 키를 문자열로 직렬화
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Kafka 메시지의 값을 문자열로 직렬화
return new DefaultKafkaProducerFactory<>(configProps);
}
// KafkaTemplate: Kafka에 메시지를 보낼 때 사용하는 핵심 객체
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
웹 서버 애플리케이션 실행 시 (참고로 자바 파일 코드도 실행시켜야 한다)
알람 발생 조건이 alarm-condition 토픽으로 전송이 되고, 알람 매니저는 해당 토픽을 구독하여 메시지를 수신받는다.
이후 웹에서 발생한 이벤트 데이터를 Kafka의 event-topic으로 전송한다.
기능 구현 테스트를 위해 알람 발생 조건에 해당하는 값을 포스트맨으로 전송한다.
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducer producer;
private final KafkaProducer kafkaProducer;
@GetMapping("/kafka/send")
public String sendMessage(@RequestParam String topic, @RequestParam String key,@RequestParam String message) {
kafkaProducer.sendMessageTo(topic, key, message);
return "전송됨 → 토픽: " + topic + ", key: " + key + ", 메시지: " + message;
}
}
http://localhost:8080/kafka/send?topic=event-topic&key=1&message=%7B%22type%22%3A%22login_fail%22%2C%22count%22%3A6%2C%22target%22%3A%22user01%22%2C%22time%22%3A%222025-05-22T18%3A15%3A00%22%7D
전송을 하면 알람 매니저에 다음과 같이 로그가 찍힌다.
해당 값이 알람 발생 조건에 해당하므로 알람 매니저는 해당 내용을 alarm-topic으로 전송한다.
UI 확인 결과 alarm-topic에 해당 내용이 저장되었음을 알 수 있다.
웹 서버에서 토픽을 구독하는 Consumer, Consumer Config 코드도 이전과 동일하다.
// Kafaka Topic(=Message) 를 Consume(=Recv) 하는 구현체
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final SseService sseService;
private final AlarmService alarmService;
@KafkaListener(topics = "event-topic", groupId = "cplug-consumer-group")
public void consumeEvnet(String message) {
log.warn("[event] Kafka 메시지 수신: " + message);
// 여기에 알람 UI 처리, 로그 저장 등
}
@KafkaListener(topics = "alarm-topic", groupId = "cplug-consumer-group")
public void consumeAlarm(@Header(KafkaHeaders.RECEIVED_KEY) String key, String message) {
log.warn("[alarm] Kafka key: " + key); // 1 (orch) -> network_type
log.warn("[alarm] Kafka message: " + message); // {"type":"login_fail","count":6,"target":"user01","time":"2025-05-22T18:15:00","severity":3,"category":1}
// SSE: 알람 UI 처리
Map<String, Object> payload = new HashMap<>();
try {
ObjectMapper mapper = new ObjectMapper();
payload = mapper.readValue(message, Map.class); // map -> {"type":"login_fail","count":6,"target":"user01","time":"2025-05-22T18:15:00"}
String target = (String) payload.get("target");
sseService.send(target, message);
} catch (Exception e) {
log.warn("알람 메시지 처리 실패: {}", message, e);
}
log.warn("key: " + key);
log.warn("networkType class1: " + key.getClass());
int networkType = Integer.parseInt(key);
payload.put("network_type", networkType);
log.warn("payload: {}", payload);
// {type=login_fail, count=6, target=user01, time=2025-05-22T18:15:00, severity=3, category=1, network_type=1}
// DB 저장
int createAlarm = alarmService.createAlarm(payload);
log.warn("알람 DB 저장 결과: " + createAlarm);
}
}
// Kafaka Topic(=Message) 를 Consume(=Recv) 하는 설정 클래스
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); // Kafka 브로커
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); // Kafka 브로커
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cplug-consumer-group"); // Consumer Group ID (같은 group-id를 공유하는 컨슈머끼리는 메시지를 나눠가짐)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
// kafkaListenerContainerFactory()
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
alarm-topic을 구독하고 있는 웹 서버는 메시지를 수신받는다.
수신받은 알람 데이터를 토대로 SSE 연결 (위 로그에서는 SSE 연결이 기존에 되지 않음) 및
DB 저장이 됨을 확인할 수 있다.
SSE 연결은 프론트에서 위 기능을 실행하기 전에 SSE 연결에 대한 요청을 전송하면 된다.
@RestController
@RequiredArgsConstructor
@RequestMapping("/alarm")
public class AlarmSseController {
private final SseService sseService;
// 프론트에서는 /alarm/subscribe?clientId=user01 로 접속해서 실시간 알람 수신
@GetMapping("/subscribe")
public SseEmitter subscribe(@RequestParam String clientId){
return sseService.subscribe(clientId);
}
}
@Slf4j
@Service
public class SseService {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// 사용자 ID에 대한 SSE(Server-Sent Events) 연결을 생성하고, 이 연결을 통해 실시간으로 서버와 클라이언트 간에 데이터 스트림이 설정된다.
public SseEmitter subscribe(String clientId) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitters.put(clientId, emitter);
emitter.onCompletion(() -> emitters.remove(clientId));
emitter.onTimeout(() -> emitters.remove(clientId));
log.warn("SSE 구독 연결: {}", clientId);
return emitter;
}
// 알람 전송
public void send(String clientId, String message) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name("alarm")
.data(message));
log.warn("SSE 전송 완료 → {}", message);
} catch (IOException e) {
log.warn("SSE 전송 실패 → {}", message, e);
emitters.remove(clientId);
}
} else {
log.warn("연결된 SSE 없음: {}", clientId);
}
}
}
SSE 연결 요청은 일회성 연결인 포스트맨이 아닌 웹 브라우저를 통해 요청할 수 있도록 한다.
http://localhost:8080/alarm/subscribe?clientId=user01
이후 웹 이벤트 데이터 전송 시 (event-topic) SSE 연결이 된 프론트에 다음과 같이 데이터가 표시된다.
이후, 프론트에서 해당 데이터를 정제하여 알람 UI로 표시하면 된다.