【实习项目总结】

1. 项目简介

无线短视频推荐项目,负责无线端用户的短视频推荐,也就是给手机端用户推荐短视频。主要用到了item-based协同过滤的思想,为用户提供候选短视频推荐集合,然后再利用预训练的LR模型返回候选推荐集合的最终排序,推给用户。

2. 推荐系统分类

感谢@奔波的梦想 的总结。推荐算法大致可以分为三类:基于内容的推荐算法、协同过滤推荐算法和基于知识的推荐算法。
基于内容的推荐算法,原理是用户喜欢和自己关注过的Item在内容上类似的Item,比如你看了哈利波特I,基于内容的推荐算法发现哈利波特II-VI,与你以前观看的在内容上面(共有很多关键词)有很大关联性,就把后者推荐给你,这种方法可以避免Item的冷启动问题(冷启动:如果一个Item从没有被关注过,其他推荐算法则很少会去推荐,但是基于内容的推荐算法可以分析Item之间的关系,实现推荐),弊端在于推荐的Item可能会重复,典型的就是新闻推荐,如果你看了一则关于MH370的新闻,很可能推荐的新闻和你浏览过的,内容一致;另外一个弊端则是对于一些多媒体的推荐(比如音乐、电影、图片等)由于很难提内容特征,则很难进行推荐,一种解决方式则是人工给这些Item打标签。
协同过滤算法,原理是用户喜欢那些具有相似兴趣的用户喜欢过的商品,比如你的朋友喜欢电影哈利波特I,那么就会推荐给你,这是最简单的基于用户的协同过滤算法(user-based collaboratIve filtering),还有一种是基于Item的协同过滤算法(item-based collaborative filtering),这两种方法都是将用户的所有数据读入到内存中进行运算的,因此成为Memory-based Collaborative Filtering,另一种则是Model-based collaborative filtering,包括Aspect Model,pLSA,LDA,聚类,SVD,Matrix Factorization等,这种方法训练过程比较长,但是训练完成后,推荐过程比较快。
​ 最后一种方法是基于知识的推荐算法,也有人将这种方法归为基于内容的推荐,这种方法比较典型的是构建领域本体,或者是建立一定的规则,进行推荐。

item-based 和 user-based 协同过滤的比较:

我们再来回顾一下item-base CF算法的特点

  • 物品数明显小于用户数的场合,否则物品相似度矩阵计算代价很大
  • 适合长尾物品丰富,用户个性化需求强的领域
  • 对新用户友好,对新物品不友好,因为物品相似度矩阵不需要很强的实时性
  • 利用用户历史行为做推荐解释,比较令用户信服

所以item-base挺适合做电影的推荐。当用户浏览某个电影的时候,我们可以推荐给他类似的电影,或者根据用户以前的观影记录,推荐他感兴趣的电影。

3. 项目流程

  1. 根据集群中7天用户-视频score数据,计算视频两两之间的余弦相似度
  2. 为每个视频选取与之相似度>0.5且最接近的top40个视频
  3. 根据用户7天之内看过的视频,将相似视频merge进来,根据看过视频的score和相似视频的余弦相似度乘积,对看过的所有视频的所有相似视频打分,取top50作为用户的推荐视频候选集(去掉已经看过的视频)
  4. 根据线上抽取的特征和用户、视频15天线下特征输入预训练的LR得到候选集合的点击率排序,作为最终的推荐顺序。

4. CF核心思路

* user1     user2
* item1  score11   score21 (X)
* item2  score12   score22 (Y)
* --------------------------------
* sim(item1,item2) = XY / math.sqrt(XX) * math.sqrt(YY)
* XY= score11 * score12 + score21 * score22
* XX = score11 * score11 + score21 * score21
* YY = score12 * score12 + score22 * score22

已知一周内,用户观看视频的数据,形式为RDD[(uid,(aid,score))]

按照uid做一次join操作,就可以得到RDD[((aid1,aid2),(score11,score22))],即用户对以及同时看过两者的用户对其的score

接下来就可以计算出aid对的余弦相似度了

然后根据余弦相似度为每个aid选出最相近的top40相似ad

5. 优化方向

暂时就想到了一个:

将用户对电影的score做时间衰减

之后继续补充

6. 核心代码

