辽宁省住房和城乡建设厅官方网站,公司起名字,企业门户网站开发源码,公众号怎么制作内容前言
本节内容我们主要介绍几个Flume数据采集的基本案例#xff0c;包括监控端口数据、实时监控单个追加文件、实时监控目录下多个新文件、实时监控目录下的多个追加文件等案例。完成flume数据监控的基本使用。
正文
监控端口数据 ①需求说明 - 使用 Flume 监听一个端口包括监控端口数据、实时监控单个追加文件、实时监控目录下多个新文件、实时监控目录下的多个追加文件等案例。完成flume数据监控的基本使用。
正文
监控端口数据 ①需求说明 - 使用 Flume 监听一个端口收集该端口数据并打印到控制台 ②需求分析 ③安装netcat 工具sudo yum install -y nc ④查看监听端口1111是否被占用注意测试端口的范围是0-65535 ⑤在flume安装目录下创建一个job目录用与存放监听数据的配置文件 ⑥在job目录下创建监听数据的配置文件job-netcat-flume-console.conf # Name the components on this agent
#a1:表示agent的名称,不能重复
a1.sources r1 #r1:表示a1的Source的名称
a1.sinks k1 #k1:表示a1的Sink的名称
a1.channels c1 #c1:表示a1的Channel的名称
# Describe/configure the source
a1.sources.r1.type netcat #表示a1的输入源类型为netcat端口类型
a1.sources.r1.bind localhost #表示a1的监听的主机
a1.sources.r1.port 1111 #表示a1的监听的端口号
# Describe the sink
a1.sinks.k1.type logger #表示a1的输出目的地是控制台logger类型
# Use a channel which buffers events in memory
a1.channels.c1.type memory #表示a1的channel类型是memory内存型
a1.channels.c1.capacity 1000 #表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity 100 #表示a1的channel传输时收集到了100条event以后再去提交事务
# Bind the source and sink to the channel
a1.sources.r1.channels c1 #表示将r1和c1连接起来
a1.sinks.k1.channel c1 #表示将k1和c1连接起来 ⑦开启 flume服务监听端口 bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-console.conf -Dflume.root.loggerINFO,console ⑧启动参数说明 --conf/-c表示配置文件存储在 conf/目录 --name/-n表示给 agent 起名为 a1 --conf-file/-fflume本次启动读取的配置文件是在job文件夹下的job-netcat-flume-console.conf文件 -Dflume.root.loggerINFO,console -D 表示 flume 运行时动态修改 flume.root.logger 参数属性值并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error ⑨使用netcat 工具向本机的1111端口发送内容 实时监控单个追加文件 ①监控需求 - 实时监控Hive日志并上传到HDFS ②需求分析 ③在job目录下创建监听数据的配置文件job-file-flume-hdfs.conf # Name the components on this agent
a2.sources r2
a2.sinks k2
a2.channels c2
# Describe/configure the source
a2.sources.r2.type exec
#hive日志的默认位置
a2.sources.r2.command tail -F /tmp/hadoop/hive.log
# Describe the sink
a2.sinks.k2.type hdfs
a2.sinks.k2.hdfs.path hdfs://hadoop101:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp true
#积攒多少个Event才flush到 HDFS一次
a2.sinks.k2.hdfs.batchSize 100
#设置文件类型可支持压缩
a2.sinks.k2.hdfs.fileType DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount 0
# Use a channel which buffers events in memory
a2.channels.c2.type memory
a2.channels.c2.capacity 1000
a2.channels.c2.transactionCapacity 100
# Bind the source and sink to the channel
a2.sources.r2.channels c2
a2.sinks.k2.channel c2④启动hadoop集群 ⑤启动flume监控任务 bin/flume-ng agent --conf conf/ --name a2 --conf-file job/job-file-flume-hdfs.conf -Dflume.root.loggerINFO,console ⑥启动hive ⑦查看hdfs是否有监控日志 ⑧存在的问题 - tail命令不能实现断点续传监控的功能可能会有数据丢失的情况或者数据重复的问题 - Exec source 适用于监控一个实时追加的文件不能实现断点续传 实时监控目录下多个新文件 ①监控需求 - 使用 Flume 监听整个目录的文件并上传至 HDFS ②需求分析 ③在job目录下创建监听目录数据的配置文件job-dir-flume-hdfs.conf a3.sources r3
a3.sinks k3
a3.channels c3
# Describe/configure the source
a3.sources.r3.type spooldir
a3.sources.r3.spoolDir /opt/module/apache-flume-1.9.0/upload
a3.sources.r3.fileSuffix .COMPLETED
a3.sources.r3.fileHeader true
#忽略所有以.tmp 结尾的文件不上传
a3.sources.r3.ignorePattern ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type hdfs
a3.sinks.k3.hdfs.path hdfs://hadoop101:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize 100
#设置文件类型可支持压缩
a3.sinks.k3.hdfs.fileType DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount 0
# Use a channel which buffers events in memory
a3.channels.c3.type memory
a3.channels.c3.capacity 1000
a3.channels.c3.transactionCapacity 100
# Bind the source and sink to the channel
a3.sources.r3.channels c3
a3.sinks.k3.channel c3④启动hadoop集群 ⑤创建upload监控目录 ⑥启动目录监控任务 bin/flume-ng agent -c conf/ -n a3 -f job/job-dir-flume-hdfs.conf -Dflume.root.loggerINFO,console ⑦在upload中上传文件 ⑧查看hdfs中是否上传成功 ⑨存在的问题 - 相同文件名的文件不能重复上传只能上传一次修改了也不会再次上传 - 忽略的文件和配置后缀.COMPLETED的文件不能重复上传 - Spooldir Source 适合用于同步新文件但不适合对实时追加日志的文件进行监听并同步 实时监控目录下的多个追加文件 ①案例需求 - 使用Flume监听整个目录的实时追加文件并上传至HDFS - 使用Taildir Source适合用于监听多个实时追加的文件并且能够实现断点续传 ②需求分析 ③在job目录下创建监听目录数据的配置文件job-taildir-flume-hdfs.conf a4.sources r4
a4.sinks k4
a4.channels c4
# Describe/configure the source
a4.sources.r4.type TAILDIR
a4.sources.r4.positionFile /opt/module/apache-flume-1.9.0/tail_dir.json
a4.sources.r4.filegroups f1 f2
a4.sources.r4.filegroups.f1 /opt/module/apache-flume-1.9.0/files/.*file.*
a4.sources.r4.filegroups.f2 /opt/module/apache-flume-1.9.0/files2/.*log.*
# Describe the sink
a4.sinks.k4.type hdfs
a4.sinks.k4.hdfs.path hdfs://hadoop101:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a4.sinks.k4.hdfs.filePrefix upload-
#是否按照时间滚动文件夹
a4.sinks.k4.hdfs.round true
#多少时间单位创建一个新的文件夹
a4.sinks.k4.hdfs.roundValue 1
#重新定义时间单位
a4.sinks.k4.hdfs.roundUnit hour
#是否使用本地时间戳
a4.sinks.k4.hdfs.useLocalTimeStamp true
#积攒多少个 Event 才 flush 到 HDFS 一次
a4.sinks.k4.hdfs.batchSize 100
#设置文件类型可支持压缩
a4.sinks.k4.hdfs.fileType DataStream
#多久生成一个新的文件
a4.sinks.k4.hdfs.rollInterval 60
#设置每个文件的滚动大小大概是 128M
a4.sinks.k4.hdfs.rollSize 134217700
#文件的滚动与 Event 数量无关
a4.sinks.k4.hdfs.rollCount 0
# Use a channel which buffers events in memory
a4.channels.c4.type memory
a4.channels.c4.capacity 1000
a4.channels.c4.transactionCapacity 100
# Bind the source and sink to the channel
a4.sources.r4.channels c4
a4.sinks.k4.channel c4④启动hadoop集群 ⑤创建监控目录文件files和files2 ⑥启动flume监控 bin/flume-ng agent -c conf/ -n a4 -f job/job-taildir-flume-hdfs.conf -Dflume.root.loggerINFO,console ⑦往files和files2目录中的文件写数据 ⑧在hdfs中查看数据 结语
关于Flume数据采集的基本案例实战到这里就结束了我们下期见。。。。。。