当前位置: 首页 > news >正文

.net 获取网站域名做平台交易网站怎么收款

.net 获取网站域名,做平台交易网站怎么收款,three.js 做的网站,卖域名的网站哪些好前置知识#xff1a; Flume学习笔记#xff08;1#xff09;—— Flume入门-CSDN博客 Flume学习笔记#xff08;2#xff09;—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析#xff1a;使用 Flume 采集服务器本地日志#xff0c;需要按照日志… 前置知识 Flume学习笔记1—— Flume入门-CSDN博客 Flume学习笔记2—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析使用 Flume 采集服务器本地日志需要按照日志类型的不同将不同种类的日志发往不同的分析系统 需要使用Flume 拓扑结构中的 Multiplexing 结构Multiplexing的原理是根据 event 中 Header 的某个 key 的值将不同的 event 发送到不同的 Channel中所以我们需要自定义一个 Interceptor为不同类型的 event 的 Header 中的 key 赋予不同的值 实现流程 代码 导入依赖 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version/dependency /dependencies 自定义拦截器的代码 package com.why.interceptor;import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList; import java.util.List; import java.util.Map;public class TypeInterceptor implements Interceptor {//存放事件的集合private ListEvent addHeaderEvents;Overridepublic void initialize() {//初始化集合addHeaderEvents new ArrayList();}//单个事件拦截Overridepublic Event intercept(Event event) {//获取头信息MapString, String headers event.getHeaders();//获取body信息String body new String(event.getBody());//根据数据中是否包含”why“来分组if(body.contains(why)){headers.put(type,first);}else {headers.put(type,second);}return event;}//批量事件拦截Overridepublic ListEvent intercept(ListEvent events) {//清空集合addHeaderEvents.clear();//遍历eventsfor(Event event : events){//给每一个事件添加头信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}Overridepublic void close() {}//构建生成器public static class TypeBuilder implements Interceptor.Builder{Overridepublic Interceptor build() {return new TypeInterceptor();}Overridepublic void configure(Context context) {}}} 将代码打包放入flume安装路径下的lib文件夹中 配置文件 在job文件夹下创建group4目录添加配置文件 为 hadoop102 上的 Flume1 配置 1 个 netcat source1 个 sink group2 个 avro sink并配置相应的 ChannelSelector 和 interceptor # Name the components on this agent a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.why.interceptor.TypeInterceptor$TypeBuilder a1.sources.r1.selector.type multiplexing a1.sources.r1.selector.header type a1.sources.r1.selector.mapping.first c1 a1.sources.r1.selector.mapping.second c2# Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop103 a1.sinks.k1.port 4141 a1.sinks.k2.typeavro a1.sinks.k2.hostname hadoop104 a1.sinks.k2.port 4242# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Use a channel which buffers events in memory a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2 hadoop103配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop103 a1.sources.r1.port 4141 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 hadoop104配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop104 a1.sources.r1.port 4242 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 执行指令 hadoop102bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf hadoop103bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.loggerINFO,console hadoop104bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.loggerINFO,console 然后hadoop102通过nc连接44444端口发送数据 在hadoop103和104上分别接收到 自定义 Source 官方提供的文档Flume 1.11.0 Developer Guide — Apache Flume 给出的示例代码如下 public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation, convert to another type, ...)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external client}Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;try {// This try clause includes whatever Channel/Event operations you want to do// Receive new dataEvent e getSomeData();// Store the Event into this Sources associated Channel(s)getChannelProcessor().processEvent(e);status Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;} } 需要继承AbstractSource实现Configurable, PollableSource 实战需求分析 使用 flume 接收数据并给每条数据添加前缀输出到控制台。前缀可从 flume 配置文件中配置 代码 package com.why.source;import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource;import java.util.HashMap; import java.util.concurrent.ConcurrentMap;public class MySource extends AbstractSource implements PollableSource, Configurable {//定义配置文件将来要读取的字段private Long delay;private String field;//获取数据封装成 event 并写入 channel这个方法将被循环调用Overridepublic Status process() throws EventDeliveryException {try {//事件头信息HashMapString,String headerMap new HashMap();//创建事件SimpleEvent event new SimpleEvent();//循环封装事件for (int i 0; i 5; i) {//设置头信息event.setHeaders(headerMap);//设置事件内容event.setBody((field i).getBytes());//将事件写入ChannelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}}catch (InterruptedException e) {throw new RuntimeException(e);}return Status.READY;}//backoff 步长Overridepublic long getBackOffSleepIncrement() {return 0;}//backoff 最长时间Overridepublic long getMaxBackOffSleepInterval() {return 0;}//初始化 context读取配置文件内容Overridepublic void configure(Context context) {delay context.getLong(delay);field context.getString(field,Hello);}}打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type com.why.source.MySource a1.sources.r1.delay 1000 a1.sources.r1.field why# Describe the sink a1.sinks.k1.type logger# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.loggerINFO,console 结果如下 自定义 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件 官方文档Flume 1.11.0 Developer Guide — Apache Flume 接口实例 public class MySink extends AbstractSink implements Configurable {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..}Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;// Start transactionChannel ch getChannel();Transaction txn ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to doEvent event ch.take();// Send the Event to the external repository.// storeSomeData(e);txn.commit();status Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;} } 自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口 实战需求分析 使用 flume 接收数据并在 Sink 端给每条数据添加前缀和后缀输出到控制台。前后缀可在 flume 任务配置文件中配置 代码 package com.why.sink;import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG LoggerFactory.getLogger(AbstractSink.class);//前后缀private String prefix;private String suffix;Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch getChannel();//获取事务Transaction txn ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件直到读取到事件结束循环while (true) {event ch.take();if (event ! null) {break;}}try {//处理事件打印LOG.info(prefix new String(event.getBody()) suffix);//事务提交txn.commit();status Status.READY;} catch (Exception e) {//遇到异常事务回滚txn.rollback();status Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}Overridepublic void configure(Context context) {prefix context.getString(prefix, hello);suffix context.getString(suffix);} }打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444# Describe the sink a1.sinks.k1.type com.why.sink.MySink a1.sinks.k1.prefix why: a1.sinks.k1.suffix :why# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.loggerINFO,console 结果如下
http://wiki.neutronadmin.com/news/184731/

