当前位置: 首页 > news >正文

运城做网站费用高吗数据处理网站开发

运城做网站费用高吗,数据处理网站开发,ssr和wordpress,做网站域名是赠送的吗文章目录 一、上传压缩包二、解压压缩包三、监控本地文件#xff08;file to kafka#xff09;3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包#xff08;1#xff09;创建maven项目#xff08;2#xff09;开发拦截器类#xff08;3#xff09;开发pom文件file to kafka3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包1创建maven项目2开发拦截器类3开发pom文件4打成jar包上传到Flume 3.2.3 修改配置文件 3.3 创建Kafka Topic3.4 启动Flume3.5 停止Flume 四、监控Kafkakafka to hdfs3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.33.1 自定义拦截器3.2 编写配置文件3.3 启动Flume3.4 停止Flume 五、监控 ipportTODO 一、上传压缩包 官网https://flume.apache.org/ 二、解压压缩包 [mallmall software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/三、监控本地文件file to kafka Flume是用java写的所以需要确保JDK环境可用 需求描述监控目录下多个文件写入Kafka TAILDIR SOURCE本质是tail -F [file]命令只能监控文件的新增和修改不能处理历史文件。 3.1 编写配置文件 [mallmall ~]$ cd /opt/module/apache-flume-1.9.0-bin/ [mallmall apache-flume-1.9.0-bin]$ mkdir job [mallmall apache-flume-1.9.0-bin]$ cd job/ [mallmall job]$ vim file_to_kafka.conf内容 # 0、配置agent给source channel sink组件命名 a1.sources r1 a1.channels c1# 1、配置source组件 a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /opt/module/applog/app.* # 断点续传标记信息存储位置 a1.sources.r1.positionFile /opt/module/apache-flume-1.9.0-bin/taildir_position.json# 2、配置channel组件event临时缓冲区 a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c1.kafka.topic topic_mall_applog# 按照字符串类型传到kafka去 a1.channels.c1.parseAsFlumeEvent false# 3、配置source、channel、sink之间的连接关系 a1.sources.r1.channels c13.2 自定义拦截器 作用拦截events经拦截器处理输出处理后的events。 开发创建maven项目打成jar包形式上传到flume所在机器 3.2.1 开发拦截器jar包 1创建maven项目 2开发拦截器类 package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List;/*** date 2023/11/21 20:40* 功能剔除掉非json格式数据** 1、实现接口* 2、实现抽象方法* 3、建造者模式静态内部类*/ public class ETLInterceptor implements Interceptor {public void initialize() {}// 将log中event为非json格式数据置为nullpublic Event intercept(Event event) {byte[] body event.getBody();// byte数组转为字符串String log new String(body, StandardCharsets.UTF_8);boolean flag false;// 判断log是否是json格式try {JSONObject jsonObject JSONObject.parseObject(log);flag true;} catch (JSONException e) {}return flag ? event : null;}// 将log中event为null的删掉public ListEvent intercept(ListEvent events) {// 遍历eventsIteratorEvent iterator events.iterator();while (iterator.hasNext()) {Event event iterator.next();if (intercept(event) null) {iterator.remove();}}return events;}public void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}} }3开发pom文件 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.songshuang/groupIdartifactIdflume_interceptor/artifactIdversion1.0-SNAPSHOT/versiondependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency/dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build /project4打成jar包上传到Flume 上传到 /opt/module/apache-flume-1.9.0-bin/lib 目录下 3.2.3 修改配置文件 [mallmall job]$ vim file_to_kafka.conf新增内容 # 自定义拦截器 a1.sources.r1.interceptors i1 # 指定自定义拦截器的建造者类名入口 a1.sources.r1.interceptors.i1.type com.songshuang.flume.interceptor.ETLInterceptor$Builder3.3 创建Kafka Topic 为什么要手动创建topicflume自动创建的topic默认1个分区每个分区1个副本。手动创建可以指定分区和副本数可以有效利用Kafka集群资源。 –bootstrap-server参数作用连接Kafka集群 [hadoophadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic topic_mall_applog3.4 启动Flume 注意放开Kafka集群所在机器9092端口对Flume所在机器放开。 原因Flume需要向Kafka集群写入数据所以需要具有访问Kafka集群端口的权限。 – conf参数配置文件存储所在目录 – name参数agent名称每个Flume配置文件就是一个agent。 – conf-file参数flume本次启动读取的配置文件 nohup配合后台运行 /dev/null将标准输出重定向到 /dev/null 即丢弃所有输出 2/dev/null将标准错误输出重定向到 /dev/null 即丢弃所有错误输出 [mallmall ~]$ cd /opt/module/apache-flume-1.9.0-bin/ [mallmall apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/file_to_kafka.conf /dev/null 2/dev/null 3.5 停止Flume [mallmall apache-flume-1.9.0-bin]$ ps -ef | grep file_to_kafka.conf [mallmall apache-flume-1.9.0-bin]$ kill 11001四、监控Kafkakafka to hdfs 需求描述监控Kafka将数据写入HDFS 如果想要从头消费需要设置kafka.consumer.auto.offset.reset earliest默认从最新offset开始 注意需要在HDFS所在机器部署FLume需要调用HADOOP相关jar包。 3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3 否则Flume向HDFS写数据时会失败 [hadoophadoop104 ~]$ rm /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar3.1 自定义拦截器 作用按照kafka消息中的时间字段决定消息存储到hdfs的哪个文件中。 代码 package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map;/*** date 2023/11/22 16:52* 作用获取kafka中时间戳字段放入event头中flume写入hdfs时从头部获取时间作为该event放入hdfs的文件夹名称*/ public class TimestampInterceptor implements Interceptor {Overridepublic void initialize() {}// 获取kafka时间戳字段放入event的headerOverridepublic Event intercept(Event event) {byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);String ts jsonObject.getString(ts);MapString, String headers event.getHeaders();headers.put(timestamp,ts); // event是引用变量类型存储的是地址header变了自然event所对应地址上的值就变了return event;}Overridepublic ListEvent intercept(ListEvent events) {for (Event event : events) {intercept(event);}return events;}Overridepublic void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimestampInterceptor();}Overridepublic void configure(Context context) {}} }3.2 编写配置文件 [hadoophadoop104 job]$ vim kafka_to_hdfs.conf内容 a1.sources.r1.kafka.consumer.group.id消费者组名。 a1.channels.c1.typefile类型channel缓冲数据放在磁盘中而不是内存中。 a1.channels.c1.dataDirsfile channel缓冲内容落盘地址。 a1.channels.c1.checkpointDir检查点存放位置用于断点续传。 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# 配置source a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics topic_mall_applog a1.sources.r1.kafka.consumer.group.id consumer_group_flume # 指定consumer从哪个offset开始消费默认latest # a1.sources.r1.kafka.consumer.auto.offset.reset earliest # 自定义拦截器 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.songshuang.flume.interceptor.TimestampInterceptor$Builder# 配置sink a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /warehouse/applog/%Y-%m-%d a1.sinks.k1.hdfs.codeC gzip# 配置channel a1.channels.c1.type file a1.channels.c1.dataDirs /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs a1.channels.c1.checkpointDir /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c13.3 启动Flume 注意1需要放开kafka端口即9092端口Flume要读Kafka。 [hadoophadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/ [hadoophadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs.conf /dev/null 2/dev/null 3.4 停止Flume [mallmall apache-flume-1.9.0-bin]$ ps -ef | grep kafka_to_hdfs.conf [mallmall apache-flume-1.9.0-bin]$ kill 11001五、监控 ipportTODO
http://wiki.neutronadmin.com/news/236099/

