Delta数据湖原来是强绑定于Spark引擎,而近期社区实现了使用Flink引擎将数据入湖,简单写个demo使用以下。
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:主机域名、雅安服务器托管、营销软件、网站建设、连云网站维护、网站推广。设置下checkpoint的时间大小
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
2.2 构建MysqlSouce使用flink-cdc-mysql依赖中的方法,输入ip,表名等直接构建
MySqlSourcesource = MySqlSource
.builder()
.hostname("ip")
.port(3306)
.databaseList("database")
.tableList("database.table")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
2.3 Mysql表的Schema转变成Flink-RowType使用flink将数据入湖时,需要将source的Schema转换成Flink的RowType
通过RowType.RowField实现,这里定义三个字段的RowType
public static RowType getMysqlRowType(){return new RowType(Arrays.asList(
new RowType.RowField("id", new BigIntType()),
new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("dept_id",new IntType())
));
}
2.4 构建Sink使用delta-flink依赖中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink
public static org.apache.hadoop.conf.Configuration getHadoopConf() {org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("parquet.compression", "SNAPPY");
return conf;
}
public static DeltaSinkcreateDeltaSink(String deltaTablePath, RowType rowType) {return DeltaSink
.forRowData(
new Path(deltaTablePath),
getHadoopConf(),
rowType).build();
}
2.5 String转为RowDataSource端使用String类型,Sink端使用RowData类型,所以需要使用Map函数进行一次转换。
使用fastJson获取每个字段的值,然后变成Flink row类型,最后使用convertor转换为RowData
//存在于flink-table-runtime-blink_2.12依赖中
public static final DataFormatConverters.DataFormatConverterMYSQL_CONVERTER =
DataFormatConverters.getConverterForDataType(
TypeConversions.fromLogicalToDataType(getMysqlRowType())
);
public static RowData mysqlJsonToRowData(String line){String body = JSON.parseObject(line).getString("after");
Long id = JSON.parseObject(body).getLong("id");
String name = JSON.parseObject(body).getString("name");
Integer deptId = JSON.parseObject(body).getInteger("dept_id");
Row row = Row.of(id,name,deptId);
return MYSQL_CONVERTER.toInternal(row);
}
2.6 执行依次将source,sink放入env中执行即可
env.fromSource(source, WatermarkStrategy.noWatermarks(),"demo-mysql-cdc")
.setParallelism(2)
//将json数据转为FlinkRowData
.map(FlinkDeltaUtil::mysqlJsonToRowData)
.sinkTo(deltaSink)
.setParallelism(1);
env.execute("flink-cdc-to-delta");
3. 源码仓库地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