当前位置: 首页 > news >正文

怎样做网站快手刷粉做网站昆山

怎样做网站快手刷粉,做网站昆山,制作灯笼的手工做法视频,山东省建设工程质量安全协会网站用户行为数据由Flume从Kafka直接同步到HDFS#xff0c;由于离线数仓采用Hive的分区表按天统计#xff0c;所以目标路径要包含一层日期。具体数据流向如下图所示。 按照规划#xff0c;该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分由于离线数仓采用Hive的分区表按天统计所以目标路径要包含一层日期。具体数据流向如下图所示。 按照规划该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分将不同天的数据发往HDFS不同天的路径。 此处选择KafkaSource、FileChannel、HDFSSink。关键配置如下 日志消费者 Flume 实操 在hadoop101 节点的Flume 的 job目录下创建 kafka_to_hdfs_log.conf内容如下 配置注释 FileChannel优化配置 dataDirsk可以通过逗号分隔指向多个路径每个路径对应不同硬盘可以增加吞吐量。新增checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中保证checkpoint坏掉后可以快速使用backupCheckpointDir恢复数据 #定义组件 a1.sourcesr1 a1.channelsc1 a1.sinksk1#配置source1 a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize 5000 a1.sources.r1.batchDurationMillis 2000 a1.sources.r1.kafka.bootstrap.servers hadoop101:9092,hadoop102:9092 a1.sources.r1.kafka.topicstopic_log a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.logan.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel a1.channels.c1.type file a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs /opt/module/flume/data/behavior1 a1.channels.c1.maxFileSize 2146435071 a1.channels.c1.capacity 1000000 a1.channels.c1.keep-alive 6#配置sink a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix log a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10 a1.sinks.k1.hdfs.rollSize 134217728 a1.sinks.k1.hdfs.rollCount 0#控制输出文件类型 a1.sinks.k1.hdfs.fileType CompressedStream a1.sinks.k1.hdfs.codeC gzip#组装 a1.sources.r1.channels c1 a1.sinks.k1.channel c1HDFS Sink 优化 HDFS存入大量小文件有什么影响 元数据层面每个小文件都有一份元数据其中包括文件路径文件名所有者所属组权限创建时间等这些信息都保存在Namenode内存中。所以小文件过多会占用Namenode服务器大量内存影响Namenode性能和使用寿命。HDFS小文件处理。 官方默认三个参数配置写入HDFS 后会产生小文件: hdfs.rollInterval, hdfs.rollSize, hdfs.rollCount。 本次配置的参数为hdfs.rollInterval3600hdfs.rollSize134217728hdfs.rollCount 0。意味着文件在达到128M时会滚动生成新文件或者文件超过 3600 秒会生成新文件。 编写 Flume 拦截器 解决问题 在com.logan.gmall.flume.interceptor包下创建TimestampInterceptor类 package com.logan.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map;public class TimestampInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {// 获取header和body数据MapString, String headers event.getHeaders();String body new String(event.getBody(), StandardCharsets.UTF_8);// 将body转换成JsonObject类型JSONObject jsonObject JSONObject.parseObject(body);// 将header中的timestamp时间转换成body中的timestamp解决数据漂移问题String ts jsonObject.getString(ts);headers.put(timestamp, ts);return event;}Overridepublic ListEvent intercept(ListEvent list) {for (Event event : list) {intercept(event);}return list;}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new TimestampInterceptor();}Overridepublic void configure(Context context) {}}Overridepublic void close() {} } 将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下 启动测试 启动 Zookeeper、Kafka 集群启动 hadoop101 的消费Flume [loganhadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.loggerinfo,console生成模拟数据[loganhadoop101 ~]$ vim /opt/module/applog/log/app.2023-12-14.log {common:{ar:110000,ba:vivo,ch:oppo,is_new:0,md:vivo iqoo3,mid:mid_70997,os:Android 11.0,uid:776,vc:v2.1.134},start:{entry:icon,loading_time:11968,open_ad_id:16,open_ad_ms:7891,open_ad_skip_ms:0},ts:1672503309000} {common:{ar:110000,ba:vivo,ch:oppo,is_new:0,md:vivo iqoo3,mid:mid_70997,os:Android 11.0,uid:776,vc:v2.1.134},displays:[{display_type:activity,item:2,item_type:activity_id,order:1,pos_id:1},{display_type:activity,item:2,item_type:activity_id,order:2,pos_id:1},{display_type:query,item:9,item_type:sku_id,order:3,pos_id:1},{display_type:query,item:18,item_type:sku_id,order:4,pos_id:4},{display_type:promotion,item:35,item_type:sku_id,order:5,pos_id:4},{display_type:query,item:35,item_type:sku_id,order:6,pos_id:4},{display_type:recommend,item:13,item_type:sku_id,order:7,pos_id:5}],page:{during_time:14287,page_id:home},ts:1672503309000}检查HFDS是否生成数据当 HDFS 生成数据后增加[loganhadoop101 bin]$ vim f2.sh #!/bin/bashcase $1 in start)echo --------启动 hadoop101 日志数据flume-------ssh hadoop101 nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf /dev/null 21 ;; stop)echo --------停止 hadoop101 日志数据flume-------ssh hadoop101 ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk {print \$2} | xargs -n1 kill ;; esac最终 HDFS 文件
http://www.huolong8.cn/news/95254/

相关文章:

  • 做市场推广应该掌握什么技巧佛山seo外包平台
  • 阿里巴巴网站如何做固定背景婚纱销售网站
  • 云南省做网站开发的公司排名wordpress 阿里云主机
  • 上海网站架设wordpress整站模板
  • 展示网站源码下载百度的网站网址
  • 深圳专业专业网站设计公司做网站赚50万
  • 清远网站开发公司钢构网架加工厂
  • 东台市住房和城乡建设局网站wordpress 企业沟通插件
  • 临清网站建设费用品牌建设与市场推广
  • 成都都江堰网站建设北京网站代运营公司
  • wordpress+药品食品网站快照优化公司
  • 企业网站建设定制软件技术一个月工资多少
  • 人才招聘网站模板html做微整去那个网站找好的医院
  • 广州做外贸网站建设构建自己网站
  • 中国芗城区城乡建设局网站怎么查看网站打开速度
  • 商务网站建设评估的指标前端网课
  • 商城网站建设快速服务订货网站开发价格
  • 微站设计做设计用哪个素材网站好
  • 自考网站建设与管理资料成都网页设计师
  • c2c网站制作定制网站开发食道里感觉有东西堵
  • 宁波网站怎么建设网站建设 深圳
  • 做证券考试的网站wordpress主题js文件在哪
  • 工商注册在哪个网站前端开发是程序员吗
  • 薛城网站建设怎么弄一个自己的网址
  • 做网站虚拟主机推荐为什么想做网站运营
  • 赵公口网站建设网络推广app是干什么的
  • 卖高权重网站做跳转网站主办者和所有者区别
  • 网站简介 更改做网站和推广
  • 深圳比较好的ui设计公司小果seo实战培训课程
  • 网站制作定制图pc网站如何做sp