方式一:通过Debezium UI 创建connector
方式二:通过connect REST API 创建connector(curl方式)
查看所有connector
curl -H "Accept:application/json" localhost:8083/connectors/
创建connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "password", "database.server.id": "184054", "topic.prefix": "mysql", "database.include.list": "testdb", "schema.history.internal.kafka.bootstrap.servers": "kafka.kafka.svc.cluster.local:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'
查看创建的test-connector
curl -H "Accept:application/json" localhost:8083/connectors/test-connector
test-connector:新建的connector名称
删除test-connector
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/test-connector
方式三:通过connect REST API 创建connector(post工具方式)
查看所有 connector
创建connector
{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "184054",
"topic.prefix": "mysql",
"database.include.list": "testdb",
"table.include.list": "testdb.test_tables",
"schema.history.internal.kafka.bootstrap.servers": "kafka.kafka.svc.cluster.local:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
查看创建的test-connector
删除connector
注意事项
以上三种方式创建connector时,connect容器日志中会输出以下内容,验证新增的connector是否有效
获取所有connect plugins
curl http://localhost:8083/connector-plugins
查看表更新
# 启动一个pod watcher监控数据表增删改,只作用database.include.list定义的数据库
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2 --restart=Never watcher -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql
查看数据更新
# 启动一个pod watcher2监控user表数据增删改
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher2 -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql.testdb.user
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2 --restart=Never watcher2 -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql.testdb.user
如果提示watcher或watcher2 Already exists,需要执行下面命令删除pod
kubectl delete pod -n kafka watcher --force
kubectl delete pod -n kafka watcher2 --force