当前位置: 首页 > news >正文

免费企业自助建站信息发布网网址大全网站

免费企业自助建站信息发布网,网址大全网站,企业网络营销现状,深圳自助建站文章目录 1、基本操作1.1、创建SparkSession1.2、创建DataFrames1.3、创建Dataset操作1.4、运行sql查询1.5、创建全局临时视图1.6、创建Datasets1.7、与rdd进行互操作1.7.1、使用反射推断模式1.7.2、以编程方式指定模式 2、完整的测试例子 1、基本操作 1.1、创建SparkSession … 文章目录 1、基本操作1.1、创建SparkSession1.2、创建DataFrames1.3、创建Dataset操作1.4、运行sql查询1.5、创建全局临时视图1.6、创建Datasets1.7、与rdd进行互操作1.7.1、使用反射推断模式1.7.2、以编程方式指定模式 2、完整的测试例子 1、基本操作 1.1、创建SparkSession import org.apache.spark.sql.SparkSession;SparkSession spark SparkSession .builder() .appName(Java Spark SQL basic example) .config(spark.some.config.option, some-value) .getOrCreate();1.2、创建DataFrames import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;DatasetRow df spark.read().json(examples/src/main/resources/people.json);// Displays the content of the DataFrame to stdout df.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.3、创建Dataset操作 // col(...) is preferable to df.col(...) import static org.apache.spark.sql.functions.col;// Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable true) // |-- name: string (nullable true)// Select only the name column df.select(name).show(); // ------- // | name| // ------- // |Michael| // | Andy| // | Justin| // -------// Select everybody, but increment the age by 1 df.select(col(name), col(age).plus(1)).show(); // ---------------- // | name|(age 1)| // ---------------- // |Michael| null| // | Andy| 31| // | Justin| 20| // ----------------// Select people older than 21 df.filter(col(age).gt(21)).show(); // ------- // |age|name| // ------- // | 30|Andy| // -------// Count people by age df.groupBy(age).count().show(); // --------- // | age|count| // --------- // | 19| 1| // |null| 1| // | 30| 1| // ---------1.4、运行sql查询 import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView(people);DatasetRow sqlDF spark.sql(SELECT * FROM people); sqlDF.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.5、创建全局临时视图 // Register the DataFrame as a global temporary view df.createGlobalTempView(people);// Global temporary view is tied to a system preserved database global_temp spark.sql(SELECT * FROM global_temp.people).show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------// Global temporary view is cross-session spark.newSession().sql(SELECT * FROM global_temp.people).show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.6、创建Datasets import java.util.Arrays; import java.util.Collections; import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name;private long age;public String getName() {return name;}public void setName(String name) {this.name name;}public long getAge() {return age;}public void setAge(long age) {this.age age;} }// Create an instance of a Bean class Person person new Person(); person.setName(Andy); person.setAge(32);// Encoders are created for Java beans EncoderPerson personEncoder Encoders.bean(Person.class); DatasetPerson javaBeanDS spark.createDataset(Collections.singletonList(person),personEncoder ); javaBeanDS.show(); // ------- // |age|name| // ------- // | 32|Andy| // -------// Encoders for most common types are provided in class Encoders EncoderLong longEncoder Encoders.LONG(); DatasetLong primitiveDS spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder); DatasetLong transformedDS primitiveDS.map((MapFunctionLong, Long) value - value 1L,longEncoder); transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path examples/src/main/resources/people.json; DatasetPerson peopleDS spark.read().json(path).as(personEncoder); peopleDS.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.7、与rdd进行互操作 1.7.1、使用反射推断模式 Spark SQL支持将JavaBeans的RDD自动转换为DataFrame。使用反射获得的BeanInfo定义了表的模式。目前Spark SQL不支持包含Map字段的JavaBeans。但是支持嵌套JavaBeans和List或Array字段。您可以通过创建一个实现Serializable的类来创建JavaBean并且该类的所有字段都有getter和setter。 import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text file JavaRDDPerson peopleRDD spark.read().textFile(examples/src/main/resources/people.txt).javaRDD().map(line - {String[] parts line.split(,);Person person new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// Apply a schema to an RDD of JavaBeans to get a DataFrame DatasetRow peopleDF spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView(people);// SQL statements can be run by using the sql methods provided by spark DatasetRow teenagersDF spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19);// The columns of a row in the result can be accessed by field index EncoderString stringEncoder Encoders.STRING(); DatasetString teenagerNamesByIndexDF teenagersDF.map((MapFunctionRow, String) row - Name: row.getString(0),stringEncoder); teenagerNamesByIndexDF.show(); // ------------ // | value| // ------------ // |Name: Justin| // ------------// or by field name DatasetString teenagerNamesByFieldDF teenagersDF.map((MapFunctionRow, String) row - Name: row.StringgetAs(name),stringEncoder); teenagerNamesByFieldDF.show(); // ------------ // | value| // ------------ // |Name: Justin| // ------------1.7.2、以编程方式指定模式 当JavaBean类不能提前定义时(例如记录的结构被编码为字符串或者文本数据集将被解析字段将以不同的方式投影给不同的用户)可以通过三个步骤以编程方式创建dataset 。 从原始RDD的行创建一个RDD;创建由StructType表示的模式该模式与步骤1中创建的RDD中的Rows结构相匹配。通过SparkSession提供的createDataFrame方法将模式应用到RDD的行。 import java.util.ArrayList; import java.util.List;import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;// Create an RDD JavaRDDString peopleRDD spark.sparkContext().textFile(examples/src/main/resources/people.txt, 1).toJavaRDD();// The schema is encoded in a string String schemaString name age;// Generate the schema based on the string of schema ListStructField fields new ArrayList(); for (String fieldName : schemaString.split( )) {StructField field DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field); } StructType schema DataTypes.createStructType(fields);// Convert records of the RDD (people) to Rows JavaRDDRow rowRDD peopleRDD.map((FunctionString, Row) record - {String[] attributes record.split(,);return RowFactory.create(attributes[0], attributes[1].trim()); });// Apply the schema to the RDD DatasetRow peopleDataFrame spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView(people);// SQL can be run over a temporary view created using DataFrames DatasetRow results spark.sql(SELECT name FROM people);// The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name DatasetString namesDS results.map((MapFunctionRow, String) row - Name: row.getString(0),Encoders.STRING()); namesDS.show(); // ------------- // | value| // ------------- // |Name: Michael| // | Name: Andy| // | Name: Justin| // -------------2、完整的测试例子 本例子代码是在window下测试需要下载https://github.com/steveloughran/winutils解压放在hadoop对应目录 package com.penngo.spark;import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List;import static org.apache.spark.sql.functions.col;public class SparkDataset {private static final String jsonPath D:\\hadoop\\spark\\resources\\people.json;private static final String txtPath D:\\hadoop\\spark\\resources\\people.txt;public static class Person implements Serializable {private String name;private long age;public String getName() {return name;}public void setName(String name) {this.name name;}public long getAge() {return age;}public void setAge(long age) {this.age age;}}public static void createDataFrame(SparkSession spark) throws Exception{// 创建DataFrameDatasetRow df spark.read().json(jsonPath);df.show();// 操作operations(df);// sql查询sqlQuery(spark, df);}public static void operations(DatasetRow df){df.printSchema();// root// |-- age: long (nullable true)// |-- name: string (nullable true)// Select only the name columndf.select(name).show();// -------// | name|// -------// |Michael|// | Andy|// | Justin|// -------// Select everybody, but increment the age by 1df.select(col(name), col(age).plus(1)).show();// ----------------// | name|(age 1)|// ----------------// |Michael| null|// | Andy| 31|// | Justin| 20|// ----------------// Select people older than 21df.filter(col(age).gt(21)).show();// -------// |age|name|// -------// | 30|Andy|// -------// Count people by agedf.groupBy(age).count().show();// ---------// | age|count|// ---------// | 19| 1|// |null| 1|// | 30| 1|// ---------}/*** SQL查询*/public static void sqlQuery(SparkSession spark, DatasetRow df) throws Exception{// 临时视图会话消失视图也会消失df.createOrReplaceTempView(people);DatasetRow sqlDF spark.sql(SELECT * FROM people);sqlDF.show();// 全局视图全局临时视图绑定到系统保留的数据库 global_temp df.createGlobalTempView(people);spark.sql(SELECT * FROM global_temp.people).show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------// 全局临时视图是跨会话的spark.newSession().sql(SELECT * FROM global_temp.people).show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------}public static void createDataset(SparkSession spark){// 列表转成datasetPerson person new Person();person.setName(Andy);person.setAge(32);EncoderPerson personEncoder Encoders.bean(Person.class);DatasetPerson javaBeanDS spark.createDataset(Collections.singletonList(person),personEncoder);System.out.println(createDataset show);javaBeanDS.show();// -------// |age|name|// -------// | 32|Andy|// -------EncoderLong longEncoder Encoders.LONG();DatasetLong primitiveDS spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);DatasetLong transformedDS primitiveDS.map((MapFunctionLong, Long) value - value 1L,longEncoder);transformedDS.collect(); // Returns [2, 3, 4]// 读取文件转成datasetDatasetPerson peopleDS spark.read().json(jsonPath).as(personEncoder);peopleDS.show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------}/*** 非Bean的方式转换rdd-DataFrame-Dataset* param spark* throws Exception*/public static void rddToDataset(SparkSession spark) throws Exception{// 读取文件生成一个Person类型的RDDJavaRDDPerson peopleRDD spark.read().textFile(txtPath).javaRDD().map(line - {String[] parts line.split(,);Person person new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// RDD转成DataFrameDatasetRow peopleDF spark.createDataFrame(peopleRDD, Person.class);// 把DataFrame注册为临时视图peopleDF.createOrReplaceTempView(people);// SQL语句可以通过spark提供的SQL方法来运行DatasetRow teenagersDF spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19);// 结果中一行的列可以通过字段索引访问EncoderString stringEncoder Encoders.STRING();DatasetString teenagerNamesByIndexDF teenagersDF.map((MapFunctionRow, String) row - Name: row.getString(0),stringEncoder);teenagerNamesByIndexDF.show();// ------------// | value|// ------------// |Name: Justin|// ------------// 也可以通过字段名访问DatasetString teenagerNamesByFieldDF teenagersDF.map((MapFunctionRow, String) row - Name: row.StringgetAs(name),stringEncoder);teenagerNamesByFieldDF.show();// ------------// | value|// ------------// |Name: Justin|// ------------}/*** 非Bean的方式转换rdd-DataFrame-Dataset* param spark* throws Exception*/public static void rddToDataset2(SparkSession spark) throws Exception{// 创建RDDJavaRDDString peopleRDD spark.sparkContext().textFile(txtPath, 1).toJavaRDD();// 字段字义String schemaString name age;// 根据schema的字符串生成schemaListStructField fields new ArrayList();for (String fieldName : schemaString.split( )) {StructField field DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);}StructType schema DataTypes.createStructType(fields);// 将RDD(people)的记录转换为视图的RowJavaRDDRow rowRDD peopleRDD.map((FunctionString, Row) record - {String[] attributes record.split(,);return RowFactory.create(attributes[0], attributes[1].trim());});// 将schema应用于RDD转为DataFrameDatasetRow peopleDataFrame spark.createDataFrame(rowRDD, schema);// 使用DataFrame创建临时视图peopleDataFrame.createOrReplaceTempView(people);// SQL可以在使用dataframe创建的临时视图上运行DatasetRow results spark.sql(SELECT name FROM people);// SQL查询的结果是dataframe支持所有正常的RDD操作// 结果行的列可以通过字段索引或字段名称访问DatasetString namesDS results.map((MapFunctionRow, String) row - Name: row.getString(0),Encoders.STRING());namesDS.show();// -------------// | value|// -------------// |Name: Michael|// | Name: Andy|// | Name: Justin|// -------------}public static void main(String[] args) throws Exception{Logger.getLogger(org.apache.spark).setLevel(Level.WARN);Logger.getLogger(org.apache.eclipse.jetty.server).setLevel(Level.OFF);//windows下调试spark需要使用https://github.com/steveloughran/winutilsSystem.setProperty(hadoop.home.dir, D:\\hadoop\\hadoop-3.3.1);System.setProperty(HADOOP_USER_NAME, root);SparkSession spark SparkSession.builder().appName(SparkDataset).master(local[*]).getOrCreate();createDataFrame(spark);createDataset(spark);rddToDataset(spark);rddToDataset2(spark);spark.stop();} } 参考自官方文档https://spark.apache.org/docs/3.1.2/sql-getting-started.html spark支持数据源https://spark.apache.org/docs/3.1.2/sql-data-sources.html spark sql语法相关https://spark.apache.org/docs/3.1.2/sql-ref.html
http://www.yutouwan.com/news/318325/

