做年会的网站,wordpress 采集图片,网站开发广告怎么写,网页设计需要学什么软件知乎在《基于Flume的美团日志收集系统(一)架构和设计》中#xff0c;我们详述了基于Flume的美团日志收集系统的架构设计#xff0c;以及为什么做这样的设计。在本节中#xff0c;我们将会讲述在实际部署和使用过程中遇到的问题#xff0c;对Flume的功能改进和对系统做的优化。 … 在《基于Flume的美团日志收集系统(一)架构和设计》中我们详述了基于Flume的美团日志收集系统的架构设计以及为什么做这样的设计。在本节中我们将会讲述在实际部署和使用过程中遇到的问题对Flume的功能改进和对系统做的优化。 1 Flume的问题总结 在Flume的使用过程中遇到的主要问题如下 a. Channel“水土不服”使用固定大小的MemoryChannel在日志高峰时常报队列大小不够的异常使用FileChannel又导致IO繁忙的问题 b. HdfsSink的性能问题使用HdfsSink向Hdfs写日志在高峰时间速度较慢 c. 系统的管理问题配置升级模块重启等 2 Flume的功能改进和优化点 从上面的问题中可以看到有一些需求是原生Flume无法满足的因此基于开源的Flume我们增加了许多功能修改了一些Bug并且进行一些调优。下面将对一些主要的方面做一些说明。 2.1 增加Zabbix monitor服务 一方面Flume本身提供了http, ganglia的监控服务而我们目前主要使用zabbix做监控。因此我们为Flume添加了zabbix监控模块和sa的监控服务无缝融合。 另一方面净化Flume的metrics。只将我们需要的metrics发送给zabbix避免 zabbix server造成压力。目前我们最为关心的是Flume能否及时把应用端发送过来的日志写到Hdfs上 对应关注的metrics为 Source : 接收的event数和处理的event数Channel : Channel中拥堵的event数Sink : 已经处理的event数2.2 为HdfsSink增加自动创建index功能 首先我们的HdfsSink写到hadoop的文件采用lzo压缩存储。 HdfsSink可以读取hadoop配置文件中提供的编码类列表然后通过配置的方式获取使用何种压缩编码我们目前使用lzo压缩数据。采用lzo压缩而非bz2压缩是基于以下测试数据 event大小(Byte)sink.batch-sizehdfs.batchSize压缩格式总数据大小(G)耗时(s)平均events/s压缩后大小(G)54430010000bz29.1244868331.3654430010000lzo9.1612273333.49其次我们的HdfsSink增加了创建lzo文件后自动创建index功能。Hadoop提供了对lzo创建索引使得压缩文件是可切分的这样Hadoop Job可以并行处理数据文件。HdfsSink本身lzo压缩但写完lzo文件并不会建索引我们在close文件之后添加了建索引功能。 /**
* Rename bucketPath file from .tmp to permanent location.
*/
private void renameBucket() throws IOException, InterruptedException {if(bucketPath.equals(targetPath)) {return;}final Path srcPath new Path(bucketPath);final Path dstPath new Path(targetPath);callWithTimeout(new CallRunnerObject() {Overridepublic Object call() throws Exception {if(fileSystem.exists(srcPath)) { // could blockLOG.info(Renaming srcPath to dstPath);fileSystem.rename(srcPath, dstPath); // could block//index the dstPath lzo fileif (codeC ! null .lzo.equals(codeC.getDefaultExtension()) ) {LzoIndexer lzoIndexer new LzoIndexer(new Configuration());lzoIndexer.index(dstPath);}}return null;}});
}2.3 增加HdfsSink的开关 我们在HdfsSink和DualChannel中增加开关当开关打开的情况下HdfsSink不再往Hdfs上写数据并且数据只写向DualChannel中的FileChannel。以此策略来防止Hdfs的正常停机维护。 2.4 增加DualChannel Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快但缓存大小有限且没有持久化FileChannel则刚好相反。我们希望利用两者的优势在Sink处理速度够快Channel没有缓存过多日志的时候就使用MemoryChannel当Sink处理速度跟不上又需要Channel能够缓存下应用端发送过来的日志时就使用FileChannel由此我们开发了DualChannel能够智能的在两个Channel之间切换。 其具体的逻辑如下 /**** putToMemChannel indicate put event to memChannel or fileChannel* takeFromMemChannel indicate take event from memChannel or fileChannel* */
private AtomicBoolean putToMemChannel new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel new AtomicBoolean(true);void doPut(Event event) {if (switchon putToMemChannel.get()) {//往memChannel中写数据memTransaction.put(event);if ( memChannel.isFull() || fileChannel.getQueueSize() 100) {putToMemChannel.set(false);}} else {//往fileChannel中写数据fileTransaction.put(event);}
}Event doTake() {Event event null;if ( takeFromMemChannel.get() ) {//从memChannel中取数据event memTransaction.take();if (event null) {takeFromMemChannel.set(false);}} else {//从fileChannel中取数据event fileTransaction.take();if (event null) {takeFromMemChannel.set(true);putToMemChannel.set(true);}}return event;
}2.5 增加NullChannel Flume提供了NullSink可以把不需要的日志通过NullSink直接丢弃不进行存储。然而Source需要先将events存放到Channel中NullSink再将events取出扔掉。为了提升性能我们把这一步移到了Channel里面做所以开发了NullChannel。 2.6 增加KafkaSink 为支持向Storm提供实时数据流我们增加了KafkaSink用来向Kafka写实时数据流。其基本的逻辑如下 public class KafkaSink extends AbstractSink implements Configurable {private String zkConnect;private Integer zkTimeout;private Integer batchSize;private Integer queueSize;private String serializerClass;private String producerType;private String topicPrefix;private ProducerString, String producer;public void configure(Context context) {//读取配置并检查配置}Overridepublic synchronized void start() {//初始化producer}Overridepublic synchronized void stop() {//关闭producer}Overridepublic Status process() throws EventDeliveryException {Status status Status.READY;Channel channel getChannel();Transaction tx channel.getTransaction();try {tx.begin();//将日志按category分队列存放MapString, ListString topic2EventList new HashMapString, ListString();//从channel中取batchSize大小的日志从header中获取category生成topic并存放于上述的Map中//将Map中的数据通过producer发送给kafkatx.commit();} catch (Exception e) {tx.rollback();throw new EventDeliveryException(e);} finally {tx.close();}return status;}
}2.7 修复和scribe的兼容问题 Scribed在通过ScribeSource发送数据包给Flume时大于4096字节的包会先发送一个Dummy包检查服务器的反应而Flume的ScribeSource对于logentry.size()0的包返回TRY_LATER此时Scribed就认为出错断开连接。这样循环反复尝试无法真正发送数据。现在在ScribeSource的Thrift接口中对size为0的情况返回OK保证后续正常发送数据。 3. Flume系统调优经验总结 3.1 基础参数调优经验 HdfsSink中默认的serializer会每写一行在行尾添加一个换行符我们日志本身带有换行符这样会导致每条日志后面多一个空行修改配置不要自动添加换行符lc.sinks.sink_hdfs.serializer.appendNewline false 调大MemoryChannel的capacity尽量利用MemoryChannel快速的处理能力调大HdfsSink的batchSize增加吞吐量减少hdfs的flush次数适当调大HdfsSink的callTimeout避免不必要的超时错误3.2 HdfsSink获取Filename的优化 HdfsSink的path参数指明了日志被写到Hdfs的位置该参数中可以引用格式化的参数将日志写到一个动态的目录中。这方便了日志的管理。例如我们可以将日志写到category分类的目录并且按天和按小时存放 lc.sinks.sink_hdfs.hdfs.path /user/hive/work/orglog.db/%{category}/dt%Y%m%d/hour%H HdfsS ink中处理每条event时都要根据配置获取此event应该写入的Hdfs path和filename默认的获取方法是通过正则表达式替换配置中的变量获取真实的path和filename。因为此过程是每条event都要做的操作耗时很长。通过我们的测试20万条日志这个操作要耗时6-8s左右。 由于我们目前的path和filename有固定的模式可以通过字符串拼接获得。而后者比正则匹配快几十倍。拼接定符串的方式20万条日志的操作只需要几百毫秒。 3.3 HdfsSink的b/m/s优化 在我们初始的设计中所有的日志都通过一个Channel和一个HdfsSink写到Hdfs上。我们来看一看这样做有什么问题。 首先我们来看一下HdfsSink在发送数据的逻辑 //从Channel中取batchSize大小的events
for (txnEventCount 0; txnEventCount batchSize; txnEventCount) {//对每条日志根据category append到相应的bucketWriter上bucketWriter.append(event);
for (BucketWriter bucketWriter : writers) {//然后对每一个bucketWriter调用相应的flush方法将数据flush到Hdfs上bucketWriter.flush();假设我们的系统中有100个categorybatchSize大小设置为20万。则每20万条数据就需要对100个文件进行append或者flush操作。 其次对于我们的日志来说基本符合80/20原则。即20%的category产生了系统80%的日志量。这样对大部分日志来说每20万条可能只包含几条日志也需要往Hdfs上flush一次。 上述的情况会导致HdfsSink写Hdfs的效率极差。下图是单Channel的情况下每小时的发送量和写hdfs的时间趋势图。 鉴于这种实际应用场景我们把日志进行了大小归类分为big, middle和small三类这样可以有效的避免小日志跟着大日志一起频繁的flush提升效果明显。下图是分队列后big队列的每小时的发送量和写hdfs的时间趋势图。 4 未来发展 目前Flume日志收集系统提供了一个高可用高可靠可扩展的分布式服务已经有效地支持了美团的日志数据收集工作。 后续我们将在如下方面继续研究 日志管理系统图形化的展示和控制日志收集系统跟进社区发展跟进Flume 1.5的进展同时回馈社区