这篇文章将为大家详细讲解有关Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
涡阳网站建设公司创新互联,涡阳网站设计制作,有大型网站制作公司丰富经验。已为涡阳超过千家提供企业网站建设服务。企业网站搭建\外贸网站建设要多少钱,请找那个售后服务好的涡阳做网站的公司定做!
添加依赖项
Maven 下载:
https://maven.aliyun.com/mvn/search
2.11
1.10.0
1.1.0
org.apache.flink
flink-table-api-scala_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-api-scala-bridge_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.bin.version}
${flink.version}
org.apache.flink
flink-sql-connector-kafka-0.11_${scala.bin.version}
${flink.version}
org.apache.flink
flink-connector-hive_${scala.bin.version}
${flink.version}
org.apache.flink
flink-json
${flink.version}
org.apache.hive
hive-exec
${hive.version}
最后,找到 Hive 的配置文件 hive-site.xml,准备工作就完成了。
注册 HiveCatalog、创建数据库
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(5)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
val catalog = new HiveCatalog(
"rtdw", // catalog name
"default", // default database
"/Users/lmagic/develop", // Hive config (hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog("rtdw", catalog)
tableEnv.useCatalog("rtdw")
val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods"
tableEnv.sqlUpdate(createDbSql)
创建 Kafka 流表并指定事件时间
"eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "", "platform": "xyz", "columnType": "merchDetail", "merchandiseId": "12727495", "fromType": "wxapp", "siteId": "20392", "categoryId": "", "ts": 1585136092541
CREATE TABLE rtdw.ods.streaming_user_active_log ( eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME(), -- 处理时间 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间 WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'ng_log_par_extracted', 'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置 'connector.properties.zookeeper.connect' = 'zk109:2181,zk110:2181,zk111:2181', 'connector.properties.bootstrap.servers' = 'kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' = 'rtdw_group_test_1', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 由表schema自动推导解析JSON 'update-mode' = 'append')
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'n' TIME_UNIT
https://www.jianshu.com/p/c612e95a5028
val createTableSql = """ |上文的SQL语句 |...... """.stripMargin tableEnv.sqlUpdate(createTableSql)
开窗计算 PV、UV
SELECT eventType,TUMBLE_START(eventTime, INTERVAL '30' SECOND) AS windowStart,TUMBLE_END(eventTime, INTERVAL '30' SECOND) AS windowEnd,COUNT(userId) AS pv,COUNT(DISTINCT userId) AS uvFROM rtdw.ods.streaming_user_active_logWHERE platform = 'xyz'GROUP BY eventType, TUMBLE(eventTime, INTERVAL '30' SECOND)
SQL 文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
val queryActiveSql =
"""
|......
|......
""".stripMargin
val result = tableEnv.sqlQuery(queryActiveSql)
result
.toAppendStream[Row]
.print()
.setParallelism(1)
关于“Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。