Java

Spring 기반 Kafka Producer-Consumer + Java 알람 매니저 (Kafka Streams 모듈화)로 실시간 알람 기능 구현

circle.j 2025. 5. 26. 18:20

기존에는 카프카의 프로듀서와 컨슈머를 같은 서버에 두어 메시지를 전송 및 수신하였다. 

 

2025.05.15 - [Java] - Kafka 로컬 설치 및 Spring Boot에서 Kafka 연동하기

 

Kafka 로컬 설치 및 Spring Boot에서 Kafka 연동하기

출처 및 참고: https://docs.spring.io/spring-kafka/reference/quick-tour.html 1. 스프링부트 버전 확인인텔리제이 > File > Project Structure > Modules > Dependencies에서 스프링부트 버전을 확인한다. 스프링부트 버전: 3.4.3

this-circle-jeong.tistory.com

 


이번에는 Kafka Streams 기능을 통해 알람 기능을 추가하며, 해당 기능을 모듈화한다. 

 

💡  "모듈화한다"

✔️ 하나의 시스템 안에서 특정 역할(알람 처리) 을 담당하는 것을 독립적인 프로그램 또는 애플리케이션으로 분리해서 관리하는 것.

 

🔧 왜 굳이 나눠야 하는 걸까?

1) 서버 부하 분산

- 하나의 서버에서 처리하면 알람 로직이 무거워질수록 같이 느력짐

- 알람 분석, 저장, SSE 전송, DB 저장까지 하면 CPU 점유율도 올라가고 장애도 한 군데서 터짐 

 

2) 장애 전파 방지 (격리 효과)

3) 독립적인 유지 보수 

항목 모듈화 전 모듈화 후
구조 웹 + 알람 판단 + DB 저장 다 처리 웹은 Producer + Consumer / 알람 판단은 별도 프로그램
위험 한 곳에 로직 몰림 → 서버 과부하/장애 시 전체 영향 역할 분리로 안정성 ↑
유지보수 전체 애플리케이션 수정/배포 기능별로 부분 배포 가능
확장성 변경 어려움 기능 단위 확장/이식 쉬움

 


단계별 시나리오는 다음과 같다.

 

✅ [1] 시스템 초기 실행 시

  1. 웹 서버가 최초 실행됨
  2. 웹 서버는 DB에서 알람 발생 조건 목록을 조회
  3. 해당 조건들을 Kafka의 alarm-condition 토픽에 전송
  4. 알람 매니저는 alarm-condition 토픽을 구독하여 조건 목록을 메모리에 보관

 

✅ [2] 실시간 이벤트 처리 흐름

  1. 웹에서 이벤트 발생 (예: 로그인 실패, 리소스 초과 등)
  2. 해당 이벤트 메시지를 Kafka의 event-topic에 전송
  3. 알람 매니저가 event-topic을 구독하여 메시지 수신
  4. 알람 매니저는 미리 보관한 조건과 비교하여:
    • 조건 충족 시 → Kafka alarm-topic으로 전송
    • 조건 불충족 시 → Kafka event-log-topic으로 전송
  5. 웹 서버는 alarm-topic과 event-log-topic을 각각 구독하여:
    • alarm-topic: SSE 알림 전송 + 알람 DB 저장
    • event-log-topic: 이벤트 로그 DB 저장

 

✅ [3] 알람 발생 조건 변경 시

  1. 관리자가  웹에서 알람 발생 조건을 수정
  2. 웹 서버는 수정된 조건을 다시 Kafka의 alarm-condition 토픽으로 전송
  3. 알람 매니저는 alarm-condition 토픽을 통해 조건 갱신하고 메모리에 업데이트

 


알람 매니저 모듈화는 일반 자바 코드(Spring X)로 작성하기 위해 프로젝트를 생성한다. 

(알람 매니저가 스프링 기반일 경우 또 다른 톰캣 서버를 띄어야 하고 설정도 복잡해지기 때문에 자바 애플리케이션으로 실행시킨다.)

알람 매니저 

1. 새 프로젝트 만들기

  • File → New → Project
  • 왼쪽에서 Java 선택
  • Build system: Gradle 또는 Maven (편한 거 선택)
  • Java SDK: 설치된 JDK 선택 (예: Java 17)
  • 모듈 이름: alarm-manager 

 

 

