signed

QiShunwang

“诚信为本、客户至上”

spark项目小结

2020/8/20 15:18:12   来源:

部署

注意固定的ip地址和主机需要在同一个子网,选择桥接模式

桥接模式具有和宿主机同等地位,可以直接和同一网络内其他主机通信,nat模式依赖宿主机上网,只能和宿主机通信.

关于向yarn提交spark作业报错

通过以下方法查看yarn的报错日志.如果是找不到类,可能是输入命令格式有问题.

hadoop等启动

1.进入Hadoop的bin目录下,输入:start-all.sh 即可启动你所搭建的集群.如果配置过路径,直接start-all.sh可以启动hdfs和yarn

2.进入zookeeper的bin目录下,输入:zkServer.sh start 即可启动zookeeper,或者直接zkServer.sh start

3.进入kafka的目录下输入命令:nohup bin/kafka-server-start.sh config/server.properties &

JAVA

外部类和内部类

JavaBean

 

工厂模式

性能调优

资源和并行度分配

重构RDD和持久化

第一,RDD架构重构与优化

尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。

第二,公共RDD一定要实现持久化

对于要多次计算和使用的公共RDD,一定要进行持久化。

持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),以后无论对这个RDD做多少次计算,那么都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。

第三,持久化,是可以进行序列化的

如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM内存溢出。

当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。

序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。

如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。

内存+磁盘,序列化

第四,为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化

持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。

这种方式,仅仅针对你的内存资源极度充足

广播大变量

 

这种默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本,有什么缺点呢?在什么情况下,会出现性能上的恶劣的影响呢?

map,本身是不小,存放数据的一个单位是Entry,还有可能会用链表的格式的来存放Entry链条。所以map是比较消耗内存的数据格式。

比如,map1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000task。大量task的确都在并行运行。

举例,默认情况下,1000task1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。

如果使用了广播变量。50execurtor50个副本。500M的数据,网络传输,而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executorbockmanager上拉取变量副本,网络传输速度大大增加;500M的内存消耗。

10000M500M20倍。20~以上的网络传输性能消耗的降低;20倍的内存消耗的减少。

对性能的提升和影响,还是很客观的。

 

kryo序列化

默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化

这种默认序列化机制的好处在于,处理起来比较方便;也不需要我们手动去做什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化即可。

但是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。

可以手动进行序列化格式的优化

Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10

所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。

fastutil

Spark中应用fastutil的场景:

1、如果算子函数使用了外部变量;那么第一,你可以使用Broadcast广播变量优化;第二,可以使用Kryo序列化类库,提升序列化性能和效率;第三,如果外部变量是某种比较大的集合,那么可以考虑使用fastutil改写外部变量,首先从源头上就减少内存的占用,通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用。

2、在你的算子函数里,也就是task要执行的计算逻辑里面,如果有逻辑中,出现,要创建比较大的MapList等集合,可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作;那么此时,可以考虑将这些集合类型使用fastutil类库重写,使用了fastutil集合类以后,就可以在一定程度上,减少task创建出来的集合类型的内存占用。避免executor内存频繁占满,频繁唤起GC,导致性能下降。

关于fastutil调优的说明:

fastutil其实没有你想象中的那么强大,也不会跟官网上说的效果那么一鸣惊人。广播变量、Kryo序列化类库、fastutil,都是之前所说的,对于性能来说,类似于一种调味品,烤鸡,本来就很好吃了,然后加了一点特质的孜然麻辣粉调料,就更加好吃了一点。分配资源、并行度、RDD架构与持久化,这三个就是烤鸡;broadcastkryofastutil,类似于调料。

shuffle调优,15分钟;groupByKeyreduceByKey改写,执行本地聚合,也许10分钟;跟公司申请更多的资源,比如资源更大的YARN队列,1分钟。

fastutil的使用:

第一步:在pom.xml中引用fastutil的包

<dependency>

    <groupId>fastutil</groupId>

    <artifactId>fastutil</artifactId>

    <version>5.0.9</version>

</dependency>

List<Integer> => IntList

基本都是类似于IntList的格式,前缀就是集合的元素类型;特殊的就是MapInt2IntMap,代表了key-value映射的元素类型。除此之外,刚才也看到了,还支持objectreference

数据本地化等待时长

 

PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的taskexecutor执行,数据在executorBlockManager中;性能最好

NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输

NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分

RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输

ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

spark.locality.wait,默认是3s

JVM调优

降低cache操作内存占比

每一次放对象的时候,都是放入eden区域,和其中一个survivor区域;另外一个survivor区域是空闲的。

eden区域和一个survivor区域放满了以后(spark运行过程中,产生的对象实在太多了),就会触发minor gc,小型垃圾回收。把不再使用的对象,从内存中清空,给后面新创建的对象腾出来点儿地方。

清理掉了不再使用的对象之后,那么也会将存活下来的对象(还要继续使用的),放入之前空闲的那一个survivor区域中。这里可能会出现一个问题。默认edensurvior1survivor2的内存占比是8:1:1。问题是,如果存活下来的对象是1.5,一个survivor区域放不下。此时就可能通过JVM的担保机制(不同JVM版本可能对应的行为),将多余的对象,直接放入老年代了。

如果你的JVM内存不够大的话,可能导致频繁的年轻代内存满溢,频繁的进行minor gc。频繁的minor gc会导致短时间内,有些存活的对象,多次垃圾回收都没有回收掉。会导致这种短声明周期(其实不一定是要长期使用的)对象,年龄过大,垃圾回收次数太多还没有回收到,跑到老年代。

老年代中,可能会因为内存不足,囤积一大堆,短生命周期的,本来应该在年轻代中的,可能马上就要被回收掉的对象。此时,可能导致老年代频繁满溢。频繁进行full gc(全局/全面垃圾回收)。full gc就会去回收老年代中的对象。full gc由于这个算法的设计,是针对的是,老年代中的对象数量很少,满溢进行full gc的频率应该很少,因此采取了不太复杂,但是耗费性能和时间的垃圾回收算法。full gc很慢。

full gc / minor gc,无论是快,还是慢,都会导致jvm的工作线程停止工作,stop the world。简而言之,就是说,gc的时候,spark停止工作了。等着垃圾回收结束。

内存不充足的时候,问题:

1、频繁minor gc,也会导致频繁spark停止工作

2、老年代囤积大量活跃对象(短生命周期的对象),导致频繁full gcfull gc时间很长,短则数十秒,长则数分钟,甚至数小时。可能导致spark长时间停止工作。

3、严重影响咱们的spark的性能和运行的速度。

----------------------------------------------------------------

JVM调优的第一个点:降低cache操作的内存占比

