前提条件
安装kafka connect(debezium)
架构
获取所有connector-plugins
curl http://localhost:3083/connector-plugins | jq
type: source
:源数据插件
type: sink
:目标数据插件默认内置插件:
[kafka@kafka-connect-769747b4bb-vqkld connect]$ ls -l total 52 drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-db2 drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-ibmi drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-informix drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-jdbc drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-mongodb drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-mysql drwxr-xr-x 2 kafka kafka 8192 Jul 26 04:26 debezium-connector-oracle drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-postgres drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-spanner drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-sqlserver drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-vitess
源数据插件使用:
io.debezium.connector.mysql.MySqlConnector
debezium镜像默认只有一个jdbc的目标数据插件:
io.debezium.connector.jdbc.JdbcSinkConnector
curl http://localhost:3083/connector-plugins | jq | grep sink -C 1
创建source connector(源连接器)
{
"name": "source-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
// 源数据库配置
"database.hostname": "mysql.mysql.svc.cluster.local",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
// 此数据库客户机的数字ID,必须是唯一的,连接器使用此ID将MySQL数据库集群加入为另一台服务器。
"database.server.id": "184054",
// 源连接器的kafka主题,这个主题可以获取表的增删改操作
"topic.prefix": "mysql",
// 只获取testdb库
"database.include.list": "testdb",
// 仅同步testdb数据库下的test_tables表
"table.include.list": "testdb.test_tables",
// kafka地址
"schema.history.internal.kafka.bootstrap.servers": "kafka.kafka.svc.cluster.local:9092",
// 历史记录字段,仅debezium内部使用。
"schema.history.internal.kafka.topic": "schema-changes.testdb",
// 主键字段,testdb开头的数据库,id字段
"message.key.columns": "testdb(.*):id"
// 指定连接器向 Kafka 主题发送检测信号消息的频率,默认0
"heartbeat.interval.ms": "100",
// 指定连接器是否将数据库架构发生的更改发布到具有数据库服务器 ID 名称的 Kafka 主题
"include.schema.changes": "true",
// 指定连接器是否在元数据对象上分析和发布表和列注释。
"include.schema.comments": "true",
// 快照模式,默认就是initial
"snapshot.mode": "initial",
// 通过route获取数据库名作为kafka主题名
// "transforms": "route",
// "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
// "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
// "transforms.route.replacement": "$3",
// 指定删除事件后是否跟有逻辑删除事件,默认就是true
// "tombstones.on.delete": "true"
}
}
参数地址:https://debezium.io/documentation/reference/stable/connectors/mysql.html
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:3083/connectors -d @source-mysql-connector.json
创建sink connector(目标连接器)
{
"name": "sink-mysql-connector",
"config":{
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
// 目标数据库:jdbc:mysql://mysql地址/数据库
"connection.url": "jdbc:mysql://mysqltest.mysql.svc.cluster.local:3306/test",
"connection.username": "root",
"connection.password": "password",
"tasks.max": "1",
// 源kafka主题
"topics": "mysql.testdb.test_tables",
"dialect.name": "MySqlDatabaseDialect",
// 保证删除能正确消费的参数
"debezium.sink.databend.upsert-keep-deletes": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"schema.evolution": "basic",
// 如果主键不存在,则连接器执行INSERT操作,如果键存在,则连接器执行UPDATE操作
"insert.mode": "upsert",
//
"delete.enabled": "true",
// 主键,如果"delete.enabled": "true",这个配置不能为空
"primary.key.fields": "id"
"primary.key.mode": "record_key",
"database.time_zone": "UTC",
}
}
参数地址:https://debezium.io/documentation/reference/stable/connectors/jdbc.html
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:3083/connectors -d @sink-mysql-connector.json
测试
源数据库,新增数据,并删除一部分
目标数据库查看
数据库表名为kafka主题名,
.
改为_