본문 바로가기
Infra, Cloud/Kafka

Kafka Connect, Debezium으로 MySQL CDC 구축, Strimzi 없이 Docker Desktop 로컬 실습

by J4J 2025. 8. 4.
300x250
반응형

안녕하세요. J4J입니다.

 

이번 포스팅은 docker desktop 환경에서 strimzi 없이 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 적어보는 시간을 가져보려고 합니다.

 

 

 

관련 글

 

Helm 이란? Helm 입문을 위한 기본 개념 설명

 

Helm 이란? Helm 입문을 위한 기본 개념 설명

안녕하세요. J4J입니다. 이번 포스팅은 helm 입문을 위한 기본 개념 설명하는 시간을 가져보려고 합니다. Helm 이란? helm이라고 하는 것은 kubernetes를 사용하는 환경에서 kubernetes cluster가 관리하는 영

jforj.tistory.com

 

Window에서 Chocolately로 Helm 설치하기 & Helm 명령어 정리

 

Window에서 Chocolately로 Helm 설치하기 & Helm 명령어 정리

안녕하세요. J4J입니다. 이번 포스팅은 window에서 chocolately로 helm 설치하는 방법과 사용할 수 있는 명령어에 대해 적어보는 시간을 가져보려고 합니다. 관련 글 Helm 이란? Helm 입문을 위한 기본 개

jforj.tistory.com

 

MSA 환경에서 필요한 CDC 개념 이해: Kafka Connector와 Debezium으로 데이터 동기화 구조 살펴보기

 

MSA 환경에서 필요한 CDC 개념 이해: Kafka Connector와 Debezium으로 데이터 동기화 구조 살펴보기

안녕하세요. J4J입니다. 이번 포스팅은 msa 환경에서 필요한 cdc 개념 이해와 kafka connector, debezium에 대해 이해하는 시간을 가져보려고 합니다. CDC 소개 cdc라고 하는 것은 change data capture의 약자로,

jforj.tistory.com

 

Kafka Connect, Debezium으로 MySQL CDC 구축: Strimzi 기반 Docker Desktop 로컬 실습

 

Kafka Connect, Debezium으로 MySQL CDC 구축: Strimzi 기반 Docker Desktop 로컬 실습

안녕하세요. J4J입니다. 이번 포스팅은 docker desktop 환경에서 strimzi 기반 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 적어보는 시간을 가져보려고 합니다. 관련 글 Helm 이란? Helm 입문을 위

jforj.tistory.com

 

 

반응형

 

 

Kafka 설치

 

이번 글에서 cdc 구축 테스트를 위해 다음과 같은 환경들을 사용하고자 합니다.

 

  • k8s cluster >> docker desktop 활용
  • kafka >> helm을 이용하여 cluster 내부 설치
  • mysql >> 로컬에 설치되어 있는 mysql
  • strimzi >> 미 사용

 

 

 

strimzi 같은 경우는 클러스터 내부에서 kafka와 관련된 리소스들의 배포/관리를 효율적으로 할 수 있도록 도와주는 도구인데 단순 비교를 하기 위해 이번 글에서는 strimzi를 사용하지 않습니다.

 

다만, 많은 곳에서 운영을 할 때 strimzi를 이용한 설정을 많이 하기 때문에 strimzi 설정이 이루어진 것을 테스트해보고 싶으신 분은 다음 글을 참고해 주시면 좋을 것 같습니다.

 

Kafka Connect, Debezium으로 MySQL CDC 구축: Strimzi 기반 Docker Desktop 로컬 실습

안녕하세요. J4J입니다. 이번 포스팅은 docker desktop 환경에서 strimzi 기반 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 적어보는 시간을 가져보려고 합니다. 관련 글 Helm 이란? Helm 입문을 위

jforj.tistory.com

 

 

 

그리고 kafka 같은 경우 보통은 이미 설치되어 있을 것으로 생각되지만, 이번 글에서는 helm을 이용하여 cluster 내부에 설치하려고 합니다.

 