spark中,堆内存又被划分成了两块儿,一块儿是专门用来给RDDcachepersist操作进行RDD数据缓存用的;另外一块儿,就是我们刚才所说的,用来给spark算子函数的运行使用的,存放函数中自己创建的对象。

默认情况下,给RDD cache操作的内存占比,是0.660%的内存都给了cache操作了。但是问题是,如果某些情况下,cache不是那么的紧张,问题在于task算子函数中创建的对象过多,然后内存又不太大,导致了频繁的minor gc,甚至频繁full gc,导致spark频繁的停止工作。性能影响会很大。

针对上述这种情况,大家可以在之前我们讲过的那个spark uiyarn去运行的话,那么就通过yarn的界面,去查看你的spark作业的运行统计,很简单,大家一层一层点击进去就好。可以看到每个stage的运行情况,包括每个task的运行时间、gc时间等等。如果发现gc太频繁,时间太长。此时就可以适当调价这个比例。

降低cache操作的内存占比,大不了用persist操作,选择将一部分缓存的RDD数据写入磁盘,或者序列化方式,配合Kryo序列化类,减少RDD缓存的内存占用;降低cache操作内存占比;对应的,算子函数的内存占比就提升了。这个时候,可能,就可以减少minor gc的频率,同时减少full gc的频率。对性能的提升是有一定的帮助的。

一句话,让task执行算子函数时,有更多的内存可以使用。

spark.storage.memoryFraction0.6 -> 0.5 -> 0.4 -> 0.2

executor对外内存与连接等待时长

executor堆外内存

有时候,如果你的spark作业处理的数据量特别特别大,几亿数据量;然后spark作业一运行,时不时的报错,shuffle file cannot findexecutortask lostout of memory(内存溢出);

可能是说executor的堆外内存不太够用,导致executor在运行的过程中,可能会内存溢出;然后可能导致后续的stagetask在运行的时候,可能要从一些executor中去拉取shuffle map output文件,但是executor可能已经挂掉了,关联的block manager也没有了;所以可能会报shuffle output file not foundresubmitting taskexecutor lostspark作业彻底崩溃。

上述情况下,就可以去考虑调节一下executor的堆外内存。也许就可以避免报错;此外,有时,堆外内存调节的比较大的时候,对于性能来说,也会带来一定的提升。

 

--conf spark.yarn.executor.memoryOverhead=2048

spark-submit脚本里面,去用--conf的方式,去添加配置;一定要注意!!!切记,不是在你的spark作业代码中,用new SparkConf().set()这种方式去设置,不要这样去设置,是没有用的!一定要在spark-submit脚本中去设置。

spark.yarn.executor.memoryOverhead(看名字,顾名思义,针对的是基于yarn的提交模式)

默认情况下,这个堆外内存上限大概是300M;后来我们通常项目中,真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G1024M),甚至说2G4G

通常这个参数调节上去以后,就会避免掉某些JVM OOM的异常问题,同时呢,会让整体spark作业的性能,得到较大的提升。

此时呢,就会没有响应,无法建立网络连接;会卡住;okspark默认的网络连接的超时时长,是60s;如果卡住60s都无法建立连接的话,那么就宣告失败了。

碰到一种情况,偶尔,偶尔,偶尔!!!没有规律!!!某某file。一串file iduuiddsfsfd-2342vs--sdf--sdfsd)。not foundfile lost

这种情况下,很有可能是有那份数据的executorjvm gc。所以拉取数据的时候,建立不了连接。然后超过默认60s以后,直接宣告失败。

报错几次,几次都拉取不到数据的话,可能会导致spark作业的崩溃。也可能会导致DAGScheduler,反复提交几次stageTaskScheduler,反复提交几次task。大大延长我们的spark作业的运行时间。

可以考虑调节连接的超时时长。

--conf spark.core.connection.ack.wait.timeout=300

spark-submit脚本,切记,不是在new SparkConf().set()这种方式来设置的。

spark.core.connection.ack.wait.timeout(spark coreconnection,连接,ackwait timeout,建立不上连接的时候,超时等待时长)

调节这个值比较大以后,通常来说,可以避免部分的偶尔出现的某某文件拉取失败,某某文件lost掉了。。。

---------------------------------------------------------

为什么在这里讲这两个参数呢?

因为比较实用,在真正处理大数据(不是几千万数据量、几百万数据量),几亿,几十亿,几百亿的时候。很容易碰到executor堆外内存,以及gc引起的连接超时的问题。file not foundexecutor losttask lost

调节上面两个参数,还是很有帮助的。

shuffle调优

什么样的情况下,会发生shuffle

spark中,主要是以下几个算子:groupByKeyreduceByKeycountByKeyjoin,等等。

什么是shuffle

groupByKey,要把分布在集群各个节点上的数据中的同一个key,对应的values,都给集中到一块儿,集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个executor的一个task中。

然后呢,集中一个key对应的values之后,才能交给我们来进行处理,<key, Iterable<value>>reduceByKey,算子函数去对values集合进行reduce操作,最后变成一个valuecountByKey,需要在一个task中,获取到一个key对应的所有的value,然后进行计数,统计总共有多少个valuejoinRDD<key, value>RDD<key, value>,只要是两个RDD中,key相同对应的2value,都能到一个节点的executortask中,给我们进行处理。

reduceByKey(_+_)

问题在于,同一个单词,比如说(hello, 1),可能散落在不同的节点上;对每个单词进行累加计数,就必须让所有单词都跑到同一个节点的一个task中,给一个task来进行处理。

合并map输出文件

new SparkConf().set("spark.shuffle.consolidateFiles", "true")

开启shuffle map端输出文件合并的机制;默认情况下,是不开启的,就是会发生如上所述的大量map端输出文件的操作,严重影响性能。

开启了map端输出文件的合并机制之后:

第一个stage,同时就运行cpu coretask,比如cpu core2个,并行运行2task;每个task都创建下一个stagetask数量个文件;

第一个stage,并行运行的2task执行完以后;就会执行另外两个task;另外2task不会再重新创建输出文件;而是复用之前的task创建的map端输出文件,将数据写入上一批task的输出文件中。

第二个stagetask在拉取数据的时候,就不会去拉取上一个stage每一个task为自己创建的那份输出文件了;而是拉取少量的输出文件,每个输出文件中,可能包含了多个task给自己的map端输出。

------------------------------------------------------------

提醒一下(map端输出文件合并):

只有并行执行的task会去创建新的输出文件;下一批并行执行的task,就会去复用之前已有的输出文件;但是有一个例外,比如2task并行在执行,但是此时又启动要执行2task;那么这个时候的话,就无法去复用刚才的2task创建的输出文件了;而是还是只能去创建新的输出文件。

要实现输出文件的合并的效果,必须是一批task先执行,然后下一批task再执行,才能复用之前的输出文件;负责多批task同时起来执行,还是做不到复用的。

