본문 바로가기
ETC/Infra

[Kafka] 기본 개념 및 Embedded Kafka를 통한 Kafka 테스트

by 은z 2024. 11. 27.

상황

 

아파치 카프카는 대용량 실시간 메시징 시스템으로, 대량의 데이터를 안정적이고 빠르게 전송하고 처리하는 데 사용된다. 

기존 프로젝트에서 메소드 내에 특정 로직에서 오류가 발생하면 전체 프로세스가 롤백되는 문제가 있었다.

그래서 그 특정 로직(로깅, pdf생성, 메시지 전송)만 따로 분리하여 카프카를 이용하여 처리하고자 한다.

 

※참고로 이 포스팅은 카프카 서버가 구축되어있고, 기본적인 카프카 세팅이 완료되었다고 가정하고 작성한 것입니다.

 

[개발 환경]

- JDK 17

- Springboot 3.1.3

- Kotlin 1.8.22

- JUnit5

- IDE : IntelliJ

- OS: mac m1


간단 요약

개발에 들어가기 앞서, 그전에 간단하게 이론적인 부분을 정리해보자.

  1. 데이터를 주고받는 굉장히 많은 어플리케이션과 DB가 있지만 이 많은 데이터 파이프라인을 관리하기란 어렵다.
  2. 파이프라인의 중간 통로로 카프카를 사용해서 데이터를 카프카에 넣고 데이터를 카프카로부터 꺼내가도록 한다.
  3. 이때, 카프카는 topic, parition, offset 을 이용해서 데이터(메시지)를 관리한다.
Topic: 카프카로 들어오는 데이터(메시지) 스트림을 카테고리화한다. 대략적으로 생각하면 동일한 카테고리의 데이터가 저장되는 박스라 보면 된다. 이곳에 같은 카테고리 데이터를 저장하고, 해당 카테고리를 찾는 사람은 이곳에 접근해 데이터를 이용할 수 있다.
Partition: 토픽을 나눈 단위. 병렬 처리에 큰 이점을 제공한다.
Offset: 파티션 내의 각 메시지의 고유 위치로 0부터 시작해 메시지가 토픽에 올 때마다 증가한다. 즉, 일종의 Index라 볼 수 있다. 이 오프셋을 활용해 메시지를 이용하려는 사람(컨슈머)는 어디부터 데이터를 처리할지 계산할 수 있어 데이터를 빠뜨리지 않고 처리할 수 있게 해준다.

 

그럼 카프카를 통해 데이터(메시지)를 어떻게 보내고 처리할까?

아래 사진을 살펴보면, Producer, Broker, Consumer 로 구성되어 있는 것을 볼 수 있다.

프로듀서 (Producer)
Kafka에 메시지를 발행하는 client이다. 특정 토픽으로 메시지를 보낸다.
브로커 (Broker)
브로커가 일반적으로 Kafka라고 불리는 시스템이라고 보면 된다.
Kafka cluster를 구성하는 서버로 데이터(메시지)를 저장하고 처리한다. 브로커는 클라이언트(프로듀서)에게서 메시지를 받아 저장하며, 필요한 클라이언트(컨슈머)에게 메시지를 전달하는 역할을 한다.
컨슈머 (Consumer)
Kafka의 메시지를 소비하는 client이다. 지속적으로 메시지를 받아 처리한다.

카프카 브로커는 프로듀서와 컨슈머 사이에서 메시지를 중계한다 (출처 : https://mapr.com/ebooks/streaming-architecture/chapter-04-apache-kafka-overview.html)

 


적용

✔️1. 라이브러리 추가

각자 상황에 맞게 버전이나 라이브러리명을 잘 확인하고 추가해야 한다. 

kafka test를 위한 의존성도 추가해주었다.

 

📌build.gradle.kts

implementation("org.springframework.kafka:spring-kafka")

testImplementation("org.springframework.kafka:spring-kafka-test") //for kafka test

 

✔️2. properties

 

📌application.yaml

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092

 

 

✔️3. config 설정

참고로, 이번에는 프로듀서와 컨슈머를 같은 어플리케이션 내에서 구현할 거라서 config 파일을 하나로 뒀다.

(물론 필요에 따라 각각 분리해도 무방하다.)

 

📌KafkaConfig.kt

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import java.io.Serializable

@Configuration
class KafkaConfig(
    @Value("\${spring.kafka.producer.bootstrap-servers}")
    private val bootstrapServers: String,
    ) {

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        val factory = DefaultKafkaProducerFactory<String, String>(producerConfigs())
        return KafkaTemplate(factory)
    }

    // Producer 관련
    fun producerConfigs(): Map<String, Serializable> =
        mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
//            TODO :: 파티션이 여러개일 떄 사용 필요
//            ProducerConfig.PARTITIONER_CLASS_CONFIG to CustomPartitioner::class.java
        )

    // Consumer 관련
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

