当前位置: 首页 > news >正文

网站建设属于什么职位只做健康产品的网站

网站建设属于什么职位,只做健康产品的网站,河南新站关键词排名优化外包,房产cms系统二 根据用户行为数据创建ALS模型并召回商品 2.0 用户行为数据拆分 方便练习可以对数据做拆分处理 pandas的数据分批读取 chunk 厚厚的一块 相当大的数量或部分 import pandas as pd reader pd.read_csv(behavior_log.csv,chunksize100,iteratorTrue) count 0; for chunk in …二 根据用户行为数据创建ALS模型并召回商品 2.0 用户行为数据拆分 方便练习可以对数据做拆分处理 pandas的数据分批读取 chunk 厚厚的一块 相当大的数量或部分 import pandas as pd reader pd.read_csv(behavior_log.csv,chunksize100,iteratorTrue) count 0; for chunk in reader:count 1if count 1:chunk.to_csv(test4.csv,index False)elif count1 and count1000:chunk.to_csv(test4.csv,index False, mode a,header False)else:break pd.read_csv(test4.csv)2.1 预处理behavior_log数据集 创建spark session import os # 配置spark driver和pyspark运行时所使用的python解释器路径 PYSPARK_PYTHON /home/hadoop/miniconda3/envs/datapy365spark23/bin/python JAVA_HOME/home/hadoop/app/jdk1.8.0_191 # 当存在多个版本时不指定很可能会导致出错 os.environ[PYSPARK_PYTHON] PYSPARK_PYTHON os.environ[PYSPARK_DRIVER_PYTHON] PYSPARK_PYTHON os.environ[JAVA_HOME]JAVA_HOME # spark配置信息 from pyspark import SparkConf from pyspark.sql import SparkSessionSPARK_APP_NAME preprocessingBehaviorLog SPARK_URL spark://192.168.199.188:7077conf SparkConf() # 创建spark config对象 config ((spark.app.name, SPARK_APP_NAME), # 设置启动的spark的app名称没有提供将随机产生一个名称(spark.executor.memory, 6g), # 设置该app启动时占用的内存用量默认1g(spark.master, SPARK_URL), # spark master的地址(spark.executor.cores, 4), # 设置spark executor使用的CPU核心数# 以下三项配置可以控制执行器数量 # (spark.dynamicAllocation.enabled, True), # (spark.dynamicAllocation.initialExecutors, 1), # 1个执行器 # (spark.shuffle.service.enabled, True) # (spark.sql.pivotMaxValues, 99999), # 当需要pivot DF且值很多时需要修改默认是10000 ) # 查看更详细配置及说明https://spark.apache.org/docs/latest/configuration.htmlconf.setAll(config)# 利用config对象创建spark session spark SparkSession.builder.config(confconf).getOrCreate()从hdfs中加载csv文件为DataFrame # 从hdfs加载CSV文件为DataFrame df spark.read.csv(hdfs://localhost:9000/datasets/behavior_log.csv, headerTrue) df.show() # 查看dataframe默认显示前20条 # 大致查看一下数据类型 df.printSchema() # 打印当前dataframe的结构显示结果: ------------------------------- | user|time_stamp|btag| cate| brand| ------------------------------- |558157|1493741625| pv| 6250| 91286| |558157|1493741626| pv| 6250| 91286| |558157|1493741627| pv| 6250| 91286| |728690|1493776998| pv|11800| 62353| |332634|1493809895| pv| 1101|365477| |857237|1493816945| pv| 1043|110616| |619381|1493774638| pv| 385|428950| |467042|1493772641| pv| 8237|301299| |467042|1493772644| pv| 8237|301299| |991528|1493780710| pv| 7270|274795| |991528|1493780712| pv| 7270|274795| |991528|1493780712| pv| 7270|274795| |991528|1493780712| pv| 7270|274795| |991528|1493780714| pv| 7270|274795| |991528|1493780765| pv| 7270|274795| |991528|1493780714| pv| 7270|274795| |991528|1493780765| pv| 7270|274795| |991528|1493780764| pv| 7270|274795| |991528|1493780633| pv| 7270|274795| |991528|1493780764| pv| 7270|274795| ------------------------------- only showing top 20 rowsroot|-- user: string (nullable true)|-- time_stamp: string (nullable true)|-- btag: string (nullable true)|-- cate: string (nullable true)|-- brand: string (nullable true)从hdfs加载数据为dataframe并设置结构 from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType # 构建结构对象 schema StructType([StructField(userId, IntegerType()),StructField(timestamp, LongType()),StructField(btag, StringType()),StructField(cateId, IntegerType()),StructField(brandId, IntegerType()) ]) # 从hdfs加载数据为dataframe并设置结构 behavior_log_df spark.read.csv(hdfs://localhost:8020/datasets/behavior_log.csv, headerTrue, schemaschema) behavior_log_df.show() behavior_log_df.count() 显示结果: --------------------------------- |userId| timestamp|btag|cateId|brandId| --------------------------------- |558157|1493741625| pv| 6250| 91286| |558157|1493741626| pv| 6250| 91286| |558157|1493741627| pv| 6250| 91286| |728690|1493776998| pv| 11800| 62353| |332634|1493809895| pv| 1101| 365477| |857237|1493816945| pv| 1043| 110616| |619381|1493774638| pv| 385| 428950| |467042|1493772641| pv| 8237| 301299| |467042|1493772644| pv| 8237| 301299| |991528|1493780710| pv| 7270| 274795| |991528|1493780712| pv| 7270| 274795| |991528|1493780712| pv| 7270| 274795| |991528|1493780712| pv| 7270| 274795| |991528|1493780714| pv| 7270| 274795| |991528|1493780765| pv| 7270| 274795| |991528|1493780714| pv| 7270| 274795| |991528|1493780765| pv| 7270| 274795| |991528|1493780764| pv| 7270| 274795| |991528|1493780633| pv| 7270| 274795| |991528|1493780764| pv| 7270| 274795| --------------------------------- only showing top 20 rowsroot|-- userId: integer (nullable true)|-- timestamp: long (nullable true)|-- btag: string (nullable true)|-- cateId: integer (nullable true)|-- brandId: integer (nullable true)分析数据集字段的类型和格式 查看是否有空值查看每列数据的类型查看每列数据的类别情况 print(查看userId的数据情况, behavior_log_df.groupBy(userId).count().count()) # 约113w用户 #注意behavior_log_df.groupBy(userId).count() 返回的是一个dataframe这里的count计算的是每一个分组的个数但当前还没有进行计算 # 当调用df.count()时才开始进行计算这里的count计算的是dataframe的条目数也就是共有多少个分组查看user的数据情况 1136340print(查看btag的数据情况, behavior_log_df.groupBy(btag).count().collect()) # collect会把计算结果全部加载到内存谨慎使用 # 只有四种类型数据pv、fav、cart、buy # 这里由于类型只有四个所以直接使用collect把数据全部加载出来查看btag的数据情况 [Row(btagbuy, count9115919), Row(btagfav, count9301837), Row(btagcart, count15946033), Row(btagpv, count688904345)]print(查看cateId的数据情况, behavior_log_df.groupBy(cateId).count().count()) # 约12968类别id查看cateId的数据情况 12968print(查看brandId的数据情况, behavior_log_df.groupBy(brandId).count().count()) # 约460561品牌id查看brandId的数据情况 460561print(判断数据是否有空值, behavior_log_df.count(), behavior_log_df.dropna().count()) # 约7亿条目723268134 723268134 # 本数据集无空值条目可放心处理判断数据是否有空值 723268134 723268134pivot透视操作把某列里的字段值转换成行并进行聚合运算(pyspark.sql.GroupedData.pivot) 如果透视的字段中的不同属性值超过10000个则需要设置spark.sql.pivotMaxValues否则计算过程中会出现错误。文档介绍。 # 统计每个用户对各类商品的pv、fav、cart、buy数量 cate_count_df behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.cateId).pivot(btag,[pv,fav,cart,buy]).count() cate_count_df.printSchema() # 此时还没有开始计算显示效果: root|-- userId: integer (nullable true)|-- cateId: integer (nullable true)|-- pv: long (nullable true)|-- fav: long (nullable true)|-- cart: long (nullable true)|-- buy: long (nullable true)统计每个用户对各个品牌的pv、fav、cart、buy数量并保存结果 # 统计每个用户对各个品牌的pv、fav、cart、buy数量 brand_count_df behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.brandId).pivot(btag,[pv,fav,cart,buy]).count() # brand_count_df.show() # 同上 # 113w * 46w # 由于运算时间比较长所以这里先将结果存储起来供后续其他操作使用 # 写入数据时才开始计算 cate_count_df.write.csv(hdfs://localhost:9000/preprocessing_dataset/cate_count.csv, headerTrue) brand_count_df.write.csv(hdfs://localhost:9000/preprocessing_dataset/brand_count.csv, headerTrue)2.2 根据用户对类目偏好打分训练ALS模型 根据您统计的次数 打分规则 偏好打分数据集 ALS模型 # spark ml的模型训练是基于内存的如果数据过大内存空间小迭代次数过多的化可能会造成内存溢出报错 # 设置Checkpoint的话会把所有数据落盘这样如果异常退出下次重启后可以接着上次的训练节点继续运行 # 但该方法其实指标不治本因为无法防止内存溢出所以还是会报错 # 如果数据量大应考虑的是增加内存、或限制迭代次数和训练数据量级等 spark.sparkContext.setCheckpointDir(hdfs://localhost:8020/checkPoint/) from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType# 构建结构对象 schema StructType([StructField(userId, IntegerType()),StructField(cateId, IntegerType()),StructField(pv, IntegerType()),StructField(fav, IntegerType()),StructField(cart, IntegerType()),StructField(buy, IntegerType()) ])# 从hdfs加载CSV文件 cate_count_df spark.read.csv(hdfs://localhost:9000/preprocessing_dataset/cate_count.csv, headerTrue, schemaschema) cate_count_df.printSchema() cate_count_df.first() # 第一行数据显示结果: root|-- userId: integer (nullable true)|-- cateId: integer (nullable true)|-- pv: integer (nullable true)|-- fav: integer (nullable true)|-- cart: integer (nullable true)|-- buy: integer (nullable true)Row(userId1061650, cateId4520, pv2326, favNone, cart53, buyNone)处理每一行数据r表示row对象 def process_row(r):# 处理每一行数据r表示row对象# 偏好评分规则# m: 用户对应的行为次数# 该偏好权重比例次数上限仅供参考具体数值应根据产品业务场景权衡# pv: if m20: score0.2*m; else score4# fav: if m20: score0.4*m; else score8# cart: if m20: score0.6*m; else score12# buy: if m20: score1*m; else score20# 注意这里要全部设为浮点数spark运算时对类型比较敏感要保持数据类型都一致pv_count r.pv if r.pv else 0.0fav_count r.fav if r.fav else 0.0cart_count r.cart if r.cart else 0.0buy_count r.buy if r.buy else 0.0pv_score 0.2*pv_count if pv_count20 else 4.0fav_score 0.4*fav_count if fav_count20 else 8.0cart_score 0.6*cart_count if cart_count20 else 12.0buy_score 1.0*buy_count if buy_count20 else 20.0rating pv_score fav_score cart_score buy_score# 返回用户ID、分类ID、用户对分类的偏好打分return r.userId, r.cateId, rating返回一个PythonRDD类型 # 返回一个PythonRDD类型此时还没开始计算 cate_count_df.rdd.map(process_row).toDF([userId, cateId, rating])显示结果: DataFrame[userId: bigint, cateId: bigint, rating: double]用户对商品类别的打分数据 # 用户对商品类别的打分数据 # map返回的结果是rdd类型需要调用toDF方法转换为Dataframe cate_rating_df cate_count_df.rdd.map(process_row).toDF([userId, cateId, rating]) # 注意toDF不是每个rdd都有的方法仅局限于此处的rdd # 可通过该方法获得 user-cate-matrix # 但由于cateId字段过多这里运算量比很大机器内存要求很高才能执行否则无法完成任务 # 请谨慎使用# 但好在我们训练ALS模型时不需要转换为user-cate-matrix所以这里可以不用运行 # cate_rating_df.groupBy(userId).povit(cateId).min(rating) # 用户对类别的偏好打分数据 cate_rating_df显示结果: DataFrame[userId: bigint, cateId: bigint, rating: double]通常如果USER-ITEM打分数据应该是通过一下方式进行处理转换为USER-ITEM-MATRIX [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4dF4oXj6-1691649134814)(/img/CF%E4%BB%8B%E7%BB%8D.png)] 但这里我们将使用的Spark的ALS模型进行CF推荐因此注意这里数据输入不需要提前转换为矩阵直接是 USER-ITEM-RATE的数据 基于Spark的ALS隐因子模型进行CF评分预测 ALS的意思是交替最小二乘法Alternating Least Squares是Spark2.*中加入的进行基于模型的协同过滤model-based CF的推荐系统算法。 同SVD它也是一种矩阵分解技术对数据进行降维处理。 详细使用方法pyspark.ml.recommendation.ALS 注意由于数据量巨大因此这里也不考虑基于内存的CF算法 参考为什么Spark中只有ALS # 使用pyspark中的ALS矩阵分解方法实现CF评分预测 # 文档地址https://spark.apache.org/docs/2.2.2/api/python/pyspark.ml.html?highlightvectors#module-pyspark.ml.recommendation from pyspark.ml.recommendation import ALS # mldataframe mllibrdd# 利用打分数据训练ALS模型 als ALS(userColuserId, itemColcateId, ratingColrating, checkpointInterval5)# 此处训练时间较长 model als.fit(cate_rating_df)模型训练好后调用方法进行使用具体API查看 # model.recommendForAllUsers(N) 给所有用户推荐TOP-N个物品 ret model.recommendForAllUsers(3) # 由于是给所有用户进行推荐此处运算时间也较长 ret.show() # 推荐结果存放在recommendations列中 ret.select(recommendations).show()显示结果: -------------------------- |userId| recommendations| -------------------------- | 148|[[3347, 12.547271...| | 463|[[1610, 9.250818]...| | 471|[[1610, 10.246621...| | 496|[[1610, 5.162216]...| | 833|[[5607, 9.065482]...| | 1088|[[104, 6.886987],...| | 1238|[[5631, 14.51981]...| | 1342|[[5720, 10.89842]...| | 1580|[[5731, 8.466453]...| | 1591|[[1610, 12.835257...| | 1645|[[1610, 11.968531...| | 1829|[[1610, 17.576496...| | 1959|[[1610, 8.353473]...| | 2122|[[1610, 12.652732...| | 2142|[[1610, 12.48068]...| | 2366|[[1610, 11.904813...| | 2659|[[5607, 11.699315...| | 2866|[[1610, 7.752719]...| | 3175|[[3347, 2.3429515...| | 3749|[[1610, 3.641833]...| -------------------------- only showing top 20 rows-------------------- | recommendations| -------------------- |[[3347, 12.547271...| |[[1610, 9.250818]...| |[[1610, 10.246621...| |[[1610, 5.162216]...| |[[5607, 9.065482]...| |[[104, 6.886987],...| |[[5631, 14.51981]...| |[[5720, 10.89842]...| |[[5731, 8.466453]...| |[[1610, 12.835257...| |[[1610, 11.968531...| |[[1610, 17.576496...| |[[1610, 8.353473]...| |[[1610, 12.652732...| |[[1610, 12.48068]...| |[[1610, 11.904813...| |[[5607, 11.699315...| |[[1610, 7.752719]...| |[[3347, 2.3429515...| |[[1610, 3.641833]...| -------------------- only showing top 20 rowsmodel.recommendForUserSubset 给部分用户推荐TOP-N个物品 # 注意recommendForUserSubset API2.2.2版本中无法使用 dataset spark.createDataFrame([[1],[2],[3]]) dataset dataset.withColumnRenamed(_1, userId) ret model.recommendForUserSubset(dataset, 3)# 只给部分用推荐运算时间短 ret.show() ret.collect() # 注意 collect会将所有数据加载到内存慎用显示结果: -------------------------- |userId| recommendations| -------------------------- | 1|[[1610, 25.4989],...| | 3|[[5607, 13.665942...| | 2|[[5579, 5.9051886...| --------------------------[Row(userId1, recommendations[Row(cateId1610, rating25.498899459838867), Row(cateId5737, rating24.901548385620117), Row(cateId3347, rating20.736785888671875)]),Row(userId3, recommendations[Row(cateId5607, rating13.665942192077637), Row(cateId1610, rating11.770171165466309), Row(cateId3347, rating10.35690689086914)]),Row(userId2, recommendations[Row(cateId5579, rating5.90518856048584), Row(cateId2447, rating5.624575138092041), Row(cateId5690, rating5.2555742263793945)])]transform中提供userId和cateId可以对打分进行预测利用打分结果排序后 # transform中提供userId和cateId可以对打分进行预测利用打分结果排序后同样可以实现TOP-N的推荐 model.transform # 将模型进行存储 model.save(hdfs://localhost:8020/models/userCateRatingALSModel.obj) # 测试存储的模型 from pyspark.ml.recommendation import ALSModel # 从hdfs加载之前存储的模型 als_model ALSModel.load(hdfs://localhost:8020/models/userCateRatingALSModel.obj) # model.recommendForAllUsers(N) 给用户推荐TOP-N个物品 result als_model.recommendForAllUsers(3) result.show()显示结果: -------------------------- |userId| recommendations| -------------------------- | 148|[[3347, 12.547271...| | 463|[[1610, 9.250818]...| | 471|[[1610, 10.246621...| | 496|[[1610, 5.162216]...| | 833|[[5607, 9.065482]...| | 1088|[[104, 6.886987],...| | 1238|[[5631, 14.51981]...| | 1342|[[5720, 10.89842]...| | 1580|[[5731, 8.466453]...| | 1591|[[1610, 12.835257...| | 1645|[[1610, 11.968531...| | 1829|[[1610, 17.576496...| | 1959|[[1610, 8.353473]...| | 2122|[[1610, 12.652732...| | 2142|[[1610, 12.48068]...| | 2366|[[1610, 11.904813...| | 2659|[[5607, 11.699315...| | 2866|[[1610, 7.752719]...| | 3175|[[3347, 2.3429515...| | 3749|[[1610, 3.641833]...| -------------------------- only showing top 20 rows召回到redis import redis host 192.168.19.8 port 6379 # 召回到redis def recall_cate_by_cf(partition):# 建立redis 连接池pool redis.ConnectionPool(hosthost, portport)# 建立redis客户端client redis.Redis(connection_poolpool)for row in partition:client.hset(recall_cate, row.userId, [i.cateId for i in row.recommendations]) # 对每个分片的数据进行处理 #mapPartition Transformation map # foreachPartition Action操作 foreachRDD result.foreachPartition(recall_cate_by_cf)# 注意这里这是召回的是用户最感兴趣的n个类别 # 总的条目数查看redis中总的条目数是否一致 result.count()显示结果: 11363402.3 根据用户对品牌偏好打分训练ALS模型 from pyspark.sql.types import StructType, StructField, StringType, IntegerTypeschema StructType([StructField(userId, IntegerType()),StructField(brandId, IntegerType()),StructField(pv, IntegerType()),StructField(fav, IntegerType()),StructField(cart, IntegerType()),StructField(buy, IntegerType()) ]) # 从hdfs加载预处理好的品牌的统计数据 brand_count_df spark.read.csv(hdfs://localhost:8020/preprocessing_dataset/brand_count.csv, headerTrue, schemaschema) # brand_count_df.show() def process_row(r):# 处理每一行数据r表示row对象# 偏好评分规则# m: 用户对应的行为次数# 该偏好权重比例次数上限仅供参考具体数值应根据产品业务场景权衡# pv: if m20: score0.2*m; else score4# fav: if m20: score0.4*m; else score8# cart: if m20: score0.6*m; else score12# buy: if m20: score1*m; else score20# 注意这里要全部设为浮点数spark运算时对类型比较敏感要保持数据类型都一致pv_count r.pv if r.pv else 0.0fav_count r.fav if r.fav else 0.0cart_count r.cart if r.cart else 0.0buy_count r.buy if r.buy else 0.0pv_score 0.2*pv_count if pv_count20 else 4.0fav_score 0.4*fav_count if fav_count20 else 8.0cart_score 0.6*cart_count if cart_count20 else 12.0buy_score 1.0*buy_count if buy_count20 else 20.0rating pv_score fav_score cart_score buy_score# 返回用户ID、品牌ID、用户对品牌的偏好打分return r.userId, r.brandId, rating # 用户对品牌的打分数据 brand_rating_df brand_count_df.rdd.map(process_row).toDF([userId, brandId, rating]) # brand_rating_df.show()基于Spark的ALS隐因子模型进行CF评分预测 ALS的意思是交替最小二乘法Alternating Least Squares是Spark中进行基于模型的协同过滤model-based CF的推荐系统算法也是目前Spark内唯一一个推荐算法。 同SVD它也是一种矩阵分解技术但理论上ALS在海量数据的处理上要优于SVD。 更多了解pyspark.ml.recommendation.ALS 注意由于数据量巨大因此这里不考虑基于内存的CF算法 参考为什么Spark中只有ALS 使用pyspark中的ALS矩阵分解方法实现CF评分预测 # 使用pyspark中的ALS矩阵分解方法实现CF评分预测 # 文档地址https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlightvectors#module-pyspark.ml.recommendation from pyspark.ml.recommendation import ALSals ALS(userColuserId, itemColbrandId, ratingColrating, checkpointInterval2) # 利用打分数据训练ALS模型 # 此处训练时间较长 model als.fit(brand_rating_df) # model.recommendForAllUsers(N) 给用户推荐TOP-N个物品 model.recommendForAllUsers(3).show() # 将模型进行存储 model.save(hdfs://localhost:9000/models/userBrandRatingModel.obj) # 测试存储的模型 from pyspark.ml.recommendation import ALSModel # 从hdfs加载模型 my_model ALSModel.load(hdfs://localhost:9000/models/userBrandRatingModel.obj) my_model # model.recommendForAllUsers(N) 给用户推荐TOP-N个物品 my_model.recommendForAllUsers(3).first()
http://wiki.neutronadmin.com/news/437205/