-------------------------------------------------------------

分享一下,实际在生产环境中,使用了spark.shuffle.consolidateFiles机制以后,实际的性能调优的效果:对于上述的这种生产环境的配置,性能的提升,还是相当的客观的。spark作业,5个小时 -> 2~3个小时。

大家不要小看这个map端输出文件合并机制。实际上,在数据量比较大,你自己本身做了前面的性能调优,executor上去->cpu core上去->并行度(task数量)上去,shuffle没调优,shuffle就很糟糕了;大量的map端输出文件的产生。对性能有比较恶劣的影响。

这个时候,去开启这个机制,可以很有效的提升性能。

调节map端内存缓冲与reduce端内存占比

spark.shuffle.file.buffer,默认32k

spark.shuffle.memoryFraction,0.2

map端内存缓冲,reduce端内存占比;很多资料、网上视频,都会说,这两个参数,是调节shuffle性能的不二选择,很有效果的样子,实际上,不是这样的。

以实际的生产经验来说,这两个参数没有那么重要,往往来说,shuffle的性能不是因为这方面的原因导致的

但是,有一点点效果的,broadcast,数据本地化等待时长;这两个shuffle调优的小点,其实也是需要跟其他的大量的小点配合起来使用,一点一点的提升性能,最终很多个性能调优的小点的效果,汇集在一起之后,那么就会有可以看见的还算不错的性能调优的效果。

reducetask,在拉取到数据之后,会用hashmap的数据格式,来对各个key对应的values进行汇聚。

针对每个key对应的values,执行我们自定义的聚合函数的代码,比如_ + _(把所有values累加起来)

reduce task,在进行汇聚、聚合等操作的时候,实际上,使用的就是自己对应的executor的内存,executorjvm进程,堆),默认executor内存中划分给reduce task进行聚合的比例,是0.2

问题来了,因为比例是0.2,所以,理论上,很有可能会出现,拉取过来的数据很多,那么在内存中,放不下;这个时候,默认的行为,就是说,将在内存放不下的数据,都spill(溢写)到磁盘文件中去。

----------------------------------------------------------------------------

原理说完之后,来看一下,默认情况下,不调优,可能会出现什么样的问题?

默认,map端内存缓冲是每个task32kb

默认,reduce端聚合内存比例,是0.2,也就是20%

如果map端的task,处理的数据量比较大,但是呢,你的内存缓冲大小是固定的。可能会出现什么样的情况?

每个task就处理320kb32kb,总共会向磁盘溢写320 / 32 = 10次。

每个task处理32000kb32kb,总共会向磁盘溢写32000 / 32 = 1000次。

map task处理的数据量比较大的情况下,而你的task的内存缓冲默认是比较小的,32kb。可能会造成多次的map端往磁盘文件的spill溢写操作,发生大量的磁盘IO,从而降低性能。

reduce端聚合内存,占比。默认是0.2。如果数据量比较大,reduce task拉取过来的数据很多,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操作,溢写到磁盘上去。而且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操作的时候,很可能会多次读取磁盘中的数据,进行聚合。

默认不调优,在数据量比较大的情况下,可能频繁地发生reduce端的磁盘文件的读写。

这两个点之所以放在一起讲,是因为他们俩是有关联的。数据量变大,map端肯定会出点问题;reduce端肯定也会出点问题;出的问题是一样的,都是磁盘IO频繁,变多,影响性能。

------------------------------------------------------------

调节map task内存缓冲:spark.shuffle.file.buffer,默认32k(spark 1.3.x不是这个参数,后面还有一个后缀,kbspark 1.5.x以后,变了,就是现在这个参数)

调节reduce端聚合内存占比:spark.shuffle.memoryFraction,0.2

在实际生产环境中,我们在什么时候来调节两个参数?

Spark UI,如果你的公司是决定采用standalone模式,那么狠简单,你的spark跑起来,会显示一个Spark UI的地址,4040的端口,进去看,依次点击进去,可以看到,你的每个stage的详情,有哪些executor,有哪些task,每个taskshuffle writeshuffle read的量,shuffle的磁盘和内存,读写的数据量;如果是用的yarn模式来提交,课程最前面,从yarn的界面进去,点击对应的application,进入Spark UI,查看详情。

如果发现shuffle 磁盘的writeread,很大。这个时候,就意味着最好调节一些shuffle的参数。进行调优。首先当然是考虑开启map端输出文件合并机制。

调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。

不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,其他环节的内存使用就会有问题了。

调节了以后,效果?map task内存缓冲变大了,减少spill到磁盘文件的次数;reduce端聚合内存变大了,减少spill到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。

hashshufflemanager和sortshufflemanager

首先先声明一点:

之前我们所讲的,其实都是已经属于Spark中,比较老旧的一种shuffle managerHashShuffleManager;这种manager,实际上,从spark 1.2.x版本以后,就不再是默认的选择了。

HashShuffleManager的原理,以及对应的一些性能调优的点,基本上,之前几讲,咱们就都讲过了。

spark 1.2.x版本以后,默认的shuffle manager,是什么呢?SortShuffleManager

来一个总结,现在相当于把sparkshuffle的东西又多讲了一些。大家理解的更加深入了。hashsorttungsten-sort。如何来选择?

1、需不需要数据默认就让spark给你进行排序?就好像mapreduce,默认就是有按照key的排序。如果不需要的话,其实还是建议搭建就使用最基本的HashShuffleManager,因为最开始就是考虑的是不排序,换取高性能;

2、什么时候需要用sort shuffle manager?如果你需要你的那些数据按key排序了,那么就选择这种吧,而且要注意,reduce task的数量应该是超过200的,这样sortmerge(多个文件合并成一个)的机制,才能生效把。但是这里要注意,你一定要自己考量一下,有没有必要在shuffle的过程中,就做这个事情,毕竟对性能是有影响的。

3、如果你不需要排序,而且你希望你的每个task输出的文件最终是会合并成一份的,你自己认为可以减少性能开销;可以去调节bypassMergeThreshold这个阈值,比如你的reduce task数量是500,默认阈值是200,所以默认还是会进行sort和直接merge的;可以将阈值调节成550,不会进行sort,按照hash的做法,每个reduce task创建一份输出文件,最后合并成一份文件。(一定要提醒大家,这个参数,其实我们通常不会在生产环境里去使用,也没有经过验证说,这样的方式,到底有多少性能的提升)

4、如果你想选用sort based shuffle manager,而且你们公司的spark版本比较高,是1.5.x版本的,那么可以考虑去尝试使用tungsten-sort shuffle manager。看看性能的提升与稳定性怎么样。

总结:

1、在生产环境中,不建议大家贸然使用第三点和第四点:

