안녕하세요. J4J입니다.
이번 포스팅은 docker desktop 환경에서 strimzi 기반 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 적어보는 시간을 가져보려고 합니다.
관련 글
Helm 이란? Helm 입문을 위한 기본 개념 설명
안녕하세요. J4J입니다. 이번 포스팅은 helm 입문을 위한 기본 개념 설명하는 시간을 가져보려고 합니다. Helm 이란? helm이라고 하는 것은 kubernetes를 사용하는 환경에서 kubernetes cluster가 관리하는 영
jforj.tistory.com
Window에서 Chocolately로 Helm 설치하기 & Helm 명령어 정리
Window에서 Chocolately로 Helm 설치하기 & Helm 명령어 정리
안녕하세요. J4J입니다. 이번 포스팅은 window에서 chocolately로 helm 설치하는 방법과 사용할 수 있는 명령어에 대해 적어보는 시간을 가져보려고 합니다. 관련 글 Helm 이란? Helm 입문을 위한 기본 개
jforj.tistory.com
MSA 환경에서 필요한 CDC 개념 이해: Kafka Connector와 Debezium으로 데이터 동기화 구조 살펴보기
MSA 환경에서 필요한 CDC 개념 이해: Kafka Connector와 Debezium으로 데이터 동기화 구조 살펴보기
안녕하세요. J4J입니다. 이번 포스팅은 msa 환경에서 필요한 cdc 개념 이해와 kafka connector, debezium에 대해 이해하는 시간을 가져보려고 합니다. CDC 소개 cdc라고 하는 것은 change data capture의 약자로,
jforj.tistory.com
Kafka Connect, Debezium으로 MySQL CDC 구축: Strimzi 없이 Docker Desktop 로컬 실습
Kafka Connect, Debezium으로 MySQL CDC 구축: Strimzi 없이 Docker Desktop 로컬 실습
안녕하세요. J4J입니다. 이번 포스팅은 docker desktop 환경에서 strimzi 없이 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 적어보는 시간을 가져보려고 합니다. 관련 글 Helm 이란? Helm 입문을 위
jforj.tistory.com
Kafka 설치
이번 글에서 cdc 구축 테스트를 위해 다음과 같은 환경들을 사용하고자 합니다.
- k8s cluster >> docker desktop 활용
- kafka >> helm을 이용하여 cluster 내부 설치
- mysql >> 로컬에 설치되어 있는 mysql
- strimzi >> helm을 이용하여 cluster 내부 설치
strimzi 같은 경우는 클러스터 내부에서 kafka와 관련된 리소스들의 배포/관리를 효율적으로 할 수 있도록 도와주는 도구이며 많은 곳에서 운영을 할 때 strimzi를 이용한 설정을 활용하고 있습니다.
혹시, strimzi를 사용하지 못하는 경우에서는 어떻게 구성되는지에 대해 궁금하시다면 이전 글을 확인해주시면 됩니다.
Kafka Connect, Debezium으로 MySQL CDC 구축: Strimzi 없이 Docker Desktop 로컬 실습
안녕하세요. J4J입니다. 이번 포스팅은 docker desktop 환경에서 strimzi 없이 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 적어보는 시간을 가져보려고 합니다. 관련 글 Helm 이란? Helm 입문을 위
jforj.tistory.com
그리고 kafka 같은 경우 보통은 이미 설치되어 있을 것으로 생각되지만, 이번 글에서는 helm을 이용하여 cluster 내부에 설치하려고 합니다.
설치되어 있으신 분들은 해당 파트는 스킵하시고 다음 파트부터 보시면 됩니다.
[ 1. 테스트를 위한 namespace 구성 ]
논리적으로 환경을 구분하기 위해 별도의 namespace를 생성하려고 합니다.
필수적인 설정은 아니기에 필요하지 않다고 생각하시는 분들은 스킵하셔도 됩니다.
$ kubectl create namespace kafka
[ 2. kafka values 구성 ]
kafka 설치를 위한 설정 정보에 사용하기 위한 values를 다음과 같이 구성했습니다.
사용되는 kafka 환경에 따라 다르겠지만, 실습을 위해 기본적인 것들만 설정해봤습니다.
// kafka-values.yaml
listeners:
client:
protocol: PLAINTEXT # 일반 tcp로 통신
extraEnvVars:
- name: KAFKA_CFG_LISTENERS # kafka broker가 내부에서 접속되는 방식 설정
value: PLAINTEXT://:9092
- name: KAFKA_CFG_ADVERTISED_LISTENERS # kafka 외부에 kafka broker 접속 방식 설정
value: PLAINTEXT://localhost:9092
# localhost container에서 kafka 접속을 위한 설정
externalAccess:
enabled: true
controller:
service:
loadBalancerNames: ["localhost", "localhost", "localhost"]
[ 3. bitnami repo 추가 ]
helm에서 bitnami를 이용하여 kafka를 설치하기 위해 repository를 다음과 같이 추가합니다.
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update
[ 4. kafka 설치 ]
설치하기 전 먼저 위에서 작성한 kafka-values.yaml 파일이 있는 곳으로 명령어를 통해 이동해 줍니다.
그리고 다음과 같이 명령어를 입력하여 yaml 파일이 적용된 kafka를 설치해 줍니다.
$ helm install kafka bitnami/kafka -n kafka -f kafka-values.yaml
[ 5. kafka 설치 확인 ]
kafka가 올바르게 설치되었다면 다음과 같이 pod와 svc가 생성되어 있을 것입니다.
올바르게 동작이 되고 있는지 다음과 같이 확인해 보시면 됩니다.
$ kubectl get pod -n kafka # pod 상태 확인
$ kubectl get svc -n kafka # svc 상태 확인
Strimzi 설치
strimzi는 위에서 얘기한 것과 같이 클러스터 내부에서 kafka와 관련된 리소스들의 배포/관리를 효율적으로 할 수 있도록 도와줍니다.
그리고 kafka connect를 이용하여 cdc를 구축할 때 strimzi를 사용한다면, 클러스터에서 사용할 수 있는 기본적인 리소스 타입 말고 kafka connect와 kafka connector를 위한 커스텀 리소스 타입을 활용하게 됩니다.
그 결과로 다음과 같은 이점들을 제공합니다.
- rest api를 호출하던 kafka connect 생성을 클러스터 리소스 관리하는 방식과 동일하게 관리
- 롤링 업데이트 및 장애 복구 자동화를 통해 운영 자동화
- 확장(= scale out) 이 필요한 경우 리소스에서 제공해주는 설정 값을 통해 유연하게 확장
- 외부 script 요소 없이 yaml 파일만을 이용한 ci/cd 파이프라인 구성 가능
등등 이 외에도 다양한 이점들을 확인 해볼 수 있습니다.
strimzi에 대해서 더 많은 정보를 알고 싶으신 분들은 strimzi 공식 문서를 참고해주시면 좋을 것 같습니다.
Strimzi Overview (0.47.0)
Operators are Kubernetes components that package, deploy, and manage applications by extending the Kubernetes API. They simplify administrative tasks and reduce manual intervention. Strimzi provides a set of operators to automate the deployment and managem
strimzi.io
strimzi를 설치하는 방법은 다양하게 존재합니다.
다만, 저 같은 경우 kafka를 포함한 여러 리소스들을 helm을 이용하여 설치를 하고 있기 때문에 strimzi도 동일하게 helm을 활용하여 설치해보겠습니다.
[ 1. strimzi repo 추가 ]
kafka는 bitami repo를 이용하여 설치를 했다면 strimzi는 strimzi repo를 이용하여 설치해보겠습니다.
다음과 같이 strimzi repo를 추가해주시면 됩니다.
$ helm repo add strimzi https://strimzi.io/charts
$ helm repo update
[ 2. strimzi 설치 ]
repository가 추가되었다면 별도의 행위 없이 바로 strimzi를 다음과 같이 설치해주시면 됩니다.
$ helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator -n kafka
[ 3. strimzi 설치 확인 ]
strimzi가 올바르게 설치되었다면 strimzi와 관련된 pod가 생성되어 있습니다.
pod 목록을 확인하여 strimzi가 올바르게 실행되고 있는지 확인해주시면 됩니다.
$ kubectl get pod -n kafka # pod 상태 확인
Kafka Connect 배포
kafka connect를 배포하기 전 해야 되는 것이 한 가지 있습니다.
kafka connect는 여러 데이터베이스들을 연결하여 다양한 connector들을 만들 수 있지만, 기본적으로 kafka connect의 기본 이미지에는 데이터베이스와 관련된 설정 정보들이 담겨있지 않습니다.
그래서 kafka connect를 설치할 때 내가 사용할 connector의 종류에 맞는 데이터베이스 driver가 함께 관리되어야 합니다.
connector에 사용되는 driver를 관리하는 방법은 다양하게 있지만, 저는 confluent에서 공식적으로 제공해 주는 것을 활용합니다.
[ 1. confluent debezium mysql connector 설치 ]
위에서 얘기한 것처럼 저 같은 경우 kafka connect와 함께 debezium 기반의 mysql connector를 사용할 예정입니다.
그래서 confluent 공식 문서에서 제공해주는 mysql connector를 설치하여 사용해 보겠습니다.
먼저 Confluent 공식 문서에 접속합니다.
Debezium MySQL CDC Source Connector
Discover 200+ expert-built Apache Kafka connectors for seamless, real-time data streaming and integration. Connect with MongoDB, AWS S3, Snowflake, and more.
www.confluent.io
그리고 self hosted 기반으로 사용되기 때문에 self-hosted의 download 버튼을 클릭합니다.