相关文章:

  • 网站怎么记录搜索引擎的关键词php网站欣赏
  • 长沙网站运营网站推广的软文
  • 小公司建网站 优帮云有网站做淘宝客
  • 股票做空网站设计师个人网页设计
  • 广西建设网桂建云网站惠州高端模板建站
  • a站为什么会凉wordpress 加载页面
  • 郑州做网站好的公2817网站
  • wordpress拖曳式建站正邦设计创始人
  • 网站前置审批 公司名称做网站的html代码格式
  • 房产网站建设方案论文ks数据分析神器
  • 做推广效果哪个网站好怎样找人做网站
  • 如何做qq钓鱼网站无锡企业自助建站系统
  • 广西新农村建设工作专题网站免费推广网站2023mmm
  • 萧山网站建设争锋网络如何建立自己的手机网站
  • 阿里云 网站部署aspcms 网站统计
  • angular 做网站wordpress做网站
  • 怎样做外贸网站建设手机端网站建站
  • 如何建立自已的购物网站徐州网站开发案例
  • 企业网站建设itcask怎么免费建立个人网站
  • 网站开发团队 人员企业网站推广联系方式
  • 网站建设需要哪些必备文件成都网站推广
  • 沧州网站建设益志科技wordpress集成
  • 关于网站建设的求职意向网站设计公司排名
  • 免费源码下载网站福建建设培训中心网站
  • 推荐佛山企业网站建设手机app游戏制作软件
  • 网站推广seo教程进度圈wordpress
  • 网站备案接入商乌市网络营销公司
  • 手机网站制作代理wordpress 发送邮件 名称
  • 施工企业负责人每月带班时间不少于如何做好网站的优化的监测评价
  • 中企动力网站后台网站制作公司需要什么资质