2、如果你不想要你的数据在shuffle时排序,那么就自己设置一下,用hash shuffle manager

3、如果你的确是需要你的数据在shuffle时进行排序的,那么就默认不用动,默认就是sort shuffle manager;或者是什么?如果你压根儿不care是否排序这个事儿,那么就默认让他就是sort的。调节一些其他的参数(consolidation机制)。(80%,都是用这种)

spark.shuffle.manager:hashsort、tungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash")

new SparkConf().set("spark.shuffle.manager", "tungsten-sort")

// 默认就是,new SparkConf().set("spark.shuffle.manager", "sort")

new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")

算子调优

mappartition

什么时候比较适合用MapPartitions系列操作,就是说,数据量不是特别大的时候,都可以用这种MapPartitions系列操作,性能还是非常不错的,是有提升的。比如原来是15分钟,(曾经有一次性能调优),12分钟。10分钟->9分钟。

但是也有过出问题的经验,MapPartitions只要一用,直接OOM,内存溢出,崩溃。

在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。性能肯定是有提升的。

但是试了一下以后,发现,不行,OOM了,那就放弃吧。

coalesce

foreachpartition

repartition

并行度:之前说过,并行度是自己可以调节,或者说是设置的。

1spark.default.parallelism

2textFile(),传入第二个参数,指定partition数量(比较少用)

咱们的项目代码中,没有设置并行度,实际上,在生产环境中,是最好自己设置一下的。官网有推荐的设置方式,你的spark-submit脚本中,会指定你的application总共要启动多少个executor100个;每个executor多少个cpu core2~3个;总共application,有cpu core200个。

官方推荐,根据你的application的总cpu core数量(在spark-submit中可以指定,200个),自己手动设置spark.default.parallelism参数,指定为cpu core总数的2~3倍。400~600个并行度。600

承上启下

你设置的这个并行度,在哪些情况下会生效?哪些情况下,不会生效?

如果你压根儿没有使用Spark SQLDataFrame),那么你整个spark application默认所有stage的并行度都是你设置的那个参数。(除非你使用coalesce算子缩减过partition数量)

问题来了,Spark SQL,用了。用Spark SQL的那个stage的并行度,你没法自己指定。Spark SQL自己会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。你自己通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQLstage中生效。

比如你第一个stage,用了Spark SQLhive表中查询出了一些数据,然后做了一些transformation操作,接着做了一个shuffle操作(groupByKey);下一个stage,在shuffle操作之后,做了一些transformation操作。hive表,对应了一个hdfs文件,有20block;你自己设置了spark.default.parallelism参数为100

你的第一个stage的并行度,是不受你的控制的,就只有20task;第二个stage,才会变成你自己设置的那个并行度,100

问题在哪里?

Spark SQL默认情况下,它的那个并行度,咱们没法设置。可能导致的问题,也许没什么问题,也许很有问题。Spark SQL所在的那个stage中,后面的那些transformation操作,可能会有非常复杂的业务逻辑,甚至说复杂的算法。如果你的Spark SQL默认把task数量设置的很少,20个,然后每个task要处理为数不少的数据量,然后还要执行特别复杂的算法。

这个时候,就会导致第一个stage的速度,特别慢。第二个stage1000task,刷刷刷,非常快。

--------------------------------------------------------------

解决上述Spark SQL无法设置并行度和task数量的办法,是什么呢?

repartition算子,你用Spark SQL这一步的并行度和task数量,肯定是没有办法去改变了。但是呢,可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20partition,分区成100个。

然后呢,从repartition以后的RDD,再往后,并行度和task数量,就会按照你预期的来了。就可以避免跟Spark SQL绑定在一个stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑。

redecebykey

总结:

reduceByKey在什么情况下使用呢?

1、非常普通的,比如说,就是要实现类似于wordcount程序一样的,对每个key对应的值,进行某种数据公式或者算法的计算(累加、类乘)

2、对于一些类似于要对每个key进行一些字符串拼接的这种较为复杂的操作,可以自己衡量一下,其实有时,也是可以使用reduceByKey来实现的。但是不太好实现。如果真能够实现出来,对性能绝对是有帮助的。(shuffle基本上就占了整个spark作业的90%以上的性能消耗,主要能对shuffle进行一定的调优,都是有价值的)

troubleshooting

reduce缓冲大小与oom

reduce端缓冲(buffer),可能会出什么问题?

可能是会出现,默认是48MB,也许大多数时候,reducetask一边拉取一边计算,不一定一直都会拉满48M的数据。可能大多数时候,拉取个10M数据,就计算掉了。

大多数时候,也许不会出现什么问题。但是有的时候,map端的数据量特别大,然后写出的速度特别快。reduce端所有task,拉取的时候,全部达到自己的缓冲的最大极限值,缓冲,48M,全部填满。

这个时候,再加上你的reduce端执行的聚合函数的代码,可能会创建大量的对象。也许,一下子,内存就撑不住了,就会OOMreduce端的内存中,就会发生内存溢出的问题。

针对上述的可能出现的问题,我们该怎么来解决呢?