이후 프로젝트에 구조가 자동으로 생성되지 않았다면 구조를 직접 수동으로 생성한다.

 

 

2. pom.xml 작성하기 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>kr.co.cplug</groupId>
    <artifactId>alarm-manager</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>

    <dependencies>

        <!-- Kafka Streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.6.1</version>
        </dependency>

        <!-- Slf4j for logging -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
        </dependency>

        <!-- Jackson for JSON parsing -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.18.2</version>
        </dependency>

    </dependencies>


</project>

 

pom.xml 작성 후 측 상단에서 "Load Maven Project"를 클릭한다.

만약 버튼이 없다면 우측 클릭 > Add as Maven Project

 

3. 애플리케이션 실행 함수 작성하기

다음과 같은 과정으로 기능을 구현한다.

 

(1) Kafka Streams 설정을 위한 Properties 생성 

// 1. Kafka Streams 설정을 위한 Properties 생성
Properties props = new Properties();

// application.id
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-manager-app");

// bootstrap.servers
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");


//  key/value serde 설정 등
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// (선택) 에러 로그 보기 쉽게 하기 위해 record 캐싱 끔
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

// (선택) commit 주기 짧게
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

 

설정 키 설명
APPLICATION_ID_CONFIG Kafka Streams 앱 식별자 (state store 이름 등에 사용됨)
BOOTSTRAP_SERVERS_CONFIG Kafka 브로커 주소
DEFAULT_KEY_SERDE_CLASS_CONFIG 메시지 Key 직렬화/역직렬화 방식
CACHE_MAX_BYTES_BUFFERING_CONFIG 캐시 비활성화 (실시간 디버깅에 도움됨)
COMMIT_INTERVAL_MS_CONFIG 오프셋 커밋 주기 설정 (기본 30초 → 1초로 단축)

 

(+) 알람 발생 조건 AlarmCondition.java 파일 생성

 

public class AlarmCondition {

    public String metricKey;
    public double thresholdValue;
    public String operator;
    public int severity;
    public int category;

    public AlarmCondition(String metricKey, double thresholdValue,
                          String operator, int severity, int category) {
        this.metricKey = metricKey;
        this.thresholdValue = thresholdValue;
        this.operator = operator;
        this.severity = severity;
        this.category = category;
    }

}

 

각 파라미터 값은 알람 발생 조건 DB에 맞게 작성한다.

 

(2) 알람 발생 조건을 저장할 Map 초기화

// 2. 알람 발생 조건을 저장할 Map 초기화
// key: metric_key (예: login_fail)
// value: 조건 객체 (threshold, operator, severity, category 등)
Map<String, AlarmCondition> alarmConditions = new ConcurrentHashMap<>();

Kafka Streams가 멀티스레드 환경이니 동시성 문제를 방지하기 위해 ConcurrentHashMap을 사용한다. 

 

(3) StreamBuilder 생성 

// 3. StreamsBuilder 생성
StreamsBuilder builder = new StreamsBuilder();

 

(4) alarm-condition 토픽 구독 (조건 업데이트 수신)

// 4. alarm-condition 토픽 구독 (조건 업데이트 수신)
// alarm-condition 메시지를 받아서 JSON 파싱
ObjectMapper objectMapper = new ObjectMapper();
KStream<String, String> conditionStream = builder.stream("alarm-condition");

// Map<String, AlarmCondition> 형태로 조건 저장 (기존 조건 갱신 포함)
conditionStream
        .peek((key, value) -> System.out.println("[조건 수신] key: " + key + ", value: " + value))
        .foreach((key, value) -> {
            try {
                // 메시지를 Map으로 파싱
                Map<String, Object> conditionData = objectMapper.readValue(value, Map.class);

                String metricKey = (String) conditionData.get("metric_key");
                double threshold = Double.parseDouble(conditionData.get("threshold_value").toString());
                String operator = (String) conditionData.get("condition_operator");
                int severity = Integer.parseInt(conditionData.get("severity").toString());
                int category = Integer.parseInt(conditionData.get("category").toString());

                // Map에 조건 저장 또는 갱신
                alarmConditions.put(metricKey, new AlarmCondition(metricKey, threshold, operator, severity, category));
                System.out.println("[조건 갱신] metric_key: " + metricKey);

            } catch (Exception e) {
                System.out.println("alarm-condition 파싱 실패: " + e.getMessage());
            }
        });

 

