方式一:通过Debezium UI 创建connector

方式二:通过connect REST API 创建connector(curl方式)

查看所有connector

curl -H "Accept:application/json" localhost:8083/connectors/

创建connector

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "password", "database.server.id": "184054", "topic.prefix": "mysql", "database.include.list": "testdb", "schema.history.internal.kafka.bootstrap.servers": "kafka.kafka.svc.cluster.local:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'

查看创建的test-connector

curl -H "Accept:application/json" localhost:8083/connectors/test-connector

test-connector:新建的connector名称

删除test-connector

curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/test-connector

方式三:通过connect REST API 创建connector(post工具方式)

查看所有 connector

创建connector

{
	"name": "test-connector",
	"config": {
		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
		"tasks.max": "1",  
        "database.hostname": "mysql",  
        "database.port": "3306",
        "database.user": "root",
        "database.password": "password",
        "database.server.id": "184054",
        "topic.prefix": "mysql",
        "database.include.list": "testdb",
        "table.include.list": "testdb.test_tables",
        "schema.history.internal.kafka.bootstrap.servers": "kafka.kafka.svc.cluster.local:9092",  
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

查看创建的test-connector

删除connector

注意事项

以上三种方式创建connector时,connect容器日志中会输出以下内容,验证新增的connector是否有效

获取所有connect plugins

curl http://localhost:8083/connector-plugins

查看表更新

# 启动一个pod watcher监控数据表增删改,只作用database.include.list定义的数据库 
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql
​
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2  --restart=Never watcher -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql

查看数据更新

# 启动一个pod watcher2监控user表数据增删改
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher2 -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql.testdb.user
​
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2  --restart=Never watcher2 -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql.testdb.user

如果提示watcher或watcher2 Already exists,需要执行下面命令删除pod

kubectl delete pod -n kafka watcher --force
kubectl delete pod -n kafka watcher2 --force