Kafka Connect
- 특정한 Code 없이 Configuration으로 Data를 Import/Export 가능
- Standalone/Distribution Mode 지원
- RESTful API 통해 지원
- Stream 또는 Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 Plugin 제공(File, S3, Hive, MySQL, etc)
- Kafka Connect Source : 특정 시스템에서 Kafak Cluster로 데이터를 가져오는 Layer
- Kafka Connect Sink : Kafka CLuster의 데이터를 특정 시스템으로 보내는 Layer
Kafka Connect 설치
Kafka Connect 설정
- $KAFKA_HOME/config/connect-distributed.properties
Kafka Connect 실행
- ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
Kafka Connect Topic 조회
- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Kafka Connect에서 JDBC Connector를 이용한 데이터 Export를 위한 Plugin
~etc/kafka/connect-distributed.properties 파일 마지막 아래 plugin 정보 추가
- plugin.path=~/confluentinc-kafka-connect-jdbc-10.0.1/lib
- JdbcSourceConnector에서 RDB 사용을 위한 드라이버 복사
- ./shre/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar 파일 복사
docker-compose.yml
주문 생성을 위한 Kafka Sink Connector 추가
POST 127.0.0.1:8083/connectors
{
"name": "order-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://127.0.0.1:3306/order",
"connection.user": "ecommerce",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "orders"
}
}
Kafka Connect REST API
- GET /connectors : 커넥터 목록 조회
- GET /connectors/{name} : {name}을 갖는 커넥터 상세 정보 조회
- POST /connectors : 커넥터 생성 (Body = JSON Object 타입의 커넥터 config정보)
- GET /connectors/{name}/status : 이 커넥터가 running인지, failed인지 paused 인지 현재 상태 조회
- DELETE /connectors/{name} : {name}을 갖는 커넥터 삭제
- GET /connector-plugins : 카프카 커넥터 클러스터 내부에 설치된 플러그인 목록 조회
Reference URL
Kafka Connect
Kafka Connect 설치
Kafka Connect 설정
Kafka Connect 실행
Kafka Connect Topic 조회
Kafka Connect에서 JDBC Connector를 이용한 데이터 Export를 위한 Plugin
~etc/kafka/connect-distributed.properties 파일 마지막 아래 plugin 정보 추가
docker-compose.yml
주문 생성을 위한 Kafka Sink Connector 추가
POST 127.0.0.1:8083/connectors
{ "name": "order-sink-connect", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://127.0.0.1:3306/order", "connection.user": "ecommerce", "connection.password": "password", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "false", "tasks.max": "1", "topics": "orders" } }Kafka Connect REST API
Reference URL