网站怎么做网盘,桂林论坛网网站电话,潍坊网站制作公司,数据网站建设工具模板spark的使用 spark是一款分布式的计算框架#xff0c;用于调度成百上千的服务器集群。 安装pyspark
# os.environ[PYSPARK_PYTHON]解析器路径 pyspark_python配置解析器路径
import os
os.environ[PYSPARK_PYTHON]D:/dev/python/python3.11.4/python.exepip inst…spark的使用 spark是一款分布式的计算框架用于调度成百上千的服务器集群。 安装pyspark
# os.environ[PYSPARK_PYTHON]解析器路径 pyspark_python配置解析器路径
import os
os.environ[PYSPARK_PYTHON]D:/dev/python/python3.11.4/python.exepip install pyspark # 原始国外安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark #网址安装java安装
前置安装软件java包 java官网下载地址 一键下一步安装配置环境变量 首先创建一个JAVA_HOME的全局变量然后在path中通过%%引入执行下面的bin 路径%JAVA_HOME%\bin 执行成功
from pyspark import SparkConf,SparkContext# 创建sparkConf 类对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
# 创建sparkConf类对象创建sparkContext对象
sc SparkContext(confconf)
# 打印pySpark的运行脚本
print(sc.version)
# 停止sparkContext对象的运行停止pySpark程序)
sc.stop()PySpark的数据计算都是基于RDD对象来进行的RDD对象内置丰富的成员方法算子 map算子
功能map算子是将RDD的数据一条条处理处理的逻辑基于map算子中接收的处理函数返回新的RDD语法
# 简单执行map将数据乘以10返回如果不引入python解析器的路径引入就会报错
from pyspark import SparkConf, SparkContext
# 指定spark的python解析器路径
import os
os.environ[PYSPARK_PYTHON]D:/dev/python/python3.11.4/python.exe
# 创建sparkConf 类对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
# 创建sparkConf类对象创建sparkContext对象
sc SparkContext(confconf)rdd sc.parallelize([1, 2, 3, 4, 5, 6])def func(data):return data * 10# map传入一个参数有返回值是函数或者是值
rdd2 rdd.map(func)
print(rdd2.collect())flatMap flatMap跟map差不多就是在最后做了一个解除嵌套的功能 from pyspark import SparkConf, SparkContext
import os
os.environ[PYSPARK_PYTHON]D:/dev/python/python3.11.4/python.exe
# 创建sparkConf 类对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
# 创建sparkConf类对象创建sparkContext对象
sc SparkContext(confconf)rdd sc.parallelize([中石科技 时间还复活甲 如今房价,慰问金 咖啡机 姐夫哥,格很高 客服管家二恶烷 可归结为])rdd2 rdd.flatMap(lambda x:x.split( ))print(rdd2.collect()) map的结果
reduceByKey reduceByKey对数据进行分组可以两两计算 from pyspark import SparkConf, SparkContext
import osos.environ[PYSPARK_PYTHON] D:/dev/python/python3.11.4/python.exe
# 创建sparkConf 类对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app)
# 创建sparkConf类对象创建sparkContext对象
sc SparkContext(confconf)rdd sc.parallelize([(男, 11), (男, 22), (女, 21), (男, 31), (女, 99)])
# 把男女进行分组value值进行计算
rdd2 rdd.reduceByKey(lambda a, b:ab)print(rdd2.collect()) # [(女, 120), (男, 64)]
reduce
与reduce的区别就是没有进行分组
take
取出前几个数据
...
rdd sc.parallelize([1,2,3,4,5]).take(3) # [1,2,3]count
计算rdd中的数据个数
filter from pyspark import SparkConf,SparkContext
import os
os.environ[PYSPARK_PYTHON]D:/dev/python/python3.11.4/python.execonfSparkConf().setMaster(local[*]).setAppName(test_spark)
scSparkContext(confconf)rddsc.parallelize([1,2,3,4,5])rdd2rdd.filter(lambda a:a%20)
print(rdd2.collect()) # [2,4]
distinct
进行数据去重
from pyspark import SparkConf,SparkContext
import os
os.environ[PYSPARK_PYTHON]D:/dev/python/python3.11.4/python.execonfSparkConf().setMaster(local[*]).setAppName(test_spark)
scSparkContext(confconf)add sc.parallelize([1,2,3,4,5,6,73,3,2,4,56,3,5])add2add.distinct()
print(add2.collect()) # [56, 1, 73, 2, 3, 4, 5, 6]sortBy排序
from pyspark import SparkConf, SparkContext
import osos.environ[PYSPARK_PYTHON] D:/dev/python/python3.11.4/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)add sc.textFile(D:/wordText.txt)word_rdd add.flatMap(lambda x: x.split( ))
word_with_rdd word_rdd.map(lambda word: (word, 1))
result_rdd word_with_rdd.reduceByKey(lambda a,b:ab)
result_numresult_rdd.sortBy(lambda x:x[1],ascendingFalse,numPartitions1) # 1.根据什么排序2.True 升序 False降序 3.分布式分区
print(result_num.collect())collect
将rdd内容变成list从而就可以打印出来