Spark2.x中Shuffle演进历程及Shuffle两阶段划分是这样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
成都创新互联公司自成立以来,一直致力于为企业提供从网站策划、网站设计、成都做网站、成都网站建设、电子商务、网站推广、网站优化到为企业提供个性化软件开发等基于互联网的全面整合营销服务。公司拥有丰富的网站建设和互联网应用系统开发管理经验、成熟的应用系统解决方案、优秀的网站开发工程师团队及专业的网站设计师团队。
一、概述
我自己也去看了Spark2.2.0的源码,在Spark-env初始化中只保留了两种Shuffle:Sort、Tungsten-Sort,下面是源码截图:
由于Spark Shuffle是Spark的核心之核心,为了对Spark Shuffle有更全面的认识,这里先来讲解一下Spark的技术演进历程及Shuffle阶段换分。
二、Spark Shuffle技术演进历程
1.Spark0.8以前,都是采用的HashShuffle,这是最开始的Shuffle,会存在生成很多小文件的问题,文件数M*R,其中M表示ShuffleMapTask个数,R表示Result个数;
2.Spark0.8.1中,引入了Consolidation优化机制,减少了小文件的生产,文件数变成了E*(C/T)*R,其中E表示Executor个数,C表示每个Executor中可用Core的个数,T表示Task所分配的Core的个数(默认值为1)。
3.Spark 0.9 中,引入ExternalAppendOnlyMap,combine的时候,可以将数据spill到磁盘,然后通过堆排序merge;
4.Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle,稍后会详解这种机制;
5.Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle;
6.Spark 1.4 引入Tungsten-Sort Based Shuffle,将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型,这些技术决定了使用Tungsten-Sort要符合一些严格的限制,比如Shuffle dependency不能带有aggregation、输出不能排序等。由于堆外内存的管理基于JDK Sun Unsafe API,故Tungsten-Sort Based Shuffle也被称为Unsafe Shuffle;
7.Spark 1.6 Tungsten-sort并入Sort Based Shuffle;
8.Spark 2.0 Hash Based Shuffle被启用,Sort Based Shuffle成为默认Shuffle机制。
三、Spark Shuffle的两阶段
Spark中有宽依赖、窄依赖两种,宽依赖会触发Stage的划分,这时候需要进行Shuffle,需要对RDD中的每个Paritioon数据进行重新分区,Spark Shuffle主要分成了两个阶段:Shuffle Write、Shuffle Read,两个阶段分属两个人Stage,前者属于父Stage,后者属于子Stage。
结合上篇文章的图我这里给他用不同颜色进行了标识,上面的部分是stage1中的ShuffleWrite负责Shuffle阶段数据写,下面就是stage0的中的ShuffleWrite负责Shuffle阶段数据读,如图所示:
这里有一点需要说明一下:如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。
Shuffle Write阶段
shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。
Shuffle Read阶段
shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task为下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。
shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。