网站素材图标,it行业哪个专业最吃香,有关做详情页的参考网站,wordpress实现文章连载目录1、通过ALS模型实现用户/商品Embedding的效果#xff0c;获得其向量表示 准备训练数据#xff0c; M (U , I, R) 即 用户集U、商品集I、及评分数据R。
#xff08;1#xff09;商品集I的选择#xff1a;可以根据业务目标确定商品候选集#xff0c;比如TopK热度召回、或…1、通过ALS模型实现用户/商品Embedding的效果获得其向量表示 准备训练数据 M (U , I, R) 即 用户集U、商品集I、及评分数据R。
1商品集I的选择可以根据业务目标确定商品候选集比如TopK热度召回、或者流行度不高但在业务用户中区分度比较高的商品集等。个人建议量级控制在5W内1W-2W左右比较合适太大的话用户产生行为的商品比较少评分数据会非常的稀疏。
2用户集U的选择 最好是粗召回策略确定的用户范围因为ALS模型会生成所有U用户的特征向量表示对于没有见过的用户u没有其向量表示其推荐也是冷启动策略。这里可以根据业务需要限制一个大范围比如4000W-5000W的或大几百万的用户从计算效率和内存使用上个人建议500W内比较合适。比如用户U定义为某些类目下购买人群、或者近期活跃人群等符合业务人群目标的潜在客户群。模型训练完之后也是在这个用户集U中筛选出TopK相似的用户做推荐或扩量。
3评分数据R的选择我们能采集到的大多是隐式反馈的数据比如购买行为、浏览行为、收藏行为等。确定了U、I确定了评分指标类型就可以统计一段时间内U对I的反馈数据R。数据量级大约在7亿条-10亿条在模型参数设置合理的情况下大约20-30分钟就可以训练完。
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import expr, isnull ALS模型参数解读和大小设置建议
:paramrank10, maxIter10, regParam0.1, numUserBlocks10,numItemBlocks10, implicitPrefsFalse, alpha1.0, userColuser, itemColitem,seedNone, ratingColrating, nonnegativeFalse, checkpointInterval10,intermediateStorageLevelMEMORY_AND_DISK,finalStorageLevelMEMORY_AND_DISK, coldStartStrategynan, blockSize4096NumBlocks分块数分块是为了并行计算默认为10。可以根据数据量级适当放大比如20。 可以对 numUserBlocks\numItemBlocks 单独进行配置并行度 也可以通过setNumBlocks(30)一起设置。正则化参数默认为1。
秩rank模型中隐藏因子的个数默认是10。即特征向量的维度。
implicitPrefs显式偏好信息-false隐式偏好信息-true默认false(显示) 。 电商场景中 购买、点击、分享都是隐式反馈。
alpha隐式反馈时的置信度参数默认是1.0。只用于隐式的偏好数据。
setMaxIter(10)最大迭代次数,设置太大发生java.lang.StackOverflowError。建议范围 10 20。 超过20比较容易失败。
coldStartStrategy: 预测时冷启动策略。默认是nan, 可以选择 drop。
ratings spark.sql(selectuser_acct, user_id, main_sku_id, item_id, ratingfrom dmb_dev.dmb_dev_als_model_rating_matrix).repartition(3600)
train_data, test_data ratings.randomSplit([0.9, 0.1], seed4226)
train_data.cache()
als ALS() \.setImplicitPrefs(True) \.setAlpha(0.7) \.setMaxIter(20) \.setRank(10) \.setRegParam(0.01) \.setNumBlocks(30) \.setUserCol(user_id) \.setItemCol(item_id) \.setRatingCol(rating) \.setColdStartStrategy(drop)
print(als.explainParams())als_model als.fit(train_data)
als_model.write().overwrite().save(model_save_path)# 训练集合所有用户U的向量表示
candidate_user_factors als_model.userFactors.withColumnRenamed(id, user_id)\.join(train_data.select(user_acct, user_id).dropDuplicates(), [user_id])\.withColumn(bin_group, expr(round(rand(),1)))
candidate_user_factors.cache()
candidate_user_factors.write.format(orc).mode(overwrite)\.saveAsTable(dev.dev_als_model_all_trained_users_factor_result)
train_data.unpersist()# query用户的向量表示
target_user_factors spark.sql(selectuser_acct, user_idfrom dev.dev_wdy_als_seed_users_tablegroup by user_acct, user_id).join(candidate_user_factors, [user_acct, user_id])
target_user_factors.cache()
target_user_factors.write.format(orc).mode(overwrite)\.saveAsTable(dev.dev_als_model_seed_users_factor)# 候选用户向量表示
search_user_factors candidate_user_factors.join(target_user_factors,candidate_user_factors[user_acct] target_user_factors[user_acct],left_outer)\.where(isnull(target_user_factors[user_acct]))\.select(candidate_user_factors[user_acct], candidate_user_factors[user_id],candidate_user_factors[features], candidate_user_factors[bin_group])
search_user_factors.write.format(orc).mode(overwrite)\.saveAsTable(dev.dev_als_model_candidate_users_factor)
candidate_user_factors.unpersist()
target_user_factors.unpersist()2、通过Faiss快速实现向量TopK相似检索
如果没有装faiss可以选择安装CPU/GPU版本 pip install faiss-cpu
关于faiss的使用说明可以参考向量数据库入坑指南聊聊来自元宇宙大厂 Meta 的相似度检索技术 Faiss - 知乎 faiss来自facebook 开源 Meta Research · GitHub的github库为GitHub - facebookresearch/faiss: A library for efficient similarity search and clustering of dense vectors.
根据业务需求的查询速度、精准度要求来选择合适的Faiss TopK向量查询方法。 # 判断 npy文件是否存在不存在则执行以下操作否则跳过此步骤直接读取文件。
user_embedding spark.sql(select features[0],features[1],features[2],features[3],features[4],features[5],features[6],features[7],features[8],features[9] from dev.dev_als_model_candidate_users_factorwhere bin_group0.1).toPandas()# 量级500W内执行顺利再大的量级容易内存溢出失败。
np.save(user_embedding_01.npy, np.array(user_embedding, orderC))user_embedding np.load(user_embedding_01.npy)
print(user_embedding data sample:, user_embedding[:3])
print(user embedding shape, user_embedding.shape)
dimension user_embedding.shape[1]
nums_user user_embedding.shape[0]faiss.normalize_L2(user_embedding)
index faiss.IndexFlatIP(dimension)
index.add(user_embedding)
print(index is trained:, index.is_trained)
print(index n total:, index.ntotal)# 判断文件是否存在如果存在则直接读取否则先下载保存到本地。
## 这里k30 或更大时查询易失败。 k20 查询耗时久但会成功大约3小时。 k10时
k 5
query1 spark.sql(select features[0],features[1],features[2],features[3],features[4],features[5],features[6],features[7],features[8],features[9] from dev.dev_als_model_seed_users_factor).toPandas()
np.save(query.npy, np.array(query1, orderC))
query np.load(query.npy)
print(query shape:, query.shape)# 查询
t0 time.time()
Deg, Ind index.search(query, k)
t1 time.time()
print(平均耗时 %7.3f min % ((t1 - t0)/60))# 保存索引
faiss.write_index(index, faiss_01.index)
np.save(Ind_01.npy, Ind)
np.save(Deg_01.npy, Deg)res []
for i in range(query.shape[0]):q_vector query[i]r_list Ind[i]for j in range(len(r_list)):r_vector user_embedding[r_list][j]sim Deg[i][j]res.append(([float(v) for v in r_vector], float(sim)))res spark.createDataFrame(res, [recommend_vector, similarity]).repartition(10)
res.cache()res.write.format(orc).mode(overwrite)\.saveAsTable(dev.dev_als_model_recommend_vector_result)user_embedding spark.sql(select *from dev.dev_als_model_candidate_users_factorwhere bin_group0.1)
res.join(user_embedding, res[recommend_vector] user_embedding[features])\.write.format(orc).mode(overwrite)\.saveAsTable(dev.dev_als_model_recommend_user_pin_result)3、通过I2I2U 或者 I2U2U来获得用户扩量结果
上述实现的是U2U的扩量方法使用的是User-Factor向量表示。第一个U来自于业务营销目标I下的历史已购人群。即这是一个I2U2U的扩量方法。 I (目标商品)--- U(历史购买) --- U(TopK相似) 。
当然也可以通过使用Item-Factor向量表示实现 I2I2U即 I (目标商品)--- I(TopK相似) --- U(历史购买) 这样来做商品相似召回实现用户的扩量。
基于实验效果或历史数据的验证来选择使用哪种方法投产。
4、算法设计框架总结
可以看到这个算法设计框架其实是 Embedding Faiss 即用户/商品的向量表示 Faiss快速向量相似检索 的设计模式。
那么第一部分的ALS模型当然可以替换成任何一种可以效果更好的Embedding算法模型比如BERT 、Transformer等深度学习模型。而第二部分Faiss的查询可以保持不动只要替换查询数据源就可以了。当然也可以将其优化成GPU的或更快速的查询方式以满足线上业务的需求。
但整体的算法设计框架是不变的Embedding向量化 Faiss相似检索。
Done.