这个时候,就应该减少reducetask缓冲的大小。我宁愿多拉取几次,但是每次同时能够拉取到reduce端每个task的数量,比较少,就不容易发生OOM内存溢出的问题。(比如,可以调节成12M

在实际生产环境中,我们都是碰到过这种问题的。这是典型的以性能换执行的原理。reduce端缓冲小了,不容易OOM了,但是,性能一定是有所下降的,你要拉取的次数就多了。就走更多的网络传输开销。

这种时候,只能采取牺牲性能的方式了,spark作业,首先,第一要义,就是一定要让它可以跑起来。分享一个经验,曾经写过一个特别复杂的spark作业,写完代码以后,半个月之内,就是跑不起来,里面各种各样的问题,需要进行troubleshooting。调节了十几个参数,其中就包括这个reduce端缓冲的大小。总算作业可以跑起来了。然后才去考虑性能的调优。

----------------------------------------------------

再来说说,reduce端缓冲大小的另外一面,关于性能调优的一面:

咱们假如说,你的Map端输出的数据量也不是特别大,然后你的整个application的资源也特别充足。200executor5cpu core10G内存。

其实可以尝试去增加这个reduce端缓冲大小的,比如从48M,变成96M。那么这样的话,每次reduce task能够拉取的数据量就很大。需要拉取的次数也就变少了。比如原先需要拉取100次,现在只要拉取50次就可以执行完了。

对网络传输性能开销的减少,以及reduce端聚合操作执行的次数的减少,都是有帮助的。

最终达到的效果,就应该是性能上的一定程度上的提升。一定要注意,资源足够的时候,再去做这个事儿。

spark.reducer.maxSizeInFlight,48

spark.reducer.maxSizeInFlight,24

GC导致shuffle文件拉去失败

spark.shuffle.io.maxRetries 3

第一个参数,意思就是说,shuffle文件拉取的时候,如果没有拉取到(拉取失败),最多或重试几次(会重新拉取几次文件),默认是3次。

spark.shuffle.io.retryWait 5s

第二个参数,意思就是说,每一次重试拉取文件的时间间隔,默认是5s钟。

默认情况下,假如说第一个stageexecutor正在进行漫长的full gc。第二个stageexecutor尝试去拉取文件,结果没有拉取到,默认情况下,会反复重试拉取3次,每次间隔是五秒钟。最多只会等待3 * 5s = 15s。如果15s内,没有拉取到shuffle file。就会报出shuffle file not found

针对这种情况,我们完全可以进行预备性的参数调节。增大上述两个参数的值,达到比较大的一个值,尽量保证第二个stagetask,一定能够拉取到上一个stage的输出文件。避免报shuffle file not found。然后可能会重新提交stagetask去执行。那样反而对性能也不好。

spark.shuffle.io.maxRetries 60

spark.shuffle.io.retryWait 60s

最多可以忍受1个小时没有拉取到shuffle file。只是去设置一个最大的可能的值。full gc不可能1个小时都没结束吧。

这样呢,就可以尽量避免因为gc导致的shuffle file not found,无法拉取到的问题。

----------------------------------------------------

有时会出现的一种情况,非常普遍,在spark的作业中;shuffle file not found。(spark作业中,非常非常常见的)而且,有的时候,它是偶尔才会出现的一种情况。有的时候,出现这种情况以后,会重新去提交stagetask。重新执行一遍,发现就好了。没有这种错误了。

log怎么看?用client模式去提交你的spark作业。比如standalone clientyarn client。一提交作业,直接可以在本地看到刷刷刷更新的log

yarn队列资源不足导致application直接失败

现象:

如果说,你是基于yarn来提交spark。比如yarn-cluster或者yarn-client。你可以指定提交到某个hadoop队列上的。每个队列都是可以有自己的资源的。

跟大家说一个生产环境中的,给spark用的yarn资源队列的情况:500G内存,200cpu core

比如说,某个spark application,在spark-submit里面你自己配了,executor80个;每个executor4G内存;每个executor2cpu core。你的spark作业每次运行,大概要消耗掉320G内存,以及160cpu core

乍看起来,咱们的队列资源,是足够的,500G内存,280cpu core

首先,第一点,你的spark作业实际运行起来以后,耗费掉的资源量,可能是比你在spark-submit里面配置的,以及你预期的,是要大一些的。400G内存,190cpu core

那么这个时候,的确,咱们的队列资源还是有一些剩余的。但是问题是,如果你同时又提交了一个spark作业上去,一模一样的。那就可能会出问题。

第二个spark作业,又要申请320G内存+160cpu core。结果,发现队列资源不足。。。。

此时,可能会出现两种情况:(备注,具体出现哪种情况,跟你的YARNHadoop的版本,你们公司的一些运维参数,以及配置、硬件、资源肯能都有关系)

1YARN,发现资源不足时,你的spark作业,并没有hang在那里,等待资源的分配,而是直接打印一行faillog,直接就fail掉了。

2YARN,发现资源不足,你的spark作业,就hang在那里。一直等待之前的spark作业执行完,等待有资源分配给自己来执行。

采用如下方案:

1、在你的J2EE(我们这个项目里面,spark作业的运行,之前说过了,J2EE平台触发的,执行spark-submit脚本),限制,同时只能提交一个spark作业到yarn上去执行,确保一个spark作业的资源肯定是有的。

2、你应该采用一些简单的调度区分的方式,比如说,你有的spark作业可能是要长时间运行的,比如运行30分钟;有的spark作业,可能是短时间运行的,可能就运行2分钟。此时,都提交到一个队列上去,肯定不合适。很可能出现30分钟的作业卡住后面一大堆2分钟的作业。分队列,可以申请(跟你们的YARNHadoop运维的同学申请)。你自己给自己搞两个调度队列。每个队列的根据你要执行的作业的情况来设置。在你的J2EE程序里面,要判断,如果是长时间运行的作业,就干脆都提交到某一个固定的队列里面去把;如果是短时间运行的作业,就统一提交到另外一个队列里面去。这样,避免了长时间运行的作业,阻塞了短时间运行的作业。

3、你的队列里面,无论何时,只会有一个作业在里面运行。那么此时,就应该用我们之前讲过的性能调优的手段,去将每个队列能承载的最大的资源,分配给你的每一个spark作业,比如80executor6G的内存;3cpu core。尽量让你的spark作业每一次运行,都达到最满的资源使用率,最快的速度,最好的性能;并行度,240cpu core720task

4、在J2EE中,通过线程池的方式(一个线程池对应一个资源队列),来实现上述我们说的方案。

序列化导致报错

你会看到什么样的序列化导致的报错?

client模式去提交spark作业,观察本地打印出来的log。如果出现了类似于SerializableSerialize等等字眼,报错的log,那么恭喜大家,就碰到了序列化问题导致的报错。

虽然是报错,但是序列化报错,应该是属于比较简单的了,很好处理。

序列化报错要注意的三个点:

1、你的算子函数里面,如果使用到了外部的自定义类型的变量,那么此时,就要求你的自定义类型,必须是可序列化的。

final Teacher teacher = new Teacher("leo");

studentsRDD.foreach(new VoidFunction() {

public void call(Row row) throws Exception {

  String teacherName = teacher.getName();

  .... 

}

});

public class Teacher implements Serializable {

}

2、如果要将自定义的类型,作为RDD的元素类型,那么自定义的类型也必须是可以序列化的

JavaPairRDD<Integer, Teacher> teacherRDD

JavaPairRDD<Integer, Student> studentRDD

studentRDD.join(teacherRDD)

public class Teacher implements Serializable {

}

public class Student implements Serializable {

}

3、不能在上述两种情况下,去使用一些第三方的,不支持序列化的类型

Connection conn =

studentsRDD.foreach(new VoidFunction() {

public void call(Row row) throws Exception {

  conn.....

}

});

Connection是不支持序列化的

算子函数返回null

在算子函数中,返回null

//  return actionRDD.mapToPair(new PairFunction<Row, String, Row>() {

//  private static final long serialVersionUID = 1L;

//  @Override

//  public Tuple2<String, Row> call(Row row) throws Exception {

//  return new Tuple2<String, Row>("-999", RowFactory.createRow("-999")); 

//  }

//  });

大家可以看到,在有些算子函数里面,是需要我们有一个返回值的。但是,有时候,我们可能对某些值,就是不想有什么返回值。我们如果直接返回NULL的话,那么可以不幸的告诉大家,是不行的,会报错的。

Scala.Math(NULL),异常

如果碰到你的确是对于某些值,不想要有返回值的话,有一个解决的办法:

1、在返回的时候,返回一些特殊的值,不要返回null,比如“-999”

2、在通过算子获取到了一个RDD之后,可以对这个RDD执行filter操作,进行数据过滤。filter内,可以对数据进行判定,如果是-999,那么就返回false,给过滤掉就可以了。

3、大家不要忘了,之前咱们讲过的那个算子调优里面的coalesce算子,在filter之后,可以使用coalesce算子压缩一下RDDpartition的数量,让各个partition的数据比较紧凑一些。也能提升一些性能。

yarn-client导致网卡流量激增

yarn-client模式下,会产生什么样的问题呢?

由于咱们的driver是启动在本地机器的,而且driver是全权负责所有的任务的调度的,也就是说要跟yarn集群上运行的多个executor进行频繁的通信(中间有task的启动消息、task的执行统计消息、task的运行状态、shuffle的输出结果)。

咱们来想象一下。比如你的executor100个,stage10个,task1000个。每个stage运行的时候,都有1000task提交到executor上面去运行,平均每个executor10task。接下来问题来了,driver要频繁地跟executor上运行的1000task进行通信。通信消息特别多,通信的频率特别高。运行完一个stage,接着运行下一个stage,又是频繁的通信。

在整个spark运行的生命周期内,都会频繁的去进行通信和调度。所有这一切通信和调度都是从你的本地机器上发出去的,和接收到的。这是最要人命的地方。你的本地机器,很可能在30分钟内(spark作业运行的周期内),进行频繁大量的网络通信。那么此时,你的本地机器的网络通信负载是非常非常高的。会导致你的本地机器的网卡流量会激增!!!

你的本地机器的网卡流量激增,当然不是一件好事了。因为在一些大的公司里面,对每台机器的使用情况,都是有监控的。不会允许单个机器出现耗费大量网络带宽等等这种资源的情况。运维人员。可能对公司的网络,或者其他(你的机器还是一台虚拟机),对其他机器,都会有负面和恶劣的影响。

---------------------------------------------

解决的方法:

实际上解决的方法很简单,就是心里要清楚,yarn-client模式是什么情况下,可以使用的?yarn-client模式,通常咱们就只会使用在测试环境中,你写好了某个spark作业,打了一个jar包,在某台测试机器上,用yarn-client模式去提交一下。因为测试的行为是偶尔为之的,不会长时间连续提交大量的spark作业去测试。还有一点好处,yarn-client模式提交,可以在本地机器观察到详细全面的log。通过查看log,可以去解决线上报错的故障(troubleshooting)、对性能进行观察并进行性能调优。

实际上线了以后,在生产环境中,都得用yarn-cluster模式,去提交你的spark作业。

yarn-cluster模式,就跟你的本地机器引起的网卡流量激增的问题,就没有关系了。也就是说,就算有问题,也应该是yarn运维团队和基础运维团队之间的事情了。使用了yarn-cluster模式以后,就不是你的本地机器运行Driver,进行task调度了。是yarn集群中,某个节点会运行driver进程,负责task调度。

yarn-cluster导致jvm内存溢出无法执行问题

实践经验,碰到的yarn-cluster的问题:

有的时候,运行一些包含了spark sqlspark作业,可能会碰到yarn-client模式下,可以正常提交运行;yarn-cluster模式下,可能是无法提交运行的,会报出JVMPermGen(永久代)的内存溢出,OOM

yarn-client模式下,driver是运行在本地机器上的,spark使用的JVMPermGen的配置,是本地的spark-class文件(spark客户端是默认有配置的),JVM的永久代的大小是128M,这个是没有问题的;但是呢,在yarn-cluster模式下,driver是运行在yarn集群的某个节点上的,使用的是没有经过配置的默认设置(PermGen永久代大小),82M

spark-sql,它的内部是要进行很复杂的SQL的语义解析、语法树的转换等等,特别复杂,在这种复杂的情况下,如果说你的sql本身特别复杂的话,很可能会比较导致性能的消耗,内存的消耗。可能对PermGen永久代的占用会比较大。

所以,此时,如果对永久代的占用需求,超过了82M的话,但是呢又在128M以内;就会出现如上所述的问题,yarn-client模式下,默认是128M,这个还能运行;如果在yarn-cluster模式下,默认是82M,就有问题了。会报出PermGen Out of Memory error log

----------------------------------------------------

如何解决这种问题?

既然是JVMPermGen永久代内存溢出,那么就是内存不够用。咱们呢,就给yarn-cluster模式下的,driverPermGen多设置一些。

spark-submit脚本中,加入以下配置即可:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

这个就设置了driver永久代的大小,默认是128M,最大是256M。那么,这样的话,就可以基本保证你的spark作业不会出现上述的yarn-cluster模式导致的永久代内存溢出的问题。

--------------------------------------------------------

spark sqlsql,要注意,一个问题

sql,有大量的or语句。比如where keywords='' or keywords='' or keywords=''

当达到or语句,有成百上千的时候,此时可能就会出现一个driver端的jvm stack overflowJVM栈内存溢出的问题

JVM栈内存溢出,基本上就是由于调用的方法层级过多,因为产生了大量的,非常深的,超出了JVM栈深度限制的,递归。递归方法。我们的猜测,spark sql,有大量or语句的时候,spark sql内部源码中,在解析sql,比如转换成语法树,或者进行执行计划的生成的时候,对or的处理是递归。or特别多的话,就会发生大量的递归。

JVM Stack Memory Overflow,栈内存溢出。

这种时候,建议不要搞那么复杂的spark sql语句。采用替代方案:将一条sql语句,拆解成多条sql语句来执行。每条sql语句,就只有100or子句以内;一条一条SQL语句来执行。根据生产环境经验的测试,一条sql语句,100or子句以内,是还可以的。通常情况下,不会报那个栈内存溢出。

错误的持久化方式以及checkpoint使用

出现上述情况的时候,接下来,如果要对这个RDD执行某些操作,可能会发现RDD的某个partition找不到了。

对消失的partition重新计算,计算完以后再缓存和使用。

有些时候,计算某个RDD,可能是极其耗时的。可能RDD之前有大量的父RDD。那么如果你要重新计算一个partition,可能要重新计算之前所有的父RDD对应的partition

这种情况下,就可以选择对这个RDD进行checkpoint,以防万一。进行checkpoint,就是说,会将RDD的数据,持久化一份到容错的文件系统上(比如hdfs)。

在对这个RDD进行计算的时候,如果发现它的缓存数据不见了。优先就是先找一下有没有checkpoint数据(到hdfs上面去找)。如果有的话,就使用checkpoint数据了。不至于说是去重新计算。

checkpoint,其实就是可以作为是cache的一个备胎。如果cache失效了,checkpoint就可以上来使用了。

checkpoint有利有弊,利在于,提高了spark作业的可靠性,一旦发生问题,还是很可靠的,不用重新计算大量的rdd;但是弊在于,进行checkpoint操作的时候,也就是将rdd数据写入hdfs中的时候,还是会消耗性能的。checkpoint,用性能换可靠性。

---------------------------------------------------

checkpoint原理:

1、在代码中,用SparkContext,设置一个checkpoint目录,可以是一个容错文件系统的目录,比如hdfs

2、在代码中,对需要进行checkpointrdd,执行RDD.checkpoint()

3RDDCheckpointDataspark内部的API),接管你的RDD,会标记为marked for checkpoint,准备进行checkpoint

