本篇文章给大家分享的是有关Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
创新互联是一家专业提供硚口企业网站建设,专注与成都网站设计、成都做网站、H5高端网站建设、小程序制作等业务。10年已为硚口众多企业、政府机构等服务。创新互联专业网站制作公司优惠进行中。
小编主要对Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控的方法和架构进行了介绍,下面探讨了一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。
构建流式数据分析和监控流程
在Databricks,我们看到客户中不断涌现出许多数据处理模式,这些新模式的产生推动了可能的极限,在速度和质量问题上也不例外。为了帮助解决这一矛盾,我们开始考虑使用正确的工具,不仅可以支持所需的数据速度,还可以提供可接受的数据质量水平。Structured Streaming和Delta Lake非常适合用于数据获取和存储层,因为他们能够配合创造一个具有扩展性、容错性和类实时的系统,并且具有exactly-once处理保证。
为企业数据质量分析找到可接受的工具要困难一些,特别是这个工具需要具有对数据质量指标的状态汇总的能力。另外,还需要能够对整个数据集进行检查(例如检测出多少比例的记录为空值),这些都会随着所提取的数据量的增加而增加计算成本。这对所有流式系统而言都是需要的,这一要求就排除了很多可用的工具。
在我们最初的解决方案中,我们选择了Amazon的数据质量检测工具Deequ,因为它能提供简单而强大的API,有对数据质量指标进行状态聚合的能力,以及对Scala的支持。将来,其他Spark原生的工具将提供额外的选择。
我们通过在EC2实例上运行一个小型的Kafka producer来模拟数据流,该实例将模拟的股票交易信息写入Kafka topic,并使用原生的Databricks连接器将这些数据导入到Delta Lake表当中。为了展示Spark Streaming中数据质量检查的功能,我们选择在整个流程中实现Deequ的不同功能:
根据历史数据生成约束条件;
使用foreachBatch算子对到达的数据进行增量质量分析;
使用foreachBatch算子对到达的数据执行(较小的)单元测试,并将质量不佳的batch隔离到质量不佳记录表中;
对于每个到达的batch,将最新的状态指标写入到Delta表当中;
对整个数据集定期执行(较大的)单元测试,并在MLFlow中跟踪结果;
根据验证结果发送通知(如通过电子邮件或Slack);
捕获MLFlow中的指标以进行可视化和记录。
我们结合了MLFlow来跟踪一段时间内数据性能指标的质量、Delta表的版本迭代以及结合了一个用于通知和告警的Slack连接器。整个流程可以用如下的图片进行表示:
由于Spark中具有统一的批处理/流式处理接口,因此我们能够在这个流程的任何位置提取报告、告警和指标,作为实时更新或批处理快照。这对于设置触发器或限制特别有用,因此,如果某个指标超过了阈值,则可以执行数据质量改善措施。还要注意的是,我们并没有对初始到达的原始数据造成影响,这些数据将立即提交到我们的Delta表,这意味着我们不会限制数据输入的速率。下游系统可以直接从该表中读取数据,如果超过了上述任何触发条件或质量阈值,则可能会中断。此外,我们可以轻松地创建一个排除质量不佳记录的view以提供一个干净的表。
在一个较高的层次,执行我们的数据质量跟踪和验证的代码如下所示:
spark.readStream
.table("trades_delta")
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// reassign our current state to the previous next state
val stateStoreCurr = stateStoreNext
// run analysis on the current batch, aggregate with saved state
val metricsResult = AnalysisRunner.run(data=batchDF, ...)
// verify the validity of our current microbatch
val verificationResult = VerificationSuite()
.onData(batchDF)
.addCheck(...).run()
// if verification fails, write batch to bad records table
if (verificationResult.status != CheckStatus.Success) {...}
// write the current results into the metrics table
Metric_results.write
.format("delta")
.mode("overwrite")
.saveAsTable("deequ_metrics")
}
.start()
使用数据质量工具Deequ
在Databricks中使用Deequ是相对比较容易的事情,你需要首先定义一个analyzer,然后在dataframe上运行该analyzer。例如,我们可以跟踪Deequ本地提供的几个相关指标检查,包括检查数量和价格是否为非负数、原始IP地址是否不为空以及符号字段在所有事务中的唯一性。Deequ的StateProvider对象在流式数据配置中特别有用,它能允许用户将我们指标的状态保存在内存或磁盘中,并在以后汇总这些指标。这意味着每个处理的批次仅分析该批次中的数据记录,而不会分析整个表。即使随着数据大小的增长,这也可以使性能保持相对稳定,这在长时间运行的生产环境中很重要,因为生产环境需要在任意数量的数据上保持一致。
MLFlow还可以很好地跟踪指标随时间的演变,在我们的notebook中,我们跟踪在foreachBatch代码中分析的所有Deequ约束作为指标,并使用Delta的versionID和时间戳作为参数。在Databricks的notebook中,集成的MLFlow服务对于指标跟踪特别方便。
通过使用Structured Streaming、Delta Lake和Deequ,我们能够消除传统情况下数据质量和速度之间的权衡,而专注于实现两者的可接受水平。这里特别重要的是灵活性——不仅在如何处理不良记录(隔离、报错、告警等),而且在体系结构上(例如何时以及在何处执行检查?)和生态上(如何使用我们的数据?)。开源技术(如Delta Lake、Structured Streaming和Deequ)是这种灵活性的关键。随着技术的发展,能够使用最新最、最强大的解决方案是提升其竞争优势的驱动力。最重要的是,你的数据的速度和质量一定不能对立,而要保持一致,尤其是在流式数据处理越来越靠近核心业务运营时。很快,这将不会是一种选择,而是一种期望和要求,我们正朝着这个未来方向一次一小步地不断前进。
以上就是Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。