package model
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.hive.HiveContext
import util.MqManager
/**
* Created by dengxing on 2017/7/18.
*/
object CF {
/** 基于dt时间获取原始数据源
*
* @param sc SparkContext
* @param table 转换的hive表
* @param day 获取当前日期的数据
* @return 原始数据的dataFrame
*/
def getResource(sc: SparkContext, table: String, day: String) = {
val hiveContext = new HiveContext(sc)
import hiveContext.sql
val resource = sql("select "
+ "uid,"
+ "aid,"
+ "cnt"
+ " from " + table + " where dt ='" + day + "'")
resource
}
/**
* 分布式计算余弦相似度
* --------------------------------
* user1 user2
* item1 score11 score21 (X)
* item2 score12 score22 (Y)
* --------------------------------
* sim(item1,item2) = XY / math.sqrt(XX) * math.sqrt(YY)
* XY= score11 * score12 + score21 * score22
* XX = score11 * score11 + score21 * score21
* YY = score12 * score12 + score22 * score22
*
* @param resource
* @return RDD[(item1,item2,sim)]
*/
def getCosineSimilarity(resource: DataFrame): RDD[(String, (String, Double))] = {
val rating = resource.map {
row => {
val uid = row.getString(0)
val aid = row.getString(1)
val score = row.getString(2).toDouble
(uid, aid, score)
}
}
//RDD[(uid,(aid,score))]
val user_item_score = rating.map(f => (f._1, (f._2, f._3)))
/*
* 提取每个用户有过行为的item键值对,即
* RDD[((aid1,aid2),(score11,score22))]
*/
val item_score_pair = user_item_score.join(user_item_score)
.map(f => ((f._2._1._1, f._2._2._1), (f._2._1._2, f._2._2._2)))
/*
* 提取同一对item,所有的用户评分向量的点积,即XY 及 XX 及 YY
* RDD[((aid1,aid2),score11 * score12 + score21 * score22)]
* 及 RDD[((aid1,aid1),score11 * score11 + score21 * score21)]
* 及 RDD[((aid2,aid2),score12 * score12 + score22 * score22)]
*/
val item_pair_ALL = item_score_pair.map(f => (f._1, f._2._1 * f._2._2)).reduceByKey(_ + _)
/*
* 提取每个item,所有用户的自向量的点积,即XX或YY
* RDD[((aid1,aid1),score11 * score11 + score21 * score21)]
* 或 RDD[((aid2,aid2),score12 * score12 + score22 * score22)]
*/
val item_pair_XX_YY = item_pair_ALL.filter(f => f._1._1 == f._1._2)
/*
* 提取每个item,所有用户的非自向量的点积,即XY
* RDD[((aid1,aid2),score11 * score12 + score21 * score22)]
*/
val item_pair_XY = item_pair_ALL.filter(f => f._1._1 != f._1._2)
/*
* 提取item_pair_XX_YY中的item及XX或YY
* RDD[(aid1,score11 * score11 + score21 * score21)]
* 或 RDD[(aid2,score12 * score12 + score22 * score22)]
*/
val item_XX_YY = item_pair_XX_YY.map(f => (f._1._1, f._2))
/*
* 转化item_pair_XY为(aid1,((aid1,aid2,XY),XX)))
* RDD[(aid1,((aid1,aid2,score11 * score12 + score21 * score22),score11 * score11 + score21 * score21)))]
*/
val item_XY_XX = item_pair_XY.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).join(item_XX_YY)
/*
* 转为item_XY_XX为(aid2,((aid1,aid2,XY,XX),YY))
* RDD[(aid2,((aid1,aid2,score11 * score12 + score21 * score22,score11 * score11 + score21 * score21),score12 * score12 + score22 * score22))]
*/
val item_XY_XX_YY = item_XY_XX.map(f => (f._2._1._2, (f._2._1._1, f._2._1._2, f._2._1._3, f._2._2))).join(item_XX_YY)
/*
* 提取item_XY_XX_YY中的(aid1,aid2,XY,XX,YY))
* RDD[(aid1,aid2,score11 * score12 + score21 * score22,score11 * score11 + score21 * score21,score12 * score12 + score22 * score22)]
*/
val item_pair_XY_XX_YY = item_XY_XX_YY.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
/*
* 转化item_pair_XY_XX_YY为(aid1,aid2,XY / math.sqrt(XX * YY))
* RDD[(aid1,aid2,score11 * score12 + score21 * score22 / math.sqrt((score11 * score11 + score21 * score21)*(score12 * score12 + score22 * score22))]
*/
val item_pair_sim = item_pair_XY_XX_YY.map(f => (f._1, (f._2, f._3 / math.sqrt(f._4 * f._5))))
item_pair_sim
}
/**
* 基于item相似度矩阵为user生成topN推荐列表
*
* @param resource
* @param item_sim_bd
* @param topN
* @return RDD[(user,List[(item,score)])]
*/
def recommend(resource: DataFrame, item_sim_bd: Broadcast[scala.collection.Map[String, List[(String, Double)]]], topN: Int = 50) = {
val user_item_score = resource.map(
row => {
val uid = row.getString(0)
val aid = row.getString(1)
val score = row.getString(2).toDouble
((uid, aid), score)
}
)
/*
* 提取item_sim_user_score为((user,item2),sim * score)
* RDD[(user,item2),sim * score]
*/
val user_item_simscore = user_item_score.flatMap(
f => {
val items_sim = item_sim_bd.value.getOrElse(f._1._2, List(("0", 0.0)))
for (w <- items_sim) yield ((f._1._1, w._1), w._2 * f._2)
}).filter(_._2 > 0.03)
/*
* 聚合user_item_simscore为 (user,(item2,sim1 * score1 + sim2 * score2))
* 假设user观看过两个item,评分分别为score1和score2,item2是与user观看过的两个item相似的item,相似度分别为sim1,sim2
* RDD[(user,item2),sim1 * score1 + sim2 * score2))]
*/
val user_item_rank = user_item_simscore.reduceByKey(_ + _, 1000)
/*
* 过滤用户已看过的item,并对user_item_rank基于user聚合
* RDD[(user,CompactBuffer((item2,rank2),(item3,rank3)...))]
*/
val user_items_ranks = user_item_rank.subtractByKey(user_item_score).map(f => (f._1._1, (f._1._2, f._2))).groupByKey(1000)
/*
* 对user_items_ranks基于rank降序排序,并提取topN,其中包括用户已观看过的item
* RDD[(user,ArrayBuffer((item2,rank2),...,(itemN,rankN)))]
*/
val user_items_ranks_desc = user_items_ranks.map(f => {
val item_rank_list = f._2.toList
val item_rank_desc = item_rank_list.sortWith((x, y) => x._2 > y._2)
(f._1, item_rank_desc.take(topN))
})
user_items_ranks_desc
}
/**
* json 编码格式1:用于CF离线推荐结果
*
* @param recTopN 离线推荐结果
*/
def encodeToJson1(recTopN: (String, List[(String, Double)])) = {
val mtype = "lxfs"
val mtype_ = "\"" + "mtype" + "\"" + ":" + "\"" + mtype + "\""
val uid = recTopN._1
val uid_ = "\"" + "uid" + "\"" + ":" + "\"" + uid + "\""
val aid_score = recTopN._2
val aids_ = new StringBuilder().append("\"" + "list" + "\"" + ":[")
for (v <- aid_score) {
val aid = v._1.split("_")(0)
val type_ = v._1.split("_")(1)
val score = v._2
aids_.append("{" + "\"aid\"" + ":" + aid + ",")
aids_.append("\"type\"" + ":" + type_ + ",")
aids_.append("\"score\"" + ":" + score + "},")
}
aids_.deleteCharAt(aids_.length - 1).append("]")
val result = "{" + mtype_ + "," + uid_ + "," + aids_.toString() + "}"
result
}
/**
* json 编码格式2:用于CF相似度矩阵
*
* @param cf_sim CF截断的相似度矩阵
*/
def encodeToJson2(cf_sim: (String, List[(String, Double)])) = {
val mtype = "cf"
val mtype_ = "\"" + "mtype" + "\"" + ":" + "\"" + mtype + "\""
val aid1 = cf_sim._1.split("_")(0)
val aid_ = "\"" + "aid" + "\"" + ":" + "\"" + aid1 + "\""
val aid_score = cf_sim._2
val aids_ = new StringBuilder().append("\"" + "similar" + "\"" + ":[")
for (v <- aid_score) {
val aid2 = v._1.split("_")(0)
val type_ = v._1.split("_")(1)
val score = v._2
aids_.append("{" + "\"aid\"" + ":" + aid2 + ",")
aids_.append("\"type\"" + ":" + type_ + ",")
aids_.append("\"score\"" + ":" + score + "},")
}
aids_.deleteCharAt(aids_.length - 1).append("]")
val result = "{" + mtype_ + "," + aid_ + "," + aids_.toString() + "}"
result
}
def main(args: Array[String]): Unit = {
val table = args(0) //要处理的表
val day = args(1) //当前日期
val sparkConf = new SparkConf().setAppName("Wireless ItemBased Collaborative Filtering")
val sc = new SparkContext(sparkConf)
val resource = getResource(sc, table, day).repartition(1000)
resource.cache()
// 1.计算item相似度矩阵
val item_sim: RDD[(String, (String, Double))] = getCosineSimilarity(resource)
item_sim.cache()
// 2.每个item提取最相近的40个item
val item_sim_rdd = item_sim.filter(f => f._2._2 > 0.05).groupByKey().map(
f => {
val item = f._1
val items_score = f._2.toList
val items_score_desc = items_score.sortWith((x, y) => x._2 > y._2)
(item, items_score_desc.take(40))
}).cache()
// 3.保存cf相似度矩阵到HDFS
item_sim_rdd.map(encodeToJson2(_)).foreachPartition(
f => {
for (m <- f) MqManager.sendMessage(m)
}
)
// 4.广播相似度矩阵
val item_sim_map = item_sim_rdd.collectAsMap()
val item_sim_bd: Broadcast[scala.collection.Map[String, List[(String, Double)]]] = sc.broadcast(item_sim_map)
// 5.为用户生成推荐列表
val recTopN = recommend(resource, item_sim_bd, 50)
/* 6.发送RabbitMQ
* 1> CF相似度矩阵Json 编码并发送
* 2> 用户推荐列表Json 编码并发送
*/
recTopN.map(encodeToJson1(_)).foreachPartition(
f => {
for (m <- f) MqManager.sendMessage(m)
}
)
}
}