(5) event-topic 구독 (이벤트 수신)

// 5. event-topic 구독 (이벤트 수신)
KStream<String, String> eventStream = builder.stream("event-topic");

eventStream
        .peek((key, value) -> System.out.println("[이벤트 수신] key: " + key + ", value: " + value))
        .mapValues(value -> {
            try {
                Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
                String metricKey = (String) eventData.get("type");
                int count = Integer.parseInt(eventData.get("count").toString());

                // 조건 비교
                if(alarmConditions.containsKey(metricKey)){
                    AlarmCondition condition = alarmConditions.get(metricKey);
                    if (compare(count, condition.thresholdValue, condition.operator)) {
                        // 조건 충족 시 → 추가 필드 붙여서 알람 전송
                        eventData.put("severity", condition.severity);
                        eventData.put("category", condition.category);
                        System.out.println("[알람 판단됨] metric_key: " + metricKey + ", count: " + count);
                        return objectMapper.writeValueAsString(eventData);  // JSON 문자열로 다시 변환
                    }
                }

            } catch (Exception e) {
                System.out.println("event-message 파싱 실패: " + e.getMessage());
            }
            return null;
        });

 

compare 함수이다.

private static boolean compare(int count, double threshold, String operator) {
        return switch (operator) {
            case ">" -> count > threshold;
            case ">=" -> count >= threshold;
            case "<" -> count < threshold;
            case "<=" -> count <= threshold;
            case "==" -> count == threshold;
            case "!=" -> count != threshold;
            default -> false;
        };
    }

 

(6) 조건 비교 결과에 따라 분기

((5)번에 이어 작성)

 // 5. event-topic 구독 (이벤트 수신)
KStream<String, String> eventStream = builder.stream("event-topic");

eventStream
        .peek((key, value) -> System.out.println("[이벤트 수신] key: " + key + ", value: " + value))
        .mapValues(value -> {
            try {
                Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
                String metricKey = (String) eventData.get("type");
                int count = Integer.parseInt(eventData.get("count").toString());

                // 조건 비교
                if(alarmConditions.containsKey(metricKey)){
                    AlarmCondition condition = alarmConditions.get(metricKey);
                    if (compare(count, condition.thresholdValue, condition.operator)) {
                        // 조건 충족 시 → 추가 필드 붙여서 알람 전송
                        eventData.put("severity", condition.severity);
                        eventData.put("category", condition.category);
                        System.out.println("[알람 판단됨] metric_key: " + metricKey + ", count: " + count);
                        return objectMapper.writeValueAsString(eventData);  // JSON 문자열로 다시 변환
                    }
                }

            } catch (Exception e) {
                System.out.println("event-message 파싱 실패: " + e.getMessage());
            }
            return null;
        })
        // 6. 조건 비교 결과에 따라 분기 조건 만족 시 → alarm-topic으로 전송
        .filter((key, value) -> value != null)
        .peek((key, value) -> log.warn("알람 전송 예정 → {}", value))
        .to("alarm-topic"); // alarm-topic으로 전송

// 조건 불만족 시: 그대로 event-log-topic으로 전송
eventStream
        .filter((key, value) -> {
            try {
                Map<String, Object> eventData = objectMapper.readValue(value, Map.class);

                String metricKey = (String) eventData.get("type");
                int count = Integer.parseInt(eventData.get("count").toString());

                if (alarmConditions.containsKey(metricKey)) {
                    AlarmCondition condition = alarmConditions.get(metricKey);
                    return !compare(count, condition.thresholdValue, condition.operator);
                }
            } catch (Exception e) {
                return true;    // 파싱 실패 메시지는 로그로 보냄
            }
            return true;        // 조건 없음도 로그로 보냄
        })
        .peek((key, value) -> log.warn("로그 전송 예정 → {}", value))
        .to("event-log-topic"); // event-log-topic으로 전송

 

(7) KafkaStreams topology 빌드 및 시작

