Skip to main content

moregeek program

spark常规性能调优(二)_蓦然的博客-多极客编程

2、常规性能调优二:RDD优化

1)RDD复用

在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如下图所示

Spark常规性能调优(二)_spark

对上图中的RDD计算架构进行修改,得到下图所示的优化结果

Spark常规性能调优(二)_spark_02

2)RDD持久化

在Spark中,当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费,因此,必须对多次使用的RDD进行持久化 ,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据

对于RDD的持久化,有两点需要说明:

第一,RDD的持久化是可以进行序列化 的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中

第二,如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制 ,对RDD数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本

3)RDD尽可能早的filter操作

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率

3、常规性能调优三:并行度调节

Spark作业中的并行度指各个stage的task的数量

如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个Executor,每个Executor分配3个CPU core,而Spark作业有40个task,这样每个Executor分配到的task个数是2个,这就使得每个Executor有一个CPU core空闲,导致资源的浪费

理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度

Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率

Spark作业并行度的设置如下方代码清单所示

val conf = new SparkConf().set("spark.default.parallelism", "500")

4、常规性能调优四:广播大变量

默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。

假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,如果使用了广播变量, 那么每个Executor保存一个副本,一共消耗400M内存,内存消耗减少了5倍。

广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少 。

在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本 ,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量

5、常规性能调优五:Kryo序列化

默认情况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable接口即可,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。

Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。

Kryo序列化注册方式的实例代码如下方代码清单所示

public class MyKryoRegistrator implements KryoRegistrator {  @Override  public void registerClasses(Kryo kryo)  {  kryo.register(StartupReportLogs.class);  } }

配置Kryo序列化方式的实例代码如下方代码清单所示

//创建SparkConf对象 val conf = new SparkConf().setMaster(…).setAppName(…) //使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  //在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉 conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");

6、常规性能调优六:调节本地化等待时长

Spark作业运行过程中,Driver会对每一个stage的task进行分配。根据Spark的task分配算法,Spark希望task能够运行在它要计算的数据算在的节点 (数据本地化思想),这样就可以避免数据的网络传输。通常来说,task可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark会等待一段时间,默认3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上 ,比如将task分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级

当task要处理的数据不在task所在节点上时,会发生数据的传输。task会通过所在节点的BlockManager获取数据,BlockManager发现数据不在本地时,户通过网络传输组件从数据所在节点的BlockManager处获取数据

网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task将有机会得到执行,这样就能够改善Spark作业的整体性能

Spark的本地化等级如下表所示

名称

解析

PROCESS_LOCAL

进程本地化,task和数据在同一个Executor中,性能最好

NODE_LOCAL

节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输

RACK_LOCAL

机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输

NO_PREF

对于task来说,从哪里获取都一样,没有好坏之分

ANY

task和数据可以在集群的任何地方,而且不在一个机架中,性能最差

在Spark项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看task的本地化级别有没有提升,并观察Spark作业的运行时间有没有缩短

注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark作业的运行时间反而增加了

Spark本地化等待时长的设置如下方代码清单所示

val conf = new SparkConf().set("spark.locality.wait", "6")

©著作权归作者所有:来自51CTO博客作者蓦然1607的原创作品,请联系作者获取转载授权,否则将追究法律责任

【gis开发】esri shapefile(.shp)矢量数据文件读取(c++、python)_爱看书的小沐的博客-多极客编程

1、简介 1.1 什么是Shapefile <font color=blue>ESRI Shapefile(shp),或简称shapefile,是美国环境系统研究所公司(ESRI)开发的一种空间数据开放格式。该文件格式已经成为了地理信息软件界的一个开放标准,这表明ESRI公司在全球的地理信息系统市场的重要性。 GIS 保留的数据大致分为栅格数据和矢量数据: 而矢量数据文件主要有如下

设计模式之桥接模式_wx63311348dcab6的博客-多极客编程

1 桥接模式的定义桥接模式(Bridge Pattem):将抽象部分和实现部分分离,使它们都可以独立地变化。它是一种对象结构型模式,又称柄体模式或者接口模式。2 为什么引入桥接模式当用户采用多继承的方式实现代码时,增加一个新的种类非常不方便(可拓展性差)。如上图,我想增加一个奥迪类,则需要在跑车中增加奥迪跑车类,在SUV中增加奥迪SUV类,这仅仅是两个,如果是多的话会更加的麻烦。同时也违反了 单一

sql进阶篇之约束(constraints)_davieyang的博客-多极客编程

