推荐算法原理详解
约 1987 字大约 7 分钟
recommendationalgorithms
2025-09-15
推荐系统是信息过滤的核心技术,广泛应用于电商、视频、音乐、新闻等场景。本文系统介绍从经典协同过滤到深度学习推荐模型的演进路线。
推荐系统概览
协同过滤(Collaborative Filtering)
协同过滤的核心假设是"物以类聚,人以群分"——相似的用户喜欢相似的物品。
用户协同过滤(User-based CF)
找到与目标用户兴趣相似的用户群体,将这些相似用户喜欢但目标用户未接触过的物品推荐给目标用户。
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
# 用户-物品评分矩阵
# 行: 用户, 列: 物品, 值: 评分 (0 表示未评分)
ratings = np.array([
[5, 3, 0, 1, 4],
[4, 0, 0, 1, 5],
[1, 1, 0, 5, 0],
[0, 0, 5, 4, 0],
[0, 3, 4, 0, 3],
])
def user_based_cf(ratings, target_user, k=2):
"""用户协同过滤推荐"""
# 计算用户相似度(余弦相似度)
user_sim = cosine_similarity(ratings)
np.fill_diagonal(user_sim, 0)
# 找到 top-k 相似用户
similar_users = np.argsort(user_sim[target_user])[::-1][:k]
# 预测未评分物品的分数
predictions = {}
for item in range(ratings.shape[1]):
if ratings[target_user][item] == 0:
numerator = sum(
user_sim[target_user][u] * ratings[u][item]
for u in similar_users if ratings[u][item] > 0
)
denominator = sum(
abs(user_sim[target_user][u])
for u in similar_users if ratings[u][item] > 0
)
if denominator > 0:
predictions[item] = numerator / denominator
return sorted(predictions.items(), key=lambda x: x[1], reverse=True)
# 为用户 0 推荐
recommendations = user_based_cf(ratings, target_user=0)
print("推荐物品:", recommendations)物品协同过滤(Item-based CF)
计算物品之间的相似度,将用户已喜欢物品的相似物品推荐给用户。工业界更常用,因为物品数量通常比活跃用户少,且物品相似度更稳定。
def item_based_cf(ratings, target_user, k=2):
"""物品协同过滤推荐"""
# 计算物品相似度(基于评分过该物品的用户)
item_sim = cosine_similarity(ratings.T)
np.fill_diagonal(item_sim, 0)
# 获取用户已评分的物品
rated_items = np.where(ratings[target_user] > 0)[0]
# 对未评分物品预测分数
predictions = {}
for item in range(ratings.shape[1]):
if ratings[target_user][item] == 0:
# 找到与该物品最相似的 k 个已评分物品
sim_scores = [(i, item_sim[item][i]) for i in rated_items]
sim_scores.sort(key=lambda x: x[1], reverse=True)
top_k = sim_scores[:k]
numerator = sum(sim * ratings[target_user][i] for i, sim in top_k)
denominator = sum(abs(sim) for _, sim in top_k)
if denominator > 0:
predictions[item] = numerator / denominator
return sorted(predictions.items(), key=lambda x: x[1], reverse=True)矩阵分解(Matrix Factorization)
将用户-物品评分矩阵分解为两个低秩矩阵的乘积,学习用户和物品的隐因子表示。
SVD(Singular Value Decomposition)
R≈UΣVT
from scipy.sparse.linalg import svds
# SVD 分解
U, sigma, Vt = svds(ratings.astype(float), k=3)
sigma_diag = np.diag(sigma)
# 重建评分矩阵
predicted_ratings = np.dot(np.dot(U, sigma_diag), Vt)
print("预测评分:\n", np.round(predicted_ratings, 2))ALS(Alternating Least Squares)
ALS 交替固定用户矩阵和物品矩阵,求解另一个矩阵,适合大规模分布式计算。
# PySpark ALS 示例
from pyspark.ml.recommendation import ALS
als = ALS(
maxIter=10,
regParam=0.1,
rank=50, # 隐因子维度
userCol="user_id",
itemCol="item_id",
ratingCol="rating",
coldStartStrategy="drop", # 冷启动处理
implicitPrefs=False, # 显式反馈
)
model = als.fit(training_df)
predictions = model.transform(test_df)
# 为所有用户生成 Top-10 推荐
user_recs = model.recommendForAllUsers(10)基于内容的推荐(Content-Based)
利用物品的内容特征(文本描述、标签、属性)计算物品相似度,推荐与用户历史偏好相似的物品。
from sklearn.feature_extraction.text import TfidfVectorizer
# 物品描述
items = {
0: "action movie with car chases and explosions",
1: "romantic comedy about love in paris",
2: "science fiction movie with space exploration",
3: "action thriller with spy and conspiracy",
4: "romantic drama about family relationships",
}
# TF-IDF 特征提取
tfidf = TfidfVectorizer(stop_words='english')
item_features = tfidf.fit_transform(list(items.values()))
# 计算物品相似度
item_similarity = cosine_similarity(item_features)
def content_based_recommend(user_history, item_similarity, k=3):
"""基于用户历史行为推荐相似物品"""
scores = np.zeros(item_similarity.shape[0])
for item_id, rating in user_history:
scores += rating * item_similarity[item_id]
# 排除已交互的物品
interacted = {item_id for item_id, _ in user_history}
recommendations = [
(i, scores[i]) for i in range(len(scores)) if i not in interacted
]
return sorted(recommendations, key=lambda x: x[1], reverse=True)[:k]深度学习推荐模型
NCF(Neural Collaborative Filtering)
用神经网络替代矩阵分解中的内积操作,增强表达能力。
import torch
import torch.nn as nn
class NCF(nn.Module):
def __init__(self, num_users, num_items, embedding_dim=64, mlp_layers=[128, 64, 32]):
super().__init__()
# GMF 部分
self.user_emb_gmf = nn.Embedding(num_users, embedding_dim)
self.item_emb_gmf = nn.Embedding(num_items, embedding_dim)
# MLP 部分
self.user_emb_mlp = nn.Embedding(num_users, embedding_dim)
self.item_emb_mlp = nn.Embedding(num_items, embedding_dim)
mlp = []
input_dim = embedding_dim * 2
for hidden_dim in mlp_layers:
mlp.append(nn.Linear(input_dim, hidden_dim))
mlp.append(nn.ReLU())
mlp.append(nn.Dropout(0.2))
input_dim = hidden_dim
self.mlp = nn.Sequential(*mlp)
# 融合层
self.output = nn.Linear(embedding_dim + mlp_layers[-1], 1)
def forward(self, user_ids, item_ids):
# GMF
gmf_user = self.user_emb_gmf(user_ids)
gmf_item = self.item_emb_gmf(item_ids)
gmf_out = gmf_user * gmf_item # 逐元素乘积
# MLP
mlp_user = self.user_emb_mlp(user_ids)
mlp_item = self.item_emb_mlp(item_ids)
mlp_input = torch.cat([mlp_user, mlp_item], dim=-1)
mlp_out = self.mlp(mlp_input)
# 融合
combined = torch.cat([gmf_out, mlp_out], dim=-1)
return torch.sigmoid(self.output(combined)).squeeze()DeepFM
结合 FM(Factorization Machine)的二阶特征交叉和 DNN 的高阶特征交叉。
class DeepFM(nn.Module):
def __init__(self, field_dims, embed_dim=16, mlp_dims=[256, 128, 64]):
super().__init__()
self.num_fields = len(field_dims)
# Embedding 层
self.embeddings = nn.ModuleList([
nn.Embedding(dim, embed_dim) for dim in field_dims
])
# FM 一阶项
self.linear = nn.ModuleList([
nn.Embedding(dim, 1) for dim in field_dims
])
self.bias = nn.Parameter(torch.zeros(1))
# DNN 部分
dnn = []
input_dim = self.num_fields * embed_dim
for hidden_dim in mlp_dims:
dnn.append(nn.Linear(input_dim, hidden_dim))
dnn.append(nn.BatchNorm1d(hidden_dim))
dnn.append(nn.ReLU())
dnn.append(nn.Dropout(0.3))
input_dim = hidden_dim
dnn.append(nn.Linear(input_dim, 1))
self.dnn = nn.Sequential(*dnn)
def forward(self, x):
# x: (batch, num_fields) 每个字段的特征索引
# 一阶项
linear_out = sum(
self.linear[i](x[:, i]) for i in range(self.num_fields)
)
# Embedding
embeds = [self.embeddings[i](x[:, i]) for i in range(self.num_fields)]
embed_stack = torch.stack(embeds, dim=1) # (batch, num_fields, embed_dim)
# FM 二阶交叉
sum_square = embed_stack.sum(dim=1).pow(2)
square_sum = embed_stack.pow(2).sum(dim=1)
fm_out = 0.5 * (sum_square - square_sum).sum(dim=1, keepdim=True)
# DNN
dnn_input = embed_stack.view(x.size(0), -1)
dnn_out = self.dnn(dnn_input)
# 输出
output = self.bias + linear_out + fm_out + dnn_out
return torch.sigmoid(output).squeeze()DIN(Deep Interest Network)
DIN 引入注意力机制,根据候选物品动态激活用户历史行为中的相关兴趣。
class DIN(nn.Module):
def __init__(self, num_items, embed_dim=32):
super().__init__()
self.item_embedding = nn.Embedding(num_items, embed_dim)
# 注意力网络
self.attention = nn.Sequential(
nn.Linear(embed_dim * 4, 64),
nn.ReLU(),
nn.Linear(64, 1),
)
# 输出 MLP
self.mlp = nn.Sequential(
nn.Linear(embed_dim * 2, 128),
nn.ReLU(),
nn.Linear(128, 1),
)
def forward(self, candidate_item, history_items, history_mask):
"""
candidate_item: (batch,)
history_items: (batch, max_history_len)
history_mask: (batch, max_history_len)
"""
candidate_emb = self.item_embedding(candidate_item) # (batch, dim)
history_emb = self.item_embedding(history_items) # (batch, len, dim)
# 注意力计算
candidate_expand = candidate_emb.unsqueeze(1).expand_as(history_emb)
attn_input = torch.cat([
candidate_expand, history_emb,
candidate_expand - history_emb,
candidate_expand * history_emb
], dim=-1)
attn_score = self.attention(attn_input).squeeze(-1)
attn_score = attn_score.masked_fill(history_mask == 0, float('-inf'))
attn_weight = torch.softmax(attn_score, dim=-1).unsqueeze(-1)
# 加权用户兴趣表示
user_interest = (attn_weight * history_emb).sum(dim=1)
# 预测
combined = torch.cat([user_interest, candidate_emb], dim=-1)
return torch.sigmoid(self.mlp(combined)).squeeze()冷启动问题
| 策略 | 适用场景 | 方法 |
|---|---|---|
| 热门推荐 | 新用户 | 推荐全局热门物品 |
| 内容特征 | 新物品 | 基于物品属性推荐 |
| 用户画像 | 新用户 | 基于注册信息推荐 |
| 探索性推荐 | 通用 | 多臂老虎机动态探索 |
| 跨域迁移 | 新平台 | 从其他平台迁移偏好 |
评估指标
def precision_at_k(actual, predicted, k):
"""Precision@K: 推荐列表中相关物品的比例"""
pred_k = predicted[:k]
return len(set(pred_k) & set(actual)) / k
def ndcg_at_k(actual, predicted, k):
"""NDCG@K: 归一化折扣累积增益"""
dcg = sum(
1 / np.log2(i + 2) for i, item in enumerate(predicted[:k]) if item in actual
)
idcg = sum(1 / np.log2(i + 2) for i in range(min(len(actual), k)))
return dcg / idcg if idcg > 0 else 0
def mean_average_precision(actual_list, predicted_list, k):
"""MAP@K: 平均精度均值"""
aps = []
for actual, predicted in zip(actual_list, predicted_list):
hits = 0
precision_sum = 0
for i, item in enumerate(predicted[:k]):
if item in actual:
hits += 1
precision_sum += hits / (i + 1)
ap = precision_sum / min(len(actual), k) if actual else 0
aps.append(ap)
return np.mean(aps)总结
推荐算法从经典的协同过滤演进到深度学习时代,核心趋势是更精确的特征交叉(DeepFM)、更细粒度的用户兴趣建模(DIN)以及多目标优化。工业推荐系统通常采用多阶段架构(召回 -> 粗排 -> 精排 -> 重排),结合多种算法的优势。评估指标应与业务目标对齐,离线指标(NDCG、MAP)需要通过在线 A/B 测试验证。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于