当前位置: 首页 > news >正文

建设部网站统计网站推广主要方法

建设部网站统计,网站推广主要方法,自媒体代运营怎么收费,为什么做腾讯网站Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据#xff0c;本文简单介绍两种方式的pyspark实现。1、Spark Streaming接入Kafka方式介绍Spark Streaming 官方提供了两种方式读取Kafka数据#xff1a;一是Receiver-based Approach。该种读取模式… Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据本文简单介绍两种方式的pyspark实现。1、Spark Streaming接入Kafka方式介绍Spark Streaming 官方提供了两种方式读取Kafka数据一是Receiver-based Approach。该种读取模式官方最先支持并在Spark 1.2提供了数据零丢失(zero-data loss)的支持一是Direct Approach (No Receivers)。该种读取方式在Spark 1.3引入。1.1 Receiver-based ApproachReceiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中具体StorageLevel方式由用户指定诸如MEMORY_ONLY等。当driver 触发batch任务的时候Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图需要借助Write Ahead Logs 来保证数据的不丢失如果启用了Write Ahead Logs复制到文件系统如HDFS那么storage level需要设置成StorageLevel.MEMORY_AND_DISK_SER也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)在Receiver的方式中Spark中的partition和kafka中的partition并不是相关的所以如果我们加大每个topic的partition数量仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据之后可以利用union来统一成一个Dstream1.2 Direct Approach (No Receivers)Direct方式采用Kafka简单的consumer api方式来读取数据无需经由ZooKeeper此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时由Executor读取数据并参与到其他Executor的数据计算过程中去。由drive来决定读取多少offsets并将offsets交由checkpoints来维护。将触发下次batch任务再由Executor读取Kafka数据并计算。从此过程可以发现Direct方式无需Receiver读取数据而是需要计算时再读取数据所以Direct方式的数据消费对内存的要求不高只需要考虑批量计算所需要的内存即可另外batch任务堆积时也不会影响数据堆积。其具体读取方式如下图简化的并行在Receiver的方式中提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据这种映射关系也更利于理解和优化。高效在Receiver的方式中为了达到0数据丢失需要将数据存入Write Ahead Log中这样在Kafka和日志中就保存了两份数据浪费而第二种方式不存在这个问题只要我们Kafka的数据保留时间足够长我们都能够从Kafka进行数据恢复。精确一次在Receiver的方式中使用的是Kafka的高阶API接口从Zookeeper中获取offset值这也是传统的从Kafka中读取数据的方式但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步这种方式偶尔会造成数据重复消费。而第二种方式直接使用了简单的低阶Kafka APIOffsets则利用Spark Streaming的checkpoints进行记录消除了这种不一致性。2、Spark Streaming接入Kafka数据实现以wordcount统计为例kafka生产端输入词组Spark端读取kafka流数据并统计词频2.1 Receiver方式收取数据1)Import KafkaUtils并创建DStreamfrom pyspark.streaming.kafka import KafkaUtilskafkaStream KafkaUtils.createStream(streamingContext, \ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])ZK QuorumZookeeper quorum (hostname:port,hostname:port,..)Groupid消费者的groupidTopics{topic_name : numPartitions}2)具体实现代码如下from pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtilsif __name__ __main__: #if len(sys.argv) ! 3: # print(Usage: kafka_wordcount.py , filesys.stderr) # exit(-1) sc SparkContext(appNamePythonStreamingKafkaWordCount) ssc StreamingContext(sc, 10) zkQuorum 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 groupid spark-streaming-consumer topic {kafka_spark_test1:0,kafka_spark_test1:1,kafka_spark_test1:2} #zkQuorum, topic sys.argv[1:] kvs KafkaUtils.createStream(ssc, zkQuorum, groupid, topic) lines kvs.map(lambda x: x[1]) counts lines.flatMap(lambda line: line.split( )) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: ab) counts.pprint() ssc.start() ssc.awaitTermination()在Spark目录执行命令spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py2.2 Direct方式收取数据1)Import KafkaUtils并创建DStreamfrom pyspark.streaming.kafka import KafkaUtils directKafkaStream KafkaUtils.createDirectStream(ssc, [topic], {metadata.broker.list: brokers})sscStreamingContexttopics消费的topics清单{metadata.broker.list: brokers}kafka参数可以指定为 metadata.broker.list或bootstrap.servers默认情况下从每个kafka分区的最新的offset进行消费如果在kafka参数中设置了auto.offset.reset 为smallest则会从最小的offset进行消费如果希望保存每个批量消费的kafka offset可以进行如下操作offsetRanges []def storeOffsetRanges(rdd):global offsetRanges offsetRanges rdd.offsetRanges()return rdddef printOffsetRanges(rdd):for o in offsetRanges:print %s %s %s %s % (o.topic, o.partition, o.fromOffset, o.untilOffset) directKafkaStream \ .transform(storeOffsetRanges) \ .foreachRDD(printOffsetRanges)如果希望使用基于Zookeeper的Kafka监控也可以通过这种方法展现Streaming的进程。2)具体实现代码如下from pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtilsoffsetRanges []def storeOffsetRanges(rdd): global offsetRanges offsetRanges rdd.offsetRanges() return rdddef printOffsetRanges(rdd): for o in offsetRanges: print(%s %s %s %s % (o.topic, o.partition, o.fromOffset, o.untilOffset))if __name__ __main__: #if len(sys.argv) ! 3: # print(Usage: direct_kafka_wordcount.py , filesys.stderr) # exit(-1) sc SparkContext(appNamePythonStreamingDirectKafkaWordCount) ssc StreamingContext(sc, 10) #brokers, topic sys.argv[1:] topickafka_spark_test1 brokers 192.168.112.101:9092,192.168.112.102:9092,192.168.112.103:9092 kvs KafkaUtils.createDirectStream(ssc, [topic], {metadata.broker.list: brokers}) lines kvs.map(lambda x: x[1]) counts lines.flatMap(lambda line: line.split( )) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: ab) kvs.transform(storeOffsetRanges).foreachRDD(printOffsetRanges) counts.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate在Spark根目录执行命令spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py2.3 Kafka生产者配置Kafka集群环境的安装配置参考之前的文档大数据系列之Kafka集群环境部署中相关内容1)启动zookeeper[roottango-centos01 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties [roottango-centos02 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties [roottango-centos03 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 2)启动Kafka集群[roottango-centos01 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties [roottango-centos02 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties [roottango-centos03 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties 3)创建Kafka topic[roottango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --replication-factor 2 --partitions 3 --topic kafka_spark_test1Created topic kafka_spark_test1.创建名为kafka_spark_test1 的Topic复制因子设为2同时分区数为3注意分区数是read parallelisms的最大值4)查看Topic详情[roottango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --topic kafka_spark_test1Topic:kafka_spark_test1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: kafka_spark_test1 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: kafka_spark_test1 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: kafka_spark_test1 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2指定--zookeeper选项的值为192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181对应的Topic即刚创建的kafka_spark_test12.4 Kafka-Spark Streaming流测试1)下载依赖的jars包2)启动kafka生产者[roottango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test13)运行Spark Streaming流数据处理程序[roottango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py[roottango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py4)在Kafka生产端输入流数据[roottango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1hello worldhello tango hellohello tango tango5)终端打印结果-------------------------------------------Time: 2018-08-08 11:03:15-------------------------------------------(utango, 2)(uhello, 1)6)登录SparkWeb UI查看Spark Streaming的的运行情况a) spark-submit时候指定spark-submit --master spark://192.168.112.121:7077才能在8080端口看到数据b) 如果通过yarn模式调度可通过8088端口查看2.5 Spark写入Kafka1)安装Kafka插件Pyspark访问Kafka需要使用到kafka安装包使用以下命令安装pip install --no-index --find-links../kafka-1.3.5-py2.py3-none.any.whl kafka2)调用KafkaProducer模块spark作为生产者将数据传输到kafka端from kafka import KafkaProducerto_kafka KafkaProducer(bootstrap_serversbroker_list)to_kafka.send(topic_name,send_msg,encode(‘utf8’))to_kafka.flush()参考资料http://spark.apache.org/docs/latest/streaming-kafka-integration.html大数据系列之Kafka集群环境部署
http://www.huolong8.cn/news/322080/