설치되어 있으신 분들은 해당 파트는 스킵하시고 다음 파트부터 보시면 됩니다.

 

 

 

[ 1. 테스트를 위한 namespace 구성 ]

 

논리적으로 환경을 구분하기 위해 별도의 namespace를 생성하려고 합니다.

 

필수적인 설정은 아니기에 필요하지 않다고 생각하시는 분들은 스킵하셔도 됩니다.

 

$ kubectl create namespace kafka

 

 

 

[ 2. kafka values 구성 ]

 

kafka 설치를 위한 설정 정보에 사용하기 위한 values를 다음과 같이 구성했습니다.

 

사용되는 kafka 환경에 따라 다르겠지만, 실습을 위해 기본적인 것들만 설정해봤습니다.

 

// kafka-values.yaml

listeners:
  client:
    protocol: PLAINTEXT # 일반 tcp로 통신

extraEnvVars:
  - name: KAFKA_CFG_LISTENERS # kafka broker가 내부에서 접속되는 방식 설정
    value: PLAINTEXT://:9092
  - name: KAFKA_CFG_ADVERTISED_LISTENERS # kafka 외부에 kafka broker 접속 방식 설정
    value: PLAINTEXT://localhost:9092

# localhost container에서 kafka 접속을 위한 설정
externalAccess:
  enabled: true
  controller:
    service:
      loadBalancerNames: ["localhost", "localhost", "localhost"]

 

 

 

[ 3. bitnami repo 추가 ]

 

helm에서 bitnami를 이용하여 kafka를 설치하기 위해 repository를 다음과 같이 추가합니다.

 

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update

 

 

 

[ 4. kafka 설치 ]

 

설치하기 전 먼저 위에서 작성한 kafka-values.yaml 파일이 있는 곳으로 명령어를 통해 이동해 줍니다.

 

그리고 다음과 같이 명령어를 입력하여 yaml 파일이 적용된 kafka를 설치해 줍니다.

 

$ helm install kafka bitnami/kafka -n kafka -f kafka-values.yaml

 

 

 

[ 5. kafka 설치 확인 ]

 

kafka가 올바르게 설치되었다면 다음과 같이 pod와 svc가 생성되어 있을 것입니다.

 

올바르게 동작이 되고 있는지 다음과 같이 확인해 보시면 됩니다.

 

$ kubectl get pod -n kafka # pod 상태 확인
$ kubectl get svc -n kafka # svc 상태 확인

 

 

 

 

Kafka Connect 배포

 

kafka connect를 배포하기 전 해야 되는 것이 한 가지 있습니다.

 

kafka connect는 여러 데이터베이스들을 연결하여 다양한 connector들을 만들 수 있지만, 기본적으로 kafka connect의 기본 이미지에는 데이터베이스와 관련된 설정 정보들이 담겨있지 않습니다.

 

그래서 kafka connect를 설치할 때 내가 사용할 connector의 종류에 맞는 데이터베이스 driver가 함께 관리되어야 합니다.

 

connector에 사용되는 driver를 관리하는 방법은 다양하게 있지만, 저는 confluent에서 공식적으로 제공해 주는 것을 활용합니다.

 

 

 

[ 1. confluent debezium mysql connector 설치 ]

 

위에서 얘기한 것처럼 저 같은 경우 kafka connect와 함께 debezium 기반의 mysql connector를 사용할 예정입니다.

 

그래서 confluent 공식 문서에서 제공해주는 mysql connector를 설치하여 사용해 보겠습니다.

 

먼저 Confluent 공식 문서에 접속합니다.

 

Debezium MySQL CDC Source Connector

Discover 200+ expert-built Apache Kafka connectors for seamless, real-time data streaming and integration. Connect with MongoDB, AWS S3, Snowflake, and more.

www.confluent.io

 

 

 