//        factory.setConcurrency(2) // Consumer Process Thread Count
        factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
        factory.containerProperties.pollTimeout = 500

        return factory

    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name,
            //ConsumerConfig.GROUP_ID_CONFIG to "group-id-sample",
        )
}

 

✔️4. Producer 

import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service

@Service
class KafkaService(
    private val kafkaTemplate: KafkaTemplate<String, String>,
) {

    fun send(topic: String, data: String) {
        println("Produce Kafka - ${topic}")
        kafkaTemplate.send(topic, data);
    }

}

 

✔️5. Consumer

import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component

@Component
class KafkaListener(
    private val objectMapper: ObjectMapper,
) {

    //group id : 동일한 topic에 대해 여러 컨슈머가 메시지를 처리할 때 사용하는 식별자
    @KafkaListener(topics = ["message"], groupId = "message.group")
    fun messageReceive(
        @Payload message: String,
    ) {
        println("receive message: ${message}")

        // 역직렬화
        val request = try {
            objectMapper.readValue(message, KafkaMessage::class.java)
        } catch (e: JsonParseException) {
            print("Not Supported Data Content")
            throw e
        }

        //여기서 서비스 로직 호출
        //~~~~~
    }
}

 

 

👀 여기서 잠깐!

Kafka 통신 시에 직렬화, 역직렬화를 해야하는 이유 ?

직렬화객체나 데이터를 바이트 배열로 변환하는 과정

  • Kafka 메시지는 TCP/IP 네트워크를 통해 전송
  • 네트워크는 바이트 형태의 데이터만 이해할 수 있기 때문에 데이터를 직렬화해야 한다.

 

역직렬화바이트 배열을 원래의 객체나 데이터로 변환하는 과정

  • Kafka 컨슈머는 브로커로부터 받은 메시지를 이해하기 위해 역직렬화가 필요하다.

 


적용 (테스트 코드)

이제 테스트 코드를 작성해보자.

참고로 mac 에서 테스트 파일을 만드는 단축키는 테스트하고 싶은 클래스명을 클릭한 뒤, Command + Shift + T 로 생성

Create New Test 누르면 테스트 파일을 Create 할수 있는 설정이 나온다.
이런 구조로 생김

 

 

✔️Test 작성

EmbeddedKafka 어노테이션을 클래스 레벨에 선언함으로써 해당 테스트는 Embedded Kafka가 셋팅된 환경에서 테스트를 하게 되는 것이다.

 

📌KafkaConnectTest.kt

import com.fasterxml.jackson.databind.ObjectMapper
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired

import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.kafka.test.utils.KafkaTestUtils
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

@SpringBootTest
@EmbeddedKafka(topics = ["test-topic"])
class KafkaConnectTest {

    // 참고로 JUnit5는 생성자 매개변수 관리를 스프링 컨테이너가 아닌 Jupiter에서 담당
    // 때문에 @Autowired를 명시적으로 선언해주어야 Jupiter가 스프링컨테이너에게 빈 주입 요청!
    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, String>

    private val messageQueue = LinkedBlockingQueue<String>()

    // Consumer
    @KafkaListener(topics = ["test-topic"], groupId = "test-topic-group")
    fun consume(message: String) {
        messageQueue.offer(message)
    }

    @Test
    fun `test Kafka connection`() {
        // Producer가 메시지를 보냄
        val testMessage = "Kafka!"
        kafkaTemplate.send("test-topic", testMessage)

        // Consumer가 메시지를 받는지 확인
        val receivedMessage = messageQueue.poll(5, TimeUnit.SECONDS) // 5초 대기
        Assertions.assertEquals(testMessage, receivedMessage) // 메시지가 동일한지 검증
    }


}

댓글