설치된 파일은 zip을 풀고 lib에 있는 jar만 사용됩니다.
"{kafka connect 관리 폴더 경로}/plugins" 하위 경로에 debezium-connector-mysql 폴더를 생성하고, 해당 폴더에 lib에 담겨 있던 jar를 모두 옮겨줍니다.
[ 2. mysql connector 연결된 커스텀 kafka connect 이미지 생성 ]
kafka connect에 사용될 mysql connector를 다운로드하였다면 kafka connect 기본 이미지를 활용하여 mysql connector plugin인이 설정된 커스텀 kafka connect 이미지를 생성해야 합니다.
커스텀 이미지 파일을 생성하기 위해 다음과 같이 {kafka connect 관리 폴더 경로}에 Dockerfile을 정의합니다.
// Dockerfile
FROM quay.io/strimzi/kafka:0.47.0-kafka-4.0.0 # strimzi 제공 kafka 기본 이미지
USER root:root # copy를 위하여 root 사용자로 변경
COPY ./plugins/ /opt/kafka/plugins/ # local에 있는 plugins 폴더를 복사
USER 1001 # copy 이후 root가 아닌 사용자로 변경
그리고 다음의 순서를 통해 이미지를 생성하고 개별적으로 사용하는 docker registry에 이미지를 올려줍니다.
여기서 저는 docker desktop의 my hub를 사용합니다.
그래서 저와 동일하게 docker desktop으로 하시는 분들은 registry 주소를 다음 위치에서 확인된 ID 값을 사용하시면 됩니다.

