Spark 优化

zhuzhuya / 2024-11-13 / 原文

Spark 优化

  • 定义和目标
    定义: Spark 优化是指通过调整 Spark 应用程序的配置参数、代码结构和数据处理方式,以提高 Spark 作业的性能和效率。
    目标: 优化的目标包括减少作业的执行时间、降低资源消耗、提高吞吐量等。优化可以涉及到多个方面,如内存管理、数据分区、任务调度、代码优化等。

一、参数优化

1、num-executors: executor的数量
2、executor-memory:每个executor的内存
3、executor-cores:每个executor的核数
4、driver-memory:driver的内存
5、spark.storage.memoryFraction:用于缓存的内存占比默认0.6
6、spark.shuffle.memoryFraction:spark shuffle 使用内存占比,默认0.2
7、spark.locality.wait :task执行时,等待时间,默认3秒
50G的数据需要多少资源
1、资源充足:每一个task由一个core处理,效率最高的(会浪费资源)
总的资源:400core,800G内存
--num-executors 50
--executor-cores 8
--executor-memory 16G

2、资源充足,合理利用资源,核的数在task数量的1/3-1/2之间就可以(充分利用资源)
--num-executors 25
--executor-cores 8
--executor-memory 16G

3、资源不足:看剩余资源,总的核数在剩余核的1/3-1/2之间
5台服务器(48核,128G, 10TB硬盘)= 总资源大概(200核,500G)
executor的数量最好小于等于服务器数量
--num-executors 5
--executor-cores 20
--executor-memory 40G

二、代码优化

