淘宝店铺网站建设,网站价值 批量查询,合肥网站建设 微笑互联,做谷歌推广一定要网站吗本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式:一、SparkStreaming简介二、Kafka简介三、Redis简介(可用于保存历史数据或偏移量数据)四、SparkStreaming读取Kafka数据的两种方式 五、演示Demo一、SparkStreaming简介可以参考这篇文章#xff1a;SparkStreami…本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式:一、SparkStreaming简介二、Kafka简介三、Redis简介(可用于保存历史数据或偏移量数据)四、SparkStreaming读取Kafka数据的两种方式 五、演示Demo一、SparkStreaming简介可以参考这篇文章SparkStreaming 详解 二、Kafka简介可以参考这篇文章Kafka(分布式发布订阅消息系统) 简介 三、Redis简介可以参考这篇文章Redis简介四、SparkStreaming读取Kafka数据的两种方式 spark streaming提供了两种获取方式一种是利用接收器(receiver)和kafaka的高层API实现。一种是不利用接收器直接用kafka底层的API来实现(spark1.3以后引入)。1、reciver链接方式(有些问题开发中不采用这种方式)用KafkaUtils.createDstream创建链接。Receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的然后Spark Streaming启动的job会去处理那些数据。Receiver方式是通过zookeeper来连接kafka队列调用Kafka高阶APIoffset存储在zookeeper由Receiver维护。在executor上会有receiver从kafka接收数据并存储在Spark executor中在到了batch时间后触发job去处理接收到的数据1个receiver占用1个core使用wal预写机制因为需要使用hdfs等存储,因此会降低性能。receiver方式基于Receiver方式存在的问题启用WAL机制每次处理之前需要将该batch内的数据备份到checkpoint目录中这降低了数据处理效率同时加重了Receiver的压力另外由于数据备份机制会受到负载影响负载一高就会出现延迟的风险导致应用崩溃。采用MEMORY_AND_DISK_SER降低对内存的要求但是在一定程度上影响了计算的速度。单Receiver内存。由于Receiver是属于Executor的一部分为了提高吞吐量提高Receiver的内存。但是在每次batch计算中参与计算的batch并不会使用这么多内存导致资源严重浪费。提高并行度采用多个Receiver来保存kafka的数据。Receiver读取数据是异步的不会参与计算。如果提高了并行度来平衡吞吐量很不划算。Receiver和计算的Executor是异步的在遇到网络等因素时会导致计算出现延迟计算队列一直在增加而Receiver一直在接收数据这非常容易导致程序崩溃。在程序失败恢复时有可能出现数据部分落地但是程序失败未更新offsets的情况这会导致数据重复消费。2、Direct直连方式(开发中使用的方式)使用KafkaUtils.createDirectStream创建链接。这种方式定期从kafka的topic下对应的partition中查询最新偏移量并在每个批次中根据相应的定义的偏移范围进行处理。Spark通过调用kafka简单的消费者API读取一定范围的数据。Direct方式是直接连接kafka分区来获取数据。从每个分区直接读取数据大大提高了并行能力Direct方式调用Kafka低阶API(底层API)offset自己存储和维护默认由Spark维护在checkpoint中消除了与zk不一致的情况当然也可以自己手动维护把offset存在mysql、redis中所以基于Direct模式可以在开发中使用且借助Direct模式的特点手动操作可以保证数据的Exactly once 精准一次基于Direct方式的优势简化并行读取如果要读取多个partition不需要创建多个输入DStream然后对他们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition并且会并行从kafka中读取数据。所以在kafka partition和RDD partition之间有一一对应的关系。高性能如果要保证数据零丢失在基于Receiver的方式中需要开启WAL机制。这种方式其实效率很低因为数据实际被复制了两份kafka自己本身就有高可靠的机制会对数据复制一份而这里又会复制一份到WAL中。而基于Direct的方式不依赖于Receiver不需要开启WAL机制只要kafka中做了数据的复制那么就可以通过kafka的副本进行恢复。强一致语义基于Receiver的方式使用kafka的高阶API来在Zookeeper中保存消费过的offset。这是消费kafka数据的传统方式。这种方式配合WAL机制可以保证数据零丢失的高可靠性但是却无法保证数据被处理一次且仅一次可能会处理两次。因为Spark和Zookeeper之间可能是不同步的。基于Direct的方式使用kafka的简单apiSpark Streaming自己就负责追踪消费的offset并保存在checkpoint中。Spark自己一定是同步的因此可以保证数据时消费一次且仅消费一次。降低资源Direct不需要Receiver其申请的Executors全部参与到计算任务中而Receiver则需要专门的Receivers来读取kafka数据且不参与计算。因此相同的资源申请Direct能够支持更大的业务。Receiver与其他Executor是异步的并持续不断接收数据对于小业务量的场景还好如果遇到大业务量时需要提高Receiver的内存但是参与计算的Executor并不需要那么多的内存而Direct因为没有Receiver而是在计算的时候读取数据然后直接计算所以对内存的要求很低。鲁棒性更好基于Receiver方式需要Receiver来异步持续不断的读取数据因此遇到网络、存储负载等因素导致实时任务出现堆积但Receiver却还在持续读取数据此种情况容易导致计算崩溃。Direct则没有这种顾虑其Driver在触发batch计算任务时才会读取数据并计算队列出现堆积并不不会引起程序的失败。基于Direct方式的不足Direct方式需要采用checkpoint或者第三方存储来维护offset而不是像Receiver那样通过Zookeeper来维护offsets提高了用户的开发成本。基于Receiver方式指定topic指定consumer的消费情况均能够通过Zookeeper来监控而Direct则没有这么便利如果想做监控并可视化则需要投入人力开发。五、演示Demo1、reciver链接方式package xxximport org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * Receiver链接方式 */object KafkaWordCount { def main(args: Array[String]): Unit { val conf new SparkConf().setAppName(KafkaWordCount).setMaster(local[*]) val ssc new StreamingContext(conf, Seconds(5)) val zkQuorum slave2:2181,slave3:2181,slave4:2181 val groupId g1 val topic Map[String, Int](test1 - 1) //创建DStream需要KafkaDStream val data: ReceiverInputDStream[(String, String)] KafkaUtils.createStream(ssc, zkQuorum, groupId, topic, StorageLevel.MEMORY_AND_DISK_SER) //对数据进行处理 //Kafak的ReceiverInputDStream[(String, String)]里面装的是一个元组(key是写入的keyvalue是实际写入的内容) val lines: DStream[String] data.map(_._2) //对DSteam进行操作操作这个抽象(代理描述)就像操作一个本地的集合一样类似于RDD val words: DStream[String] lines.flatMap(_.split( )) val wordAndOne: DStream[(String, Int)] words.map((_, 1)) val reduced: DStream[(String, Int)] wordAndOne.reduceByKey(__) //打印结果(Action) reduced.print() //启动sparksteaming程序 ssc.start() //等待优雅的退出 ssc.awaitTermination() }}2、直连方式(用zookeeper存储偏移量)步骤准备zookeeper集群存储读取到额kafka数据的每个分区的偏移量调用KafkaUtils.createDirectStream建立直连链接读取zookeeper集群中的已经存储的每个数据分区地偏移量根据该偏移量继续读取数据。或者从头(当前)位置读取数据调用kafkaStream.transform遍历每个RDD获取该RDD对应数据的偏移量对RDD进行操作并将zookeeper中保存的数据偏移量进行更新package sparkStreamingAndKafkaimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}/** * 直连方式用zookeeper存偏移量 */object KafkaDirection1 { def main(args: Array[String]): Unit { val conf: SparkConf new SparkConf().setAppName(kafkaDirection).setMaster(local[*]) val ssc new StreamingContext(conf, Seconds(3)) val group group1 // 分组 val topic wordCount // topic val brokerList slave1:9092,slave2:9092,slave3:9092 // broker集群sparkStream的Task直接连到kafka分区上 val zkQuorum slave2:2181,slave3:2181,slave4:2181 // zookeeper集群用于记录偏移量(也可以选择MySQL、Redis等记录偏移量) val topics Set(topic) // 创建stream时使用的topic名字集合sparkStreaming可同时消费多个topic val topicDirs new ZKGroupTopicDirs(group, topic) // 创建一个ZKGroupTopicDirs对象其实就是指定往zookeeper中写入数据的目录该目录用于保存偏移量 val zkTopicPath: String topicDirs.consumerOffsetDir // 获取zookeeper中的路径/group1/offsets/wordCount/ // 准备kafka参数 val kafkaParams Map( metadata.broker.list - brokerList, group.id - group, auto.offset.reset - kafka.api.OffsetRequest.SmallestTimeString // 偏移量最开始的时候从哪读SmallestTimeString表示从头开始读, // LargestTimeString表示从启动时刻产生的数据读 ) val zkClient new ZkClient(zkQuorum) // zookeeper的客户端可以从zk中读取偏移量数据并更新偏移量 val numOfzkChildren: Int zkClient.countChildren(zkTopicPath) // 检查该路径下是否保存有数据(偏移量) // 例如/group1/offsets/wordCount/2/1003 表示2号分区有偏移量1003 var kafkaStream: InputDStream[(String, String)] null // 如果zookeeper中保存有偏移量offfset则利用这个偏移量作为kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] Map() if (numOfzkChildren 0){ // 如果保存过offset for (i 1003 fromOffsets (tp - fromOffset.toLong) // 将topic不同分区所对应的偏移量放入集合中 } //Key: kafka的key values: hello tom hello jerry //这个会将 kafka 的消息进行 transform最终 kafka 的数据都会变成 (kafka的key, message) 这样的 tuple val messageHandler (mmd: MessageAndMetadata[String, String]) (mmd.key(), mmd.message()) // 读数据的规则 //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据) // 泛型参数说明 //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解码方式 value的解码方式 处理完成后Dstream中的数据类型 kafkaStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 没有保存过offset相当于从头读 //如果未保存根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解码方式 value的解码方式 kafkaStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范围 var offsetRanges Array[OffsetRange]() //从kafka读取的消息DStream的Transform方法可以将当前批次的RDD获取出来 //该transform方法计算获取到当前批次RDD,然后将RDD的偏移量取出来然后在将RDD返回到DStream val transform: DStream[(String, String)] kafkaStream.transform { rdd //得到该 RDD对应 kafka 的消息的 offset //该RDD是一个KafkaRDD可以获得它的偏移量的范围 offsetRanges rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 偏移量范围 rdd // 不对RDD进行操作再放回DStream } // DStream 是RDD的工厂每隔一段时间产生一个RDD val messages: DStream[String] transform.map(_._2) //依次迭代DStream中的RDD messages.foreachRDD { rdd // foreachRDD,每隔一段时间产生一个RDD rdd.foreachPartition(partition // foreachPartition 每个分区一个连接链接 partition.foreach(x { // foreach 分区中的每条数据 println(x) }) ) // 更新偏移量offset for (o 但是在这个方案中为了获取偏移量需要遍历RDD后续又要遍历RDD操作RDD代码冗余3、直连方式(获取数据偏移量的同时处理数据)package xxximport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}/** * 直连方式用zookeeper存偏移量(获取偏移量的同时对数据进行操作) */object kafkaDirection2 { def main(args: Array[String]): Unit { val conf: SparkConf new SparkConf().setAppName(kafkaDirection).setMaster(local[*]) val ssc new StreamingContext(conf, Seconds(3)) val group group3 // 分组 val topic wordCount // topic val brokerList slave1:9092,slave2:9092,slave3:9092 // broker集群sparkStream的Task直接连到kafka分区上 val zkQuorum slave2:2181,slave3:2181,slave4:2181 // zookeeper集群用于记录偏移量(也可以选择MySQL、Redis等记录偏移量) val topics Set(topic) // 创建stream时使用的topic名字集合sparkStreaming可同时消费多个topic val topicDirs new ZKGroupTopicDirs(group, topic) // 创建一个ZKGroupTopicDirs对象其实就是指定往zookeeper中写入数据的目录该目录用于保存偏移量 val zkTopicPath: String topicDirs.consumerOffsetDir // 获取zookeeper中的路径/group1/offsets/wordCount/ // 准备kafka参数 val kafkaParams Map( metadata.broker.list - brokerList, group.id - group, auto.offset.reset - kafka.api.OffsetRequest.SmallestTimeString // 偏移量最开始的时候从哪读SmallestTimeString表示从头开始读, // LargestTimeString表示从启动时刻产生的数据读 ) val zkClient new ZkClient(zkQuorum) // zookeeper的客户端可以从zk中读取偏移量数据并更新偏移量 val numOfzkChildren: Int zkClient.countChildren(zkTopicPath) // 检查该路径下是否保存有数据(偏移量) // 例如/group1/offsets/wordCount/2/1003 表示2号分区有偏移量1003 var kafkaStream: InputDStream[(String, String)] null // 如果zookeeper中保存有偏移量offfset则利用这个偏移量作为kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] Map() if (numOfzkChildren 0){ // 如果保存过offset for (i 1003 fromOffsets (tp - fromOffset.toLong) // 将topic不同分区所对应的偏移量放入集合中 } //Key: kafka的key values: hello tom hello jerry //这个会将 kafka 的消息进行 transform最终 kafka 的数据都会变成 (kafka的key, message) 这样的 tuple val messageHandler (mmd: MessageAndMetadata[String, String]) (mmd.key(), mmd.message()) // 读数据的规则 //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据) // 泛型参数说明 //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解码方式 value的解码方式 处理完成后Dstream中的数据类型 kafkaStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 没有保存过offset相当于从头读 //如果未保存根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解码方式 value的解码方式 kafkaStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范围 var offsetRanges Array[OffsetRange]() // 获取偏移量的同时处理数据 // 直连方式只有在kakaDstream中的RDD才能获取偏移量那么就不能调用DStream的Transformation // 所以只能在KafkaStream中调用foreachRDD获取RDD的偏移量然后就是对RDD进行操作了 //依次迭代DStream中的RDD // 如果使用直连方式进行累加数据就需要在外部的数据库中进行累加(用kay-value的内存数据库NoSQL型数据库 Redis) kafkaStream.foreachRDD { kafkaRDD { // 只有kafkaRDD可以强转成HashOffSetRanges,并获取偏移量 val offsetRanges: Array[OffsetRange] kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] kafkaRDD.map(_._2) val words: RDD[String] lines.flatMap(u { u.split( ) }) val wordsAndOne: RDD[(String, Int)] words.map(word { (word, 1) }) val reduced: RDD[(String, Int)] wordsAndOne.reduceByKey((a, b) { a b }) reduced.foreach(println) // 更新偏移量offset for (o 但是该方案无法获取历史数据。这里统计到的wordcount只是某一时间片内对应数据的统计结果并不包含历史数据。4、直连方式zookeeper存储偏移量数据redis存储历史数据。redis的连接池package xxximport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}object JedisConnectePool { val config new JedisPoolConfig() //最大连接数, config.setMaxTotal(20) //最大空闲连接数, config.setMaxIdle(10) //当调用borrow Object方法时是否进行有效性检查 -- config.setTestOnBorrow(true) //10000代表超时时间(10秒) val pool new JedisPool(config, 192.168.247.8, 6379, 10000, 123) def getConnection():Jedis{ pool.getResource }}package xxximport jedis.JedisConnectionPoolimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import redis.clients.jedis.Jedis/** * 直连方式在获取RDD偏移量的同时操作偏移量并且能够wordcount统计时包含历史统计数据 */object kafkaDirection3 { def main(args: Array[String]): Unit { val conf: SparkConf new SparkConf().setAppName(kafkaDirection).setMaster(local[*]) val ssc new StreamingContext(conf, Seconds(3)) val group group2 // 分组 val topic wordCount // topic val brokerList slave1:9092,slave2:9092,slave3:9092 // broker集群sparkStream的Task直接连到kafka分区上 val zkQuorum slave2:2181,slave3:2181,slave4:2181 // zookeeper集群用于记录偏移量(也可以选择MySQL、Redis等记录偏移量) val topics Set(topic) // 创建stream时使用的topic名字集合sparkStreaming可同时消费多个topic val topicDirs new ZKGroupTopicDirs(group, topic) // 创建一个ZKGroupTopicDirs对象其实就是指定往zookeeper中写入数据的目录该目录用于保存偏移量 val zkTopicPath: String topicDirs.consumerOffsetDir // 获取zookeeper中的路径/group1/offsets/wordCount/ // 准备kafka参数 val kafkaParams Map( metadata.broker.list - brokerList, group.id - group, auto.offset.reset - kafka.api.OffsetRequest.SmallestTimeString // 偏移量最开始的时候从哪读SmallestTimeString表示从头开始读, // LargestTimeString表示从启动时刻产生的数据读 ) val zkClient new ZkClient(zkQuorum) // zookeeper的客户端可以从zk中读取偏移量数据并更新偏移量 val numOfzkChildren: Int zkClient.countChildren(zkTopicPath) // 检查该路径下是否保存有数据(偏移量) // 例如/group1/offsets/wordCount/2/1003 表示2号分区有偏移量1003 var kafkaStream: InputDStream[(String, String)] null // 如果zookeeper中保存有偏移量offfset则利用这个偏移量作为kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] Map() if (numOfzkChildren 0){ // 如果保存过offset for (i 1003 fromOffsets (tp - fromOffset.toLong) // 将topic不同分区所对应的偏移量放入集合中 } //Key: kafka的key values: hello tom hello jerry //这个会将 kafka 的消息进行 transform最终 kafka 的数据都会变成 (kafka的key, message) 这样的 tuple val messageHandler (mmd: MessageAndMetadata[String, String]) (mmd.key(), mmd.message()) // 读数据的规则 //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据) // 泛型参数说明 //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解码方式 value的解码方式 处理完成后Dstream中的数据类型 kafkaStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 没有保存过offset相当于从头读 //如果未保存根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解码方式 value的解码方式 kafkaStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范围 var offsetRanges Array[OffsetRange]() // 直连方式只有在kakaDstream中的RDD才能获取偏移量那么就不能调用DStream的Transformation // 所以只能在KafkaStream中调用foreachRDD获取RDD的偏移量然后就是对RDD进行操作了 //依次迭代DStream中的RDD // 如果使用直连方式进行累加数据就需要在外部的数据库中进行累加(用kay-value的内存数据库NoSQL型数据库 Redis) kafkaStream.foreachRDD { kafkaRDD { // 只有kafkaRDD可以强转成HashOffSetRanges,并获取偏移量 val offsetRanges: Array[OffsetRange] kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] kafkaRDD.map(_._2) val words: RDD[String] lines.flatMap(u { u.split( ) }) val wordsAndOne: RDD[(String, Int)] words.map(word { (word, 1) }) val reduced: RDD[(String, Int)] wordsAndOne.reduceByKey((a, b) { a b }) val stated: RDD[(String, Int)] reduced.map(u { // 获取redis存放的历史统计数据 val conn: Jedis JedisConnectionPool.getConnection() val str: String conn.get(u._1) var num 0 if(str ! null){ num str.toInt } val value: Int u._2 val value1: Int numvalue // 更新redis中的统计数据 conn.set(u._1, value1.toString) conn.close() (u._1, value1) }) stated.foreach(println) // 更新偏移量offset for (o