$ docker build -t kafka-connect . # Dockerfile을 이용하여 kafka-connect image 생성
$ docker tag kafka-connect {registry 주소}/kafka-connect:latest # image 태깅
$ docker push {registry 주소}/kafka-connect:latest # image 업로드
[ 3. kafka connect 배포 ]
kafka connect를 클러스터에 배포하기 위해 yaml 파일을 정의합니다.
strimzi를 사용하기 때문에 활용할 수 있는 커스텀 리소스 정보를 이용하여 다음과 같이 필요한 정보들을 설정할 수 있습니다.
// kafka-connect-deployment.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
namespace: kafka # namespace를 사용하는 경우 설정
annotations:
strimzi.io/use-connector-resources: "true" # connector resource 사용 설정
spec:
replicas: 1
image: {registry 주소}/kafka-connect:latest # 커스텀 kafka connect image 설정
bootstrapServers: kafka.kafka.svc.cluster.local:9092 # operator에 관리되지 않는 외부 kafka를 사용하는 경우 broker server 주소 설정
config:
group.id: kafka-connect-group # kafka connect group id (여러 kafka connect 인스턴스의 클러스터링 처리를 위함)
config.storage.topic: connect-configs # kafka connect 설정 저장 topic 이름
config.storage.replication.factor: 3 # kafka connect 설정 저장 복제본 개수
offset.storage.topic: connect-offsets # kafka connect offset 저장 topic 이름
offset.storage.replication.factor: 3 # kafka connect offset 저장 복제본 개수
status.storage.topic: connect-status # kafka connect 상태 저장 topic 이름
status.storage.replication.factor: 3 # kafka connect 상태 저장 복제본 개수
key.converter: org.apache.kafka.connect.json.JsonConverter # kafka connect message key 변환 포맷
value.converter: org.apache.kafka.connect.json.JsonConverter # kafka connect message value 변환 포맷
internal.key.converter: org.apache.kafka.connect.json.JsonConverter # kafka connect 관리용 topic에 저장되는 message key 변환 포맷
internal.value.converter: org.apache.kafka.connect.json.JsonConverter # kafka connect 관리용 topic에 저장되는 message value 변환 포맷
key.converter.schemas.enable: "false" # key converter에 schema 정보 포함 여부
value.converter.schemas.enable: "false" # value converter에 schema 정보 포함 여부
plugin.path: /opt/kafka/plugins # connector plugin 위치하는 경로
yaml 파일 정의가 되었다면 다음 명령어를 통해 클러스터에 배포합니다.
배포가 정상적으로 된 것인지는 클러스터의 pod 상태 정보를 조회하여 확인해 볼 수 있습니다.
$ kubectl apply -f kafka-connect-deployment.yaml
MySQL Connector 배포
[ 1. 연결될 mysql 상태 확인 ]
kafka connect까지 배포가 완료되었다면 mysql connector를 배포해야 합니다.
다만, mysql connector를 배포하기 위해서는 mysql과 통신할 계정 정보와 database, table들이 필요합니다.
저 같은 경우는 다음과 같은 mysql 환경을 기반으로 설정하려고 합니다.
- mysql 계정 > root (root가 아닌 경우 connector 처리를 위한 권한 부여가 되어 있어야 함)
- mysql database 명 > connector
- mysql table 명 > connector_table
[ 2. mysql connector 배포 ]
mysql connector를 클러스터에 배포하기 위해 yaml 파일을 정의합니다.
strimzi를 사용하지 않는다면 yaml 파일 안에 api를 호출하는 정보를 넣거나 또는 별도의 yaml 파일 정의 없이 api 자체를 curl 기반으로 직접 호출하는 방법을 사용합니다.
하지만 strimzi를 사용하기 때문에 kafka connect와 동일하게 커스텀 리소스 정보를 활용하여 다음과 같이 작성할 수 있습니다.
// mysql-connector-deployment.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-connector
namespace: kafka # namespace를 사용하는 경우 설정
labels:
strimzi.io/cluster: kafka-connect # connector를 등록할 kafka connect
spec:
class: io.debezium.connector.mysql.MySqlConnector # 사용될 connector class
tasksMax: 1 # connector가 실행되는 task 설정 (병렬 처리 필요한 경우 값 증가)
config:
database.hostname: host.docker.internal # database 호스트 이름
database.port: 3306 # database port
database.user: root # database 계정 id
database.password: root # database 계정 password
database.serverTimeZone: UTC # database timezone
database.include.list: connector # connector에 등록될 대상이 되는 database 목록 (여러 개인 경우 ,로 구분)
database.server.id: 00001 # connector 인스턴스마다 고유한 replication별 id
topic.prefix: mysql # database의 변경 사항이 발생되는 경우 생성되는 topic 이름 prefix
table.include.list: connector.connector_table # connector에 등록될 대상이 되는 table 목록 (여러 개인 경우 ,로 구분)
include.schema.changes: true # schema 변경 사항 포함 여부
schema.history.internal.kafka.bootstrap.servers: http://kafka.kafka.svc.cluster.local:9092 # schema 변경 사항 저장되는 kafka broker 주소
schema.history.internal.kafka.topic: connector-schema-history # schema 변경 사항 저장되는 topic 이름
snapshot.mode: initial # connector가 처음 실행될 때 전체 데이터 snapshot 모드
yaml 파일 정의가 되었다면 다음 명령어를 통해 클러스터에 배포합니다.
$ kubectl apply -f mysql-connector-deployment.yaml
배포가 정상적으로 된 것인지는 클러스터의 kafka connector 상태 정보를 조회하여 확인해 볼 수 있습니다.
$ kubectl get kafkaconnector -n kafka # kafka connector 상태 확인
테이블 변경 사항 감지 확인
테이블 변경 사항이 발생되었을 때 설정한 topic에 변경 사항이 올바르게 전달되는지 확인해 보겠습니다.
확인하는 방법은 여러 가지가 있겠지만, kafka container 내부에서 topic list와 topic 정보를 consuming 해보겠습니다.
저와 동일한 과정을 통해 해 오신 분들은 다음 순서대로 진행해 주시면 됩니다.
$ docker ps # 동작 중인 docker container 목록 확인
$ docker exec -it {kafka container 명} /bin/bash # kafka container bash 접속
$ cd /opt/bitnami/kafka/bin # kafka shell script가 담겨 있는 bin 폴더로 이동
$ kafka-topics.sh --bootstrap-server localhost:9092 --list # topic list 확인
만약 위와 같이 kafka container 내부로 들어가서 topic list를 확인해 보면 다음과 같이 kafka connect를 배포할 때 설정했던 connect 관리용 topic과 mysql connector를 배포할 때 설정했던 topic들이 생성되어 있는 것을 확인할 수 있습니다.

