当前位置: 首页 > news >正文

品牌网站设计制作哪家好营销网站建设培训学校

品牌网站设计制作哪家好,营销网站建设培训学校,域名如何备案教程,网站制作的困难与解决方案简介#xff1a;本文介绍了如何使用阿里云的Confluent Cloud和Databricks构建数据流和LakeHouse#xff0c;并介绍了如何使用Databricks提供的能力来挖掘数据价值#xff0c;使用Spark MLlib构建您的机器学习模型。 前提条件 已注册阿里云账号#xff0c;详情请参见阿里云…简介本文介绍了如何使用阿里云的Confluent Cloud和Databricks构建数据流和LakeHouse并介绍了如何使用Databricks提供的能力来挖掘数据价值使用Spark MLlib构建您的机器学习模型。 前提条件 已注册阿里云账号详情请参见阿里云账号注册流程已开通 Databricks 数据洞察服务已开通 OSS 对象存储服务已开通 Confluent 流数据服务 创建Databricks集群 Confluent集群 登录Confluent管理控制台创建Confluent集群并开启公网服务登录Databricks管理控制台创建Databricks集群 Databricks Worker节点公网访问 Databricks的worker节点暂时不支持公网访问为了能访问Confluent的公网地址请联系Databricks的开发人员添加NAT网关。 案例出租车数据入湖及分析 出租车和网约车在每天的运行中持续产生行驶轨迹和交易数据这些数据对于车辆调度流量预测安全监控等场景有着极大的价值。 本案例中我们使用纽约市的出租车数据来模拟网约车数据从产生发布到流数据服务Confluent通过Databricks Structured Streaming进行实时数据处理并存储到LakeHouse的整个流程。数据存储到LakeHouse后我们使用spark和spark sql对数据进行分析并使用Spark的MLlib进行机器学习训练。 前置准备 创建topic 登录Confluent的control center在左侧选中Topics点击Add a topic按钮创建一个名为nyc_taxi_data的topic将partition设置为3其他配置保持默认。 创建OSS bucket 在和Databricks同一Region的OSS中创建bucketbucket命名为databricks-confluent-integration 进入到Bucket列表页点击创建bucket按钮 创建好bucket之后在该bucket创建目录checkpoint_dir和data/nyc_taxi_data两个目录收集url用户名密码路径等以便后续使用a。confluent集群ID在csp的管控界面集群详情页获取 Confluent Control Center的用户名和密码路径 Databricks Structured Streaming的checkpoint存储目录采集的数据的存储目录 下面是我们后续会使用到的一些变量 # 集群管控界面获取 confluent_cluster_id your_confluent_cluster_id # 使用confluent集群ID拼接得到 confluent_server rb-{confluent_cluster_id}.csp.aliyuncs.com:9092 control_center_username your_confluent_control_center_username control_center_password your_confluent_control_center_passwordtopic nyc_taxi_datacheckpoint_location oss://databricks-confluent-integration/checkpoint_dir taxi_data_delta_lake oss://databricks-confluent-integration/data/nyc_taxi_data 数据的产生 在本案例中我们使用Kaggle上的NYC出租车数据集来模拟数据产生。 我们先安装confluent的python客户端其他语言的客户端参考confluent官网 pip install confluent_kafka 构造用于创建Kafka Producer的基础信息如bootstrap-servercontrol center的usernamepassword等 conf {bootstrap.servers: confluent_server,key.serializer: StringSerializer(utf_8),value.serializer: StringSerializer(utf_8),client.id: socket.gethostname(),security.protocol: SASL_SSL,sasl.mechanism: PLAIN,sasl.username: control_center_username,sasl.password: control_center_password } 创建Producer producer Producer(conf) 向Kafka中发送消息模拟数据的产生 with open(/Path/To/train.csv, rt) as f:float_field [fare_amount, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude]for row in reader:i 1try:for field in float_field:row[field] float(row[field])row[passenger_count] int(row[passenger_count])producer.produce(topictopic, valuejson.dumps(row))if i % 1000 0:producer.flush()if i 200000:breakexcept ValueError: # discard null/NAN datacontinue Kafka中的partition和offset 在使用spark读取Kafka中的数据之前我们回顾一下Kafka中的概念partition和offset partitionkafka为了能并行进行数据的写入将每个topic的数据分为多个partition每个partition由一个Broker负责向partition写入数据时负责该partition的Broker将消息复制给它的followeroffsetKafka会为每条写入partition里的消息进行编号消息的编号即为offset我们在读取Kafka中的数据时需要指定我们想要读取的数据该指定需要从两个维度partition的维度 offset的维度。 Earliest从每个partition的offset 0开始读取和加载Latest从每个partition最新的数据开始读取自定义指定每个partition的开始offset和结束offset 读取topic1 partition 0 offset 23和partition 0 offset -2之后的数据{topic1:{0:23,1:-2}} 除了指定start offset我们还可以通过endingOffsets参数指定读取到什么位置为止。 将数据存储到LakeHouseSpark集成Confluent 理解上述概念后Databricks和Confluent的集成非常简单只需要对spark session的readStream参数进行简单的设置就可以将Kafka中的实时流数据转换为Spark中的Dataframe lines (spark.readStream# 指定数据源: kafka.format(kafka)# 指定kafka bootstrap server的URL.option(kafka.bootstrap.servers, confluent_server)# 指定订阅的topic.option(subscribe, topic)# 指定想要读取的数据的offsetearliest表示从每个partition的起始点开始读取.option(startingOffsets, earliest)# 指定认证协议.option(kafka.security.protocol, SASL_SSL).option(kafka.sasl.mechanism, PLAIN)# 指定confluent的用户名和密码.option(kafka.sasl.jaas.config,forg.apache.kafka.common.security.plain.PlainLoginModule required username{control_center_username} password{control_center_password};).load()) 从kafka中读取的数据格式如下 root|-- key: binary (nullable true)|-- value: binary (nullable true)|-- topic: string (nullable true)|-- partition: integer (nullable true)|-- offset: long (nullable true)|-- timestamp: timestamp (nullable true)|-- timestampType: integer (nullable true) 由于key和value都是binary格式的我们需要将valuejson由binary转换为string格式并定义schema提取出Json中的数据并转换为对应的格式 schema (StructType().add(key, TimestampType()).add(fare_amount, FloatType()).add(pickup_datetime, TimestampType()).add(pickup_longitude, FloatType()).add(pickup_latitude, FloatType()).add(dropoff_longitude, FloatType()).add(dropoff_latitude, FloatType()).add(passenger_count, IntegerType()))# 将json中的列提取出来 lines (lines.withColumn(data, from_json(col(value).cast(string), # binary 转 stringschema)) # 解析为schema.select(col(data.*))) # select value中的所有列 过滤掉错误为空NaN的数据 lines (lines.filter(col(pickup_longitude) ! 0).filter(col(pickup_latitude) ! 0).filter(col(dropoff_longitude) ! 0).filter(col(dropoff_latitude) ! 0).filter(col(fare_amount) ! 0).filter(col(passenger_count) ! 0)) 最后我们将解析出来的数据输出到LakeHouse中以进行后续的分析和机器学习模型训练 # lakehouse 的存储格式为 delta query (lines.writeStream.format(delta).option(checkpointLocation, checkpoint_location).option(path, taxi_data_delta_lake).start()) # 执行job直到出现异常如果只想执行该Job一段时间可以指定timeout参数 query.awaitTermination() 数据分析 我们先将LakeHouse中的数据使用Spark加载进来 然后我们对该Dataframe创建一个Table View并探索fare_amount的分布 可以看到fare_amount的最小值是负数这显然是一条错误的数据我们将这些错误的数据过滤并探索fare_amount的分布 然后我们探索价格和年份月份星期打车时间的关系 从上面可以看出两点 出租车的价格和年份有很大关系从09年到15年呈不断增长的态势在中午和凌晨打车比上午和下午打车更贵一些。 我们再进一步探索价格和乘客数量的关系 此外出租车价格的另一个影响因素就是距离这里我们借助python的geopy包和Spark的UDF来计算给定两个位置的距离然后再分析费用和距离的关系。 经纬度的范围为[-90, 90]因此我们第一步是清除错误的数据 然后我们增加一列数据出租车行驶的距离并将距离离散化进行后续的分析 统计打车距离的分布 从上图可以看出打车距离分布在区间[0, 15]miles内我们继续统计在该区间内打车价格和打车距离的关系 如上图所示打车价格和打车距离呈现出线性增长的趋势。 机器学习建模 在上一小节的数据分析中我们已经提取了和出租车相关联的一些特征根据这些特征我们建立一个简单的线性回归模型 打车费用 ~ (年份打车时间乘客数距离) 先将特征和目标值提取出来 对特征做归一化 分割训练集和测试集 建立线性回归模型进行训练 训练结果统计 使用Evaluator对模型进行评价 总结 我们在本文中介绍了如何使用阿里云的Confluent Cloud和Databricks来构建您的数据流和LakeHouse并介绍了如何使用Databricks提供的能力来挖掘数据价值使用Spark MLlib构建您的机器学习模型。有了Confluent Cloud和Databricks您可以轻松实现数据入湖及时在最新的数据上进行探索挖掘您的数据价值。欢迎您试用阿里云Confluent和Databricks。 原文链接 本文为阿里云原创内容未经允许不得转载。
http://wiki.neutronadmin.com/news/6303/

