来自哈罗单车开源的组件。支持同步PG数据到kafka或者ES。
目前创新互联已为成百上千家的企业提供了网站建设、域名、虚拟主机、网站托管、服务器托管、企业网站设计、梅州网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
https://github.com/hellobike/tunnel
tunnel整体的部署比较简单的
需要事先部署好zk和kafka(我下面演示的是单节点的zk和kafka)
节点部署关系:
192.168.2.4 部署zk、kafka、pg10运行在1921端口
192.168.2.189 部署tunnel
确保已开启PG的逻辑复制
wal_level = 'logical';
max_replication_slots = 20
注意这个设置要重启PG进程的
然后,创建测试库表和同步用的账号
CREATE DATABASE test_database;
\c test_database
create table test_1 (id int primary key , name char(40));
create table test_2 (id int primary key , name char(40));
CREATE ROLE test_rep LOGIN ENCRYPTED PASSWORD 'xxxx' REPLICATION;
GRANT CONNECT ON DATABASE test_database to test_rep;
vim pg_hba.conf增加2行配置:
host all test_rep 192.168.2.0/24 md5
host replication test_rep 192.168.2.0/24 md5
然后 reload 下PG
到192.168.2.189机器上去编译tunnel:
注意: tunnel的启动需要事先安装好oracle jdk 1.8
git clone https://github.com/hellobike/tunnel
cd tunnel
mvn clean package -Dmaven.test.skip=true
cd target
unzip AppTunnelService.zip
cd AppTunnelService
vim conf/test.yml内容如下:
tunnel_subscribe_config:
pg_dump_path: '/usr/local/pgsql-10.10/bin/pg_dump'
subscribes:
- slotName: slot_for_test
pgConnConf:
host: 192.168.2.4
port: 1921
database: test_database
user: test_rep
password: xxxx
rules:
- {table: test_1, pks: ['id'], topic: test_1_logs}
- {table: test_2, pks: ['id'], topic: test_2_logs}
kafkaConf:
addrs:
- 192.168.2.4:9092
tunnel_zookeeper_address:192.168.2.4:2181
前台启动:
java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties -p 7788 #暴露prometheus metric在7788端口(配置监控不是这里的重点,也很简单,暂时先跳过)
然后,我们再在PG10上面的test_database的2张表随便造些数据,然后可以看到kafka里面已经有数据了(下图是通过kafkamanager和 kafka-eagle的结果)。
格式化下,数据就是这样的:
UPDATE的记录的样子:
{
"dataList": [{
"dataType": "integer",
"name": "id",
"value": "1111"
}, {
"dataType": "character",
"name": "name",
"value": "大狗蛋 "
}],
"eventType": "UPDATE",
"lsn": 10503246616,
"schema": "public",
"table": "test_1"
}
DELETE的记录的样子:
{
"dataList": [{
"dataType": "integer",
"name": "id",
"value": "3"
}],
"eventType": "DELETE",
"lsn": 10503247064,
"schema": "public",
"table": "test_1"
}