如何使用Spark ALS实现spark 协同过滤 例子

jethai 的BLOG
用户名:jethai
文章数:340
访问量:30187
注册日期:
阅读量:5863
阅读量:12276
阅读量:320696
阅读量:1031675
51CTO推荐博文
用户电影评分数据集下载1) Item-Based,非个性化的,每个人看到的都一样2) User-Based,个性化的,每个人看到的不一样对用户的行为分析得到用户的喜好后,可以根据用户的喜好计算相似用户和物品,然后可以基于相似用户或物品进行推荐。这就是协同过滤中的两个分支了,基于用户的和基于物品的协同过滤。在计算用户之间的相似度时,是将一个用户对所有物品的偏好作为一个向量,而在计算物品之间的相似度时,是将所有用户对某个物品的偏好作为一个向量。求出相似度后,接下来可以求相似邻居了。3)基于模型(ModelCF)&&&&&&&&按照模型,可以分为:&&&&&&&&1)最近邻模型:基于距离的协同过滤算法&&&&& &2)Latent Factor Mode(SVD):基于矩阵分解的模型&&&&& &3)Graph:图模型,社会网络图模型适用场景&&&&&对于一个在线网站,用户的数量往往超过物品的数量,同时物品数据相对稳定,因此计算物品的相似度不但&&&&&计算量小,同时不必频繁更新。但是这种情况只适用于电子商务类型的网站,像新闻类,博客等这类网站的&&&&&系统推荐,情况往往是相反的,物品数量是海量的,而且频繁更新。r语言实现基于物品的协同过滤算法&&&&&&&&#引用plyr包
&&&&library(plyr)
&&&&#读取数据集
&&&&train&-read.table(file="C:/users/Administrator/Desktop/u.data",sep="&")
&&&&train&-train[1:3]
&&&&names(train)&-c("user","item","pref")
&&&&#计算用户列表方法
&&&&usersUnique&-function(){
&&&&&&users&-unique(train$user)
&&&&&&users[order(users)]
&&&&#计算商品列表方法
&&&&itemsUnique&-function(){
&&&&&&items&-unique(train$item)
&&&&&&items[order(items)]
&&&&#&用户列表
&&&&users&-usersUnique()
&&&&#&商品列表
&&&&items&-itemsUnique()&
&&&&#建立商品列表索引
&&&&index&-function(x)&which(items&%in%&x)
&&&&data&-ddply(train,.(user,item,pref),summarize,idx=index(item))&
&&&&#同现矩阵
&&&&cooccurrence&-function(data){
&&&&&&n&-length(items)
&&&&&&co&-matrix(rep(0,n*n),nrow=n)
&&&&&&for(u&in&users){
&&&&idx&-index(data$item[which(data$user==u)])
&&&&m&-merge(idx,idx)
&&&&for(i&in&1:nrow(m)){
&&&&&&co[m$x[i],m$y[i]]=co[m$x[i],m$y[i]]+1
&&&&&&return(co)
&&&&#推荐算法
&&&&recommend&-function(udata=udata,co=coMatrix,num=0){
&&&&&&n&-length(items)
&&&&&&#&all&of&pref
&&&&&&pref&-rep(0,n)
&&&&&&pref[udata$idx]&-udata$pref
&&&&&&#&用户评分矩阵
&&&&&&userx&-matrix(pref,nrow=n)
&&&&&&#&同现矩阵*评分矩阵
&&&&&&r&-co&%*%&userx
&&&&&&#&推荐结果排序&
&&&&&&#&把该用户评分过的商品的推荐值设为0
&&&&&&r[udata$idx]&-0
&&&&&&idx&-order(r,decreasing=TRUE)
&&&&&&topn&-data.frame(user=rep(udata$user[1],length(idx)),item=items[idx],val=r[idx])
&&&&&&topn&-topn[which(topn$val&0),]
&&&&&&#&推荐结果取前num个
&&&&&&if(num&0){
&&&&topn&-head(topn,num)
&&&&&&#返回结果
&&&&&&return(topn)
&&&&#生成同现矩阵
&&&&co&-cooccurrence(data)&
&&&&#计算推荐结果
&&&&recommendation&-data.frame()
&&&&for(i&in&1:length(users)){
&&&&&&udata&-data[which(data$user==users[i]),]
&&&&&&recommendation&-rbind(recommendation,recommend(udata,co,0))&
&&&&}mareduce 实现参考文章:代码下载spark ALS实现Spark mllib里用的是矩阵分解的协同过滤,不是UserBase也不是ItemBase。参考文章:import&org.apache.spark.SparkConf
import&org.apache.spark.mllib.recommendation.{ALS,&MatrixFactorizationModel,&Rating}
import&org.apache.spark.rdd._
import&org.apache.spark.SparkContext
import&scala.io.Source
object&MovieLensALS&{
&&def&main(args:Array[String])&{
&&&&//设置运行环境
&&&&val&sparkConf&=&new&SparkConf().setAppName("MovieLensALS").setMaster("local[5]")
&&&&val&sc&=&new&SparkContext(sparkConf)
&&&&//装载用户评分,该评分由评分器生成(即生成文件personalRatings.txt)
&&&&val&myRatings&=&loadRatings(args(1))
&&&&val&myRatingsRDD&=&sc.parallelize(myRatings,&1)
&&&&//样本数据目录
&&&&val&movielensHomeDir&=&args(0)
&&&&//装载样本评分数据,其中最后一列Timestamp取除10的余数作为key,Rating为值,即(Int,Rating)
&&&&val&ratings&=&sc.textFile(movielensHomeDir&+&"/ratings.dat").map&{
&&&&&&line&=&
&&&&&&&&val&fields&=&line.split("::")
&&&&&&&&//&format:&(timestamp&%&10,&Rating(userId,&movieId,&rating))
&&&&&&&&(fields(3).toLong&%&10,&Rating(fields(0).toInt,&fields(1).toInt,&fields(2).toDouble))
&&&&//装载电影目录对照表(电影ID-&电影标题)
&&&&val&movies&=&sc.textFile(movielensHomeDir&+&"/movies.dat").map&{
&&&&&&line&=&
&&&&&&&&val&fields&=&line.split("::")
&&&&&&&&//&format:&(movieId,&movieName)
&&&&&&&&(fields(0).toInt,&fields(1))
&&&&}.collect().toMap
&&&&//统计有用户数量和电影数量以及用户对电影的评分数目
&&&&val&numRatings&=&ratings.count()
&&&&val&numUsers&=&ratings.map(_._2.user).distinct().count()
&&&&val&numMovies&=&ratings.map(_._2.product).distinct().count()
&&&&println("Got&"&+&numRatings&+&"&ratings&from&"&+&numUsers&+&"&users&"&+&numMovies&+&"&movies")
&&&&//将样本评分表以key值切分成3个部分,分别用于训练&(60%,并加入用户评分),&校验&(20%),&and&测试&(20%)
&&&&//该数据在计算过程中要多次应用到,所以cache到内存
&&&&val&numPartitions&=&4
&&&&val&training&=&ratings.filter(x&=&&x._1&&&6).values.union(myRatingsRDD).repartition(numPartitions).persist()
&&&&val&validation&=&ratings.filter(x&=&&x._1&&=&6&&&&x._1&&&8).values.repartition(numPartitions).persist()
&&&&val&test&=&ratings.filter(x&=&&x._1&&=&8).values.persist()
&&&&val&numTraining&=&training.count()
&&&&val&numValidation&=&validation.count()
&&&&val&numTest&=&test.count()
&&&&println("Training:&"&+&numTraining&+&"&validation:&"&+&numValidation&+&"&test:&"&+&numTest)
&&&&//训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模型
&&&&val&ranks&=&List(8,&12)
&&&&val&lambdas&=&List(0.1,&10.0)
&&&&val&numIters&=&List(10,&20)
&&&&var&bestModel:&Option[MatrixFactorizationModel]&=&None
&&&&var&bestValidationRmse&=&Double.MaxValue
&&&&var&bestRank&=&0
&&&&var&bestLambda&=&-1.0
&&&&var&bestNumIter&=&-1
&&&&for&(rank&&-&&lambda&&-&&numIter&&-&numIters)&{
&&&&&&val&model&=&ALS.train(training,&rank,&numIter,&lambda)
&&&&&&val&validationRmse&=&computeRmse(model,&validation,&numValidation)
&&&&&&println("RMSE(validation)&=&"&+&validationRmse&+&"&for&the&model&trained&with&rank&=&"
&&&&&&&&+&rank&+&",lambda&=&"&+&lambda&+&",and&numIter&=&"&+&numIter&+&".")
&&&&&&if&(validationRmse&&&bestValidationRmse)&{
&&&&&&&&bestModel&=&Some(model)
&&&&&&&&bestValidationRmse&=&validationRmse
&&&&&&&&bestRank&=&rank
&&&&&&&&bestLambda&=&lambda
&&&&&&&&bestNumIter&=&numIter
&&&&//用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差(RMSE)
&&&&val&testRmse&=&computeRmse(bestModel.get,&test,&numTest)
&&&&println("The&best&model&was&trained&with&rank&=&"&+&bestRank&+&"&and&lambda&=&"&+&bestLambda
&&&&&&+&",&and&numIter&=&"&+&bestNumIter&+&",&and&its&RMSE&on&the&test&set&is&"&+&testRmse&+&".")
&&&&//create&a&naive&baseline&and&compare&it&with&the&best&model
&&&&val&meanRating&=&training.union(validation).map(_.rating).mean()
&&&&val&baselineRmse&=&math.sqrt(test.map(x&=&&(meanRating&-&x.rating)&*&(meanRating&-&x.rating)).reduce(_&+&_)&/&numTest)
&&&&val&improvement&=&(baselineRmse&-&testRmse)&/&baselineRmse&*&100
&&&&println("The&best&model&improves&the&baseline&by&"&+&"%1.2f".format(improvement)&+&"%.")
&&&&//推荐前十部最感兴趣的电影,注意要剔除用户已经评分的电影
&&&&val&myRatedMovieIds&=&myRatings.map(_.product).toSet
&&&&val&candidates&=&sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
&&&&val&recommendations&=&bestModel.get
&&&&&&.predict(candidates.map((0,&_)))
&&&&&&.collect()
&&&&&&.sortBy(-_.rating)
&&&&&&.take(10)
&&&&var&i&=&1
&&&&println("Movies&recommended&for&you:")
&&&&recommendations.foreach&{&r&=&
&&&&&&println("%2d".format(i)&+&":&"&+&movies(r.product))
&&&&&&i&+=&1
&&&&sc.stop()
&&/**&校验集预测数据和实际数据之间的均方根误差&**/
&&def&computeRmse(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double&=&{
&&&&val&predictions:RDD[Rating]&=&model.predict(data.map(x&=&&(x.user,x.product)))
&&&&val&predictionsAndRatings&=&predictions.map{&x&=&((x.user,x.product),x.rating)}
&&&&&&.join(data.map(x&=&&((x.user,x.product),x.rating))).values
&&&&math.sqrt(predictionsAndRatings.map(&x&=&&(x._1&-&x._2)&*&(x._1&-&x._2)).reduce(_+_)/n)
&&/**&装载用户评分文件&personalRatings.txt&**/
&&def&loadRatings(path:String):Seq[Rating]&=&{
&&&&val&lines&=&Source.fromFile(path).getLines()
&&&&val&ratings&=&lines.map{
&&&&&&line&=&
&&&&&&&&val&fields&=&line.split("::")
&&&&&&&&Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
&&&&}.filter(_.rating&&&0.0)
&&&&if(ratings.isEmpty){
&&&&&&sys.error("No&ratings&provided.")
&&&&}else{
&&&&&&ratings.toSeq
}参考文章:本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)出处:/ 转载自
本文主要记录最近一段时间学习和实现Spark MLlib中的协同过滤的一些总结,希望对大家熟悉Spark ALS算法有所帮助。
【】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,见
为了测试简单,在本地以local方式运行Spark,你需要做的是下载编译好的压缩包解压即可,可以参考
测试数据使用
数据集,下载之后解压到data目录。数据的格式请参考README中的说明,需要注意的是ratings.dat中的数据被处理过,
每个用户至少访问了20个商品。
下面的代码均在spark-shell中运行,启动时候可以根据你的机器内存设置JVM参数,例如:
bin/spark-shell --executor-memory 3g --driver-memory 3g --driver-java-options &-Xms2g -Xmx2g -XX:+UseCompressedOops&
这个例子主要演示如何训练数据、评分并计算根均方差。
首先,启动spark-shell,然后引入mllib包,我们需要用到ALS算法类和Rating评分类:
import org.apache.spark.mllib.recommendation.{ALS, Rating}
Spark的日志级别默认为INFO,你可以手动设置为WARN级别,同样先引入log4j依赖:
import org.apache.log4j.{Logger,Level}
然后,运行下面代码:
Logger.getLogger(&org.apache.spark&).setLevel(Level.WARN)
Logger.getLogger(&org.eclipse.jetty.server&).setLevel(Level.OFF)
spark-shell启动成功之后,sc为内置变量,你可以通过它来加载测试数据:
val data = sc.textFile(&data/ml-1m/ratings.dat&)
接下来解析文件内容,获得用户对商品的评分记录:
val ratings = data.map(_.split(&::&) match { case Array(user, item, rate, ts) =&
Rating(user.toInt, item.toInt, rate.toDouble)
}).cache()
查看第一条记录:
scala& ratings.first
res81: org.apache.spark.mllib.recommendation.Rating = Rating(1,)
我们可以统计文件中用户和商品数量:
val users = ratings.map(_.user).distinct()
val products = ratings.map(_.product).distinct()
println(&Got &+ratings.count()+& ratings from &+users.count+& users on &+products.count+& products.&)
可以看到如下输出:
//Got 1000209 ratings from 6040 users on 3706 products.
你可以对评分数据生成训练集和测试集,例如:训练集和测试集比例为8比2:
val splits = ratings.randomSplit(Array(0.8, 0.2), seed = 111l)
val training = splits(0).repartition(numPartitions)
val test = splits(1).repartition(numPartitions)
这里,我们是将评分数据全部当做训练集,并且也为测试集。
接下来调用
ALS.train()方法,进行模型训练:
val rank = 12
val lambda = 0.01
val numIterations = 20
val model = ALS.train(ratings, rank, numIterations, lambda)
训练完后,我们看看model中的用户和商品特征向量:
model.userFeatures
//res82: org.apache.spark.rdd.RDD[(Int, Array[Double])] = users MapPartitionsRDD[400] at mapValues at ALS.scala:218
model.userFeatures.count
//res84: Long = 6040
model.productFeatures
//res85: org.apache.spark.rdd.RDD[(Int, Array[Double])] = products MapPartitionsRDD[401] at mapValues at ALS.scala:222
model.productFeatures.count
//res86: Long = 3706
我们要对比一下预测的结果,注意:我们将
训练集当作测试集来进行对比测试。从训练集中获取用户和商品的映射:
val usersProducts= ratings.map { case Rating(user, product, rate) =&
(user, product)
显然,测试集的记录数等于评分总记录数,验证一下:
usersProducts.count
//Long = 1000209
使用推荐模型对用户商品进行预测评分,得到预测评分的数据集:
var predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =&
((user, product), rate)
查看其记录数:
predictions.count //Long = 1000209
将真实评分数据集与预测评分数据集进行合并,这样得到用户对每一个商品的实际评分和预测评分:
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =&
((user, product), rate)
}.join(predictions)
ratesAndPreds.count
//Long = 1000209
然后计算根均方差:
val rmse= math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =&
val err = (r1 - r2)
println(s&RMSE = $rmse&)
上面这段代码其实就是
对测试集进行评分预测并计算与实际评分的相似度,这段代码可以抽象为一个方法,如下:
/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = {
val usersProducts = data.map { case Rating(user, product, rate) =&
(user, product)
val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =&
((user, product), rate)
val ratesAndPreds = data.map { case Rating(user, product, rate) =&
((user, product), rate)
}.join(predictions)
math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =&
val err = (r1 - r2)
除了RMSE指标,我们还可以计算AUC以及Mean average precision at K (MAPK),关于AUC的计算方法,参考
,关于MAPK的计算方法可以参考
一书第四章节内容,或者你可以看本文后面内容。
保存真实评分和预测评分
我们还可以保存用户对商品的真实评分和预测评分记录到本地文件:
ratesAndPreds.sortByKey().repartition(1).sortBy(_._1).map({
case ((user, product), (rate, pred)) =& (user + &,& + product + &,& + rate + &,& + pred)
}).saveAsTextFile(&/tmp/result&)
上面这段代码先按用户排序,然后重新分区确保目标目录中只生成一个文件。如果你重复运行这段代码,则需要先删除目标路径:
import scala.sys.process._
&rm -r /tmp/result&.!
我们还可以对预测的评分结果按用户进行分组并按评分倒排序:
predictions.map { case ((user, product), rate) =&
(user, (product, rate))
}.groupByKey(numPartitions).map{case (user_id,list)=&
(user_id,list.toList.sortBy {case (goods_id,rate)=& - rate})
给一个用户推荐商品
这个例子主要是记录如何给一个或大量用户进行推荐商品,例如,对用户编号为384的用户进行推荐,查出该用户在测试集中评分过的商品。
找出5个用户:
users.take(5)
//Array[Int] = Array(384, , )
查看用户编号为384的用户的预测结果中预测评分排前10的商品:
val userId = users.take(1)(0) //384
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString(&\n&))
Rating(384,)
Rating(384,129,8.676)
Rating(384,184,8.853)
Rating(384,811,7.284)
Rating(384,)
Rating(384,)
Rating(384,)
Rating(384,)
Rating(384,397,7.967)
Rating(384,97,7.754)
查看该用户的评分记录:
val goodsForUser=ratings.keyBy(_.user).lookup(384)
// Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(384,), Rating(384,), Rating(384,593,5.0), Rating(384,599,3.0), Rating(384,673,2.0), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,204,4.0), Rating(384,), Rating(384,), Rating(384,260,4.0), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,))
productsForUser.size //Int = 22
productsForUser.sortBy(-_.rating).take(10).map(rating =& (rating.product, rating.rating)).foreach(println)
可以看到该用户对22个商品评过分以及浏览的商品是哪些。
我们可以该用户对某一个商品的实际评分和预测评分方差为多少:
val actualRating = productsForUser.take(1)(0)
//actualRating: org.apache.spark.mllib.recommendation.Rating = Rating(384,)
val predictedRating = model.predict(789, actualRating.product)
val predictedRating = model.predict(384, actualRating.product)
//predictedRating: Double = 1.4637
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
//squaredError: Double = 0.5075172
如何找出和一个已知商品最相似的商品呢?这里,我们可以使用余弦相似度来计算:
import org.jblas.DoubleMatrix
/* Compute the cosine similarity between two vectors */
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
以2055商品为例,计算实际评分和预测评分相似度
val itemId = 2055
val itemFactor = model.productFeatures.lookup(itemId).head
//itemFactor: Array[Double] = Array(0.14, -0.86, -1.4, -0.9518, -0.3333, -0.3022, -0.8916, -0.62415, -0.2258)
val itemVector = new DoubleMatrix(itemFactor)
//itemVector: org.jblas.DoubleMatrix = [0..435731; -0..443828; -1..627457; -0.326453; -0.993985; -0.871032; -0.757889; -0.146219; -0.725426]
cosineSimilarity(itemVector, itemVector)
// res99: Double = 0.9999
找到和该商品最相似的10个商品:
val sims = model.productFeatures.map{ case (id, factor) =&
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) =& similarity })
//sortedSims: Array[(Int, Double)] = Array((9), (4), (6), (1), (9), (1), (2), (7), (2), (7))
println(sortedSims.mkString(&\n&))
显然第一个最相似的商品即为该商品本身,即2055,我们可以修改下代码,取前k+1个商品,然后排除第一个:
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) =& similarity })
//sortedSims2: Array[(Int, Double)] = Array((9), (4), (6), (1), (9), (1), (2), (7), (2), (7), (4))
sortedSims2.slice(1, 11).map{ case (id, sim) =& (id, sim) }.mkString(&\n&)
接下来,我们可以计算给该用户推荐的前K个商品的平均准确度MAPK,该算法定义如下(该算法是否正确还有待考证):
/* Function to compute average precision given a set of actual and predicted ratings */
// Code for this function is based on: /benhamner/Metrics
def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
val predK = predicted.take(k)
var score = 0.0
var numHits = 0.0
for ((p, i) &- predK.zipWithIndex) {
if (actual.contains(p)) {
numHits += 1.0
score += numHits / (i.toDouble + 1.0)
if (actual.isEmpty) {
score / scala.math.min(actual.size, k).toDouble
给该用户推荐的商品为:
val actualProducts = productsForUser.map(_.product)
//actualProducts: Seq[Int] = ArrayBuffer(, 593, 599, 673, , , 204, , 260, , , , , 1304)
给该用户预测的商品为:
val predictedProducts = topKRecs.map(_.product)
//predictedProducts: Array[Int] = Array(, 184, 811, , , 397, 97)
最后的准确度为:
val apk10 = avgPrecisionK(actualProducts, predictedProducts, 10)
// apk10: Double = 0.0
你可以评分记录中获得所有用户然后依次给每个用户推荐:
val users = ratings.map(_.user).distinct()
users.collect.flatMap { user =&
model.recommendProducts(user, 10)
这种方式是遍历内存中的一个集合然后循环调用RDD的操作,运行会比较慢,另外一种方式是直接操作model中的userFeatures和productFeatures,代码如下:
val itemFactors = model.productFeatures.map { case (id, factor) =& factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
//(3706,12)
// broadcast the item factor matrix
val imBroadcast = sc.broadcast(itemMatrix)
//获取商品和索引的映射
var idxProducts=model.productFeatures.map { case (prodcut, factor) =& prodcut }.zipWithIndex().map{case (prodcut, idx) =& (idx,prodcut)}.collectAsMap()
val idxProductsBroadcast = sc.broadcast(idxProducts)
val allRecs = model.userFeatures.map{ case (user, array) =&
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
//根据索引取对应的商品id
val recommendedProducts = sortedWithId.map(_._2).map{idx=&idxProductsBroadcast.value.get(idx).get}
(user, recommendedProducts)
这种方式其实还不是最优方法,更好的方法可以参考
,当然这篇文章中的代码还可以继续优化一下。我修改后的代码如下,供大家参考:
val productFeatures = model.productFeatures.collect()
var productArray = ArrayBuffer[Int]()
var productFeaturesArray = ArrayBuffer[Array[Double]]()
for ((product, features) &- productFeatures) {
productArray += product
productFeaturesArray += features
val productArrayBroadcast = sc.broadcast(productArray)
val productFeatureMatrixBroadcast = sc.broadcast(new DoubleMatrix(productFeaturesArray.toArray).transpose())
start = System.currentTimeMillis()
val allRecs = model.userFeatures.mapPartitions { iter =&
// Build user feature matrix for jblas
var userFeaturesArray = ArrayBuffer[Array[Double]]()
var userArray = new ArrayBuffer[Int]()
while (iter.hasNext) {
val (user, features) = iter.next()
userArray += user
userFeaturesArray += features
var userFeatureMatrix = new DoubleMatrix(userFeaturesArray.toArray)
var userRecommendationMatrix = userFeatureMatrix.mmul(productFeatureMatrixBroadcast.value)
var productArray=productArrayBroadcast.value
var mappedUserRecommendationArray = new ArrayBuffer[String](params.topk)
// Extract ratings from the matrix
for (i &- 0 until userArray.length) {
var ratingSet =
mutable.TreeSet.empty(Ordering.fromLessThan[(Int,Double)](_._2 & _._2))
for (j &- 0 until productArray.length) {
var rating = (productArray(j), userRecommendationMatrix.get(i,j))
ratingSet += rating
mappedUserRecommendationArray += userArray(i)+&,&+ratingSet.take(params.topk).mkString(&,&)
mappedUserRecommendationArray.iterator
悲哀的是,上面的方法还是不能解决问题,因为矩阵相乘会撑爆集群内存;可喜的是,如果你关注Spark最新动态,你会发现Spark1.4.0中MatrixFactorizationModel提供了
recommendForAll方法实现离线批量推荐,详细说明见
。因为,我使用的Hadoop版本是CDH-5.4.0,其中Spark版本还是1.3.0,所以暂且不能在集群上测试Spark1.4.0中添加的新方法。
如果上面结果跑出来了,就可以验证推荐结果是否正确。还是以384用户为例:
allRecs.lookup(384).head.take(10)
//res50: Array[Int] = Array(, , , , 853, 759)
topKRecs.map(_.product)
//res49: Array[Int] = Array(, , , , 853, 759)
接下来,我们可以计算所有推荐结果的准确度了,首先,得到每个用户评分过的所有商品:
val userProducts = ratings.map{ case Rating(user, product, rating) =& (user, product) }.groupBy(_._1)
然后,预测的商品和实际商品关联求准确度:
// finally, compute the APK for each user, and average them to find MAPK
val MAPK = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =&
val actual = actualWithIds.map(_._2).toSeq
avgPrecisionK(actual, predicted, K)
}.reduce(_ + _) / allRecs.count
println(&Mean Average Precision at K = & + MAPK)
//Mean Average Precision at K = 0.260383
其实,我们也可以使用Spark内置的算法计算RMSE和MAE:
// MSE, RMSE and MAE
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratesAndPreds.map { case ((user, product), (actual, predicted)) =& (actual, predicted) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println(&Mean Squared Error = & + regressionMetrics.meanSquaredError)
println(&Root Mean Squared Error = & + regressionMetrics.rootMeanSquaredError)
// Mean Squared Error = 0.8566
// Root Mean Squared Error = 0.0918
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =&
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println(&Mean Average Precision = & + rankingMetrics.meanAveragePrecision)
// Mean Average Precision = 0.20426
计算推荐2000个商品时的准确度为:
val MAPK2000 = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =&
val actual = actualWithIds.map(_._2).toSeq
avgPrecisionK(actual, predicted, 2000)
}.reduce(_ + _) / allRecs.count
println(&Mean Average Precision = & + MAPK2000)
//Mean Average Precision = 0.069083
保存和加载推荐模型
对与实时推荐,我们需要启动一个web server,在启动的时候生成或加载训练模型,然后提供API接口返回推荐接口,需要调用的相关方法为:
save(model: MatrixFactorizationModel, path: String)
load(sc: SparkContext, path: String)
model中的userFeatures和productFeatures也可以保存起来:
val outputDir=&/tmp&
model.userFeatures.map{ case (id, vec) =& id + &\t& + vec.mkString(&,&) }.saveAsTextFile(outputDir + &/userFeatures&)
model.productFeatures.map{ case (id, vec) =& id + &\t& + vec.mkString(&,&) }.saveAsTextFile(outputDir + &/productFeatures&)
本文主要记录如何使用ALS算法实现协同过滤并给用户推荐商品,以上代码在Github仓库中的
如果你想更加深入了解Spark MLlib算法的使用,可以看看
这本电子书并下载书中的源码,本文大部分代码参考自该电子书。
相关 [spark als 协同过滤] 推荐:
JavaChen Blog,作者:
Junez. 本文主要记录最近一段时间学习和实现Spark MLlib中的协同过滤的一些总结,希望对大家熟悉Spark ALS算法有所帮助. 【】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,见
SPARK-3066.
Mahout使用了Taste来提高协同过滤算法的实现,它是一个基于Java实现的可扩展的,高效的推荐引擎. Taste既实现了最基本的基 于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户可以方便的定义和实现自己的推荐算法. 同时,Taste不仅仅只适用于Java应用程序,它 可以作为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐的逻辑.
- JavaChen Blog
本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程. 协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分. MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素.
又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用.
说到推荐系统,大家可能立马会想到协同过滤算法. 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用. 基于模型的协同过滤应用---电影推荐.
一、协同过滤算法概述.
- CSDN博客推荐文章
今天要讲的主要内容是
协同过滤,即Collaborative Filtering,简称
关于协同过滤的一个最经典的例子就是看电影,有时候不知道哪一部电影是我们喜欢的或者评分比较高的,那.
么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐. 在问的时候,都习惯于问跟自己口味差不.
- IT技术博客大学习
协同过滤算法是推荐系统中最古老,也是最简单高效的推荐算法. 简单说协同过滤就是根据以往的用户产生的数据分析,对用户的新行为进行匹配分析来给用户推荐用户最有可能感兴趣的内容.
协同过滤算法是为了解决
长尾现象,也就是说推荐系统是为了解决长尾现象而诞生的. 因为在之前在有限的空间(如:书店的书架、服装店的衣架、商店的货架、网页的展示区域)只能摆有限的物品进行展示,造成大量的非热门物品很难进入人们的视野,也就无法产生任何价值.
- 刘思喆@贝吉塔行星
推荐系统在个性化领域有着广泛的应用,从技术上讲涉及概率、抽样、最优化、机器学习、数据挖掘、搜索引擎、自然语言处理等多个领域. 东西太多,我也不准备写连载,今天仅从基本算法这个很小的切入点来聊聊推荐引擎的原理. 推荐引擎(系统)从不同的角度看有不同的划分,比如:. 按照数据的分类:协同过滤、内容过滤、社会化过滤.
- CSDN博客云计算推荐文章
经过3个晚上的翻译,终于把ALS-WR算法的介绍论文翻译完成. 此次翻译目的是加强对ALS-WR算法的理解和练习自己对专业性英文的能力,由于本人英文水平有限并且该算法使用到了多个高数甚至超越高数和线性代数的一些知识,所以如哪里翻译不对或理解有误,望英语强人,数学高人,算法牛人给个纠正,先于此谢过. 原文见:/chapter/10.-3-540-?LI=true#page-1,最好是看英文版的,因为该算法的主要精髓是在那几个数学公式上.
- 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.
坚持分享优质有趣的原创文章,并保留作者信息和版权声明,任何问题请联系:@。

我要回帖

更多关于 python实现协同过滤 的文章

 

随机推荐