4、你的job运行完之后,会调用一个finalRDD.doCheckpoint()方法,会顺着rdd lineage,回溯扫描,发现有标记为待checkpointrdd,就会进行二次标记,inProgressCheckpoint,正在接受checkpoint操作

5job执行完之后,就会启动一个内部的新job,去将标记为inProgressCheckpointrdd的数据,都写入hdfs文件中。(备注,如果rdd之前cache过,会直接从缓存中获取数据,写入hdfs中;如果没有cache过,那么就会重新计算一遍这个rdd,再checkpoint

6、将checkpoint过的rdd之前的依赖rdd,改成一个CheckpointRDD*,强制改变你的rddlineage。后面如果rddcache数据获取失败,直接会通过它的上游CheckpointRDD,去容错的文件系统,比如hdfs,中,获取checkpoint的数据。

数据倾斜

发生数据倾斜以后的现象:

spark数据倾斜,有两种表现:

1、你的大部分的task,都执行的特别特别快,刷刷刷,就执行完了(你要用client模式,standalone clientyarn client,本地机器主要一执行spark-submit脚本,就会开始打印log),task175 finished;剩下几个task,执行的特别特别慢,前面的task,一般1s可以执行完5个;最后发现1000task998999 task,要执行1个小时,2个小时才能执行完一个task

出现数据倾斜了

还算好的,因为虽然老牛拉破车一样,非常慢,但是至少还能跑。

2、运行的时候,其他task都刷刷刷执行完了,也没什么特别的问题;但是有的task,就是会突然间,啪,报了一个OOMJVM Out Of Memory,内存溢出了,task failedtask lostresubmitting task。反复执行几次都到了某个task就是跑不通,最后就挂掉。

某个task就直接OOM,那么基本上也是因为数据倾斜了,task分配的数量实在是太大了!!!所以内存放不下,然后你的task每处理一条数据,还要创建大量的对象。内存爆掉了。

出现数据倾斜了

这种就不太好了,因为你的程序如果不去解决数据倾斜的问题,压根儿就跑不出来。

--------------------------------

定位原因与出现问题的位置:

根据log去定位

出现数据倾斜的原因,基本只可能是因为发生了shuffle操作,在shuffle的过程中,出现了数据倾斜的问题。因为某个,或者某些key对应的数据,远远的高于其他的key

1、你在自己的程序里面找找,哪些地方用了会产生shuffle的算子,groupByKeycountByKeyreduceByKeyjoin

2、看log

log一般会报是在你的哪一行代码,导致了OOM异常;或者呢,看log,看看是执行到了第几个stage!!!

我们这里不会去剖析stage的划分算法spark代码,是怎么划分成一个一个的stage的。哪一个stagetask特别慢,就能够自己用肉眼去对你的spark代码进行stage的划分,就能够通过stage定位到你的代码,哪里发生了数据倾斜去找找,代码那个地方,是哪个shuffle操作。

聚合源数据

数据倾斜,解决方案,第一个方案和第二个方案,一起来讲。最朴素、最简谱、最直接、最有效、最简单的,解决数据倾斜问题的方案。

第一个方案:聚合源数据

第二个方案:过滤导致倾斜的key

重剑无锋。后面的五个方案,尤其是最后4个方案,都是那种特别炫酷的方案。双重group聚合方案;sample抽样分解聚合方案;如果碰到了数据倾斜的问题。上来就先考虑考虑第一个和第二个方案,能不能做,如果能做的话,后面的5个方案,都不用去搞了。

-----------------------------------------------

第一个方案:聚合源数据

咱们现在,做一些聚合的操作,groupByKeyreduceByKeygroupByKey,说白了,就是拿到每个key对应的valuesreduceByKey,说白了,就是对每个key对应的values执行一定的计算。

现在这些操作,比如groupByKeyreduceByKey,包括之前说的join。都是在spark作业中执行的。

spark作业的数据来源,通常是哪里呢?90%的情况下,数据来源都是hive表(hdfs,大数据分布式存储系统)。hdfs上存储的大数据。hive表,hive表中的数据,通常是怎么出来的呢?有了spark以后,hive比较适合做什么事情?hive就是适合做离线的,晚上凌晨跑的,ETLextract transform load,数据的采集、清洗、导入),hive sql,去做这些事情,从而去形成一个完整的hive中的数据仓库;说白了,数据仓库,就是一堆表。