// 7. KafkaStreams topology 빌드 및 시작
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);

streams.start();
log.warn("AlarmManagerApp 실행 시작됨!");

 

(8) shutdown hook 등록 (애플리케이션 종료 시 graceful shutdown)

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    log.warn("AlarmManagerApp 종료 중... KafkaStreams 종료 중...");
    streams.close();
}));

 

 

AlarmManager.java 전체 코드

package kr.co.cplug;

import com.fasterxml.jackson.databind.ObjectMapper;
import kr.co.cplug.alarmmanager.AlarmCondition;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class AlarmManagerApp {

    public static void main(String[] args) {

        // 1. Kafka Streams 설정을 위한 Properties 생성
        Properties props = new Properties();

        // application.id
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-manager-app");

        // bootstrap.servers
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");

        //  key/value serde 설정 등
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // (선택) 에러 로그 보기 쉽게 하기 위해 record 캐싱 끔
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        // (선택) commit 주기 짧게
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

        // 2. 알람 발생 조건을 저장할 Map 초기화
        Map<String, AlarmCondition> alarmConditions = new ConcurrentHashMap<>();

        // 3. StreamsBuilder 생성
        StreamsBuilder builder = new StreamsBuilder();


        // 4. alarm-condition 토픽 구독 (조건 업데이트 수신)
        // alarm-condition 메시지를 받아서 JSON 파싱
        ObjectMapper objectMapper = new ObjectMapper();
        KStream<String, String> conditionStream = builder.stream("alarm-condition");

        // Map<String, AlarmCondition> 형태로 조건 저장 (기존 조건 갱신 포함)
        conditionStream
                .peek((key, value) -> log.warn("[조건 수신] key: " + key + ", value: " + value))
                .foreach((key, value) -> {
                    try {
                        // 메시지를 Map으로 파싱
                        Map<String, Object> conditionData = objectMapper.readValue(value, Map.class);

                        String metricKey = (String) conditionData.get("metric_key");
                        double threshold = Double.parseDouble(conditionData.get("threshold_value").toString());
                        String operator = (String) conditionData.get("condition_operator");
                        int severity = Integer.parseInt(conditionData.get("severity").toString());
                        int category = Integer.parseInt(conditionData.get("category").toString());

                        // Map에 조건 저장 또는 갱신
                        alarmConditions.put(metricKey, new AlarmCondition(metricKey, threshold, operator, severity, category));
                        log.warn("[조건 갱신] metric_key: " + metricKey);

                    } catch (Exception e) {
                        log.warn("alarm-condition 파싱 실패: " + e.getMessage());
                    }
                });



        // 5. event-topic 구독 (이벤트 수신)
        KStream<String, String> eventStream = builder.stream("event-topic");

        eventStream
                .peek((key, value) -> log.warn("[이벤트 수신] key: " + key + ", value: " + value))
                .mapValues(value -> {
                    try {
                        Map<String, Object> eventData = objectMapper.readValue(value, Map.class);
                        String metricKey = (String) eventData.get("type");
                        int count = Integer.parseInt(eventData.get("count").toString());

                        // 조건 비교
                        if(alarmConditions.containsKey(metricKey)){
                            AlarmCondition condition = alarmConditions.get(metricKey);
                            if (compare(count, condition.thresholdValue, condition.operator)) {
                                // 조건 충족 시 → 추가 필드 붙여서 알람 전송
                                eventData.put("severity", condition.severity);
                                eventData.put("category", condition.category);
                                log.warn("[알람 판단됨] metric_key: " + metricKey + ", count: " + count);
                                return objectMapper.writeValueAsString(eventData);  // JSON 문자열로 다시 변환
                            }
                        }

                    } catch (Exception e) {
                        log.warn("event-message 파싱 실패: " + e.getMessage());
                    }
                    return null;
                })
                // 6. 조건 비교 결과에 따라 분기 조건 만족 시 → alarm-topic으로 전송
                .filter((key, value) -> value != null)
                .peek((key, value) -> log.warn("알람 전송 예정 → {}", value))
                .to("alarm-topic"); // alarm-topic으로 전송

        // 조건 불만족 시: 그대로 event-log-topic으로 전송
        eventStream
                .filter((key, value) -> {
                    try {
                        Map<String, Object> eventData = objectMapper.readValue(value, Map.class);

                        String metricKey = (String) eventData.get("type");
                        int count = Integer.parseInt(eventData.get("count").toString());

                        if (alarmConditions.containsKey(metricKey)) {
                            AlarmCondition condition = alarmConditions.get(metricKey);
                            return !compare(count, condition.thresholdValue, condition.operator);
                        }
                    } catch (Exception e) {
                        return true;    // 파싱 실패 메시지는 로그로 보냄
                    }
                    return true;        // 조건 없음도 로그로 보냄
                })
                .peek((key, value) -> log.warn("로그 전송 예정 → {}", value))
                .to("event-log-topic"); // event-log-topic으로 전송


        // 7. KafkaStreams topology 빌드 및 시작
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);

        streams.start();
        log.warn("AlarmManagerApp 실행 시작됨!");

        // 8. shutdown hook 등록 (애플리케이션 종료 시 graceful shutdown)
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.warn("AlarmManagerApp 종료 중... KafkaStreams 종료 중...");
            streams.close();
        }));


    }

    private static boolean compare(int count, double threshold, String operator) {
        return switch (operator) {
            case ">" -> count > threshold;
            case ">=" -> count >= threshold;
            case "<" -> count < threshold;
            case "<=" -> count <= threshold;
            case "==" -> count == threshold;
            case "!=" -> count != threshold;
            default -> false;
        };
    }


}

 


