做网站送的小程序有什么用,如何在网站找做贸易的客户,关键词优化是什么工作,网页设计论文结论Flume 文章目录FlumeFlume介绍Flume核心概念Flume NG的体系结构SourceChannelSinkFlume的部署类型单一流程多代理流程#xff08;多个agent顺序连接#xff09;流的合并#xff08;多个Agent的数据汇聚到同一个Agent #xff09;多路复用流#xff08;多级流#xff09;l…Flume 文章目录FlumeFlume介绍Flume核心概念Flume NG的体系结构SourceChannelSinkFlume的部署类型单一流程多代理流程多个agent顺序连接流的合并多个Agent的数据汇聚到同一个Agent 多路复用流多级流load balance功能Flume组件选型SourceChannelFileChannel和MemoryChannel区别FileChannel优化SinkHDFS小文件处理案例日志采集发送到KafkaFlume配置日志采集流程配置如下Flume拦截器开发Flume采集脚本批量启动停止案例消费Kafka保存到HDFS配置如下FileChannel优化HDFS小文件处理Flume时间戳拦截器项目经验之Flume内存优化Flume介绍
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方用于收集数据;同时Flume提供对数据进行简单处理并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位它携带日志数据(字节数组形式)并且携带有头信息这些Event由Agent外部的Source生成当Source捕获事件后会进行特定的格式化然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。 flume的可靠性 当节点出现故障时日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障从强到弱依次分别为end-to-end收到数据agent首先将event写到磁盘上当数据传送成功后再删除如果数据发送失败可以重新发送。Store on failure这也是scribe采用的策略当数据接收方crash时将数据写到本地待恢复后继续发送Besteffort数据发送到接收方后不会进行确认。 flume的可恢复性 还是靠Channel。推荐使用FileChannel事件持久化在本地文件系统里(性能较差)。
Flume核心概念
ClientClient生产数据运行在一个独立的线程。Event一个数据单元消息头和消息体组成。Events可以是日志记录、 avro 对象等。FlowEvent从源点到达目的点的迁移的抽象。Agent一个独立的Flume进程包含组件Source、 Channel、 Sink。Agent使用JVM 运行Flume。每台机器运行一个agent但是可以在一个agent中包含多个sources和sinks。Source数据收集组件。source从Client收集数据传递给ChannelChannel中转Event的一个临时存储保存由Source组件传递过来的Event。Channel连接 sources 和 sinks 这个有点像一个队列。Sink从Channel中读取并移除Event 将Event传递到FlowPipeline中的下一个Agent如果有的话Sink从Channel收集数据运行在一个独立线程。
Flume NG的体系结构
Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具含有三个核心组件分别是source、 channel、 sink。通过这些组件 Event 可以从一个地方流向另一个地方如下图所示。 Source
Source是数据的收集端负责将数据捕获后进行特殊的格式化将数据封装到事件event 里然后将事件推入Channel中。
Flume提供了各种source的实现包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Sourceetc。如果内置的Source无法满足需要 Flume还支持自定义Source。 Channel
Channel是连接Source和Sink的组件大家可以将它看做一个数据的缓冲区数据队列它可以将事件暂存到内存中也可以持久化到本地磁盘上 直到Sink处理完该事件。
Flume对于Channel则提供了Memory Channel、JDBC Chanel、File Channeletc。
MemoryChannel可以实现高速的吞吐但是无法保证数据的完整性。
MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘以便提高效率。
Sink
Flume Sink取出Channel中的数据进行相应的存储文件系统数据库或者提交到远程服务器。
Flume也提供了各种sink的实现包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sinketc。
Flume Sink在设置存储数据时可以向文件系统中数据库中hadoop中储数据在日志数据较少时可以将数据存储在文件系中并且设定一定的时间间隔保存数据。在日志数据较多时可以将相应的日志数据存储到Hadoop中便于日后进行相应的数据分析。
Flume的部署类型
单一流程 多代理流程多个agent顺序连接 可以将多个Agent顺序连接起来将最初的数据源经过收集存储到最终的存储系统中。这是最简单的情况一般情况下应该控制这种顺序连接的Agent 的数量因为数据流经的路径变长了如果不考虑failover的话出现故障将影响整个Flow上的Agent收集服务。
流的合并多个Agent的数据汇聚到同一个Agent 这种情况应用的场景比较多比如要收集Web网站的用户行为日志 Web网站为了可用性使用的负载集群模式每个节点都产生用户行为日志可以为每 个节点都配置一个Agent来单独收集日志数据然后多个Agent将数据最终汇聚到一个用来存储数据存储系统如HDFS上。
多路复用流多级流
Flume还支持多级流什么多级流来举个例子当syslog java nginx、 tomcat等混合在一起的日志流开始流入一个agent后可以agent中将混杂的日志流分开然后给每种日志建立一个自己的传输通道。
load balance功能
下图Agent1是一个路由节点负责将Channel暂存的Event均衡到对应的多个Sink组件上而每个Sink组件分别连接到一个独立的Agent上 。
Flume组件选型
Source Taildir Source相比Exec Source、Spooling Directory Source的优势 TailDir Source断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置实现断点续传。不会丢数据但是有可能会导致数据重复。 Exec Source可以实时搜集数据但是在Flume不运行或者Shell命令出错的情况下数据将会丢失。 Spooling Directory Source监控目录支持断点续传。 batchSize大小如何设置 答Event 1K左右时500-1000合适默认为100
Channel
采用Kafka Channel省去了Sink提高了效率。KafkaChannel数据存储在Kafka里面所以数据是存储在磁盘中。
注意在Flume1.7以前Kafka Channel很少有人使用因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false都会转为Flume Event。这样的话造成的结果是会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中这显然不是我所需要的我只是需要把内容写入即可。
FileChannel和MemoryChannel区别
MemoryChannel:
传输数据速度更快但因为数据保存在JVM的堆内存中Agent进程挂掉会导致数据丢失适用于对数据质量要求不高的需求。
FileChannel:
传输速度相对于Memory慢但数据安全保障高Agent进程挂掉也可以从失败中恢复数据。
选型
金融类公司、对钱要求非常准确的公司通常会选择FileChannel
传输的是普通日志信息京东内部一天丢100万-200万条这是非常正常的通常选择MemoryChannel。
FileChannel优化
通过配置dataDirs指向多个路径每个路径对应不同的硬盘增大Flume吞吐量。
官方说明如下
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformancecheckpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中保证checkpoint坏掉后可以快速使用backupCheckpointDir恢复数据。
FileChannel原理
Sink
HDFS小文件处理
HDFS存入大量小文件有什么影响
**元数据层面**每个小文件都有一份元数据其中包括文件路径文件名所有者所属组权限创建时间等这些信息都保存在Namenode内存中。所以小文件过多会占用Namenode服务器大量内存影响Namenode性能和使用寿命
**计算层面**默认情况下MR会对每个小文件启用一个Map任务计算非常影响计算性能。同时也影响磁盘寻址时间。
官方默认的这三个参数配置写入HDFS后会产生小文件hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval3600hdfs.rollSize134217728hdfs.rollCount 0几个参数综合作用效果如下
①文件在达到128M时会滚动生成新文件
②文件创建超3600秒时会滚动生成新文件
案例日志采集发送到KafkaFlume配置
日志采集流程 配置如下
在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件 vim file-flume-kafka.conf# 为各组件命名
a1.sources r1
a1.channels c1# 描述source
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1
a1.sources.r1.filegroups.f1 /opt/program/logs/app.*
a1.sources.r1.positionFile /opt/program/logs/taildir_position.json
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.jast.flume.ETLInterceptor$Builder# 描述channel
a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers 192.168.60.14:9092,192.168.60.15:9092
a1.channels.c1.kafka.topic topic_log
a1.channels.c1.parseAsFlumeEvent false# 绑定source和channel以及sink和channel的关系
a1.sources.r1.channels c1注意com.jast.flume.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。 Flume拦截器开发 创建Maven工程flume-interceptor 创建包名com.jast.flume.interceptor 在pom.xml文件中添加如下配置 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.7.19/version/dependency/dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build注意scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。 在com.jast.flume.ETLInterceptor包下创建ETLInterceptor类 package com.jast.flume;import cn.hutool.json.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;/*** author Jast* description 自定义拦截器*/
public class ETLInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {byte[] body event.getBody();String text new String(body, StandardCharsets.UTF_8);if(JSONUtil.isJson(text)){return event;}System.out.println(非json格式过滤掉);return null;}Overridepublic ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()){Event next iterator.next();if(intercept(next)null){iterator.remove();}}return list;}Overridepublic void close() {}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}
} 打包 自己部署的Flume需要先将打好的包放入到flume/lib文件夹下面 cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/libCDH版本Flume需要先将打好的包放入到flume/lib文件夹下面 具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/ cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib启动Flume nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.loggerINFO,LOGFILE /opt/program/flume/log1.txt 21 测试 随便在app.log加入一条数据拦截器检测到然后打印出非json格式过滤掉。 2022-03-17 09:42:42,148 INFO taildir.ReliableTaildirEventReader: Pos 9 is larger than file size! Restarting from pos 0, file: /opt/program/logs/app.log, inode: 5810713
2022-03-17 09:42:42,148 INFO taildir.TailFile: Updated position, file: /opt/program/logs/app.log, inode: 5810713, pos: 0
非json格式过滤掉
2022-03-17 09:45:17,236 INFO taildir.TaildirSource: Closed file: /opt/program/logs/app.log, inode: 5810713, pos: 22输入一条json数据在kafka消费时正常消费配置成功 22/03/17 09:15:41 INFO internals.Fetcher: [Consumer clientIdconsumer-1, groupIdconsole-consumer-3667] Resetting offset for partition topic_log-0 to offset 0.
{en:小张}Flume采集脚本批量启动停止 在/home/atguigu/bin目录下创建脚本f1.sh vim f1.sh#! /bin/bashcase $1 in
start){for i in localhostdoecho --------启动 $i 采集flume-------ssh $i nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.loggerINFO,LOGFILE /opt/program/flume/log1.txt 21 done
};;
stop){for i in localhostdoecho --------停止 $i 采集flume-------ssh $i ps -ef | grep file-flume-kafka-1.conf | grep -v grep |awk {print \$2} | xargs -n1 kill -9 done};;
esac说明1nohup该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思不挂断地运行命令。 说明2awk 默认分隔符为空格 说明3$2是在“”双引号内部会被解析为脚本的第二个参数但是这里面想表达的含义是awk的第二个值所以需要将他转义用$2表示。 说明4xargs 表示取出前面命令运行的结果作为后面命令的输入参数。ls 增加脚本执行权限 chmod ux f1.shf1集群启动脚本 f1.sh startf1集群停止脚本 f1.sh stop案例消费Kafka保存到HDFS 配置如下
## 组件
a1.sourcesr1
a1.channelsc1
a1.sinksk1## source1
a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize 5000
a1.sources.r1.batchDurationMillis 2000
a1.sources.r1.kafka.bootstrap.servers 192.168.60.15:9092,192.168.60.14:9092
a1.sources.r1.kafka.topicstopic_log
#a1.sources.r1.interceptors i1
#a1.sources.r1.interceptors.i1.type com.atguigu.flume.interceptor.TimeStampInterceptor$Builder## channel1
a1.channels.c1.type file
a1.channels.c1.checkpointDir /opt/program/flume/checkpoint/behavior1
a1.channels.c1.dataDirs /opt/program/flume/data/behavior1/## sink1
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path /jast_root/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix log-
a1.sinks.k1.hdfs.round false#控制生成的小文件
# 每隔多少秒生成一个
a1.sinks.k1.hdfs.rollInterval 10
# 128M生成一个文件
a1.sinks.k1.hdfs.rollSize 134217728
a1.sinks.k1.hdfs.rollCount 0## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType CompressedStream
a1.sinks.k1.hdfs.codeC GzipCodec## 拼装
a1.sources.r1.channels c1
a1.sinks.k1.channel c1CDH 版本Flume写入HDFS时Flume部署的服务器需要安装HDFS Gateway 自己部署版本Flume与Hadoop集群不在一起的话需要配置Hadoop环境变量 FileChannel优化
通过配置dataDirs指向多个路径每个路径对应不同的硬盘增大Flume吞吐量。
官方说明如下
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformancecheckpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中保证checkpoint坏掉后可以快速使用backupCheckpointDir恢复数据。
FileChannel原理 HDFS小文件处理
HDFS存入大量小文件有什么影响
**元数据层面**每个小文件都有一份元数据其中包括文件路径文件名所有者所属组权限创建时间等这些信息都保存在Namenode内存中。所以小文件过多会占用Namenode服务器大量内存影响Namenode性能和使用寿命
**计算层面**默认情况下MR会对每个小文件启用一个Map任务计算非常影响计算性能。同时也影响磁盘寻址时间。
官方默认的这三个参数配置写入HDFS后会产生小文件hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval3600hdfs.rollSize134217728hdfs.rollCount 0几个参数综合作用效果如下
①文件在达到128M时会滚动生成新文件
②文件创建超3600秒时会滚动生成新文件
Flume时间戳拦截器
由于Flume默认会用Linux系统时间作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时有可能已经是第二天了那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间发往HDFS的路径所以下面拦截器作用是获取日志中的实际时间。
解决的思路拦截json日志通过fastjson框架解析json获取实际时间ts。将获取的ts时间写入拦截器header头header的key必须是timestamp因为Flume框架会根据这个key的值识别为时间写入到HDFS。 在com.jast.flume.interceptor包下创建TimeStampInterceptor类 package com.jast.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayListEvent events new ArrayList();Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {MapString, String headers event.getHeaders();String log new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);String ts jsonObject.getString(ts);headers.put(timestamp, ts);return event;}Overridepublic ListEvent intercept(ListEvent list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}Overridepublic void close() {}public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimeStampInterceptor();}Overridepublic void configure(Context context) {}}
}重新打包 自己部署的Flume需要先将打好的包放入到flume/lib文件夹下面 cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/libCDH版本Flume需要先将打好的包放入到flume/lib文件夹下面 具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/ cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib项目经验之Flume内存优化 问题描述如果启动消费Flume抛出如下异常 ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded解决方案步骤 在服务器的flume/conf/flume-env.sh文件中增加如下配置 export JAVA_OPTS-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote Flume内存参数设置及优化 JVM heap一般设置为4G或更高 -Xmx与-Xms最好设置一致减少内存抖动带来的性能影响如果设置不一致容易导致频繁fullgc。 -Xms表示JVM Heap堆内存最小尺寸初始分配-Xmx 表示JVM Heap(堆内存)最大允许的尺寸按需分配。如果不设置一致容易在初始化时由于内存不够频繁触发fullgc。 参考内容 https://www.cnblogs.com/qingyunzong/p/8994494.html https://flume.apache.org/