안녕하세요. J4J입니다.
이번 포스팅은 kafka로 분산형 데이터 처리하기 세 번째인 kafka에서 사용하는 명령어에 대해 적어보는 시간을 가져보려고 합니다.
이전 글
[SpringBoot] Kafka로 분산형 데이터 처리하기 (1) - Kafka란 무엇인가?
[SpringBoot] Kafka로 분산형 데이터 처리하기 (2) - Virtual Box에서 Kafka 설치하기
들어가기에 앞서
kafka에서 사용되는 모든 명령어는 kafka가 설치되어 있는 경로를 기반으로 실행할 수 있습니다.
이전 글을 통해 kafka를 설치하신 분들이라면 다음 명령어를 통해 kafka 설치 경로로 이동 후 명령어를 사용해 주시면 됩니다.
$ cd /usr/local/kafka
Kafka 브로커 (Broker) 명령어
이전 글에서 확인할 수 있는 것처럼 브로커는 kafka를 구성하는 노드 서버들을 의미합니다.
브로커는 kafka 동작을 위한 기본 설정들을 담고 있기도 하고 브로커가 동작되고 있어야 kafka가 올바르게 처리될 수 있습니다.
[ 1. 브로커 설정 값 출력 ]
$ bin/kafka-broker-api-versions.sh --bootstrap-server <broker_host>:<broker_port>
ex)
$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
Kafka 토픽 (Topic) 명령어
토픽은 kafka에서 데이터를 분류하는 기본 단위를 의미합니다.
RDB와 비교를 해보면 테이블과 같은 역할을 한다고 생각해 주시면 됩니다.
kafka에서 토픽과 관련된 명령어는 테이블 처리와 유사하게 생성/조회/삭제 등의 작업들이 존재합니다.
[ 1. 토픽 생성 ]
$ bin/kafka-topics.sh --bootstrap-server <broker_host>:<broker_port> --create [--replication-factor <replication_factor>] [--partitions <num_partitions>] --topic <topic_name>
ex)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic testTopic
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- replication_factor → 토픽 복제 개수 (각 브로커 서버에 복제)
- num_partitions → 토픽이 가지는 파티션 갯수
- topic_name → 토픽의 이름
[ 2. 토픽 목록 조회 ]
$ bin/kafka-topics.sh --bootstrap-server <broker_host>:<broker_port> --list
ex)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
[ 3. 토픽 상세 조회 ]
$ bin/kafka-topics.sh --bootstrap-server <broker_host>:<broker_port> --describe --topic <topic_name>
ex)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic testTopic
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
[ 4. 파티션 개수 변경 ]
참고 사항으로 파티션의 개수는 증가만 가능하고 감소는 kafka에서 지원하지 않습니다.
$ bin/kafka-topics.sh --bootstrap-server <broker_host>:<broker_port> --alter --partitions <num_partitions> --topic <topic_name>
ex)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --partitions 2 --topic testTopic
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- num_partitions → 토픽이 가지는 파티션 개수
- topic_name → 토픽의 이름
[ 5. 토픽 삭제 ]
$ bin/kafka-topics.sh --bootstrap-server <broker_host>:<broker_port> --delete --topic <topic_name>
ex)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic testTopic
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
Kafka 프로듀서 (Producer) 명령어
프로듀서는 kafka의 레코드를 전달하는 주체를 의미합니다.
즉, kafka의 토픽 내부에 데이터가 적재될 수 있도록 도와주는 것이 프로듀서이며 데이터 적재를 위한 명령어들이 존재합니다.
[ 1. 기본 메시지 전달 ]
기본 메시지를 전달하는 방법은 아래의 명령어를 입력한 뒤 전달하고자 하는 명령어를 문자열로 입력해 주시면 됩니다.
입력된 문자의 개수만큼 토픽으로 전달됩니다.
$ bin/kafka-console-producer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name>
ex)
$ bin/kafka-console-producer.sh --bootstrap-sevrer localhost:9092 --topic testTopic
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
[ 2. key/value 메시지 전달 ]
토픽에 데이터를 전달할 때 key 값을 설정할 수 있습니다.
key값을 설정하는 경우 동일한 key 정보에 대해 동일한 파티션으로 데이터가 전달될 수 있도록 도와줍니다.
$ bin/kafka-console-producer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name> --property "parse.key=true" --property "key.separator=<separator>"
ex)
$ bin/kafka-console-producer.sh --bootstrap-sevrer localhost:9092 --topic testTopic --property "parse.key=true" --property "key.separator=:"
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
- separator → key/value를 구분하기 위한 구분자
Kafka 컨슈머 (Consumer) 명령어
컨슈머는 kafka의 레코드를 전달받는 주체를 의미합니다.
즉, 프로듀서에 의해 kafka의 토픽 내부에 데이터가 적재된 것을 가져와 비즈니스 로직에 사용될 수 있도록 도와주는 것이 컨슈머이며 데이터 수신을 위한 명령어들이 존재합니다.
[ 1. 기본적인 메시지 수신 ]
기본적인 메세지 수신 방식은 토픽에 적재되어 있는 메세지 가장 마지막부터 읽기 시작합니다.
그래서 컨슈머가 동작되지 않고 있을 때 메시지가 전달되었더라도 컨슈머가 동작된 이후부터 전달된 메시지만 읽는 특징을 가집니다.
$ bin/kafka-console-consumer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name>
ex)
$ bin/kafka-console-consumer.sh --bootstrap-sevrer localhost:9092 --topic testTopic
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
[ 2. 가장 처음부터 메시지 수신 ]
토픽에 저장되어 있는 메시지의 가장 처음부터 읽을 수 있는 방식도 존재합니다.
다만 주의할 점은 한 번에 너무 많은 데이터를 읽어오기에 트래픽이 몰리는 현상이 발생될 수 있습니다.
$ bin/kafka-console-consumer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name> --from-beginning
ex)
$ bin/kafka-console-consumer.sh --bootstrap-sevrer localhost:9092 --topic testTopic --from-beginning
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
[ 3. key/value 메시지 수신 ]
기본적으로 컨슈머가 메시지를 확인할 땐 value 값만 확인합니다.
하지만 프로듀서가 key 정보를 함께 전달할 때 key 값에 대해 확인이 필요하면 다음과 같이 확인해 볼 수 있습니다.
$ bin/kafka-console-consumer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name> --property print.key=true --property key.separator=<separator>
ex)
$ bin/kafka-console-consumer.sh --bootstrap-sevrer localhost:9092 --topic testTopic --property print.key=true --property key.separator=:
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
- separator → key/value를 구분하기 위한 구분자
[ 4. 파티션 / 오프셋 메시지 수신 ]
위에서 key 값을 확인하는 것처럼 메시지를 전달받을 때 파티션 정보와 오프셋 정보를 함께 확인할 수도 있습니다.
$ bin/kafka-console-consumer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name> --property print.partition=true --property print.offset=true
ex)
$ bin/kafka-console-consumer.sh --bootstrap-sevrer localhost:9092 --topic testTopic --property print.partition=true --property print.offset=true
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
[ 5. 메시지 수 제한하여 수신 ]
메시지를 읽어와 소비할 때 개수를 설정하여 전달받을 수 있습니다.
$ bin/kafka-console-consumer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name> --max-messages <num_messages>
ex)
$ bin/kafka-console-consumer.sh --bootstrap-sevrer localhost:9092 --topic testTopic --max-messages 100
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
- num_messages → 소비할 메시지 개수
[ 6. 컨슈머 그룹을 설정하여 수신 ]
컨슈머 그룹은 그룹 별로 메시지를 어디까지 수신했는지에 대한 오프셋 관리 기능이 담겨 있습니다.
특정 토픽의 메세지를 소비할 때 컨슈머 그룹을 설정하고자 한다면 다음과 같이 사용할 수 있습니다.
$ bin/kafka-console-consumer.sh --bootstrap-sevrer <broker_host>:<broker_port> --topic <topic_name> --group <group_id>
ex)
$ bin/kafka-console-consumer.sh --bootstrap-sevrer localhost:9092 --topic testTopic --group testGroup
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- topic_name → 토픽의 이름
- group_id → 메시지를 소비할 컨슈머 그룹 아이디
Kafka 컨슈머 그룹 (Consumer Group) 명령어
컨슈머 그룹은 위에서 얘기드린 바와 같이 메세지를 어디까지 소비했는지에 대한 오프셋 관리 기능을 제공합니다.
보통 컨슈머를 통해 메시지를 전달 받을 때 하나의 컨슈머가 아닌 다수의 컨슈머에서 동일한 토픽의 메세지를 소비하기 때문에 컨슈머 그룹을 통해 각각의 오프셋을 관리해 주는 것이 필요합니다.
[ 1. 컨슈머 그룹 목록 조회 ]
$ bin/kafka-consumer-groups.sh --bootstrap-server <broker_host>:<broker_port> --list
ex)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
[ 2. 컨슈머 그룹 상세 조회 ]
$ bin/kafka-consumer-groups.sh --bootstrap-server <broker_host>:<broker_port> --describe --group <group_id>
ex)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- group_id → 메시지를 소비할 컨슈머 그룹 아이디
[ 3. 컨슈머 그룹 오프셋 초기화 ]
// 가장 처음
$ bin/kafka-consumer-groups.sh --bootstrap-server <broker_host>:<broker_port> --group <group_id> --reset-offsets --to-earliest --execute --topic <topic_name>
ex)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --to-earliest --execute --topic testTopic
// 가장 마지막
$ bin/kafka-consumer-groups.sh --bootstrap-server <broker_host>:<broker_port> --group <group_id> --reset-offsets --to-latest --execute --topic <topic_name>
ex)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --to-latest --execute --topic testTopic
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- group_id → 메시지를 소비할 컨슈머 그룹 아이디
- topic_name → 토픽의 이름
[ 4. 모든 파티션에서 컨슈머 그룹 오프셋 초기화 ]
// 가장 처음
$ bin/kafka-consumer-groups.sh --bootstrap-server <broker_host>:<broker_port> --group <group_id> --reset-offsets --to-earliest --execute --all-topics
ex)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --to-earliest --execute --all-topics
// 가장 마지막
$ bin/kafka-consumer-groups.sh --bootstrap-server <broker_host>:<broker_port> --group <group_id> --reset-offsets --to-latest --execute --all-topics
ex)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroup --reset-offsets --to-latest --execute --all-topics
- broker_host→ 토픽을 생성하려는 브로커 서버 호스트
- broker_port → 토픽을 생성하려는 브로커 서버 포트
- group_id → 메시지를 소비할 컨슈머 그룹 아이디
이상으로 kafka로 분산형 데이터 처리하기 세 번째인 kafka에서 사용하는 명령어에 대해 간단하게 알아보는 시간이었습니다.
읽어주셔서 감사합니다.
'Spring > SpringBoot' 카테고리의 다른 글
[SpringBoot] Kafka로 분산형 데이터 처리하기 (4) - SpringBoot에 Kafka Producer 사용 환경 설정 (0) | 2024.09.17 |
---|---|
[SpringBoot] Kafka로 분산형 데이터 처리하기 (2) - Virtual Box에서 Kafka 설치하기 (8) | 2024.08.25 |
[SpringBoot] Kafka로 분산형 데이터 처리하기 (1) - Kafka란 무엇인가? (0) | 2024.08.08 |
[SpringBoot] Redis 테스트 환경 구축하기 (2) - Test Container (0) | 2024.07.27 |
[SpringBoot] Redis 테스트 환경 구축하기 (1) - Embedded (0) | 2024.07.07 |
댓글