알람 매니저에 대한 코드를 작성했다. 

 

이제 다시 흐름도를 살펴보면 

웹 서버 최초 시작 시 - 알람 발생 조건 전송  

DB에서 알람 발생 조건을 조회하여 알람 매니저에게 전송해야 한다. (topic: alarm-condition)

 

↓ 코드 확인

웹 서버에 ScheduledTask.java 파일을 생성하여 서버 초기화 작업을 수행하는 스케줄러 메서드를 작성한다.

더보기
@Slf4j
@Component
@RequiredArgsConstructor
public class ScheduledTask {

    private final LicenseRead licenseRead;
    private final NetworkMgmtService networkMgmtService;
    private final SystemSettingService systemSettingService;
    private final AuditLogService auditLogService;
    private final TestService testService;
    private final AlarmDao alarmDao;
    private final KafkaProducer kafkaProducer;

    boolean testSystem = true;
    private boolean initialized = false;
    private Date testedTimer = null;

    /**
     * 서버 초기화 작업을 수행하는 스케줄러 메서드
     * - cron: "* * * * * *" => 매초마다 실행되지만, 실제 초기화는 한 번만 수행됨 (initialized 플래그 사용)
     */
    @Scheduled(cron = "* * * * * *")
    public void initializeServer()
            throws NoSuchPaddingException, IllegalBlockSizeException, IOException, NoSuchAlgorithmException, InvalidKeySpecException, BadPaddingException, InvalidKeyException {

        // 이미 초기화되었으면 더 이상 실행하지 않음
        if (initialized) {
            return;
        }

        // 알람 발생 조건 DB 조회 및 TOPIC(alarm-condition) 전송
        List<Map<String,Object>> conditions = alarmDao.getAlarmCondition();
        for(Map<String,Object> condition : conditions) {
            String message = OBJECT_MAPPER.writeValueAsString(condition);
            kafkaProducer.sendMessageTo("alarm-condition", "init", message);
        }

        // 서버 초기화 완료 플래그 설정
        initialized = true;
    }


}

 

 

관리자가 웹에서 알람 발생 조건 변경 시 - 알람 발생 조건 전송  

수정하고자 하는 알람 발생 조건을 DB에 업데이트 후  알람 매니저에게 전송해야 한다. (topic: alarm-condition)

 

  코드 확인

관리자가 웹에서 알람 발생 조건 변경 요청을 한 경우, DB에 반영 성공 시 토픽에 전송한다. 

더보기

기존에 사용 중이던 알람 발생 조건 변경 요청 코드 (AlarmService.java)

String message = OBJECT_MAPPER.writeValueAsString(param);
kafkaProducer.sendMessageTo("alarm-condition", param.get("metric_key").toString(), message);

 

 