缓存
缓存会将表的数据加载到Executo`r的内存或者磁盘上,如果表的数据量太大了,超过内存的上线,就没有必要使用缓存了,所以在使用缓存时需要注意以下几点:

1、理解缓存的作用和优势

(1)提高性能
当一个 RDD(弹性分布式数据集)被缓存后,Spark 会将其数据存储在内存或磁盘中,以便后续的操作可以更快地访问这些数据,避免重复计算。
例如,在一个迭代算法中,如果中间结果被缓存,那么每次迭代都可以直接从缓存中读取数据,而不需要重新计算,从而大大提高了算法的执行速度。
(2)减少网络传输
如果一个 RDD 是从远程数据源(如 HDFS 或 S3)读取的,缓存可以避免每次操作都从远程数据源读取数据,减少网络传输开销。
例如,在一个数据仓库应用中,如果需要多次查询同一个数据源,将该数据源对应的 RDD 缓存起来可以显著减少网络传输时间。

2、选择合适的存储级别

Spark 提供了多种存储级别,包括 MEMORY_ONLY、MEMORY_AND_DISK、DISK_ONLY 等。在选择存储级别时,需要注意各自的特点:

MEMORY_ONLY:内存充足,最快的访问速度,适用内存充足数据量不是很大时,建议优先选择这个

MEMORY_AND_DISK_SER:压缩之后放内存,放不下在放磁盘

MEMORY_ONLY_SER:压缩之后放内存,CPU的时间换内存空间

DISK_ONLY:如果数据可靠性要求较高,可以选择将数据存储在磁盘上,但尽量不选择

3、代码实现

-- 在sql中使用 
cache table students;
uncache table students;

-- 在DSL中使用
 studentDF.cache()
 studentDF.unpersist()

 -- 在RDD中使用
 rdd.cache()
 rdd.unpersist()

使用高性能的算子

使用reduceByKey/aggregateByKey替代groupByKey

reduceByKey

1、对相同key的value进行聚合计算
2、会在每一个map task中对相同key的value预聚合,可以减少shuffle过程中传输的数据量,提高效率
3、只能处理相对简单的逻辑

studentsRDD
      .reduceByKey(_+_)
      .foreach(println)

aggregateByKey是一个对(K,V)类型的 RDD(弹性分布式数据集)进行聚合操作的函数。它允许你在每个分区内进行部分聚合,然后在跨分区进行全局聚合,从而有效地处理大规模数据集。

val countRDD: RDD[(String, Int)] = kvRD
      .aggregateByKey(0)(
        seqOp = (u, v) => u + v, //map端聚合逻辑
        combOp = (u1, u2) => u1 + u2 // reduce端聚合逻辑
      )

使用mapPartitions替代普通map Transformation算子

mapPartitionsforeachPartition一样,当与需要读取外部数据时使用

比如创建数据库连接,使用MapPartitions时,只会在分区内创建一次

使用foreachPartitions替代foreach Action算子

 object DemoToMysql {
  def main(args: Array[String]): Unit = {
    //1、创建spark的执行环境
    val conf = new SparkConf()
    //设置运行模式
    conf.setMaster("local")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)
    //2、读取数据
    //RDD:弹性的分布式数据集(相当于List)
    val linesRDD: RDD[String] = sc.textFile("data/words.txt")
    
    //一行转换多行
    val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))
    val kvRD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))

    //统计单词的数量
    val countRDD: RDD[(String, Int)] = kvRD.reduceByKey((x, y) => x + y)

    val start: Long = System.currentTimeMillis()
    //foreachPartition: 训练分区
    countRDD.foreachPartition(iter => {
      //1 创建数据库连接
      //每一个分区创建一个数据库连接
      val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
      val end: Long = System.currentTimeMillis()
      println(end - start)

      //在分区内循环
      iter.foreach {
        case (word, count) =>
          //2 编写sql插入数据
          val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")
          stat.setString(1, word)
          stat.setInt(2, count)
          stat.execute()
      }
      con.close()
    })

coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition

object DemoRePartition {
  def main(args: Array[String]): Unit = {
    //1、创建spark的执行环境
    val conf = new SparkConf()
    //设置运行模式
    conf.setMaster("local")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)

    //可以在读取数据时指定最小分区数设定RDD的分区数,需要保证每一个分区被等分
    val studentsRDD: RDD[String] = sc.textFile("data/test3", 3)
    println(s"studentsRDD分区数:${studentsRDD.getNumPartitions}")

    //读取数据时,如果有很多小文件,可以合并小文件
    val mergeRDD: RDD[String] = studentsRDD.coalesce(2, shuffle = false)
    println(s"mergeRDD分区数:${mergeRDD.getNumPartitions}")
    mergeRDD.foreach(println)


//    //repartition: 重分区,会产生shuffle
//    val rePartitionRDD: RDD[String] = studentsRDD.repartition(100)
//    println(s"rePartitionRDD分区数:${rePartitionRDD.getNumPartitions}")
//    rePartitionRDD.saveAsTextFile("data/test3")
//
//    //coalesce(1, shuffle = false): 合并分区,不会产生shuffle
//    //一班用于最后合并小文件
//    val coalesceRDD: RDD[String] = rePartitionRDD.coalesce(1, shuffle = false)
//    println(s"coalesceRDD分区数:${rePartitionRDD.getNumPartitions}")
//
//    coalesceRDD.saveAsTextFile("data/test4")
  }
}

广播大变量(Map join)
概念
在 Spark 中,广播变量是一种共享变量。当在分布式计算环境中,多个任务(Task)可能需要访问相同的数据时,如果每个任务都去获取原始数据副本,会导致数据的重复存储和传输,浪费大量的网络资源和内存。广播变量就是为了解决这个问题而设计的。它允许程序员将一个只读的数据变量(如一个大的查找表或者配置参数)在每个节点(Node)上缓存一份副本,这样各个任务就可以在本地访问这个数据,而不需要通过网络反复传输。
例如,假设你有一个机器学习任务,需要使用一个预训练好的模型参数文件,这个文件比较大。如果没有广播变量,每次在节点上执行的任务都要从存储系统中读取这个文件,这会导致大量的 I/O 开销。通过使用广播变量,这个文件可以被广播到各个节点并缓存起来,任务在本地就可以直接使用,大大提高了效率。
创建和使用
创建广播变量:在 Spark 中,可以使用SparkContext.broadcast()方法来创建广播变量。

//1、将变量广播到Executor端
    val broIds: Broadcast[Array[String]] = sc.broadcast(ids)

使用广播变量:在 Spark 的转换(Transform)和动作(Action)操作中,可以使用广播变量。在 RDD(弹性分布式数据集)的操作函数(如map、filter等)中,可以通过value属性来访问广播变量的值

//2、获取广播变量
        broIds.value.contains(id)

广播变量的优势和注意事项
优势
减少网络传输:通过在每个节点上缓存数据,避免了数据的重复网络传输,特别是对于大数据集或者频繁使用的数据,这可以显著减少网络 I/O 开销。
提高性能:由于数据在本地节点缓存,任务可以更快地访问数据,从而提高了整体计算速度。对于一些对性能敏感的应用场景,如实时数据处理或者迭代计算,广播变量的使用可以带来明显的性能提升。
注意事项
数据不可变性:广播变量是只读的。这是因为在分布式环境中,如果多个任务可以修改广播变量的值,会导致数据的不一致性和难以预测的结果。所以在使用广播变量时,要确保广播的数据在整个计算过程中不需要修改。
变量生命周期:广播变量的生命周期是与 SparkContext 相关联的。当 SparkContext 被销毁时,广播变量也会被销毁。因此,在使用广播变量时,要注意 SparkContext 的生命周期管理,避免在不适当的时候引用已经销毁的广播变量。
大小:一般来说,广播变量的变量大小不超过100M
今天的分享就到这里了,期待下次给你们分享更多干货!!!