spark作业的源表,hive表,其实通常情况下来说,也是通过某些hive etl生成的。hive etl可能是晚上凌晨在那儿跑。今天跑昨天的数九。

数据倾斜,某个key对应的80万数据,某些key对应几百条,某些key对应几十条;现在,咱们直接在生成hive表的hive etl中,对数据进行聚合。比如按key来分组,将key对应的所有的values,全部用一种特殊的格式,拼接到一个字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火锅|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001

key进行group,在spark中,拿到key=sessionidvalues<Iterable>hive etl中,直接对key进行了聚合。那么也就意味着,每个key就只对应一条数据。在spark中,就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串,map操作,进行你需要的操作即可。key,values串。

spark中,能对这个操作,就不需要执行shffule操作了,也就根本不可能导致数据倾斜。

或者是,对每个keyhive etl中进行聚合,对所有values聚合一下,不一定是拼接起来,可能是直接进行计算。reduceByKey,计算函数,应用在hive etl中,每个keyvalues

-------------------------------------------------

聚合源数据方案,第二种做法

你可能没有办法对每个key,就聚合出来一条数据;

那么也可以做一个妥协;对每个key对应的数据,10万条;有好几个粒度,比如10万条里面包含了几个城市、几天、几个地区的数据,现在放粗粒度;直接就按照城市粒度,做一下聚合,几个城市,几天、几个地区粒度的数据,都给聚合起来。比如说

city_id date area_id

select ... from ... group by city_id

尽量去聚合,减少每个key对应的数量,也许聚合到比较粗的粒度之后,原先有10万数据量的key,现在只有1万数据量。减轻数据倾斜的现象和问题。

