Spring/SpringBoot

[SpringBoot] Kafka로 분산형 데이터 처리하기 (4) - SpringBoot에 Kafka Producer 사용 환경 설정

J4J 2024. 9. 17. 11:48
300x250
반응형

안녕하세요. J4J입니다.

 

이번 포스팅은 kafka로 분산형 데이터 처리하기 네 번쨰인 spring boot에 kafka producer 사용 환경 설정하는 방법에 대해 적어보는 시간을 가져보려고 합니다.

 

 

 

이전 글

 

[SpringBoot] Kafka로 분산형 데이터 처리하기 (1) - Kafka란 무엇인가?

[SpringBoot] Kafka로 분산형 데이터 처리하기 (2) - Virtual Box에서 Kafka 설치하기

[SpringBoot] Kafka로 분산형 데이터 처리하기 (3) - Kafka에서 사용하는 명령어

 

 

반응형

 

 

Kafka Producer 란 ?

 

이전 글들에서 적어두 것처럼 kafka producer는 kafka의 partition에 저장되는 레코드를 전달하는 주체를 의미합니다.

 

즉, consumer에 전달하게 될 데이터를 kafka에 저장해두기 위한 명령을 수행하는 곳입니다.

 

 

 

spring boot를 이용하여 kafka producer에 관련된 설정을 하게 되면 일반적으로 api와 같이 비즈니스를 제공해주는 repository와 동일한 곳에 설정될 확률이 높습니다.

 

왜냐하면 client로부터 요청 받은 비즈니스 처리를 수행하게 되면 kafka에 전달해야 될 데이터들을 수집할 수 있기 때문입니다.

 

 

 

그리고 kafka producer는 spring 외에도 다양한 곳에 설정될 수 있습니다.

 

kafka에서 기능 사용을 위해 지원해주고 있는 javascript, python 등을 포함하여 spring과 동일한 구조로 설정해 볼 수 있습니다.

 

또한 connector를 등록하여 datasource에서도 producer의 역할을 수행할 수 있습니다.

 

 

 

이처럼 kafka producer는 spring에서만 한정되어 있는 기능이 아닙니다.

 

kafka와 연결되고자 하는 곳에서는 kafka 측의 지원되는 기능만 있다면 어디서든 활용해볼 수 있습니다.

 

 

 

 

SpringBoot에서 Kafka Producer 사용 환경 설정 방법

 

[ 1. dependency 설정 ]

 

// build.gradle

dependencies {
    // kafka
    implementation 'org.springframework.kafka:spring-kafka'
}

 

 

 

[ 2. properties 설정 ]

 

kafka producer 동작을 위한 properties 설정은 application.yml 파일에 해볼 수 있습니다.

 

해당 방법 외에도 클래스 파일로써 설정이 필요하다면 커스텀하여 @Configuration으로 등록하는 방법도 존재합니다.

 

설정할 때 연결되는 kafka cluster 서버는 이전 글에서 확인할 수 있는 virtual box로 연결될 수 있도록 설정하겠습니다.

 

// application.yml

spring:
  kafka:
    # kafka producer 설절
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key 직렬화 방법 설정
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value 직렬화 방법 설정
      bootstrap-servers: ubuntu:9092 # kafka cluster server (이곳에서는 virtual box 연결)

 

 

 

 

[ 3. 레코드 전송 ]

 

spring의 비즈니스 로직을 통해 전달할 수 있는 레코드 정보를 전송할 때는 kafka에서 제공해주는 kafka template을 활용해주시면 됩니다.

 

kafka template에 담겨 있는 더 많은 정보는 이따가 확인해보도록 하고, 간단하게 레코드 전송을 하는 코드를 구성하면 다음과 같습니다.

 

package com.jforj.kafkaproducer.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    @PostMapping("/send")
    public ResponseEntity<Object> send(@RequestBody RequestDto requestDto) throws JsonProcessingException {
        Record record =
                Record
                        .builder()
                        .name(requestDto.getName())
                        .height(requestDto.getHeight())
                        .build();

        // kafka cluster에 record send
        kafkaTemplate.send(requestDto.getTopic(), objectMapper.writeValueAsString(record));

        return ResponseEntity.ok(record);
    }

    @Getter
    private static class RequestDto {
        private String topic;
        private String name;
        private Long height;
    }

    @Builder
    @Getter
    @ToString
    private static class Record {
        private String name;
        private Long height;
    }
}

 

 

 

[ 4. 테스트 ]

 

위와 같이 구성된 api를 호출해보겠습니다.

 

api를 요청하면 다음과 같이 비즈니스 로직이 처리되며 kafka cluster에 전달되고, consumer의 입장에서는 virtual box를 통해 데이터가 전달되는 것을 확인할 수 있습니다.

 

api 요청 포스트맨

 

virtual box consumer 로그

 

 

 

 

Kafka Template

 

kafka template에 대한 소개는 공식 문서에서 보다 더 자세한 내용을 확인할 수 있습니다.

 

공식 문서 대신 간단하게 주요 내용에 대해서만 소개해보도록 하겠습니다.

 

 

 

kafka template은 kafka에서 제공해주는 기능 중 하나로 kafka cluster와 연결된 브로커 서버에 레코드를 전달해주기 위한 목적으로 사용됩니다.

 

kafka template이 제공해주는 특징은 다음과 같이 있습니다.

 

  • kafka 브로커 서버에 레코드 전달
  • generic을 이용한 타입 정의를 통해 타입 안정성 제공
  • 비동기 및 동기 방식 모두 사용 가능한 환경 제공

 

 

 

그리고 kafka template은 기본적으로 위에서 설정했던 것처럼 kafka와 관련된 spring 설정을 기반으로 동작되는 구조입니다.

 

그래서 특정한 상황이 아니라면 kafka template을 이용하여 메세지를 전달하는 것이 가장 바람직하며, 커스텀을 하더라도 kafka template의 기능을 기반으로 커스텀하여 사용하는 것을 권장드립니다.

 

 

 

 

kafka template의 대표 기능은 send 메서드가 될 수 있습니다.

 

send 메서드는 메세지를 전달할 때 원하는 구조에 맞춰 데이터를 전송할 수 있도록 도와줍니다.

 

공식 문서에서도 볼 수 있듯이 send 메서드는 다음과 같이 다양한 구조로 오버로딩이 되어 있습니다.

 

일반적으로는 topic, key, data를 파라미터로 소유하고 있는 메서드를 많이 사용할 것으로 보이지만, 각자의 서비스에서 제공해주고자 하는 상황에 맞춰 기능을 추가적으로 활용해보시면 될 것 같습니다.

 

send(String topic, Integer partition, Long timestamp, K key, V data)

send(String topic, Integer partition, K key, V data)

send(String topic, K key, V data)

send(String topic, V data)

send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)

send(Message<?> message)

 

 

 

마지막으로 kafka template의 return 값은 java에서 제공해주고 있는 CompletableFuture 클래스 입니다.

 

위에서 얘기했던 동기 / 비동기 처리의 기능들은 해당 클래스에서 제공해주는 메서드를 기반으로 동작되기 때문에 kafka에 데이터를 전달한 이후에 어떤 추가 동작이 필요하신 분들은 해당 클래스에 대해 확인해보시면 많은 도움이 될 것 같습니다.

 

 

 

 

 

 

 

이상으로 kafka로 분산형 데이터 처리하기 네 번쨰인 spring boot에 kafka producer 사용 환경 설정에 대해 간단하게 알아보는 시간이었습니다.

 

읽어주셔서 감사합니다.

 

 

 

728x90
반응형