그리고 self hosted 기반으로 사용되기 때문에 self-hosted의 download 버튼을 클릭합니다.

 

confluent debezium mysql connector download

 

 

 

설치된 파일은 zip을 풀고 lib에 있는 jar만 사용됩니다.

 

"{kafka connect 관리 폴더 경로}/plugins" 하위 경로에 debezium-connector-mysql 폴더를 생성하고, 해당 폴더에 lib에 담겨 있던 jar를 모두 옮겨줍니다.

 

 

 

[ 2. mysql connector 연결된 커스텀 kafka connect 이미지 생성 ]

 

kafka connect에 사용될 mysql connector를 다운로드하였다면 kafka connect 기본 이미지를 활용하여 mysql connector plugin인이 설정된 커스텀 kafka connect 이미지를 생성해야 합니다.

 

커스텀 이미지 파일을 생성하기 위해 다음과 같이 {kafka connect 관리 폴더 경로}에 Dockerfile을 정의합니다.

 

// Dockerfile

FROM confluentinc/cp-kafka-connect:7.9.1 # kafka connect 기본 이미지
COPY ./plugins/ /opt/kafka/plugins/ # local에 있는 plugins 폴더를 복사

 

 

 

 

그리고 다음의 순서를 통해 이미지를 생성하고 개별적으로 사용하는 docker registry에 이미지를 올려줍니다.

 

여기서 저는 docker desktop의 my hub를 사용합니다.

 

그래서 저와 동일하게 docker desktop으로 하시는 분들은 registry 주소를 다음 위치에서 확인된 ID 값을 사용하시면 됩니다.

 

docker desktop my hub id

 

$ docker build -t kafka-connect . # Dockerfile을 이용하여 kafka-connect image 생성
$ docker tag kafka-connect {registry 주소}/kafka-connect:latest # image 태깅
$ docker push {registry 주소}/kafka-connect:latest # image 업로드

 

 

 

[ 3. kafka connect 배포 ]

 

kafka connect를 k8s에 배포하기 위해 yaml 파일을 정의합니다.

 

이곳에서는 strimzi를 사용하지 않기 때문에 svc/deployment 기반으로 kafka connect 설정을 진행합니다.

 

// kafka-connect-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-connect
  namespace: kafka # namespace를 사용하는 경우 설정
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-connect
  template:
    metadata:
      labels:
        app: kafka-connect
    spec:
      containers:
        - name: kafka-connect
          image: {registry 주소}/kafka-connect:latest # 커스텀 kafka connect image 설정
          ports:
            - containerPort: 8083
          env:
            - name: CONNECT_BOOTSTRAP_SERVERS # kafka broker 주소
              value: "kafka.kafka.svc.cluster.local:9092"
            - name: CONNECT_REST_PORT # kafka connect rest api port
              value: "8083"
            - name: CONNECT_GROUP_ID # kafka connect group id (여러 kafka connect 인스턴스의 클러스터링 처리를 위함)
              value: "kafka-connect-group"
            - name: CONNECT_CONFIG_STORAGE_TOPIC # kafka connect 설정 저장 topic 이름
              value: "connect-configs"
            - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR # kafka connect 설정 저장 복제본 개수
              value: "3"
            - name: CONNECT_OFFSET_STORAGE_TOPIC # kafka connect offset 저장 topic 이름
              value: "connect-offsets"
            - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR # kafka connect offset 저장 복제본 개수
              value: "3"
            - name: CONNECT_STATUS_STORAGE_TOPIC # kafka connect 상태 저장 topic 이름
              value: "connect-status"
            - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR # kafka connect 상태 저장 복제본 개수
              value: "3"
            - name: CONNECT_KEY_CONVERTER # kafka connect message key 변환 포맷
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_VALUE_CONVERTER # kafka connect message value 변환 포맷
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_INTERNAL_KEY_CONVERTER # kafka connect 관리용 topic에 저장되는 message key 변환 포맷
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_INTERNAL_VALUE_CONVERTER # kafka connect 관리용 topic에 저장되는 message value 변환 포맷
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE # key converter에 schema 정보 포함 여부
              value: "false"
            - name: CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE # value converter에 schema 정보 포함 여부
              value: "false"
            - name: CONNECT_PLUGIN_PATH # connector plugin 위치하는 경로
              value: "/opt/kafka/plugins"
            - name: CONNECT_REST_ADVERTISED_HOST_NAME # 외부에 노출되는 kafka connect 호스트 이름
              value: "kafka-connect"

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-connect
  namespace: kafka # namespace를 사용하는 경우 설정