웹에서 이벤트 발생 시 데이터 전송 

웹에서 발생한 데이터를 카프카에 전송한다. (topic: event-topic)

알람매니저는 event-topic을 구독하여 데이터를 수신 후 알람 발생 조건과 비교하여 알람 발생 유무를 판단한다.

알람 발생 조건 만족 시 alarm-topic으로 해당 데이터를 전송하고 웹 서버는 해당 토픽을 구독하여 알람을 발생시킨다.

알람 발생 조건 불만족 시 event-log-topic으로 해당 데이터를 전송하고 웹 서버는 해당 토픽을 구독하여 DB에 저장한다.

 

 

먼저 웹 서버 최초 시작 시

Kafka Producer와 Kafka Producer Config는 저번 게시글의 코드와 동일하다.

// Kafaka Topic(=Message) 를 Produce(=Send) 구현체
@Service
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private static final String event = "event-topic";                  // 웹 서버에서 발생한 이벤트
    private static final String alarm = "alarm-topic";                  // 발생한 알람
    private static final String log = "event-log-topic";                // 알람이 아닌 이벤트
    private static final String condition = "alarm-condition";          // 알람 발생 조건

    // 토픽 전송
    public void sendMessageTo(String topic, String key, String message) {
        kafkaTemplate.send(topic, key, message);
    }

}

 

// Kafaka Topic(=Message)를 Produce(=Send) 하는 설정 클래스
@Configuration
public class KafkaProducerConfig {

    // ProducerFactory: Kafka 메시지 전송을 위한 프로듀서 팩토리
    public ProducerFactory<String,String> producerFactory() {
        Map<String, Object> configProps  = new HashMap<String, Object>();

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");                 // Kafka 서버 주소 (로컬 PC에서 구현 시)
        //configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");                   // Kafka 서버 주소 (이후 도커에서 구현 시)
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        // Kafka 메시지의 키를 문자열로 직렬화
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);      // Kafka 메시지의 값을 문자열로 직렬화

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    // KafkaTemplate: Kafka에 메시지를 보낼 때 사용하는 핵심 객체
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

 

웹 서버 애플리케이션 실행 시  (참고로 자바 파일 코드도 실행시켜야 한다)

알람 발생 조건이 alarm-condition 토픽으로 전송이 되고, 알람 매니저는 해당 토픽을 구독하여 메시지를 수신받는다.

 

이후 웹에서 발생한 이벤트 데이터를 Kafka의 event-topic으로 전송한다.

기능 구현 테스트를 위해 알람 발생 조건에 해당하는 값을 포스트맨으로 전송한다. 

@RestController
@RequiredArgsConstructor
public class KafkaController {

    private final KafkaProducer producer;
    private final KafkaProducer kafkaProducer;

