网站建站和推广服务公司,建站知乎,网站建设有免费的空间吗,WordPress如何设置站点名称目录
1. RDD队列
2 textFileStream
3 DIY采集器
4 kafka数据源【重点】 1. RDD队列 a、使用场景#xff1a;测试 b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream#xff0c;每一个推送这个队列的RDD#xff0c;都会作为一个DStream处理 val sparkco…目录
1. RDD队列
2 textFileStream
3 DIY采集器
4 kafka数据源【重点】 1. RDD队列 a、使用场景测试 b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream每一个推送这个队列的RDD都会作为一个DStream处理 val sparkconf: SparkConf new SparkConf().setMaster(local[*]).setAppName(stream)val ssc new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象队列中存放的是RDDval queue new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置是为了模拟流式的感觉数据源源不断的生产for(i - 1 to 5 ){// 循环创建rddval rdd: RDD[String] ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000) }// 等待采集器的结束ssc.awaitTermination()}2 textFileStream val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(textFileStream)val ssc new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] ssc.textFileStream(in)textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination() 3 DIY采集器 1. 自定义采集器 2. 什么情况下需要自定采集器呢 比如从mysql、hbase中读取数据。 采集器的作用是从指定的地方按照采集周期对数据进行采集。 目前有采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等 3. 自定义采集器的步骤,模仿socketTextStream a、自定采集器类继承extends并指定数据泛型同时对父类的属性赋值指定数据存储的级别 b、重写onStart和onStop方法 onStart:采集器的如何启动 onStop:采集的如何停止
val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(DIY)val ssc new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] ssc.receiverStream(new MyReciver(localhost,9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver并指定数据泛型同时对父类的属性赋值指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket _def receive {// 获取输入流val reader new BufferedReader(new InputStreamReader(socket.getInputStream,UTF-8))// 设定一个间接变量var s: String nullwhile (true) {// 按行读取数据s reader.readLine()if (s ! null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit {socket new Socket(host, port)new Thread(Socket Receiver) {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit {socket.close()socket null}} 4 kafka数据源【重点】 -- DirectAPI是由计算的Executor来主动消费Kafka的数据速度由自身控制。 -- 配置信息基本上是固定写法 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf new SparkConf().setMaster(local[*]).setAppName(streaming)val ssc new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - hadoop105:9092,hadoop106:9092,hadoop107:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)val kafkaDStream: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(atguigu), kafkaPara))// 获取数据key是nullvalue是真实的数据val valueDStream: DStream[String] kafkaDStream.map(record record.value())valueDStream.flatMap(_.split( )).map((_, 1)).reduceByKey(_ _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()