spec:
  selector:
    app: kafka-connect
  ports:
    - name: http
      port: 8083
      targetPort: 8083

 

 

 

yaml 파일 정의가 되었다면 다음 명령어를 통해 k8s에 배포합니다.

 

배포가 정상적으로 된 것인지는 k8s의 pod 상태 정보를 조회하여 확인해 볼 수 있습니다.

 

$ kubectl apply -f kafka-connect-deployment.yaml

 

 

 

 

MySQL Connector 배포

 

[ 1. 연결될 mysql 상태 확인 ]

 

kafka connect까지 배포가 완료되었다면 mysql connector를 배포해야 합니다.

 

다만, mysql connector를 배포하기 위해서는 mysql과 통신할 계정 정보와 database, table들이 필요합니다.

 

저 같은 경우는 다음과 같은 mysql 환경을 기반으로 설정하려고 합니다.

 

  • mysql 계정 > root (root가 아닌 경우 connector 처리를 위한 권한 부여가 되어 있어야 함)
  • mysql database 명 > connector
  • mysql table 명  > connector_table

 

 

 

[ 2. mysql connector 배포 ]

 

mysql connector를 k8s에 배포하기 위해 yaml 파일을 정의합니다.

 

보통 strimzi를 사용하지 않고 빠르게 배포하는 방법은 curl 명령어를 통해 connector 등록을 위한 api를 호출하는 것입니다.

 

curl이 편한 경우 curl로만 사용해도 되지만, 저는 job을 등록하여 mysql connector를 배포하려고 합니다.

 

// mysql-connector-job.yaml
// 실제 사용할 때는 args 하위에 있는 # 기반 주석을 모두 제거해주세요 !!!!!

apiVersion: batch/v1
kind: Job
metadata:
  name: mysql-connector
  namespace: kafka # namespace를 사용하는 경우 설정
spec:
  backoffLimit: 0 # job 실행 실패 시 재시작 횟수 설정
  template:
    spec:
      restartPolicy: Never # 자동 재 시작 설정
      containers:
        - name: mysql-connector
          image: curlimages/curl:latest
          command: [ "sh", "-c" ]
          args:
            - |
              curl -X POST http://kafka-connect.kafka.svc.cluster.local:8083/connectors \ # kafka connect 경로
                -H "Content-Type: application/json" \
                -d '{
                  "name": "mysql-connector",
                  "config": {
                    "connector.class": "io.debezium.connector.mysql.MySqlConnector", # 사용될 connector class
                    "database.hostname": "host.docker.internal", # database 호스트 이름
                    "database.port": "3306", # database port
                    "database.user": "root", # database 계정 id
                    "database.password": {password}, # database 계정 password
                    "database.serverTimeZone": "UTC", # database timezone
                    "database.include.list": "connector", # connector에 등록될 대상이 되는 database 목록 (여러 개인 경우 ,로 구분)
                    "database.server.id": "00001", # 서버 식별 id
                    "topic.prefix": "mysql", # database의 변경 사항이 발생되는 경우 생성되는 topic 이름 prefix
                    "table.include.list": "connector.connector_table", # connector에 등록될 대상이 되는 table 목록 (여러 개인 경우 ,로 구분)
                    "include.schema.changes": "true", # schema 변경 사항 포함 여부
                    "schema.history.internal.kafka.bootstrap.servers": "http://kafka.kafka.svc.cluster.local:9092", # schema 변경 사항 저장되는 kafka broker 주소
                    "schema.history.internal.kafka.topic": "connector-schema-history", # schema 변경 사항 저장되는 topic 이름
                    "snapshot.mode": "initial" # connector가 처음 실행될 때 전체 데이터 snapshot 모드
                  }
                }'

 

 

 