相关文章:

  • 网站过期怎么找回来我想借个企业邮箱
  • 做一个网站的市场价怎么创建网站教程
  • 做网站如何挣钱苏州建设职业培训中心
  • 北京正规网站建设经历深圳58同城招聘网
  • 网站开发需要配置哪些人员企业网站框架
  • 个人网站制作在线宁波网页制作设计营销
  • 网站开发与电子商务抖音关键词优化
  • 电子商务网站建设客户需求调查表权威发布新冠疫苗接种禁忌
  • 南京网站制作公司有哪些公司黄页查询
  • 做文字的网站网站建设公司有前途吗
  • wordpress 多站点教程米拓网站建设教程
  • 如何做公司自己的网站土巴兔网站开发技术
  • 网站设计项目建设内容网站建设制作包括哪些方面
  • 企业网站模板 html株洲网站开发公司
  • 专门做软陶的网站厦门一个平台做网站啥的
  • 深圳福田网站设计wordpress怎么设置伪静态页面
  • 网站开发在哪里接活网站后台内容编辑器
  • 国外皇色网站专业网站建设专家
  • 我要自学网网站开发免费软件下载网站排行
  • 校园网站模版十大软件app排行榜下载
  • 电商网站的数据库设计开发板是什么
  • 云南网站备案难吗下什么软件做网站
  • 模板网站哪家好中国建设银行南京分行网站首页
  • 不懂网站怎么做平台西安微信网站制作
  • 网站设计项目总结合肥做网站加盟
  • 30天网站建设实录我市建设车辆违章查询网站 病句
  • 外贸平台自建站标智客logo设计免费生成
  • 佛山网站建设公司有哪些兼职做设计的网站
  • 怎么编辑网站源代码如何登录qq网页版
  • 网站建设策划包括哪些内容seo在线诊断工具