相关文章:

  • 成都网站建设益友网络wordpress手机号码登录
  • 成绩查询网站开发土特产网站模板 织梦
  • 专业服务网页制作上海网站seo外包
  • 建立个人网站多少钱和卫通app下载
  • 电商网站功能设计用jquery制作网页
  • 网站的特效代码厦门网站建设那家好
  • 秦皇岛做网站的公司哪家好政务网站建设存在问题
  • 做网站常见问题模板免备案的网站空间
  • 国土局网站建设经验上海搭建工厂
  • 企业网站页脚模版免费网站
  • 重庆建设工程网站wordpress调用留言
  • 网站建设方案书怎么签字上传制作一个网站就等于制作一个网页
  • 网域高科学校网站管理系统漏洞wordpress 含演示数据库
  • 西宁个人网站建设重庆专业做网站
  • 杭州城乡建设网站红尘直播
  • 乔拓云网站建设app开发公司成员
  • 做渔具最大的外贸网站河北交通建设投资集团公司网站
  • 工信部网站备案查询官网oppo软件商店
  • 在线考试系统网站开发自媒体135免费版下载
  • 去百度建网站wordpress不同分类文章不同模板
  • 阳江网站建设推广郑州市二七建设局网站
  • 黄骅市网站建设公司旅游网站开发设计报告书
  • 网站的关键词搜索怎么做微信公司
  • 盐城网站建设做纸箱在什么网站找客户
  • 网站建设如何把代码做推广app赚钱的项目
  • 网络公司制作网站企业咨询管理服务
  • 做网站的费用入什么科目什么网站可以做微招聘
  • 亚马逊服务器永久免费高级seo
  • 长安网站建设多少钱pc网站建设怎么做
  • 公众号网站怎么建如何查询网站icp备案