Kafka란?
대용량 데이터 스트리밍을 처리하는 분산형 메시지 브로커이다.
실시간 데이터 스트리밍을 위해 게시-구독(pub-sub) 모델을 사용하며 높은 처리량과 확장성을 제공하는 분산 이벤트 스트리밍 플랫폼이다.
예를 들어 Youtube의 구독을 하면 알람 시스템처럼 이벤트를 발생하면 실시간 데이터 스트리밍을 지원한다.
또한 쿠팡, 인터파크 등 장바구니를 담을때 DB를 사용하지 않고 실시간으로 처리할 수 있으며,
Kafka는 단순한 메시지 브로커를 넘어 데이터 처리 파이프라인을 구성하는 데 최적화된 솔루션이다.
Kafka는 이 글을 읽고 있는 상당수가 아는 LinkedIn에서 개발되었으며 Apache Software Foundation에서 오픈소스로 사용할 수 있다.
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
자세한 특징은 아래 확인!
1. 높은 처리량 (High Throughput)대량의 메시지를 초당 수백만 개 이상 빠르게 처리 가능
배치(batch) 방식 대신 스트리밍 방식으로 실시간 데이터 처리
2. 확장성 (Scalability)대량의 메시지를 초당 수백만 개 이상 빠르게 처리 가능배치(batch) 방식 대신 스트리밍 방식으로 실시간 데이터 처리
3. 내구성 및 데이터 영속성 (Durability & Persistence)디스크에 로그 형태로 메시지 저장 (데이터 손실 방지)메시지는 설정된 보존 기간 동안 유지
4. 분산형 아키텍처 (Distributed Architecture)여러 개의 브로커(Broker)로 이루어진 클러스터 구조파티션을 활용하여 병렬 처리 가능
5. 메시지 순서 보장 (Message Ordering)같은 파티션 내에서는 메시지 순서 보장여러 파티션에 걸쳐 있을 경우 순서는 보장되지 않음
6. 장애 복구 및 안정성 (Fault Tolerance)Replication(복제) 기능 제공 → 장애 발생 시 데이터 손실 최소화리더(Leader)와 팔로워(Follower) 구조로 자동 복구7. 유연한 데이터 소비 모델게시-구독(pub-sub) 모델과 Point-to-Point(Direct Read) 모델 지원같은 데이터를 여러 Consumer Group에서 독립적으로 처리 가능
아래 프로젝트에서는 게시-구독 모델을 사용.
구현
먼저 새 프로젝트를 만들고, 종속성에 Kafka를 추가한다.
Lombok과 devtools 그리고 spring web도 포함하였다.
Intellij에서 프로젝트를 생성할 때 추가하거나,
`build.gradle` 에 아래와 같이 추가해준다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
그다음 `application.yml` 파일에 아래와 같이 작성한다.
spring:
application:
name: kafka-demo
kafka:
# Kafka 서버 주소 및 포트 -> 브로커 주소
bootstrap-servers: localhost:9092
producer:
# Kafka 메시지의 Key를 문자열 직렬화
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Kafka 메시지의 Value를 문자열 직렬화
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 메시지를 읽을 때, Consumer의 오프셋이 없거나(초기 실행 시) 유효하지 않을 경우 어디서부터 읽을지 결정
# earliest : 토픽의 가장 처음부터 메시지를 읽음, latest: 가장 마지막(새로운) 메시지부터 읽음
auto-offset-reset: earliest
# Kafka Consumer Group ID, 커슈머 그룹 ID
group-id: test-group
listener:
# Kafka 토픽이 없어도 애플리케이션 실행 유지 false => 계속 실행
missing-topics-fatal: false
여기서 `port:9092` 는 kafka의 시그니처 포트이며 kafka에서는 주로 직렬화된 데이터(문자열)로 통신하게 된다.
또한 consumer에서 가장 처음부터 메시지를 읽도록 설정할 것이며, consumer에서도 그룹을 만들 수 있어 test-group으로 명시해 줬다.
음.. 혹시 포트번호가 겹칠 수도 있기 때문에 포트번호를 8070으로 설정해 놓겠다.
server:
port: 8070
Docker 연동
먼저 도커가 설치되지 않았으면 설치한다.
그다음 최상위 폴더에 `docker-compose.yml` 을 만들고 아래와 같이 입력해 준다.
해당 설정파일은 Docker와 연동하기 위함이며 주석을 통해 설명을 기재해 놓았다.
version: "3.7"
services:
# 클러스터 상태 관리, 리더 선출
zookeeper:
image: "confluentinc/cp-zookeeper:latest"
environment:
# Zookeeper가 클라이언트(예: Kafka Broker)와 통신하는 포트
ZOOKEEPER_CLIENT_PORT: 2181
# 메세지 브로커, 메세지 저장하고 관리
kafka:
image: "confluentinc/cp-kafka:latest"
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
# Kafka 브로커(서버)마다 고유한 ID를 설정
# 여러 개의 Kafka 브로커를 실행할 경우, 각 브로커마다 다른 ID를 설정
# 브로커 ID로 1로 설정
KAFKA_BROKER_ID: 1
# Kafka가 Zookeeper와 연결할 주소를 설정
# Docker Compose 네트워크 내에서 Zookeeper 컨테이너를 찾는 주소
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# Kafka가 클라이언트(Producer, Consumer)와 통신할 수 있도록 외부에 공개하는 주소
# PLAINTEXT: 암호화되지 않은 통신 방식 (기본 프로토콜)
# localhost:9092: Kafka가 클라이언트에게 노출하는 호스트와 포트
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
# Consumer의 오프셋(Offset)을 저장하는 전용 토픽을 사용
# Offset을 저장하는 토픽의 복제본(replica) 개수를 설정
# 값이 1이면 단일 브로커에서만 오프셋을 저장하므로, 브로커 장애 시 데이터 손실 위험이 있음.
# 클러스터 환경에서는 최소 2 또는 3 이상으로 설정하는 것이 좋음
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
그 후 해당파일 위치로 터미널을 열고,
`docker-compose up -d` 를 실행한다.
아래와 같이 설치가 진행되고 완료된 이후에는 docker desktop에서 컨테이너를 확인하면
프로젝트 이름으로 새롭게 생성된 것을 알 수 있다.
들어가면 정상적으로 설치된 모습을 볼 수 있다.
그 후에 kafka-demo-kafka-1로 접속 후 Exec에 들어간다.
이 창에서 아래와 같은 명령어를 통해 kafka의 토픽 목록과 상세정보를 볼 수 있다.
# 토픽 목록 확인
kafka-topics --list --bootstrap-server localhost:9092
# 특정 토픽의 상세 정보
kafka-topics --describe --topic test-topic --bootstrap-server localhost:9092
# consumer의 메세지 수신 대기 -> 송신 후 출력
kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server localhost:9092
위에 설명했듯이, kafka는 `Producer` 와 `Consumer` 로 작동한다. `broker` 는 도커 컨테이너 내부에 구현하였다.
Producer, consumer Get 구현
main 디렉터리 안에 아래와 같이 파일을 생성한다.
#KafkaController.java
package com.example.kafkademo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("kafka")
public class KafkaController {
@Autowired
private KafkaProducer producer;
@GetMapping("/send")
public String sendMessage(@RequestParam("message") String message) {
producer.sendMsg("test-topic", message);
return "Message send Successfully";
}
}
우선 틀을 먼저 잡아보자. controller에서 아래와 같이 입력한다.
Web을 사용하기 때문에 url로 통신할 것이며 먼저 Get 방식을 구현하겠다.
그럼 이제 producer에 sendMsg를 구현해 보자.
package com.example.kafkademo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* 트리거에 의해서 이벤트 발생(메세지 전송) -> 브로커(kafka)
*/
@Service
public class KafkaProducer {
// 브로커에서 메세지를 전송하는 객체 필요 -> DI
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMsg(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
producer를 만들었으면 해당 메시지를 받을 consumer도 구현하자.
여기서 `@kafkaListener` 어노테이션은 지정된 토픽을 구독하고 해당 토픽에서 수신된 메시지를 처리한다.
package com.example.kafkademo;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Producer message : " + message);
}
}
docker-compose에서 설정대로 브로커에서 미리 세팅한 test-topic과 test-group을 지정해 주면 된다.
Postman 테스트
이제 확인을 위해 Postman에서 테스트한다.
해당 Get 요청을 보내면 성공했다는 메시지가 리턴 받게 되고,
Consumer에서도 숫자, 한글, 영어, 특수문자 등등 깨지지 않고 잘 받는다.
Producer, consumer Post 구현
이번엔 Post를 구현해 보자.
`OrderDto.java` 파일을 생성해 주고 간단하게 Id와 메시지를 담도록 작성하였다.
package com.example.kafkademo;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class OrderDto {
private String orderId;
private String Msg;
public OrderDto(String orderId, String Msg) {
this.orderId = orderId;
this.Msg = Msg;
}
}
먼저 Controller다.
@PostMapping("/send")
public String sendMessage(@RequestBody OrderDto orderDto) {
try{
producer.sendMsg("test-topic2",orderDto);
} catch (Exception e) {}
return "Message send Successfully" + orderDto;
}
`OrderDto` 를 이용하여 `@RequestBody` 를 통해 메시지가 전달되며 , get과 구별을 위해 `test-topic2` 로 설정한다.
그다음 Producer에서
@Autowired
private ObjectMapper objectMapper;
```
기존 코드
```
public void sendMsg(String topic, OrderDto orderDto) throws JsonProcessingException {
kafkaTemplate.send(topic, objectMapper.writeValueAsString(orderDto));
}
}
이렇게 추가해 준다. `ObjectMapper` 를 통해 OrderDto 객체를 JSON 문자열로 직렬화하여 전송한다.
다음은 Consumer이다.
@KafkaListener(topics = "test-topic2", groupId = "test-group")
public void listen2(String message) {
try {
OrderDto orderDto = objectMapper.readValue(message, OrderDto.class);
System.out.println("프로듀서 메세지 " + orderDto.getMsg());
}catch (Exception e){}
}
앞서 Controller에서 토픽을 test-topic2로 지정하였으므로 맞춰주고 `readValue` 를 위해 try-catch 문을 사용한다.
여기서 `readValue` 는 `ObjectMapper` 에서 직렬화한 문자열을 역직렬화하는 데 사용된다.
Postman 테스트
해당 Url로 Dto에서 설정한 `orderId` , `msg` 를 Post요청으로 보낼 수 있다.
리턴값으로 해당 값이 잘 전달되었다. 그럼 comsumer에서도 확인해 보면
아래와 같이 잘 출력해 준다.
그럼 이제 위에 설명한 Docker로 가서 명령어를 통해 확인해 보자.
# consumer의 메세지 수신 대기 -> 송신 후 출력
kafka-console-consumer --topic test-topic2 --from-beginning --bootstrap-server localhost:9092
`test-topic2` 토픽을 구독하여 처음순서대로 읽도록 명령어를 작성하면
아래와 같이 Json 형식으로 실시간으로 송신 후 출력되는 것을 볼 수 있다.
해당 프로젝트에 자세한 파일은 아래 Github 주소에서 볼 수 있다.
https://github.com/iseungho/kafka-demo
GitHub - iseungho/kafka-demo
Contribute to iseungho/kafka-demo development by creating an account on GitHub.
github.com
마무리
사실 Kafka는 이렇게 한 프로젝트에 국한되지 않고 여러 프로젝트에서 데이터를 유기적으로 연결하며 실시간 처리가 필요한 시스템에서 강력한 역할을 한다.
이번 테스트는 그 가능성을 엿보는 작은 프로젝트이지만, 앞으로 더 복잡하고 다양한 환경에서 Kafka를 사용하여 익숙해지도록 노력할 것이다.
'SK 루키즈 > Cloud' 카테고리의 다른 글
[Rookies 개발 2기] Nginx 404 에러 해결 (0) | 2025.02.25 |
---|---|
[Rookies 개발 2기] AWS S3 버킷 정책 오류 설정 (2) | 2025.02.23 |
[Rookies 개발 2기] AWS 서버리스 개념과 ECS와 Lambda (0) | 2025.01.24 |
[Rookies 개발 2기] MSA와 EurekaServer 세팅 (0) | 2025.01.24 |
[Rookies 개발 2기] AWS S3 에 Spring 업로드 처리 (0) | 2025.01.23 |