方式一:通过Debezium UI 创建connector
方式二:通过connect REST API 创建connector(curl方式)
查看所有connector
1 | curl -H "Accept:application/json" localhost:8083/connectors/ |
创建connector
1 | curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "password", "database.server.id": "184054", "topic.prefix": "mysql", "database.include.list": "testdb", "schema.history.internal.kafka.bootstrap.servers": "kafka.kafka.svc.cluster.local:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }' |
查看创建的test-connector
1 | curl -H "Accept:application/json" localhost:8083/connectors/test-connector |
test-connector:新建的connector名称
删除test-connector
1 | curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/test-connector |
方式三:通过connect REST API 创建connector(post工具方式)
查看所有 connector
创建connector
1 | { |
查看创建的test-connector
删除connector
注意事项
以上三种方式创建connector时,connect容器日志中会输出以下内容,验证新增的connector是否有效
获取所有connect plugins
1 | curl http://localhost:8083/connector-plugins |
查看表更新
1 | # 启动一个pod watcher监控数据表增删改,只作用database.include.list定义的数据库 |
查看数据更新
1 | # 启动一个pod watcher2监控user表数据增删改 |
如果提示watcher或watcher2 Already exists,需要执行下面命令删除pod
1 | kubectl delete pod -n kafka watcher --force |