    @GetMapping("/kafka/send")
    public String sendMessage(@RequestParam String topic, @RequestParam String key,@RequestParam String message) {
        kafkaProducer.sendMessageTo(topic, key, message);
        return "전송됨 → 토픽: " + topic + ", key: " + key + ", 메시지: " + message;
    }

}
http://localhost:8080/kafka/send?topic=event-topic&key=1&message=%7B%22type%22%3A%22login_fail%22%2C%22count%22%3A6%2C%22target%22%3A%22user01%22%2C%22time%22%3A%222025-05-22T18%3A15%3A00%22%7D

 

전송을 하면 알람 매니저에 다음과 같이 로그가 찍힌다. 

 

해당 값이 알람 발생 조건에 해당하므로 알람 매니저는 해당 내용을 alarm-topic으로 전송한다. 

 


UI 확인 결과 alarm-topic에 해당 내용이 저장되었음을 알 수 있다.

 

 

 

웹 서버에서 토픽을 구독하는 Consumer, Consumer Config 코드도 이전과 동일하다. 

// Kafaka Topic(=Message) 를 Consume(=Recv) 하는 구현체
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {

    private final SseService sseService;
    private final AlarmService alarmService;

    @KafkaListener(topics = "event-topic", groupId = "cplug-consumer-group")
    public void consumeEvnet(String message) {
        log.warn("[event] Kafka 메시지 수신: " + message);
        // 여기에 알람 UI 처리, 로그 저장 등
    }

    @KafkaListener(topics = "alarm-topic", groupId = "cplug-consumer-group")
    public void consumeAlarm(@Header(KafkaHeaders.RECEIVED_KEY) String key, String message) {
        log.warn("[alarm] Kafka key: " + key);              // 1 (orch) -> network_type
        log.warn("[alarm] Kafka message: " + message);      // {"type":"login_fail","count":6,"target":"user01","time":"2025-05-22T18:15:00","severity":3,"category":1}

        // SSE: 알람 UI 처리
        Map<String, Object> payload = new HashMap<>();

        try {
            ObjectMapper mapper = new ObjectMapper();
            payload = mapper.readValue(message, Map.class);     // map -> {"type":"login_fail","count":6,"target":"user01","time":"2025-05-22T18:15:00"}
            String target = (String) payload.get("target");

            sseService.send(target, message);

        } catch (Exception e) {
            log.warn("알람 메시지 처리 실패: {}", message, e);

        }

        log.warn("key: " + key);
        log.warn("networkType class1: " + key.getClass());

        int networkType = Integer.parseInt(key);
        payload.put("network_type", networkType);

        log.warn("payload: {}", payload);
        // {type=login_fail, count=6, target=user01, time=2025-05-22T18:15:00, severity=3, category=1, network_type=1}

        // DB 저장
        int createAlarm = alarmService.createAlarm(payload);
        log.warn("알람 DB 저장 결과: " + createAlarm);
    }


}

 

// Kafaka Topic(=Message) 를 Consume(=Recv) 하는 설정 클래스
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); // Kafka 브로커
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); // Kafka 브로커
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "cplug-consumer-group");    // Consumer Group ID (같은 group-id를 공유하는 컨슈머끼리는 메시지를 나눠가짐)
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    // kafkaListenerContainerFactory()
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

alarm-topic을 구독하고 있는 웹 서버는 메시지를 수신받는다. 

 

 

수신받은 알람 데이터를 토대로 SSE 연결 (위 로그에서는 SSE 연결이 기존에 되지 않음) 및 

DB 저장이 됨을 확인할 수 있다. 

 

 

SSE 연결은 프론트에서 위 기능을 실행하기 전에 SSE 연결에 대한 요청을 전송하면 된다. 

@RestController
@RequiredArgsConstructor
@RequestMapping("/alarm")
public class AlarmSseController {

    private final SseService sseService;

    // 프론트에서는 /alarm/subscribe?clientId=user01 로 접속해서 실시간 알람 수신
    @GetMapping("/subscribe")
    public SseEmitter subscribe(@RequestParam String clientId){
        return sseService.subscribe(clientId);
    }

}
@Slf4j
@Service
public class SseService {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    // 사용자 ID에 대한 SSE(Server-Sent Events) 연결을 생성하고, 이 연결을 통해 실시간으로 서버와 클라이언트 간에 데이터 스트림이 설정된다.
    public SseEmitter subscribe(String clientId) {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        emitters.put(clientId, emitter);

        emitter.onCompletion(() -> emitters.remove(clientId));
        emitter.onTimeout(() -> emitters.remove(clientId));

        log.warn("SSE 구독 연결: {}", clientId);
        return emitter;
    }

    // 알람 전송
    public void send(String clientId, String message) {
        SseEmitter emitter = emitters.get(clientId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event()
                        .name("alarm")
                        .data(message));
                log.warn("SSE 전송 완료 → {}", message);
            } catch (IOException e) {
                log.warn("SSE 전송 실패 → {}", message, e);
                emitters.remove(clientId);
            }
        } else {
            log.warn("연결된 SSE 없음: {}", clientId);
        }
    }
}

 

SSE 연결 요청은 일회성 연결인 포스트맨이 아닌 웹 브라우저를 통해 요청할 수 있도록 한다.

http://localhost:8080/alarm/subscribe?clientId=user01

 

이후 웹 이벤트 데이터 전송 시 (event-topic) SSE 연결이 된 프론트에 다음과 같이 데이터가 표시된다.

 

 

이후, 프론트에서 해당 데이터를 정제하여 알람 UI로 표시하면 된다.