yaml 파일 정의가 되었다면 다음 명령어를 통해 k8s에 배포합니다.

 

배포가 정상적으로 된 것인지는 k8s의 job 및 pod 상태 정보를 조회하여 확인해 볼 수 있습니다.

 

$ kubectl apply -f mysql-connector-job.yaml

 

 

 

 

테이블 변경 사항 감지 확인

 

테이블 변경 사항이 발생되었을 때 설정한 topic에 변경 사항이 올바르게 전달되는지 확인해 보겠습니다.

 

확인하는 방법은 여러 가지가 있겠지만, kafka container 내부에서 topic list와 topic 정보를 consuming 해보겠습니다.

 

 

 

저와 동일한 과정을 통해 해 오신 분들은 다음 순서대로 진행해 주시면 됩니다.

 

$ docker ps # 동작 중인 docker container 목록 확인
$ docker exec -it {kafka container 명} /bin/bash # kafka container bash 접속
$ cd /opt/bitnami/kafka/bin # kafka shell script가 담겨 있는 bin 폴더로 이동
$ kafka-topics.sh --bootstrap-server localhost:9092 --list # topic list 확인

 

 

 

만약 위와 같이 kafka container 내부로 들어가서 topic list를 확인해 보면 다음과 같이 kafka connect를 배포할 때 설정했던 connect 관리용 topic과 mysql connector를 배포할 때 설정했던 topic들이 생성되어 있는 것을 확인할 수 있습니다.

 

connect / connector 배포 topic

 

 

 

 

다음으로는 connector에 등록된 테이블에 데이터 insert를 해보겠습니다.

 

데이터 insert를 하면 생성된 데이터를 감지하여 다음과 같이 테이블에 매핑되는 topic이 자동으로 생성된 것을 확인할 수 있습니다.

 

테이블 cdc topic 생성

 

 

 

topic에서 관리되는 메시지를 소비하기 위해 다음과 같이 consumer 기반의 shell script를 실행 해볼 수 있습니다.

 

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql.connector.connector_table --from-beginning --property print.key=true --property key.separator=:

 

 

 

그러면 다음과 같이 topic에 저장되어 있는 메세지를 확인해볼 수 있습니다.

 

{
  "no": 2, // key
} : {
  "before": null,
  "after": {
    "no": 2,
    "name": "connector_name",
    "description": "connector_description"
  }, // insert 처리된 데이터
  "source": {
    "version": "3.0.8.Final",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1754323457000,
    "snapshot ": "false",
    "db": "connector",
    "sequence": null,
    "ts_us": 1754323457000000,
    "ts_ns": 1754323457000000000,
    "table": "connector_table",
    "server_id": 1,
    "gtid": null,
    "file": "000192",
    "pos": 2406,
    "row": 0,
    "thread": 24,
    "query": null
  },
  "transaction": null,
  "op": "c", // operator 정보 (insert 처리는 c)
  "ts_ms": 1754323457768,
  "ts_us": 1754323457768132,
  "ts_ns": 1754323457768132522
}

 

 

 

또한 삭제나 변경에 대해서도 before / after / op 정보 등을 통해서 어떤 값이 어떻게 변경이 이루어졌는지 확인할 수 있습니다.

 

 

 

 

 

 

 

이상으로 docker desktop 환경에서 strimzi 없이 kafka connect, debezium으로 mysql cdc 구축하는 방법에 대해 간단하게 알아보는 시간이었습니다.

 

읽어주셔서 감사합니다.

 

 

 

 

728x90
반응형

댓글