本篇内容主要讲解“Driver容错安全性是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Driver容错安全性是什么”吧!
成都创新互联公司专注于企业营销型网站、网站重做改版、巴里坤哈萨克网站定制设计、自适应品牌网站建设、HTML5建站、商城网站建设、集团公司官网建设、成都外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为巴里坤哈萨克等各大城市提供网站开发制作服务。
从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。
从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。
ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然后在写入内存中。
data:image/s3,"s3://crabby-images/33f55/33f555d002fa2cbf916fdd3e0d2d42d0d5182a20" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/30b90/30b90c841e6e24b0d257ac06e1fec342531d2a10" alt="Driver容错安全性是什么"
根据batchTime分配属于当前BatchDuration要处理的数据到timToAllocatedBlocks数据结构中。
data:image/s3,"s3://crabby-images/9d47a/9d47a04c5e9082af026a3b019db6f9ce9001c099" alt="Driver容错安全性是什么"
Time类的是一个case class,记录时间,重载了操作符,隐式转换,值得借鉴。
case classTime(private valmillis: Long) { defmilliseconds: Long = millis def< (that: Time): Boolean = (this.millis < that.millis) def<= (that: Time): Boolean = (this.millis <= that.millis) def> (that: Time): Boolean = (this.millis > that.millis) def>= (that: Time): Boolean = (this.millis >= that.millis) def+ (that: Duration): Time = newTime(millis + that.milliseconds) def- (that: Time): Duration = newDuration(millis - that.millis) def- (that: Duration): Time = newTime(millis - that.milliseconds) // Java-friendlier versions of the above. defless(that: Time): Boolean = this< that deflessEq(that: Time): Boolean = this<= that defgreater(that: Time): Boolean = this> that defgreaterEq(that: Time): Boolean = this>= that defplus(that: Duration): Time = this+ that defminus(that: Time): Duration = this- that defminus(that: Duration): Time = this- that deffloor(that: Duration): Time = { valt = that.milliseconds newTime((this.millis / t) * t) } deffloor(that: Duration, zeroTime: Time): Time = { valt = that.milliseconds newTime(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) } defisMultipleOf(that: Duration): Boolean = (this.millis % that.milliseconds == 0) defmin(that: Time): Time = if(this< that) this elsethat defmax(that: Time): Time = if(this> that) this elsethat defuntil(that: Time, interval: Duration): Seq[Time] = { (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (newTime(_)) } defto(that: Time, interval: Duration): Seq[Time] = { (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (newTime(_)) } override deftoString: String = (millis.toString + " ms") } objectTime { implicit valordering = Ordering.by((time: Time) => time.millis) } |
跟踪Time对象,ReceiverTracker的allocateBlocksToBatch方法中的入参batchTime是被JobGenerator的generateJobs方法调用的。
data:image/s3,"s3://crabby-images/a3219/a3219995f5648487f26f78e353eff131121c08cd" alt="Driver容错安全性是什么"
JobGenerator的generateJobs方法是被定时器发送GenerateJobs消息调用的。
data:image/s3,"s3://crabby-images/43c6e/43c6e5b8df4c16c6ee0b13337baa106e96cdeac1" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/3cfa1/3cfa154514ed39a324b52575eb51979775d4cf6a" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/5e2fc/5e2fcc77105590676ebe57e20f6058970972fcc7" alt="Driver容错安全性是什么"
GenerateJobs中的时间参数就是nextTime,而nextTime+=period,这个period就是ssc.graph.batchDuration.milliseconds。
data:image/s3,"s3://crabby-images/d6b4a/d6b4a5288db6f1a665ddff64632be64017337ae6" alt="Driver容错安全性是什么"
nextTime的初始值是在start方法中传入的startTime赋值的,即RecurringTimer的getStartTime方法的返回值,是当前时间period的(整数倍+1)。
data:image/s3,"s3://crabby-images/19123/1912344d1ba52c456e44a33a55597b59c2b2ceea" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/a7522/a752268e88ec41181268b4f1086b4ed518360edc" alt="Driver容错安全性是什么"
Period这个值是我们调用new StreamingContext来构造StreamingContext时传入的Duration值。
data:image/s3,"s3://crabby-images/8e1cd/8e1cd4f4be40fe9a58caf152bfbed0fbaa597091" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/cc0be/cc0be236fe79d92a440733374bf96d9a498d62c5" alt="Driver容错安全性是什么"
ReceivedBlockTracker会清除过期的元数据信息,从HashMap中移除,也是先写入磁盘,然后在写入内存。
data:image/s3,"s3://crabby-images/c6bbd/c6bbd9b7dafe70ece861f591de191fc5aca1154a" alt="Driver容错安全性是什么"
元数据的生成,消费和销毁都有WAL,所以失败时就可以从日志中恢复。从源码分析中得出只有设置了checkpoint目录,才进行WAL机制。
data:image/s3,"s3://crabby-images/75706/75706f5863b330f8a59c15b7f918edf4741fcfd1" alt="Driver容错安全性是什么"
对传入的checkpoint目录来创建日志目录进行WAL。
data:image/s3,"s3://crabby-images/7f3e8/7f3e8e4c89c6ebcf12148a846a229054893f4690" alt="Driver容错安全性是什么"
这里是在checkpoint目录下创建文件夹名为receivedBlockMetadata的文件夹来保存WAL记录的数据。
data:image/s3,"s3://crabby-images/1a6a2/1a6a2ec2083d9a3fd0fe6fae0b887bee0e1c6e73" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/2ed6c/2ed6c8adaeb08ac33c76047363e80ca90702b036" alt="Driver容错安全性是什么"
把当前的DStream和JobGenerator的状态进行checkpoint,该方法是在generateJobs方法最后通过发送DoCheckpoint消息,来调用的。
data:image/s3,"s3://crabby-images/75f7e/75f7eae2b02bc805c5b058df3f63971c86d2147c" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/ff6ee/ff6eec9d7a3d301fafb94339b1ccc9ef785bcfd6" alt="Driver容错安全性是什么"
data:image/s3,"s3://crabby-images/e051d/e051d49b629b0c8a627d4a9285bcee3b564d5857" alt="Driver容错安全性是什么"
到此,相信大家对“Driver容错安全性是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
网站栏目:Driver容错安全性是什么
文章源于:
http://cdkjz.cn/article/geegdc.html