[TOC]
创新互联建站长期为成百上千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为华龙企业提供专业的成都做网站、成都网站制作,华龙网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。 spark的计算本质是分布式计算,程序的性能受集群中的任何因素的影响,如:CPU、网络带宽、内存等。一般情况下,如果内存足够大,那么其他因素影响性能。然后出现调优需求时,更多是因为资源不够用的情况,所以才需要调节资源的使用情况,更加高效的使用资源。比如如果内存比较紧张,不足以存放所有数据(10亿条),需要针对内存的使用,进行调优来减少内存的消耗
Spark的性能优化,大部分的工作,是对于内存的使用,进行调优。通常情况下,Spark 处理的程序数据量较小,内存足够使用,只要保证网络通常,一般不会出现大的性能问题。但是,Spark应用程序的性能问题往往出现在针对大数据量进行计算时(数据突增)。这种情况往往是现环境是无法满足的,所以可能导致集群崩溃。
除了内存调优之外,还有一些手段可以优化性能。比如spark使用过程中有和mysql交互的话,此时调优也要考虑到mysql的性能问题。
1、使用高性能序列化类库。目的减少序列化时间以及序列化后数据的大小
2、优化数据结构。目的减少内存占用
3、对多次使用的RDD进行持久化(RDD cache)、checkpoint
4、使用序列化的持久化级别:MEMORY_ONLY不序列化,MEMORY_ONLY_SER序列化。
MEMORY_ONLY比MEMORY_ONLY_SER要占用更多内存空间。
但是要注意,序列化会增加cpu使用成本,所以要权衡好
5、Java虚拟机垃圾回收调优。
6、Shuffle调优,90%的问题都是shuffle导致(1.x版本时此问题严重,到2.x版本,官网基本已优化,所以到2.x版本,这个问题可忽略)
其他性能优化的方式:
提高计算并行度
广播共享数据
下面会针对这6点调优手段进行分析
1、每个 java/scala对象,由两部分组成,一个是对象头,占用16字节,主要包含对象的一些元信息,比如指向它的类的指针。另一个是对象本身。如果对象比较小,比如int,它的对象头比自己对象本身都大。
2、String对象,会比他内部的原始数据,多出40个字节,用于保存string类型的元信息
String内部使用char数组来保存字符串序列,并且还要保存诸如数组长度之类的信息。String使用UTF-16编码,所以每个字符会占用2个字节。
比如:包含10个字符的String,占用 2*10 + 40 个字节。
3、集合类型,比如HashMap和LinkedList,内部使用链表数据结构,对链表中的每个数据,使用Entry对象包装。Entry对象,不光有对象头,还有指向下一个Entry的指针,占用8个字节。所以一句话就是,这种内部还包含多个对象的类型,占用内存更多。因为对象多了,除了对象本身数据占用内存之外,更多对象也就会有更多对象头,占用了不少内存空间。
4、基本数据类型的集合,比如int集合,内部会使用对象的包装类 Integer来存储元素。
到driver日志目录下查看程序运行日志
less ${spark_home}/work/app-xxxxxx/0/stderr
观察到类似如下信息:
INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 320.9 KB, free 366.0 MB)
19/07/05 05:57:47 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
19/07/05 05:57:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2212 bytes result sent to driver
19/07/05 05:57:48 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)
estimated size 320.9 KB:当前使用的内存大概大小
free 366.0 MB:剩余空闲内存大小
这样就可以知道任务使用内存的情况了
spark作为一个分布式系统,和其他分布式系统一样,都需要序列化。任何一个分布式系统中,序列化都是很重要的一环。如果使用的序列化技术,操作很慢,序列化后数据量大,会导致分布式系统应用程序性能下降很多。所以,Spark性能优化的第一步,就是进行序列化性能的优化。
spark在一些地方是会使用序列化,比如shuffle的时候,但是spark对便捷性和性能进行了取舍,spark为了便捷性,默认使用了java的序列化机制,java的序列化机制之前也讲过,性能不高,序列化速度慢,序列化后数据大。所以一般生产中,最好修改spark使用 的序列化机制
spark支持使用kryo来实现序列化。kryo序列化速度比java快,占用空间小,大概小10倍。但是使用起来,相对没有那么便捷。
配置spark使用kryo:
spark在读取配置时,会读取conf目录下的配置文件,其中有一个 spark-defaults.conf 文件就是用来指定spark的一些工作参数的。
vim spark-defaults.conf
spark.serializer org.apache.spark.serializer.KryoSerializer
这就配置了使用kryo,当然也可以在spark程序中使用 conf对象来来设置
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
(1)优化缓存大小
如果注册的序列化的自定义类型,本身特别大,比如包含了100个以上字段,就会导致序列化的对象过大。此时需要对kyro本身进行优化。因为kyro本身内部缓存不够存放这么大的对象。
设置:spark.kryoserializer.buffer.max 参数值调大,即可。
(2)提前注册自定义类型
使用kryo时,为了更高的性能,最好提前注册需要序列化的类,如:
在sparkConf 对象中注册
conf.registerKryoClasses(Array(classof[Student],classof[Teacher]))
注意:这里基本都针对自定义的类,而且用scala编写spark项目时,其实不会涉及到太多自定义类,不像java
优化数据结构,主要在于避免语法特性中所导致的额外内存开销。
核心:优化算子函数内部使用到的局部数据或者算子外部的数据。
目的:减少对内存的消耗和占用。
(1)优先使用数组以及字符串,而不是集合类。
即:优先使用array,而不是ArrayList,LinkedList,hashMap
使用int[] 比 List 节省内存。
前面也说过,集合类包含更多的额外数据,以及复杂的类结构,所以占用内存多。此举就是为了将结构简单化,满足使用的情况下,越简单越好
(2)将对象转换成字符串。
在企业中,将HashMap,List这种数据,统一使用String拼接成特殊格式的字符串。
举例:
Map persons = new HashMap<>()
优化为:
id:name,address,idCardNum,family......|id:name,address,idCardNum,family......
(3)避免使用多层嵌套对象结构。
public class Teacher{private List students = new ArrayList<>()}
以上例子不好,因为Teacher类的内部又嵌套了大量的小的Student对象。
改进:
转成json,处理字符串
{"teacherId":1,....,students[{"studentId":1,.....}]}
(4)对于能够避免的场景,使用int代替String
虽然String性能比List高,但是int占用更少内存。
比如:数据库主键,id,推荐使用自增主键,而不是uuid。
这个就很简单了,主要是将多次使用的RDD缓存在内存中,避免再次使用时重复计算。实现方法看前面spark core的文章
默认情况下,进行RDD缓存时,RDD对象是没有序列化的,也就是持久化级别为 MEMORY_ONLY。建议使用 MEMORY_ONLY_SER进行持久化,因为这种方式同时会进行序列化,序列化后占用更少的内存的空间。实现方法看前面spark core的文章
如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。Java虚拟机会定期进行垃圾回收,此时就会追踪所有Java对象,并且在垃圾回收时,追中找到那些已经不再使用的对象,清理旧对象,给新对象腾出空间。
垃圾回收的性能开销,和内存中的对象数量成正比。而且要注意一点, 在做Java虚拟机调优前,必须要做好上面其他调优工作,这样才有意义。因为上面的调优工作,是为了节省内存的开销,更好、更高效的使用内存。上面的优化比起进行jvm调优获得的好处要大得多。并且jvm调优好了,但是上层应用没有好的内存使用方式,jvm优化了也白搭。
这里提到这个,更多是让读者自己去理解这个原理,随便百度都可以找到了,这里不重复。
我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。
在 spark-submit脚本中,添加一个配置:
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"
注意:输出到worker的日志中,而不是driver日志。
/usr/local/spark-2.1.0-bin-hadoop2.7/work/app-20190705055405-0000/0
这是driver日志
/usr/local/spark-2.1.0-bin-hadoop2.7/logs
这是worker日志
对于GC调优来说,最重要的调节,RDD缓存占用的内存空间 与 算子执行是创建对象所占用的内存空间 的比例。默认情况下,Spark使用每个Executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放对象。
在这种情况下,很有可能因为内存不足,task创建的对象过大,导致40%的内存空间不够用,触发Java虚拟机垃圾回收操作。在极端的情况下,垃圾回收操作会被频繁触发。
根据实际情况,可以增大对象存储空间,减少gc发生概率,方式:
conf.set("spark.storage.memoryFraction",0.5)
将RDD缓存占用空间比例降低到50%
以往在spark1.x版本中,如果有shuffle时,那么每个map task就会根据result task(也可以叫reduce task)的个数,对map结果进行分区,分别给不同的result task处理,每个分区产生一个文件。当map数量很多时,就会产生大量文件,这会带来性能问题。
在spark2.x中,将一个map task输出的数据都放在一个文件中,然后加上一个索引文件,用于标识不同分区数据在文件中的位置,这样就保证了一个task只产生一个文件。从而降低了IO压力
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。