手机网站建设网,安徽金鹏建设集团网站,html静态网站模板下载,提供网站建设的功能简述 SparkRDD中可以包含任何类型的对象#xff0c;在实际应用中#xff0c;“键值对”是一种比较常见的RDD元素类型#xff0c;分组和聚合操作中经常会用到#xff0c;尤其是groupByKey和reduceByKey。 Spark操作中经常会用到“键值对RDD”#xff08;Pair RDD在实际应用中“键值对”是一种比较常见的RDD元素类型分组和聚合操作中经常会用到尤其是groupByKey和reduceByKey。 Spark操作中经常会用到“键值对RDD”Pair RDD用于完成聚合计算。普通RDD里面存储的数据类型是Int、String等而“键值对RDD”里面存储的数据类型是“键值对”。
生产环境用到的操作 以下为我在生产环境用到的操作
WordCount 统计文本中每个单词出现的次数使用Pair RDD将每个单词作为键将出现次数作为值然后进行reduceByKey操作进行聚合。
分组聚合 将具有相同键的元素分组在一起并对每个键的值进行聚合操作如groupByKey、reduceByKey等。
数据连接和关联 使用键值对进行数据的连接和关联操作如join、cogroup等。
数据预处理 对数据进行分组、排序、过滤等预处理操作如groupBy、sortByKey、filter等。
数据分析和统计 使用Pair RDD进行数据分析和统计操作如计算平均值、求和、最大值、最小值等。 通过Pair RDD可以更方便地处理键值对数据实现更灵活和复杂的数据处理和分析需求。
Pair RDD的创建方式
第一种从文件中加载数据创建pairRDD
//测试数据,自己编的,文件名为personID
591,2021,15448329898,北京,彩信
592,2022,15648029823,河北,微信
593,2022,16742329894,山西,电话
594,2020,17748529893,海南,微信
595,2020,19048729896,大连,QQ代码及运行结果
scala val lines sc.textFile(file:///data/testdata/personID.txt)
lines: org.apache.spark.rdd.RDD[String] file:///data/testdata/personID.txt MapPartitionsR DD[1] at textFile at console:23scala val pairRDD lines.flatMap(elem (elem 1))
pairRDD: org.apache.spark.rdd.RDD[Char] MapPartitionsRDD[2] at flatMap at console:23scala val pairRDD lines.flatMap(line line.split(,)).map(word (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[4] at map at console: 23scala pairRDD.foreach(println)
(591,1)0: (0 1) / 1]
(2023,1)
(15448329898,1)
(北京,1)
(彩信,1)
(592,1)
......从代码执行结果来看 返回的结果是键值对类型的RDD即RDD[(String, Int)]。从pairRDD.foreach(println)执行的打印输出结果也可以看到都是由(单词,1)这种形式的键值对。
第二种通过数组Array或集合List创建pairRDD
案例
//使用array数组
scala val array Array(spark, hadoop, flink, hive)
array: Array[String] Array(spark, hadoop, flink, hive)
scala val rdd sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[5] at parallelize at console:24
scala val pairRDD rdd.map(word (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[6] at map at console:23
scala pairRDD.foreach(println)
(spark,1)
(hadoop,1)
(flink,1)
(hive,1)//使用list集合
scala val list List(hadoop,spark,hive)
list: List[String] List(hadoop, spark, hive)scala val rdd sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[7] at parallelize at console:24scala val pairRDD rdd.map(word (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[8] at map at console:23 ^scala pairRDD.foreach(println)
(hadoop,1)
(spark,1)
(hive,1)常用键值对转换操作
常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等
reduceByKey(func)
功能使用func函数合并具有相同键的值。注意这里强调合并相同键。
比如reduceByKey((a,b) ab)有五个键值对(nlp,1) (nlp,1) (spark,1) (nlp,1) (hadoop,1) (hadoop,1)
对具有相同key的键值对进行合并后的结果就是 (spark,1) (hadoop,2) (nlp,3 我们对上面第二种方式创建List集合得到的pairRDD进行reduceByKey()操作代码如下
scala val list List(nlp,nlp,spark,nlp,hadoop,hadoop)
list: List[String] List(nlp, nlp, spark, nlp, hadoop, hadoop)scala val rdd sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[9] at parallelize at console:24scala val pairRDD rdd.map(word (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[10] at map at console:23scala pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)scala pairRDD.reduceByKey((a,b) a b).foreach(println)
(spark,1)
(hadoop,2)
(nlp,3)groupByKey()
功能对具有相同键的值进行分组。注意这里强调对相同的键分成一组。
比如groupByKey((a,b) ab)有五个键值对(nlp,1) (nlp,1) (spark,1) (nlp,1) (hadoop,1) (hadoop,1)
我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作代码如下
scala pairRDD.groupByKey()
res17: org.apache.spark.rdd.RDD[(String, Iterable[Int])] ShuffledRDD[13] at groupByKey at console:24
// 分组后value被保存到Iterable[Int]中scala pairRDD.groupByKey().foreach(println)
(spark,CompactBuffer(1))
(hadoop,CompactBuffer(1, 1))
(nlp,CompactBuffer(1, 1, 1))
keys
功能会把键值对RDD中的key返回形成一个新的RDD。
scala pairRDD.keys
res20: org.apache.spark.rdd.RDD[String] MapPartitionsRDD[16] at keys at console:24scala pairRDD.keys.foreach(println)
nlp
nlp
spark
nlp
hadoop
hadoop可以对返回的key的集合进行操作比如说写入一个List集合中
scala val prirRDDkeysList pairRDD.keys.collect().toList
prirRDDkeysList: List[String] List(nlp, nlp, spark, nlp, hadoop, hadoop)scala val prirRDDkeysArray pairRDD.keys.collect()
prirRDDkeysArray: Array[String] Array(nlp, nlp, spark, nlp, hadoop, hadoop)values
功能 把键值对RDD中的value返回形成一个新的RDD。
scala pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)scala pairRDD.values.foreach(println)
1
1
1
1
1
1将得到的值保存到数组或集合中
scala val prirRDDValuesList pairRDD.values.collect().toList
prirRDDValuesList: List[Int] List(1, 1, 1, 1, 1, 1)scala val prirRDDValueArray pairRDD.values.collect()
prirRDDValueArray: Array[Int] Array(1, 1, 1, 1, 1, 1)注意 为什么会报错value collect is not a member of Unit 因为foreach方法返回的是Unit类型它没有collect方法。
scala val prirRDDValuesList pairRDD.values.foreach(println).collect().toList
:26: error: value collect is not a member of Unit
val prirRDDValuesList pairRDD.values.foreach(println).collect().toList
工作中使用collect导致的内存不足调优 当处理大数据集时可以考虑使用Spark的分布式计算能力来处理数据而不是将所有数据收集到驱动程序中。这样可以避免内存不足的问题。 我使用collect方法将这个RDD中的元素收集到驱动程序并返回一个数组。如果pairRDD中的数据量很大collect操作可能会导致内存不足的问题建议在处理大数据集时谨慎使用collect方法。我们可以用很多方法来避免
使用RDD转换操作可以使用各种RDD转换操作如map、filter、reduceByKey等对数据集进行转换和聚合操作。这些操作在分布式环境下进行可以利用集群中的多个节点进行计算。使用RDD的collect和take方法如果只需要获取部分数据可以使用collect方法将数据收集到驱动程序中确保数据量不会导致内存不足可以使用take方法获取RDD中的前几个元素。使用RDD的sample方法可以使用sample方法对数据进行采样从而获取数据集的一个子集。这样可以在处理大数据集时降低计算和内存的压力。使用Spark SQL或DataFrame如果数据集结构化且存储在支持Spark SQL的数据源中可以使用Spark SQL或DataFrame API进行数据操作和分析。这些API提供了更高级的数据操作和查询功能。使用持久化存储如果需要将处理结果保存下来或供其他程序使用可以将结果存储在持久化存储系统中如HDFS或数据库。这样可以避免将所有数据收集到驱动程序中。 利用集群中的计算资源进行并行计算避免将所有数据收集到驱动程序中可以使用RDD转换操作、采样、分页获取等技术来处理数据。
sortByKey()
功能是返回一个根据键排序的RDD。
scala pairRDD.sortByKey().foreach(println)
(hadoop,1)
(hadoop,1)
(nlp,1)
(nlp,1)
(nlp,1)
(spark,1)mapValues(func) 常用
功能对键值对RDD中的每个value都应用一个函数但是key不会发生变化。 即我只对键值对RDD的value部分进行处理而不是同时对key和value进行处理。例如对四个键值对(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)构成的pairRDD如果执行pairRDD.mapValues(x x1)就会得到一个新的键值对RDD它包含下面四个键值对(spark,2)、(spark,3)、(hadoop,4)和(hadoop,6)。
scala pairRDD.mapValues(a a*2).foreach(println)
(nlp,2)
(nlp,2)
(spark,2)
(nlp,2)
(hadoop,2)
(hadoop,2)join 常用
功能对于给定的两个输入数据集(K,V1)和(K,V2)只有在两个数据集中都存在的key才会被输出最终得到一个(K,(V1,V2))类型的数据集。 join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域因此join的类型也和关系数据库中的join一样包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接所以join就表示内连接。 对于内连接对于给定的两个输入数据集(K,V1)和(K,V2)只有在两个数据集中都存在的key才会被输出最终得到一个(K,(V1,V2))类型的数据集。 比如pairRDD1是一个键值对集合{(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)}pairRDD2是一个键值对集合{(spark,fast)}那么pairRDD1.join(pairRDD2)的结果就是一个新的RDD这个新的RDD是键值对集合{(spark,1,fast),(spark,2,fast)}。
案例代码
scala val paRDD1 sc.parallelize(Array((spark,2),(hadoop,3),(spark,1),(hive,4),(hadoop,2)))
paRDD1: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[28] at parallelize at console:26scala val paRDD2 sc.parallelize(Array((spark,nicetry),(hadoop,good),(spark,234),(hive,2314),(hadoop,ohho)))
paRDD2: org.apache.spark.rdd.RDD[(String, Any)] ParallelCollectionRDD[29] at parallelize at console:26scala paRDD1.join(paRDD2).foreach(println)
(spark,(2,nicetry))
(spark,(2,234))
(spark,(1,nicetry))
(spark,(1,234))
(hive,(4,2314))
(hadoop,(3,good))
(hadoop,(3,ohho))
(hadoop,(2,good))
(hadoop,(2,ohho))eg现在来看林子雨教授讲解的是真清晰温故而知新。
一个完整实例-计算每种图书的每天平均销量
思路
计算一天中各种类图书卖出去的平均值,键值对的key表示图书名称value表示某天图书销量请计算每个键对应的平均值也就是计算每种图书的每天平均销量
步骤
1、构建数组包含对应键值对调用parallelize方法生成 RDD
2、针对构建得到的rdd我们调用mapValues()函数把rdd中的每个每个键值对(key,value)的value部分进行修改把value转换成键值对(value,1)其中数值1表示这个key在rdd中出现了1次为什么要记录出现次数呢因为我们最终要计算每个key对应的平均值所以必须记住这个key出现了几次最后用value的总和除以key的出现次数就是这个key对应的平均值。
注collect()是一个行动操作功能是以数组的形式返回数据集中的所有元素当我们要实时查看一个RDD中的元素内容时就可以调用collect()函数。
3、调用reduceByKey()函数此处必须要十分准确地理解reduceByKey()函数的功能 合并具有相同键的值。 reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) (x._1y._1,x._2 y._2)这个表达式中x和y都是value而且是具有相同key的两个键值对所对应的value。
4、 计算最终结果。对得到的几个键值对构成的RDD执行mapValues()操作得到每种书的每天平均销量。mapValues,key不变只对值记性操作。value会被赋值给Lamda表达式x (x._1 / x._2中的xx的值就是(22,2)x._1就是22表示hadoop书总销量是22x._2就是2表示2天因此hadoop书籍的每天平均销量就是x._1 / x._2也就是11。mapValues()输出的一个键值对就是(hadoop,11)其他同理。
代码
//构建书籍及销量
scala val books sc.parallelize(Array((book1,5),(book2,10),(book3,8),(book1,6),(book2,12)))
books: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[1] at parallelize at console:23
// 统计
scala val sum_books books.mapValues(x (x,1)).foreach(println)
(book1,(5,1))
(book2,(10,1))
(book3,(8,1))
(book1,(6,1))
(book2,(12,1))
sum_books: Unit ()
//计算出现次数,value中前面是总数后面是天数如11,2表示2天卖出11本
scala val average_books books.mapValues(x (x,1)).reduceByKey((x,y) (x._1 y._1 , x._2 y._2)).foreach(println)
(book1,(11,2))
(book3,(8,1))
(book2,(22,2))
average_books: Unit ()//平均值统计
scala val average_books books.mapValues(x (x,1)).reduceByKey((x,y) (x._1 y._1 , x._2 y._2)).mapValues(x x._1 / x._2).foreach(println)
(book1,5)
(book3,8)
(book2,11)
average_books: Unit ()