相关文章:

  • 兴安盟网站建设网络广告名词解释
  • 网站代码素材建设品牌推广工作职责
  • 培训机构停课烟台seo做的好的网站
  • 自己制作网站的步骤广州建网站站公司
  • app开发导入网站模板wordpress视频插件w
  • 青海省建设厅查询网站wordpress如何撤销301
  • 网站设计 网站推广 网站优化怎么制作平台
  • 站长工具在线查询婴儿睡袋网站建设
  • 中国哪家网站做仿古做的好nginx 网站建设
  • 网站二级分类长沙装修公司名单
  • 邢台网站建设好蜘蛛临沂中小企业网站制作
  • 哪个网站看电影做便宜建网站代理哪个
  • 空间设计网站大全怎么建立购物网站
  • 实物黄金哪个网站做的好wordpress 输出短代码
  • 最专业网站建设开发淮安住房与城乡建设部网站
  • 广东网站建设案例新野微网站开发
  • 怎样用wordpress搭建网站网站备案没公司名称
  • 高青网站建设做网站优化有什么好处
  • 合肥 做网站的联系昆明网站建设
  • 网站建设域名多少钱公司章程范本
  • ASP做旅游网站代码滨江网站开发
  • 搞网站开发的程序员属于哪一类浙江企业黄页大全
  • 泉州seo网站关键词优做旅游宣传哪个网站好
  • 怎么做好网站wordpress图片替换不掉
  • 济南建网站价格消费品展wordpress rss小工具
  • 滁州建设局网站网站建设 建站知识
  • 济南模板建站多少钱微站和网站数据
  • 成都建站模板网站开发北京网站托管维护
  • 网站开发常用技术wordpress 媒体库清理
  • 优化网站网站静态页面