相关文章:

  • 建站系统软件有哪些合肥建站公司排名前十名
  • 微信网站开发 js框架涿州网站制作多少钱
  • 在哪里可以改动网站标题和外国人做古玩生意的网站
  • 如何查询网站的服务器成都必去的地方排行榜
  • 建设网站要花多少钱义乌市微畅网络科技有限公司
  • 广州做网站的价格网站建设的小故事
  • 网站注册域名免费wordpress缓存插件 w3
  • 新浦建设集团网站dedecms 网站首页标签
  • 做购物网站收费标准网站付款链接怎么做
  • 沈阳网站建设哪家公司好电商运营怎么做如何从零开始
  • 綦江建设银行网站.简述网站开发的流程
  • 网站页面布局和样式设计linux主机做网站
  • 公司想建网站企业信息管理系统的组成不包括
  • 网站设计主要包括哪些步骤百度推广官网网站
  • 河北seo网站开发国内免费plm
  • 制作人在那个网站能看彩票网站自己可以做吗
  • 温州建设诚信评价网站公示wordpress 查看文章404
  • 住建部工程建设标准网站百度一下手机版网页
  • app软件开发网站网页制作与网站建设宝典 第2版
  • 南阳做网站优化的公司汕头网站建设技术托管
  • 武安网站建设最挣钱的三个销售行业
  • 网站如何做分站付网站建设服务费记账
  • 外贸建个网站多少钱支付宝接口 网站备案
  • 天鸿建设集团有限公司 网站wordpress 模板添加图片
  • 乐清网站建设做网站799元南阳网(网站).
  • 十堰专业网站设计制作郑州企业建站设计
  • 网站被k多久恢复中国建设工程信息网官网查询系统
  • 网站开发开发优势做运营有前途吗
  • 兰州城建设计院网站门户网站建设的企业
  • 中国建设银行个人网站注册网站建设app