前提条件
安装kafka connect(debezium)
架构
获取所有connector-plugins
1 | curl http://localhost:3083/connector-plugins | jq |
type: source
:源数据插件
type: sink
:目标数据插件默认内置插件:
1
2
3
4
5
6
7
8
9
10
11
12
13 [kafka@kafka-connect-769747b4bb-vqkld connect]$ ls -l
total 52
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-db2
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-ibmi
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-informix
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-jdbc
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-mongodb
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-mysql
drwxr-xr-x 2 kafka kafka 8192 Jul 26 04:26 debezium-connector-oracle
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-postgres
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-spanner
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-sqlserver
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-vitess源数据插件使用:
io.debezium.connector.mysql.MySqlConnector
debezium镜像默认只有一个jdbc的目标数据插件:
io.debezium.connector.jdbc.JdbcSinkConnector
1 curl http://localhost:3083/connector-plugins | jq | grep sink -C 1
创建source connector(源连接器)
1 | { |
参数地址:https://debezium.io/documentation/reference/stable/connectors/mysql.html
1 | curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:3083/connectors -d @source-mysql-connector.json |
创建sink connector(目标连接器)
1 | { |
参数地址:https://debezium.io/documentation/reference/stable/connectors/jdbc.html
1 | curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:3083/connectors -d @sink-mysql-connector.json |
测试
源数据库,新增数据,并删除一部分
目标数据库查看
数据库表名为kafka主题名,
.
改为_