다음으로는 connector에 등록된 테이블에 데이터 insert를 해보겠습니다.
데이터 insert를 하면 생성된 데이터를 감지하여 다음과 같이 테이블에 매핑되는 topic이 자동으로 생성된 것을 확인할 수 있습니다.

topic에서 관리되는 메시지를 소비하기 위해 다음과 같이 consumer 기반의 shell script를 실행 해볼 수 있습니다.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql.connector.connector_table --from-beginning --property print.key=true --property key.separator=:
그러면 다음과 같이 topic에 저장되어 있는 메세지를 확인해볼 수 있습니다.
{
"no": 1, // key
} : {
"before": null,
"after": {
"no": 1,
"name": "connector_name",
"description": "connector_description"
}, // insert 처리된 데이터
"source": {
"version": "3.0.8.Final",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1755016617000,
"snapshot ": "false",
"db": "connector",
"sequence": null,
"ts_us": 1755016617000000,
"ts_ns": 1755016617000000000,
"table": "connector_table",
"server_id": 1,
"gtid": null,
"file": "000192",
"pos": 3106,
"row": 0,
"thread": 878,
"query": null
},
"transaction": null,
"op": "c", // operator 정보 (insert 처리는 c)
"ts_ms": 1755016617845,
"ts_us": 1755016617845401,
"ts_ns": 1755016617845401176
}
또한 삭제나 변경에 대해서도 before / after / op 정보 등을 통해서 어떤 값이 어떻게 변경이 이루어졌는지 확인할 수 있습니다.
이상으로 docker desktop 환경에서 strimzi 기반 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 간단하게 알아보는 시간이었습니다.
읽어주셔서 감사합니다.
'Infra, Cloud > Kafka' 카테고리의 다른 글
| Kafka Connect, Debezium으로 MySQL CDC 구축, Strimzi 없이 Docker Desktop 로컬 실습 (1) | 2025.08.04 |
|---|---|
| MSA 환경에서 필요한 CDC 개념 이해, Kafka Connector와 Debezium으로 데이터 동기화 구조 살펴보기 (1) | 2025.07.27 |
댓글