SQL 约束 约束用于限制加入表的数据的类型,可以在创建表时规定约束(通过 CREATE TABLE 语句),或者在表创建之后也可以(通过 ALTER TABLE 语句) SQL约束主要包括以下几种约束: NOT NULL UNIQUE PRIMARY KEY FOREIGN KEY CHECK DEFAULT SQL NOT NULL 约束 NOT NULL 约束强制列不接受 NULL

阿里云大数据开发一面面经,已过,面试题已配答案_蓦然的博客-多极客编程

这份面试题时群里一位小伙伴分享的,我给这份面试题找了一些参考答案参考答案来源:​​大数据面试题V3.0,523道题,779页,46w字​​1、实习经历这一点就不多说了,每个人都不一样,根据自己的介绍就行。2、简单介绍wordcount先来看一张图具体各个阶段做了什么spliting :Documents会根据切割规则被切成若干块,map阶段:然后进行Map过程,Map会并行读取文本,对读取的单词进

一文读懂hbase_程序员路遥的博客-多极客编程

HBase是一个分布式、可扩展、支持海量数据存储的NoSQL数据库。底层物理存储是以Key-Value的数据格式存储的,HBase中的所有数据文件都存储在Hadoop HDFS文件系统上。一、主要组件HBase详细架构图解注意:HBase是依赖ZooKeeper和HDFS的,需要启动ZooKeeper和HDFS。1. Client提供了访问HBase的一系列API接口,如Java Native A

sql进阶篇之高级应用_davieyang的博客-多极客编程

SQL CREATE DATABASE 语句 CREATE DATABASE 用于创建数据库。 SQL CREATE DATABASE 语法 CREATE DATABASE database_name SQL CREATE DATABASE 实例 创建一个名为 "my_db" 的数据库,使用下面的 CREATE DATABASE 语句: CREATE DATABASE my_db SQL

sql进阶篇之约束(constraints)_davieyang的博客-多极客编程

SQL 约束 约束用于限制加入表的数据的类型,可以在创建表时规定约束(通过 CREATE TABLE 语句),或者在表创建之后也可以(通过 ALTER TABLE 语句) SQL约束主要包括以下几种约束: NOT NULL UNIQUE PRIMARY KEY FOREIGN KEY CHECK DEFAULT SQL NOT NULL 约束 NOT NULL 约束强制列不接受 NULL

sql进阶篇之多表联查_davieyang的博客-多极客编程

SQL Alias(别名) 通过使用 SQL,可以为列名称和表名称指定别名(Alias) 表的 SQL Alias 语法 SELECT column_name(s) FROM table_name AS alias_name 列的 SQL Alias 语法 SELECT column_name AS alias_name FROM table_name Alias 实例: 使用表名称别名

spark常见数据倾斜情况及调优方案_蓦然的博客-多极客编程

1、数据倾斜Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配到了1万条数据,计算5分钟内完成,第三个task分配到了98万数据,此时第三个task可能需要10个小时完成,这使得整个Spark作业需要10个小时才能运行完成,

阿里云大数据开发一面面经,已过,面试题已配答案_蓦然的博客-多极客编程

这份面试题时群里一位小伙伴分享的,我给这份面试题找了一些参考答案参考答案来源:​​大数据面试题V3.0,523道题,779页,46w字​​1、实习经历这一点就不多说了,每个人都不一样,根据自己的介绍就行。2、简单介绍wordcount先来看一张图具体各个阶段做了什么spliting :Documents会根据切割规则被切成若干块,map阶段:然后进行Map过程,Map会并行读取文本,对读取的单词进

sql进阶篇之高级应用_davieyang的博客-多极客编程

SQL CREATE DATABASE 语句 CREATE DATABASE 用于创建数据库。 SQL CREATE DATABASE 语法 CREATE DATABASE database_name SQL CREATE DATABASE 实例 创建一个名为 "my_db" 的数据库,使用下面的 CREATE DATABASE 语句: CREATE DATABASE my_db SQL

sql进阶篇之函数_davieyang的博客-多极客编程

SQL 拥有很多可用于计数和计算的内建函数 函数的语法 SELECT function(列) FROM 表 函数类型 Aggregate 函数:操作面向一系列的值,并返回一个单一的值(注:如果在 SELECT 语句的项目列表中的众多其它表达式中使用 SELECT 语句,则这个 SELECT 必须使用 GROUP BY 语句) Scalar 函数:操作面向某个单一的值,并返回基于输入值的一个单一