-----------------------------------------------

第二个方案:过滤导致倾斜的key

如果你能够接受某些数据,在spark作业中直接就摒弃掉,不使用。比如说,总共有100万个key。只有2key,是数据量达到10万的。其他所有的key,对应的数量都是几十。

这个时候,你自己可以去取舍,如果业务和需求可以理解和接受的话,在你从hive表查询源数据的时候,直接在sql中用where条件,过滤掉某几个key

那么这几个原先有大量数据,会导致数据倾斜的key,被过滤掉之后,那么在你的spark作业中,自然就不会发生数据倾斜了。

提高reduce并行度

提升shuffle reduce端并行度,怎么来操作?

很简单,主要给我们所有的shuffle算子,比如groupByKeycountByKeyreduceByKey。在调用的时候,传入进去一个参数。一个数字。那个数字,就代表了那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应着创建指定数量的reduce task

这样的话,就可以让每个reduce task分配到更少的数据。基本可以缓解数据倾斜的问题。

比如说,原本某个task分配数据特别多,直接OOM,内存溢出了,程序没法运行,直接挂掉。按照log,找到发生数据倾斜的shuffle操作,给它传入一个并行度数字,这样的话,原先那个task分配到的数据,肯定会变少。就至少可以避免OOM的情况,程序至少是可以跑的。

-----------------------------------------

提升shuffle reduce并行度的缺陷

治标不治本的意思,因为,它没有从根本上改变数据倾斜的本质和问题。不像第一个和第二个方案(直接避免了数据倾斜的发生)。原理没有改变,只是说,尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题。

实际生产环境中的经验。

1、如果最理想的情况下,提升并行度以后,减轻了数据倾斜的问题,或者甚至可以让数据倾斜的现象忽略不计,那么就最好。就不用做其他的数据倾斜解决方案了。

2、不太理想的情况下,就是比如之前某个task运行特别慢,要5个小时,现在稍微快了一点,变成了4个小时;或者是原先运行到某个task,直接OOM,现在至少不会OOM了,但是那个task运行特别慢,要5个小时才能跑完。

那么,如果出现第二种情况的话,各位,就立即放弃第三种方案,开始去尝试和选择后面的四种方案。

随机key实现双重聚合

reduce join 转换为 map join

reduce join转换为map join,适合在什么样的情况下,可以来使用?

如果两个RDD要进行join,其中一个RDD是比较小的。一个RDD100万数据,一个RDD1万数据。(一个RDD1亿数据,一个RDD100万数据)

其中一个RDD必须是比较小的,broadcast出去那个小RDD的数据以后,就会在每个executorblock manager中都驻留一份。要确保你的内存足够存放那个小RDD中的数据

这种方式下,根本不会发生shuffle操作,肯定也不会发生数据倾斜;从根本上杜绝了join操作可能导致的数据倾斜的问题;

对于join中有数据倾斜的情况,大家尽量第一时间先考虑这种方式,效果非常好;如果某个RDD比较小的情况下。

sample采样倾斜key两次join

这种方案什么时候适合使用?

优先对于join,肯定是希望能够采用上一讲讲的,reduce join转换map join。两个RDD数据都比较大,那么就不要那么搞了。

针对你的RDD的数据,你可以自己把它转换成一个中间表,或者是直接用countByKey()的方式,你可以看一下这个RDD各个key对应的数据量;此时如果你发现整个RDD就一个,或者少数几个key,是对应的数据量特别多;尽量建议,比如就是一个key对应的数据量特别多。

此时可以采用咱们的这种方案,单拉出来那个最多的key;单独进行join,尽可能地将key分散到各个task上去进行join操作。

什么时候不适用呢?

如果一个RDD中,导致数据倾斜的key,特别多;那么此时,最好还是不要这样了;还是使用我们最后一个方案,终极的join数据倾斜的解决方案。

使用随机数以及扩容表join

这个点感觉有问题

局限性:

1、因为你的两个RDD都很大,所以你没有办法去将某一个RDD扩的特别大,一般咱们就是10倍。

2、如果就是10倍的话,那么数据倾斜问题,的确是只能说是缓解和减轻,不能说彻底解决。

spark streaming 优化

1、并行化数据接收:处理多个topic的数据时比较有效

int numStreams = 5;

List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);

for (int i = 0; i < numStreams; i++) {

  kafkaStreams.add(KafkaUtils.createStream(...));

}

JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));

unifiedStream.print();

2、spark.streaming.blockInterval:增加block数量,增加每个batch rddpartition数量,增加处理并行度

receiver从数据源源源不断地获取到数据;首先是会按照block interval,将指定时间间隔的数据,收集为一个block;默认时间是200ms,官方推荐不要小于50ms;接着呢,会将指定batch interval时间间隔内的block,合并为一个batch;创建为一个rdd,然后启动一个job,去处理这个batch rdd中的数据

batch rdd,它的partition数量是多少呢?一个batch有多少个block,就有多少个partition;就意味着并行度是多少;就意味着每个batch rdd有多少个task会并行计算和处理。

当然是希望可以比默认的task数量和并行度再多一些了;可以手动调节block interval;减少block interval;每个batch可以包含更多的block;有更多的partition;也就有更多的task并行处理每个batch rdd

定死了,初始的rdd过来,直接就是固定的partition数量了

3、inputStream.repartition(<number of partitions>):重分区,增加每个batch rddpartition数量

有些时候,希望对某些dstream中的rdd进行定制化的分区

dstream中的rdd进行重分区,去重分区成指定数量的分区,这样也可以提高指定dstreamrdd的计算并行度

4、调节并行度

spark.default.parallelism

reduceByKey(numPartitions)

5、使用Kryo序列化机制:

spark streaming,也是有不少序列化的场景的

提高序列化task发送到executor上执行的性能,如果task很多的时候,task序列化和反序列化的性能开销也比较可观

默认输入数据的存储级别是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到数据,默认就会进行持久化操作;首先序列化数据,存储到内存中;如果内存资源不够大,那么就写入磁盘;而且,还会写一份冗余副本到其他executorblock manager中,进行数据冗余。

6batch interval:每个的处理时间必须小于batch interval

实际上你的spark streaming跑起来以后,其实都是可以在spark ui上观察它的运行情况的;可以看到batch的处理时间;

如果发现batch的处理时间大于batch interval,就必须调节batch interval

尽量不要让batch处理时间大于batch interval

比如你的batch每隔5秒生成一次;你的batch处理时间要达到6秒;就会出现,batch在你的内存中日积月累,一直囤积着,没法及时计算掉,释放内存空间;而且对内存空间的占用越来越大,那么此时会导致内存空间快速消耗

如果发现batch处理时间比batch interval要大,就尽量将batch interval调节大

spark2.0

spark core and spark sql