- 测试目标
为了实现分库分表前期的安全操作, 希望分表的数据还是能够暂时合并到原表中, 使用基于kafka connect实现, debezium做connect source, kafka-jdbc-connector-sink做sink.
实现步骤
开启binlog的MySQL
- 创建测试数据库test
1
create database test;
- 初始化表 ``` create table if not exists tx_refund_bill( id bigint unsigned auto_increment comment ‘主键’ primary key, order_id bigint not null comment ‘订单id’, bill_type tinyint not null comment ‘11’ )comment ‘退款费用明细’ charset=utf8;
CREATE TABLE test_new1 LIKE tx_refund_bill;
1 |
|
INSERT INTO tx_refund_bill (order_id, bill_type) VALUES (1,3);
update tx_refund_bill set order_id = 3 where id = 1;
select * from tx_refund_bill;
select * from test_new1;
1 |
|
confluent local start
1 |
|
查看connectors
GET http://172.17.228.163:8083/connectors
delete connnector
curl -XDELETE ‘http://172.17.228.163:8083/connectors/debezium’
创建source debezium connector
curl -H “Content-Type:application/json” -XPUT ‘http://172.17.228.163:8083/connectors/debezium/config’ -d ‘ { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “localhost”, “database.port”: “3306”, “database.user”: “root”, “database.password”: “Mo@123456”, “database.server.id”: “19991”, “database.server.name”: “test_0”, “database.whitelist”: “test”, “include.schema.changes”: “false”, “snapshot.mode”: “schema_only”, “snapshot.locking.mode”: “none”, “database.history.kafka.bootstrap.servers”: “localhost:9092”, “database.history.kafka.topic”: “dbhistory”, “decimal.handling.mode”: “string”, “table.whitelist”: “test.tx_refund_bill”, “database.history.store.only.monitored.tables.ddl”:”true”, “database.history.skip.unparseable.ddl”:”true” }’
查看source debezium connector status
GET http://172.17.228.163:8083/connectors/debezium/status
delete connnector
curl -XDELETE ‘http://172.17.228.163:8083/connectors/jdbc-sink’
创建sink jdbc connector
curl -H “Content-Type:application/json” -XPUT ‘http://172.17.228.163:8083/connectors/jdbc-sink/config’ -d ‘ { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:mysql://localhost:3306/test?nullCatalogMeansCurrent=true”, “connection.user”: “root”, “connection.password”: “Mo@123456”, “tasks.max”: “1”, “topics”: “test_0.test.tx_refund_bill”, “table.name.format”: “test_new1”,
1 |
|
查看connectors status
GET http://172.17.228.163:8083/connectors/jdbc-sink/status
```
实验
- 在tx_refund_bill表中insert数据,观察test_new1的变化
- 在tx_refund_bill表中执行update语句,观察test_new1的变化