相关文章:

  • 天津营销网站建设联系方式成都网站建设 平易云
  • 网站帮助中心设计邯郸招工信息网
  • 怎样在手机上制作网站visual studio
  • 网站双机热备怎么做西安市做网站公司有哪些
  • 网站没有icp备案怎么访问南昌装修公司
  • 网站建设 案例互联网装饰网站
  • 销售网站模板搭建视频服务器
  • 网站开发流程pptwordpress指定关键词自动内链
  • 珠宝企业的门户网站开发知乎自媒体平台注册
  • wamp在网站建设中的功能及协作关系深圳网站seo优化公司
  • 阿里云备案 网站备案域名购买什么网站从做系统
  • 营销型网站建设调查表如何做公司介绍视频网站
  • 如何备份网站网站做营销推广的公司
  • 商丘网站建设推广渠道企业网站建设的意义
  • 票务网站开发端口iis网站目录权限
  • 做灯饰的企业都会在哪些网站网站设计规划说明书
  • 网站推广哪个平台好中英文网站栏目修改
  • 海南网站优化公司广州市网络预约出租汽车综合业务管理平台
  • 新做的网站怎样让百度收录金华网站建设公司
  • 上海市网站建设公司58wordpress 猫
  • 自适应自助建站网站人才招聘网站建设
  • 外贸网站搜索引擎优化方法杭州网站建设app
  • 绍兴网页设计优化师是一份怎样的工作
  • 做网站源码需要多少钱泊头在哪做网站比较好
  • 搜狗网站做滤芯怎么样营销型企业网站功能
  • 江西网站建设与推广湛江网站建设外包
  • 做网站注意设么专业网站制作推荐
  • 湛江公司做网站景德镇陶瓷学院校友做网站的
  • 深圳公司网站开发苏州网站开发建设公司
  • 用vs2010做免费网站模板下载地址模板网站怎么修改