推荐系统(五)—— Embedding 表示学习
Chen Kai BOSS

在推荐系统中,如何将用户和物品表示为计算机能够理解和计算的向量,是一个核心问题。 Embedding(嵌入)技术正是解决这一问题的关键。从早期的协同过滤到如今的深度学习推荐系统, Embedding 技术经历了从简单到复杂、从浅层到深层的演进过程。

本文将深入探讨推荐系统中 Embedding 表示学习的理论基础与实践方法。我们会从最基础的 Embedding 概念出发,逐步深入到 Item2Vec 、 Node2Vec 、 Two-Tower 模型、 DSSM 等经典算法,并详细讲解负采样策略、 ANN 近邻检索技术,以及如何评估 Embedding 的质量。每个算法都会配有完整的代码实现,帮助读者真正理解和掌握这些技术。

Embedding 基础理论

什么是 Embedding

Embedding 是将高维稀疏的离散对象(如用户 ID 、物品 ID 、类别等)映射到低维稠密的连续向量空间中的技术。在推荐系统中,这些向量能够捕获对象之间的语义相似性:相似的物品在向量空间中距离更近,不相似的物品距离更远。

形式上,给定一个离散对象集合 , Embedding 函数 将每个对象 映射到一个 维向量 ,其中 通常远小于

Embedding 的优势

维度压缩:原始的特征空间可能是百万甚至千万维的(每个 ID 对应一维),而 Embedding 将其压缩到几十到几百维,大大降低了存储和计算成本。

语义捕获:通过训练, Embedding 能够学习到对象之间的潜在关系。例如,经常被同一用户点击的物品,其 Embedding 向量会更接近。

泛化能力:对于训练集中未出现的新物品,可以通过其属性或其他信息初始化 Embedding,然后通过少量数据快速学习。

可计算性:向量空间中的相似度计算(如余弦相似度、内积)比原始特征空间中的计算更高效。

Embedding 学习的基本原理

Embedding 学习的基本思路:相似的对象应该有相似的 Embedding 向量。这个"相似性"的定义取决于具体的任务和算法:

  • 协同过滤:被同一用户交互过的物品相似
  • 序列推荐:在序列中相邻的物品相似
  • 图结构:在图中有边连接的节点相似
  • 语义匹配:在语义空间中匹配的查询和文档相似

损失函数与优化目标

大多数 Embedding 学习算法都可以归结为优化以下形式的损失函数:

$$

L = _{(i,j) } (f(_i, j), y{ij}) + () $$

其中:

  • 是正样本对集合(相似的对象对)
  • 是相似度函数(如内积、余弦相似度)
  • 是标签(通常为 1 表示相似, 0 表示不相似)
  • 是损失函数(如交叉熵、均方误差)
  • 是正则化项
  • 是所有对象的 Embedding 矩阵

Item2Vec 与 Word2Vec 类比

Word2Vec 回顾

Word2Vec 是 Mikolov 等人在 2013 年提出的词向量学习算法,它通过"上下文预测词"或"词预测上下文"的方式学习词的 Embedding 。 Word2Vec 有两种架构:

Skip-gram:给定中心词,预测上下文词 CBOW( Continuous Bag of Words):给定上下文词,预测中心词

Skip-gram 的目标函数为:

$$

L = {t=1}^{T} {-c j c, j } p(w_{t+j} | w_t) $$

其中 通过 softmax 计算:

$$

p(w_{t+j} | w_t) = $$

Item2Vec 的核心思想

Item2Vec 将 Word2Vec 的思想迁移到推荐系统:将用户的行为序列看作句子,将物品看作词。如果两个物品经常在同一个用户的序列中出现(即"共现"),那么它们的 Embedding 应该相似。

Item2Vec 算法流程

  1. 构建序列:将每个用户的历史交互记录按时间排序,形成物品序列
  2. 定义上下文:对于序列中的每个物品,将其前后 个物品作为上下文
  3. 训练模型:使用 Skip-gram 或 CBOW 架构,学习物品的 Embedding

Item2Vec 完整实现

问题背景

在推荐系统中,如何学习物品的有效表示是一个核心问题。传统的协同过滤方法(如矩阵分解)需要显式的用户-物品交互矩阵,但在实际应用中,我们往往只有用户的行为序列数据(如点击序列、购买序列)。这些序列数据蕴含着丰富的物品共现信息:如果两个物品经常在同一个用户的行为序列中出现,它们很可能具有相似的性质或满足相似的用户需求。然而,如何从这些序列数据中提取物品的语义表示,并捕获物品之间的相似性,是一个挑战。

解决思路

Item2Vec 借鉴了自然语言处理中 Word2Vec 的思想,将推荐问题转化为序列建模问题。核心洞察是:用户的行为序列可以类比为自然语言中的句子,每个物品类比为一个词。如果两个物品在序列中经常共现(出现在相近的位置),它们的 Embedding 向量应该相似。具体而言, Item2Vec 采用 Skip-gram 架构:对于序列中的每个中心物品,模型学习预测其上下文物品(窗口内的其他物品)。通过最大化正样本对的相似度、最小化负样本对的相似度,模型能够学习到物品的向量表示,使得语义相似的物品在向量空间中距离更近。

设计考虑

在实现 Item2Vec 时,有几个关键的设计决策需要考虑:

  1. 两套 Embedding 矩阵:与 Word2Vec 类似,我们使用两套独立的 Embedding 矩阵——中心物品 Embedding 和上下文物品 Embedding 。这种设计提供了更大的模型容量,让模型能够更灵活地学习不同角色的表示。最终我们只使用中心物品 Embedding 作为物品的最终表示。
  2. 负采样策略:直接计算所有物品的 softmax 在物品数量很大时计算开销巨大。负采样技术通过随机采样少量负样本(通常 5-20 个)来近似完整的 softmax,大幅提升训练效率。负采样分布采用频率的 3/4 次幂,既避免高频词被过度采样,又给低频词一定的学习机会。
  3. 窗口大小选择:上下文窗口大小决定了模型考虑多远的共现关系。窗口太小(如 1-2)只能捕获局部共现,窗口太大(如 10+)可能引入噪声。通常窗口大小设置为 3-5,在捕获共现关系和计算效率之间取得平衡。
  4. 数值稳定性:内积计算可能产生很大的值,导致 sigmoid 函数溢出。需要在计算损失前对得分进行裁剪( clamp),限制在合理范围内(如[-10, 10])。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import defaultdict
from tqdm import tqdm

class Item2Vec(nn.Module):
"""
Item2Vec 模型实现
基于 Skip-gram 架构,通过预测上下文物品来学习物品的向量表示

核心思想:如果两个物品经常在同一个用户的行为序列中出现(共现),
那么它们的 Embedding 向量应该相似
"""
def __init__(self, vocab_size, embedding_dim):
"""
初始化 Item2Vec 模型

参数:
vocab_size: 物品词汇表大小(即物品总数)
embedding_dim: Embedding 向量的维度
"""
super(Item2Vec, self).__init__()
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim

# 中心词(物品)的 Embedding 矩阵
# 这是我们最终要使用的物品表示
self.center_embeddings = nn.Embedding(vocab_size, embedding_dim)

# 上下文词(物品)的 Embedding 矩阵
# 这是一个辅助的 Embedding,用于训练过程
# Word2Vec 论文建议使用两套独立的 Embedding 可以提高训练效果
self.context_embeddings = nn.Embedding(vocab_size, embedding_dim)

# 初始化中心词 Embedding:使用小范围的均匀分布
# 这样可以避免初始值过大导致的梯度爆炸问题
self.center_embeddings.weight.data.uniform_(-0.5 / embedding_dim, 0.5 / embedding_dim)

# 初始化上下文词 Embedding:全部置零
# 上下文 Embedding 从零开始学习可以提供更好的训练稳定性
self.context_embeddings.weight.data.zero_()

def forward(self, center_words, context_words, neg_words):
"""
前向传播:计算损失函数

使用负采样( Negative Sampling)技术简化 softmax 计算:
- 对于每个正样本对(中心词-上下文词),最大化它们的相似度
- 对于每个负样本对(中心词-负样本词),最小化它们的相似度

参数:
center_words: [batch_size] 中心物品 ID
context_words: [batch_size] 上下文物品 ID(正样本)
neg_words: [batch_size, num_neg] 负采样物品 ID

返回:
loss: 标量,负对数似然损失
"""
# 获取中心词的 Embedding 向量
# 形状: [batch_size, embedding_dim]
center_emb = self.center_embeddings(center_words)

# 获取正样本上下文词的 Embedding 向量
# 形状: [batch_size, embedding_dim]
context_emb = self.context_embeddings(context_words)

# 获取负样本的 Embedding 向量
# 形状: [batch_size, num_neg, embedding_dim]
neg_emb = self.context_embeddings(neg_words)

# 计算正样本得分:中心词与上下文词的内积
# 内积越大,表示两个物品越相似
# 形状: [batch_size]
pos_score = torch.sum(center_emb * context_emb, dim=1)

# 限制得分范围,防止数值溢出
# 这是一个重要的数值稳定性技巧
pos_score = torch.clamp(pos_score, max=10, min=-10)

# 计算负样本得分:中心词与每个负样本的内积
# 使用 bmm (batch matrix multiplication) 高效计算
# neg_emb: [batch_size, num_neg, embedding_dim]
# center_emb.unsqueeze(2): [batch_size, embedding_dim, 1]
# 结果形状: [batch_size, num_neg]
neg_score = torch.bmm(neg_emb, center_emb.unsqueeze(2)).squeeze(2)

# 同样限制负样本得分范围
neg_score = torch.clamp(neg_score, max=10, min=-10)

# 计算正样本损失:希望 sigmoid(pos_score) 接近 1
# 使用 -log(sigmoid(x)) 作为损失函数
# 加上 1e-10 防止 log(0) 导致 NaN
pos_loss = -torch.log(torch.sigmoid(pos_score) + 1e-10)

# 计算负样本损失:希望 sigmoid(-neg_score) 接近 1
# 即希望 sigmoid(neg_score) 接近 0,表示中心词与负样本不相似
# 对所有负样本的损失求和
neg_loss = -torch.log(torch.sigmoid(-neg_score) + 1e-10).sum(dim=1)

# 总损失 = 正样本损失 + 负样本损失,然后对 batch 求平均
return (pos_loss + neg_loss).mean()

def get_embeddings(self):
"""
获取最终的物品 Embedding

通常我们使用中心词 Embedding 作为最终的物品表示,
因为它直接对应于我们要建模的物品

返回:
embeddings: numpy array,形状 [vocab_size, embedding_dim]
"""
return self.center_embeddings.weight.data.cpu().numpy()


class Item2VecTrainer:
"""
Item2Vec 训练器

负责管理整个训练流程,包括:
1. 从用户行为序列生成训练样本
2. 构建负采样分布
3. 执行训练过程
"""

def __init__(self, sequences, vocab_size, embedding_dim=128, window_size=5,
num_negatives=5, batch_size=256, learning_rate=0.01):
"""
初始化训练器

参数:
sequences: 用户行为序列列表,每个序列是物品 ID 的列表
例如: [[0,1,2,3], [1,2,4,5], ...]
vocab_size: 物品词汇表大小(物品总数)
embedding_dim: Embedding 维度
window_size: 上下文窗口大小,表示中心词前后各取多少个词
例如 window_size=2 表示取前后各 2 个词
num_negatives: 负采样数量,每个正样本对应的负样本数
batch_size: 训练批次大小
learning_rate: 学习率
"""
self.sequences = sequences
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.window_size = window_size
self.num_negatives = num_negatives
self.batch_size = batch_size

# 构建词频表(用于负采样)
# 统计每个物品在所有序列中出现的次数
self.word_freq = self._build_word_freq()

# 构建负采样分布
# 根据词频构建采样概率分布,高频词有更高的被采样概率
self.word_dist = self._build_word_distribution()

# 初始化 Item2Vec 模型
self.model = Item2Vec(vocab_size, embedding_dim)

# 使用 Adam 优化器
# Adam 对学习率不敏感,能够自适应调整每个参数的学习率
self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

# 生成训练样本对
# 将所有序列转换为(中心词,上下文词)对
self.train_pairs = self._generate_training_pairs()

def _build_word_freq(self):
"""
统计词频

遍历所有序列,统计每个物品出现的次数
这个频率将用于构建负采样分布

返回:
word_freq: 字典,{物品 ID: 出现次数}
"""
word_freq = defaultdict(int)
for seq in self.sequences:
for word in seq:
word_freq[word] += 1
return dict(word_freq)

def _build_word_distribution(self):
"""
构建负采样分布(使用幂律分布)

Word2Vec 论文建议使用频率的 3/4 次幂作为采样概率,
这样可以平衡高频词和低频词的采样概率:
- 如果直接使用频率,高频词会被过度采样
- 如果使用均匀分布,低频词会被过度采样
- 使用 0.75 次幂是一个经验性的折中方案

返回:
word_dist: numpy array,每个物品的采样概率
"""
# 构建频率数组,索引对应物品 ID
word_freq_array = np.array([self.word_freq.get(i, 0) for i in range(self.vocab_size)])

# 使用 3/4 次幂( 0.75)
# 这是 Word2Vec 论文的经验设置
word_dist = np.power(word_freq_array, 0.75)

# 归一化为概率分布
word_dist = word_dist / word_dist.sum()

return word_dist

def _generate_training_pairs(self):
"""
生成训练样本对

对每个序列中的每个物品,提取其上下文物品,
形成(中心物品,上下文物品)训练样本对

例如:序列 [0, 1, 2, 3, 4], window_size=2
- 物品 2 的上下文:[0, 1, 3, 4]
- 生成样本对:(2,0), (2,1), (2,3), (2,4)

返回:
pairs: 训练样本对列表 [(中心词, 上下文词), ...]
"""
pairs = []

# 遍历每个用户的行为序列
for seq in self.sequences:
# 遍历序列中的每个物品作为中心词
for i, center_word in enumerate(seq):
# 确定上下文窗口的范围
# start: 窗口起始位置(不能小于 0)
# end: 窗口结束位置(不能超过序列长度)
start = max(0, i - self.window_size)
end = min(len(seq), i + self.window_size + 1)

# 遍历窗口内的所有物品作为上下文词
for j in range(start, end):
if j != i: # 排除中心词本身
context_word = seq[j]
pairs.append((center_word, context_word))

return pairs

def _negative_sampling(self, center_words, num_samples):
"""
负采样

对每个中心词,根据构建的分布随机采样 num_samples 个负样本
负样本是那些不在当前中心词上下文中的物品

参数:
center_words: numpy array,中心词 ID 列表
num_samples: 每个中心词对应的负样本数量

返回:
neg_samples: numpy array,形状 [batch_size, num_samples]
"""
batch_size = len(center_words)
neg_samples = []

# 为每个负样本位置独立采样
for _ in range(num_samples):
# 从构建的分布中采样
# p=self.word_dist 表示按照预先计算的概率分布采样
sampled = np.random.choice(
self.vocab_size, # 从所有物品中采样
size=batch_size, # 为 batch 中的每个样本采样一个
p=self.word_dist # 使用构建的概率分布
)
neg_samples.append(sampled)

# 转置:从 [num_samples, batch_size] 转为 [batch_size, num_samples]
return np.array(neg_samples).T

def train(self, num_epochs=10):
"""
训练模型

执行多个 epoch 的训练,每个 epoch 中:
1. 打乱训练样本(提高泛化能力)
2. 按 batch 训练
3. 进行负采样
4. 计算损失并更新参数

参数:
num_epochs: 训练轮数

返回:
embeddings: 训练好的物品 Embedding
"""
self.model.train() # 设置为训练模式

for epoch in range(num_epochs):
total_loss = 0
num_batches = 0

# 打乱训练样本
# 每个 epoch 都打乱样本顺序可以提高模型的泛化能力
np.random.shuffle(self.train_pairs)

# 批量训练
# 使用 tqdm 显示训练进度
for i in tqdm(range(0, len(self.train_pairs), self.batch_size),
desc=f"Epoch {epoch+1}/{num_epochs}"):
# 获取当前 batch 的样本
batch_pairs = self.train_pairs[i:i+self.batch_size]

# 提取中心词和上下文词
center_words = torch.LongTensor([p[0] for p in batch_pairs])
context_words = torch.LongTensor([p[1] for p in batch_pairs])

# 执行负采样
# 为每个正样本对采样 num_negatives 个负样本
neg_words = self._negative_sampling(center_words.numpy(), self.num_negatives)
neg_words = torch.LongTensor(neg_words)

# 梯度清零
# PyTorch 默认会累积梯度,需要手动清零
self.optimizer.zero_grad()

# 前向传播:计算损失
loss = self.model(center_words, context_words, neg_words)

# 反向传播:计算梯度
loss.backward()

# 更新参数
self.optimizer.step()

# 累积损失(用于打印)
total_loss += loss.item()
num_batches += 1

# 计算并打印平均损失
avg_loss = total_loss / num_batches
print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")

# 返回训练好的 Embedding
return self.model.get_embeddings()


# 使用示例
def example_item2vec():
"""
Item2Vec 使用示例

展示如何使用 Item2VecTrainer 训练模型并获取物品 Embedding
"""
# 模拟用户行为序列
# 每个序列代表一个用户按时间顺序的交互历史
# 例如:用户 1 先点击了物品 0,然后是物品 1,接着是 2,依此类推
sequences = [
[0, 1, 2, 3, 4], # 用户 1 的交互序列
[1, 2, 5, 6, 7], # 用户 2 的交互序列
[0, 3, 4, 8, 9], # 用户 3 的交互序列
[2, 5, 6, 10, 11], # 用户 4 的交互序列
]

vocab_size = 12 # 物品总数(物品 ID 从 0 到 11)
embedding_dim = 64 # Embedding 维度

# 创建训练器
trainer = Item2VecTrainer(
sequences=sequences,
vocab_size=vocab_size,
embedding_dim=embedding_dim,
window_size=2, # 上下文窗口大小为 2,即考虑前后各 2 个物品
num_negatives=5, # 每个正样本对应 5 个负样本
batch_size=32, # 批次大小
learning_rate=0.01 # 学习率
)

# 训练模型
embeddings = trainer.train(num_epochs=5)

# 输出结果
print(f"Embedding shape: {embeddings.shape}")
print(f"Item 0 embedding (前 5 维): {embeddings[0][:5]}")

# 计算物品相似度
def cosine_similarity(a, b):
"""
计算两个向量的余弦相似度

余弦相似度 = 向量内积 / (向量模长的乘积)
取值范围 [-1, 1],越接近 1 表示越相似
"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# 查找与物品 0 最相似的物品
item_0_emb = embeddings[0]
similarities = [cosine_similarity(item_0_emb, embeddings[i]) for i in range(vocab_size)]
top_k = np.argsort(similarities)[::-1][:5] # 取相似度最高的 5 个物品
print(f"与物品 0 最相似的 5 个物品: {top_k}")

关键点解读

Item2Vec 的实现包含几个关键组件,每个组件都有其特定的作用:

  1. 序列构建与窗口采样:将用户的历史交互按时间排序形成物品序列,这是模型的基础数据。对于序列中的每个中心物品,我们提取其窗口内的其他物品作为上下文。window_size=2 表示考虑前后各 2 个物品,这样既能捕获局部共现关系,又不会引入过多噪声。窗口大小的选择需要在捕获共现关系和计算效率之间权衡:窗口越大,训练样本越多,但可能引入不相关的共现。
  2. 负采样机制:负采样是 Item2Vec 训练效率的关键。每个正样本对(中心物品-上下文物品)需要对应多个负样本对(中心物品-随机物品)。num_negatives=5 表示每个正样本对应 5 个负样本,这个数量通常设置为 5-20 之间。负样本数量越多,模型对不相似物品的区分能力越强,但训练时间也线性增长。
  3. 两套 Embedding 矩阵:中心物品和上下文物品使用独立的 Embedding 矩阵,这是 Word2Vec 论文中的设计。这种设计提供了更大的模型容量,让模型能够学习不同角色的表示。虽然增加了参数量,但实验表明这种设计能够提升模型效果。最终我们只使用中心物品 Embedding 作为物品的最终表示。
  4. 损失函数设计:使用负对数似然损失,对正样本最大化 sigmoid(内积),对负样本最小化 sigmoid(内积)。这种设计使得相似物品的内积增大,不相似物品的内积减小,从而学习到有意义的 Embedding 。

设计权衡

在 Item2Vec 的实现中,存在多个设计权衡:

  1. 窗口大小 vs 计算效率:窗口越大,能够捕获更远的共现关系,但训练样本数量会急剧增加( O(n ²))。通常窗口大小设置为 3-5,在效果和效率之间取得平衡。
  2. 负采样数量 vs 训练时间:负采样数量越多,模型对负样本的区分能力越强,但每个 batch 的计算时间也线性增长。通常设置为 5-20 个,在效果和效率之间权衡。
  3. Embedding 维度 vs 表达能力:维度越高,模型的表达能力越强,但参数量也越大,且可能过拟合。通常设置为 64-256 维,根据物品数量和计算资源选择。
  4. 频率分布 vs 均匀分布:负采样使用频率的 3/4 次幂分布,既避免高频词被过度采样,又给低频词学习机会。如果使用均匀分布,低频词会被过度采样;如果直接使用频率,高频词会被过度采样。

常见问题

  1. 如何处理冷启动物品? 对于训练集中未出现的新物品, Item2Vec 无法直接学习其 Embedding 。常见的解决方案包括:( 1)使用物品的内容特征(如类别、标签)通过额外的神经网络生成初始 Embedding;( 2)使用相似物品的 Embedding 作为初始化;( 3)在训练时为新物品分配随机初始化的 Embedding,通过少量交互快速学习。
  2. 数值溢出问题:内积计算可能产生很大的值(特别是 Embedding 未归一化时),导致 sigmoid 函数溢出。代码中使用 torch.clamp 将得分限制在 [-10, 10] 范围内,这是一个重要的数值稳定性技巧。
  3. 负样本重复问题:随机负采样可能采样到正样本(即上下文窗口内的物品)。严格来说应该排除正样本,但在物品数量很大时(如百万级),采样到正样本的概率很小(通常<0.1%),可以忽略。如果物品数量较小,可以在采样时排除正样本。
  4. 序列长度不一致:实际应用中用户序列长度差异很大,从几个物品到数百个物品不等。常见的处理方式包括:( 1)截断:只保留最近 N 个物品(如最近 50 个);( 2)填充:将短序列补齐到固定长度;( 3)动态批处理:将相似长度的序列放在同一个 batch 中。
  5. 如何处理时间信息? Item2Vec 忽略了序列中的时间信息,将窗口内的所有物品同等对待。如果需要考虑时间衰减,可以在损失函数中为不同位置的上下文物品赋予不同的权重,距离中心物品越远权重越小。

使用示例

下面的示例展示了如何使用 Item2Vec 训练模型并获取物品 Embedding:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 准备用户行为序列数据
sequences = [
[0, 1, 2, 3, 4], # 用户 1 的交互序列
[1, 2, 5, 6, 7], # 用户 2 的交互序列
[0, 3, 4, 8, 9], # 用户 3 的交互序列
]

# 创建训练器并训练模型
trainer = Item2VecTrainer(
sequences=sequences,
vocab_size=12,
embedding_dim=64,
window_size=2,
num_negatives=5,
batch_size=32,
learning_rate=0.01
)

# 训练模型(通常需要 5-20 个 epoch)
embeddings = trainer.train(num_epochs=10)

# 使用 Embedding 进行相似物品推荐
item_emb = embeddings[0] # 获取物品 0 的 Embedding
similarities = [cosine_similarity(item_emb, embeddings[i]) for i in range(12)]
top_k_items = np.argsort(similarities)[::-1][:5] # 找到最相似的 5 个物品

在实际应用中, Item2Vec 通常作为特征提取器,学习到的 Embedding 可以用于:( 1)相似物品推荐;( 2)作为下游模型的输入特征;( 3)物品聚类和可视化分析。

Item2Vec 的优缺点

优点

  • 简单直观,易于理解和实现
  • 不需要额外的特征信息,只需要用户行为序列
  • 能够捕获物品之间的共现关系

缺点

  • 忽略了用户信息,所有用户共享同一套物品 Embedding
  • 对于新物品(冷启动)效果较差
  • 序列中的位置信息利用不充分(只考虑窗口内的共现)

Node2Vec 图嵌入

图嵌入的基本概念

在推荐系统中,用户和物品可以构成一个二部图( Bipartite Graph):用户和物品是两类节点,交互行为是边。图嵌入的目标是将图中的节点映射到低维向量空间,使得图中相似的节点(如连接相似、结构相似)在向量空间中距离更近。

Node2Vec 算法原理

Node2Vec 是 Grover 和 Leskovec 在 2016 年提出的图嵌入算法,它通过有偏随机游走( Biased Random Walk)来生成节点的上下文,然后使用 Skip-gram 学习节点的 Embedding 。

有偏随机游走: Node2Vec 的关键创新在于定义了一种灵活的游走策略,通过参数 控制游走的偏向性:

  • ( return parameter):控制回到上一个节点的概率
  • ( in-out parameter):控制走向远离起始节点的概率

从节点 游走到下一个节点 的未归一化概率为:

其中 是节点 (上一个节点)到节点 的最短距离。

Node2Vec 完整实现

问题背景

在推荐系统中,用户和物品之间的交互可以自然地建模为图结构:用户和物品是两类节点,交互行为是边。这种图结构蕴含着丰富的语义信息:连接相似的用户(如都购买了相同物品)可能具有相似的兴趣;结构相似的物品(如被相似用户购买)可能具有相似的性质。然而,如何从图结构中学习节点(用户和物品)的有效表示,使得相似节点在向量空间中距离更近,是一个挑战。传统的图嵌入方法(如 DeepWalk)使用标准的随机游走生成节点序列,但这种方法无法灵活地控制游走策略,难以同时捕获图的局部结构(如社区结构)和全局结构(如节点角色)。

解决思路

Node2Vec 通过引入有偏随机游走( Biased Random Walk)来解决这个问题。基本思路:通过两个参数 p 和 q 控制游走策略,使得游走过程能够在深度优先搜索( DFS)和广度优先搜索( BFS)之间灵活切换。具体而言, p( return parameter)控制返回上一个节点的概率, q( in-out parameter)控制探索远离起始节点的概率。当 p 较小、 q 较大时,游走倾向于 BFS,能够捕获局部社区结构;当 p 较大、 q 较小时,游走倾向于 DFS,能够捕获全局结构。通过这种灵活的游走策略, Node2Vec 能够生成多样化的节点序列,然后使用 Skip-gram 学习节点的 Embedding,使得结构相似或连接相似的节点在向量空间中距离更近。

设计考虑

在实现 Node2Vec 时,需要考虑以下几个关键设计:

  1. 有偏游走策略:游走概率的计算需要考虑上一个节点、当前节点和候选节点的关系。对于每个候选节点,需要计算其与上一个节点的最短距离,然后根据距离赋予不同的权重。这种设计使得游走过程能够灵活地在局部探索和全局探索之间平衡。
  2. 图预处理:为了加速游走生成,需要预先计算并存储每个节点的邻居信息。这样可以避免在训练时重复查询图结构,显著提高游走生成的速度。对于大规模图,这种预处理是必要的。
  3. 游走序列生成:对每个节点执行多次游走(通常 10-50 次),每次游走生成长度固定的序列(通常 50-100)。游走次数越多,能够捕获的图结构信息越丰富,但计算开销也越大。
  4. Skip-gram 训练:将生成的游走序列看作"句子",使用 Skip-gram 学习节点 Embedding 。这与 Item2Vec 的训练过程类似,但数据来源不同: Item2Vec 使用用户行为序列, Node2Vec 使用图上的游走序列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
import numpy as np
import networkx as nx
import torch
import torch.nn as nn
import torch.optim as optim
from collections import defaultdict
from tqdm import tqdm
import random

class Node2Vec:
"""
Node2Vec 图嵌入算法

核心创新:有偏随机游走( Biased Random Walk)
- 参数 p 控制返回上一个节点的概率( return parameter)
- 参数 q 控制探索远离起点的概率( in-out parameter)
- p=q=1 时退化为标准的随机游走
"""
def __init__(self, graph, dimensions=128, walk_length=80, num_walks=10,
p=1.0, q=1.0, window_size=10, num_negatives=5):
"""
初始化 Node2Vec

参数:
graph: NetworkX 图对象(可以是有向图或无向图)
dimensions: Embedding 维度
walk_length: 每次游走的长度(生成的节点序列长度)
num_walks: 每个节点作为起点的游走次数
p: return parameter,控制返回上一个节点的倾向
- p < 1: 倾向于返回上一个节点(类似 BFS,关注局部结构)
- p > 1: 不倾向返回(类似 DFS,关注全局结构)
q: in-out parameter,控制探索新节点的倾向
- q < 1: 倾向于探索远离起点的节点( DFS)
- q > 1: 倾向于探索起点附近的节点( BFS)
window_size: Skip-gram 的上下文窗口大小
num_negatives: 负采样数量
"""
self.graph = graph
self.dimensions = dimensions
self.walk_length = walk_length
self.num_walks = num_walks
self.p = p
self.q = q
self.window_size = window_size
self.num_negatives = num_negatives

# 预处理图:为每个节点存储邻居信息
# 这样可以避免重复查询,提高游走速度
self._preprocess_graph()

# 生成游走序列
# 这是 Node2Vec 的核心:通过有偏游走捕获图结构
self.walks = self._generate_walks()

# 构建训练样本
# 将游走序列转换为(中心节点,上下文节点)对
self.train_pairs = self._generate_training_pairs()

# 构建节点频率分布(用于负采样)
self.node_freq = self._build_node_freq()
self.node_dist = self._build_node_distribution()

def _preprocess_graph(self):
"""
预处理图:为每个节点存储邻居信息

将邻居信息提前存储在字典中,避免训练时重复查询图结构,
显著提高游走生成的速度
"""
self.neighbors = {}
for node in self.graph.nodes():
# 获取节点的所有邻居并转为列表
self.neighbors[node] = list(self.graph.neighbors(node))

def _generate_walks(self):
"""
生成有偏随机游走序列

对每个节点,执行 num_walks 次游走,每次游走长度为 walk_length
总共生成 num_nodes * num_walks 条游走序列

返回:
walks: 游走序列列表,每个序列是节点 ID 的列表
"""
walks = []
nodes = list(self.graph.nodes())

# 执行 num_walks 轮游走
for _ in range(self.num_walks):
# 每轮都打乱节点顺序,增加随机性
random.shuffle(nodes)

# 从每个节点开始一次游走
for node in nodes:
walk = self._node2vec_walk(node)
walks.append(walk)

return walks

def _node2vec_walk(self, start_node):
"""
从起始节点开始的有偏随机游走

这是 Node2Vec 的核心算法:通过参数 p 和 q 控制游走的偏向性

游走策略:
1. 第一步:从起点随机选择一个邻居
2. 后续步骤:根据 p 和 q 参数计算每个邻居的权重,按权重采样

权重计算规则(设当前在节点 v,上一个节点是 t,候选下一个节点是 x):
- 如果 x == t(返回上一个节点):权重 = 1/p
- 如果 x 与 t 相邻(距离为 1):权重 = 1
- 如果 x 不与 t 相邻(距离为 2):权重 = 1/q

参数:
start_node: 起始节点 ID

返回:
walk: 游走序列(节点 ID 列表)
"""
walk = [start_node]

# 如果起点没有邻居,直接返回
if len(self.neighbors[start_node]) == 0:
return walk

# 第一步:随机选择一个邻居
# 第一步没有"上一个节点",所以只能随机选择
first_step = random.choice(self.neighbors[start_node])
walk.append(first_step)

# 后续步骤:根据 p 和 q 进行有偏选择
for _ in range(self.walk_length - 2):
current = walk[-1] # 当前节点
prev = walk[-2] # 上一个节点

# 获取当前节点的邻居
neighbors = self.neighbors[current]
if len(neighbors) == 0:
break # 如果没有邻居,游走结束

# 计算每个邻居的权重
weights = []
for neighbor in neighbors:
if neighbor == prev:
# 返回上一个节点:权重 = 1/p
weight = 1.0 / self.p
elif neighbor in self.neighbors[prev]:
# 邻居与上一个节点相邻(距离为 1):权重 = 1
weight = 1.0
else:
# 邻居与上一个节点不相邻(距离为 2):权重 = 1/q
weight = 1.0 / self.q
weights.append(weight)

# 归一化权重为概率分布
weights = np.array(weights)
weights = weights / weights.sum()

# 根据权重采样下一个节点
next_node = np.random.choice(neighbors, p=weights)
walk.append(next_node)

return walk

def _generate_training_pairs(self):
"""
从游走序列生成训练样本对

与 Item2Vec 类似,将游走序列中的节点对转换为训练样本
每个节点作为中心节点,其窗口内的节点作为上下文节点
"""
pairs = []
for walk in self.walks:
for i, center_node in enumerate(walk):
# 确定上下文窗口
start = max(0, i - self.window_size)
end = min(len(walk), i + self.window_size + 1)

for j in range(start, end):
if j != i:
context_node = walk[j]
pairs.append((center_node, context_node))

return pairs

def _build_node_freq(self):
"""
统计节点频率

遍历所有游走序列,统计每个节点出现的次数
用于构建负采样分布
"""
node_freq = defaultdict(int)
for walk in self.walks:
for node in walk:
node_freq[node] += 1
return dict(node_freq)

def _build_node_distribution(self):
"""
构建负采样分布

同样使用 0.75 次幂的频率分布
"""
num_nodes = len(self.graph.nodes())
node_freq_array = np.array([
self.node_freq.get(i, 0) for i in range(num_nodes)
])
# 加上 1e-8 防止除零
node_dist = np.power(node_freq_array + 1e-8, 0.75)
node_dist = node_dist / node_dist.sum()
return node_dist

def _negative_sampling(self, center_nodes, num_samples):
"""
负采样:为每个中心节点采样负样本节点
"""
batch_size = len(center_nodes)
num_nodes = len(self.graph.nodes())

neg_samples = []
for _ in range(num_samples):
sampled = np.random.choice(
num_nodes,
size=batch_size,
p=self.node_dist
)
neg_samples.append(sampled)

return np.array(neg_samples).T

def train(self, batch_size=256, num_epochs=10, learning_rate=0.01):
"""
训练 Node2Vec 模型

创建节点到索引的映射,然后使用 Skip-gram 训练节点 Embedding
"""
num_nodes = len(self.graph.nodes())

# 创建节点到索引的映射
# 因为神经网络的 Embedding 层需要连续的整数索引
self.node_to_idx = {node: idx for idx, node in enumerate(self.graph.nodes())}
self.idx_to_node = {idx: node for node, idx in self.node_to_idx.items()}

# 将节点对转换为索引对
train_pairs_idx = [
(self.node_to_idx[p[0]], self.node_to_idx[p[1]])
for p in self.train_pairs
]

# 初始化模型
model = Node2VecModel(num_nodes, self.dimensions)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

model.train()

for epoch in range(num_epochs):
total_loss = 0
num_batches = 0

# 打乱训练样本
np.random.shuffle(train_pairs_idx)

for i in tqdm(range(0, len(train_pairs_idx), batch_size),
desc=f"Epoch {epoch+1}/{num_epochs}"):
batch_pairs = train_pairs_idx[i:i+batch_size]

center_nodes = torch.LongTensor([p[0] for p in batch_pairs])
context_nodes = torch.LongTensor([p[1] for p in batch_pairs])

# 负采样
neg_nodes = self._negative_sampling(
center_nodes.numpy(),
self.num_negatives
)
neg_nodes = torch.LongTensor(neg_nodes)

# 前向传播
optimizer.zero_grad()
loss = model(center_nodes, context_nodes, neg_nodes)

# 反向传播
loss.backward()
optimizer.step()

total_loss += loss.item()
num_batches += 1

avg_loss = total_loss / num_batches
print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")

# 获取 Embedding
embeddings = model.get_embeddings()

# 构建节点到 Embedding 的映射
self.node_embeddings = {
self.idx_to_node[i]: embeddings[i]
for i in range(num_nodes)
}

return self.node_embeddings


class Node2VecModel(nn.Module):
"""
Node2Vec 的神经网络模型

与 Item2Vec 的结构完全相同,只是语义上是节点而非物品
"""
def __init__(self, num_nodes, embedding_dim):
super(Node2VecModel, self).__init__()
self.num_nodes = num_nodes
self.embedding_dim = embedding_dim

# 中心节点和上下文节点的 Embedding
self.center_embeddings = nn.Embedding(num_nodes, embedding_dim)
self.context_embeddings = nn.Embedding(num_nodes, embedding_dim)

# 初始化
self.center_embeddings.weight.data.uniform_(-0.5 / embedding_dim, 0.5 / embedding_dim)
self.context_embeddings.weight.data.zero_()

def forward(self, center_nodes, context_nodes, neg_nodes):
"""
前向传播:与 Item2Vec 完全相同的损失函数
"""
center_emb = self.center_embeddings(center_nodes)
context_emb = self.context_embeddings(context_nodes)
neg_emb = self.context_embeddings(neg_nodes)

pos_score = torch.sum(center_emb * context_emb, dim=1)
pos_score = torch.clamp(pos_score, max=10, min=-10)

neg_score = torch.bmm(neg_emb, center_emb.unsqueeze(2)).squeeze(2)
neg_score = torch.clamp(neg_score, max=10, min=-10)

pos_loss = -torch.log(torch.sigmoid(pos_score) + 1e-10)
neg_loss = -torch.log(torch.sigmoid(-neg_score) + 1e-10).sum(dim=1)

return (pos_loss + neg_loss).mean()

def get_embeddings(self):
"""获取节点 Embedding"""
return self.center_embeddings.weight.data.cpu().numpy()


# 使用示例
def example_node2vec():
"""
Node2Vec 使用示例

构建一个用户-物品二部图,然后学习节点的 Embedding
"""
# 创建一个简单的二部图(用户-物品图)
G = nx.Graph()

# 添加用户节点( 0-4)
users = [0, 1, 2, 3, 4]
# 添加物品节点( 10-19)
items = list(range(10, 20))

# 添加边:用户-物品交互
# 例如:用户 0 与物品 10 、 11 、 12 有交互
edges = [
(0, 10), (0, 11), (0, 12),
(1, 11), (1, 12), (1, 13),
(2, 12), (2, 13), (2, 14),
(3, 13), (3, 14), (3, 15),
(4, 14), (4, 15), (4, 16),
]

G.add_edges_from(edges)

# 创建 Node2Vec 实例
node2vec = Node2Vec(
graph=G,
dimensions=64, # Embedding 维度
walk_length=20, # 每次游走 20 步
num_walks=10, # 每个节点作为起点游走 10 次
p=1.0, # return parameter
q=1.0, # in-out parameter
window_size=5, # 上下文窗口大小
num_negatives=5 # 负采样数量
)

# 训练
embeddings = node2vec.train(
batch_size=128,
num_epochs=10,
learning_rate=0.01
)

print(f"Embedding for node 0: {embeddings[0][:5]}")
print(f"Embedding for node 10: {embeddings[10][:5]}")

关键点解读

Node2Vec 的实现包含几个关键组件,每个组件都有其特定的作用:

  1. 有偏随机游走机制:这是 Node2Vec 的核心创新。游走概率的计算需要考虑上一个节点、当前节点和候选节点的关系。对于每个候选节点,根据其与上一个节点的最短距离赋予不同的权重:如果候选节点是上一个节点(距离为 0),权重为 1/p;如果候选节点与上一个节点相邻(距离为 1),权重为 1;如果候选节点与上一个节点不相邻(距离为 2),权重为 1/q 。通过调整 p 和 q,可以控制游走策略: p 小(如 0.5)时倾向返回上一个节点,类似 BFS,能够捕获局部社区结构; q 小(如 0.5)时倾向探索远离起点的节点,类似 DFS,能够捕获全局结构。
  2. 图预处理优化:为了加速游走生成,代码预先计算并存储每个节点的邻居信息。这样可以避免在训练时重复查询图结构,显著提高游走生成的速度。对于大规模图(百万级节点),这种预处理是必要的,可以将游走生成时间从数小时降低到数分钟。
  3. 游走序列生成:对每个节点执行多次游走(num_walks,通常 10-50 次),每次游走生成长度固定的序列(walk_length,通常 50-100)。游走次数越多,能够捕获的图结构信息越丰富,但计算开销也越大。通常 num_walks * walk_length 的乘积决定了生成的序列数量,需要在效果和效率之间权衡。
  4. Skip-gram 训练:将生成的游走序列看作"句子",使用 Skip-gram 学习节点 Embedding 。这与 Item2Vec 的训练过程类似,但数据来源不同: Item2Vec 使用用户行为序列, Node2Vec 使用图上的游走序列。通过最大化正样本对的相似度、最小化负样本对的相似度,模型能够学习到节点的向量表示。

设计权衡

在 Node2Vec 的实现中,存在多个设计权衡:

  1. 游走策略 vs 计算复杂度:有偏游走需要计算每个候选节点与上一个节点的最短距离,这在稠密图中计算开销较大。对于大规模图,可以使用近似方法(如只考虑 2 跳邻居)来加速计算。
  2. 游走数量 vs 训练时间num_walkswalk_length 的乘积决定了生成的序列数量。数量越多, Embedding 质量越好,但训练时间也线性增长。通常设置 num_walks=10-50walk_length=50-100,在效果和效率之间取得平衡。
  3. p 和 q 参数选择:这两个参数控制游走策略,没有通用的最优值,需要根据具体任务调整。对于同质性强的图(如社交网络),倾向于使用较小的 p 和较大的 q;对于结构等价性强的图(如知识图谱),倾向于使用较大的 p 和较小的 q 。建议从 p=1, q=1 开始,然后尝试不同的组合。
  4. 窗口大小 vs 上下文范围: Skip-gram 的窗口大小决定了模型考虑多远的上下文。窗口越大,能够捕获更远的共现关系,但训练样本也会急剧增加。通常窗口大小设置为 5-10,在捕获共现关系和计算效率之间权衡。

常见问题

  1. 如何处理有向图? Node2Vec 默认支持无向图,对于有向图,需要修改游走策略:只考虑出边邻居,忽略入边邻居。这样可以保持游走的方向性,但可能丢失一些结构信息。
  2. 如何处理加权图? 对于加权图,可以在游走概率计算中考虑边的权重:权重越大的边,被选择的概率越高。这样可以使得游走更倾向于沿着重要边进行,捕获更重要的结构信息。
  3. 如何处理大规模图? 对于百万级甚至千万级节点的大规模图,完整的游走生成可能非常耗时。可以采用以下优化策略:( 1)并行化游走生成,利用多核 CPU 或 GPU 加速;( 2)使用采样方法,只对部分节点进行游走;( 3)使用近似方法,如只考虑 2 跳邻居。
  4. 如何选择 Embedding 维度? Embedding 维度影响模型的表达能力和计算开销。维度越高,模型的表达能力越强,但参数量也越大,且可能过拟合。通常设置为 64-256 维,根据节点数量和计算资源选择。对于大规模图,可以使用较小的维度(如 64-128)以节省内存。
  5. 如何处理新节点? 对于训练集中未出现的新节点, Node2Vec 无法直接学习其 Embedding 。常见的解决方案包括:( 1)使用节点的特征(如度、邻居特征)通过额外的神经网络生成初始 Embedding;( 2)使用相似节点的 Embedding 作为初始化;( 3)在训练时为新节点分配随机初始化的 Embedding,通过少量交互快速学习。

使用示例

下面的示例展示了如何使用 Node2Vec 训练模型并获取节点 Embedding:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 构建用户-物品二部图
G = nx.Graph()
edges = [(0, 10), (0, 11), (1, 11), (1, 12), ...] # 用户-物品交互边
G.add_edges_from(edges)

# 创建 Node2Vec 实例并训练
node2vec = Node2Vec(
graph=G,
dimensions=64,
walk_length=20,
num_walks=10,
p=1.0, # return parameter
q=1.0, # in-out parameter
window_size=5,
num_negatives=5
)

# 训练模型
embeddings = node2vec.train(batch_size=128, num_epochs=10)

# 使用 Embedding 进行相似节点推荐
user_emb = embeddings[0] # 获取用户 0 的 Embedding
similarities = [cosine_similarity(user_emb, embeddings[i]) for i in range(num_nodes)]
top_k_users = np.argsort(similarities)[::-1][:5] # 找到最相似的 5 个用户

在实际应用中, Node2Vec 通常用于:( 1)学习用户和物品的 Embedding,用于相似度计算和推荐;( 2)作为下游模型的输入特征;( 3)节点聚类和可视化分析;( 4)图结构分析和社区发现。

  • 节点索引映射: NetworkX 的节点 ID 可以是任意类型(字符串、整数等),但 PyTorch 的 Embedding 层需要连续的整数索引,所以需要建立映射。

可能的坑

  1. 内存占用:游走序列会占用大量内存。如果图很大,考虑分批生成游走或使用流式处理。
  2. 孤立节点:如果图中有孤立节点(没有邻居),游走会立即结束。需要在构图时注意处理。
  3. 有向图 vs 无向图: Node2Vec 可以用于有向图和无向图。推荐系统中的用户-物品图通常是无向图。

Node2Vec 参数调优

p 和 q 的选择

  • 小、 小:倾向于深度优先搜索( DFS),捕获结构相似性
  • 大、 大:倾向于广度优先搜索( BFS),捕获同质性(连接相似性)
  • :标准的随机游走

walk_length 和 num_walks

  • walk_length:每次游走的长度,通常设置为 40-100
  • num_walks:每个节点开始的游走次数,通常设置为 10-20

Two-Tower 模型详解

Two-Tower 架构概述

Two-Tower(双塔)模型是推荐系统中广泛使用的架构,它将用户和物品分别通过两个独立的"塔"( Tower)网络映射到同一个向量空间,然后通过向量相似度(如内积、余弦相似度)计算匹配分数。

架构特点

  • 用户塔( User Tower):输入用户特征,输出用户 Embedding
  • 物品塔( Item Tower):输入物品特征,输出物品 Embedding
  • 相似度计算:用户 Embedding 和物品 Embedding 的内积或余弦相似度

Two-Tower 的数学形式

给定用户特征 和物品特征 , Two-Tower 模型计算:

$$

s(u, i) = _u^T _i $$

其中 分别是用户塔和物品塔的网络, 是各自的参数。

Two-Tower 完整实现

问题背景

在推荐系统中,需要计算用户对物品的兴趣分数,以决定是否向用户推荐某个物品。传统的协同过滤方法(如矩阵分解)需要显式的用户-物品交互矩阵,但在实际应用中,我们往往有丰富的用户特征(如年龄、性别、历史行为)和物品特征(如类别、标签、内容描述)。如何利用这些特征来学习用户和物品的表示,并高效地计算匹配分数,是一个挑战。此外,在实际的推荐系统中,需要在毫秒级时间内为每个用户从百万甚至千万级物品库中检索出最相关的物品,这就要求模型能够支持高效的相似度检索。

解决思路

Two-Tower(双塔)模型通过将用户和物品分别通过两个独立的神经网络("塔")映射到同一个低维向量空间,然后通过向量相似度(如内积、余弦相似度)计算匹配分数。这种设计的核心优势在于:( 1)用户塔和物品塔可以独立设计,分别处理不同类型的特征;( 2)训练完成后,可以预先计算所有物品的 Embedding 并建立索引,在推理时只需要计算用户 Embedding,然后通过高效的向量检索(如 FAISS)找到最相似的物品,大幅提升检索效率;( 3)两个塔的参数可以独立更新,训练过程更加灵活。

设计考虑

在实现 Two-Tower 模型时,需要考虑以下几个关键设计:

  1. 塔的网络结构:用户塔和物品塔通常采用多层全连接网络,每层包含线性变换、批归一化、激活函数和 Dropout 。网络深度和宽度需要根据特征维度和数据规模调整:特征维度高、数据量大时可以使用更深的网络;特征维度低、数据量小时可以使用较浅的网络以避免过拟合。
  2. Embedding 归一化:为了使用余弦相似度,通常对用户和物品的 Embedding 进行 L2 归一化。归一化后的内积等价于余弦相似度,取值范围为 [-1, 1]。归一化还可以稳定训练过程,避免 Embedding 的模长过大。
  3. 损失函数选择: Two-Tower 模型可以使用多种损失函数:( 1)二元交叉熵损失( BCE),适用于点击率预估任务;( 2)对比学习损失(如 InfoNCE),适用于学习通用表示;( 3)排序损失(如 BPR),适用于学习相对排序。选择哪种损失函数取决于具体的任务目标。
  4. 负采样策略:在训练时,需要为每个正样本采样负样本。负采样策略影响模型的学习效果:随机负采样简单但可能不够困难;困难负采样( hard negative mining)能够提升模型对困难样本的区分能力,但计算开销更大。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader

class UserTower(nn.Module):
"""用户塔网络"""
def __init__(self, user_feature_dim, embedding_dim, hidden_dims=[256, 128]):
super(UserTower, self).__init__()

layers = []
input_dim = user_feature_dim

# 构建隐藏层
for hidden_dim in hidden_dims:
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
input_dim = hidden_dim

# 输出层
layers.append(nn.Linear(input_dim, embedding_dim))

self.network = nn.Sequential(*layers)

def forward(self, user_features):
"""
user_features: [batch_size, user_feature_dim]
返回: [batch_size, embedding_dim]
"""
return self.network(user_features)


class ItemTower(nn.Module):
"""物品塔网络"""
def __init__(self, item_feature_dim, embedding_dim, hidden_dims=[256, 128]):
super(ItemTower, self).__init__()

layers = []
input_dim = item_feature_dim

# 构建隐藏层
for hidden_dim in hidden_dims:
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
input_dim = hidden_dim

# 输出层
layers.append(nn.Linear(input_dim, embedding_dim))

self.network = nn.Sequential(*layers)

def forward(self, item_features):
"""
item_features: [batch_size, item_feature_dim]
返回: [batch_size, embedding_dim]
"""
return self.network(item_features)


class TwoTowerModel(nn.Module):
"""Two-Tower 模型"""
def __init__(self, user_feature_dim, item_feature_dim, embedding_dim=128,
user_hidden_dims=[256, 128], item_hidden_dims=[256, 128]):
super(TwoTowerModel, self).__init__()

self.user_tower = UserTower(user_feature_dim, embedding_dim, user_hidden_dims)
self.item_tower = ItemTower(item_feature_dim, embedding_dim, item_hidden_dims)

# L2 归一化(可选,用于余弦相似度)
self.normalize = True

def forward(self, user_features, item_features):
"""
user_features: [batch_size, user_feature_dim]
item_features: [batch_size, item_feature_dim]
返回: [batch_size] 相似度分数
"""
user_emb = self.user_tower(user_features)
item_emb = self.item_tower(item_features)

if self.normalize:
user_emb = F.normalize(user_emb, p=2, dim=1)
item_emb = F.normalize(item_emb, p=2, dim=1)

# 计算内积(余弦相似度,如果已归一化)
score = torch.sum(user_emb * item_emb, dim=1)

return score

def get_user_embedding(self, user_features):
"""获取用户 Embedding"""
user_emb = self.user_tower(user_features)
if self.normalize:
user_emb = F.normalize(user_emb, p=2, dim=1)
return user_emb

def get_item_embedding(self, item_features):
"""获取物品 Embedding"""
item_emb = self.item_tower(item_features)
if self.normalize:
item_emb = F.normalize(item_emb, p=2, dim=1)
return item_emb


class RecommendationDataset(Dataset):
"""推荐系统数据集"""
def __init__(self, user_features, item_features, labels):
"""
user_features: [num_samples, user_feature_dim]
item_features: [num_samples, item_feature_dim]
labels: [num_samples] 0 或 1
"""
self.user_features = torch.FloatTensor(user_features)
self.item_features = torch.FloatTensor(item_features)
self.labels = torch.FloatTensor(labels)

def __len__(self):
return len(self.labels)

def __getitem__(self, idx):
return {
'user_features': self.user_features[idx],
'item_features': self.item_features[idx],
'label': self.labels[idx]
}


def train_two_tower(model, train_loader, num_epochs=10, learning_rate=0.001, device='cpu'):
"""训练 Two-Tower 模型"""
model = model.to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

model.train()

for epoch in range(num_epochs):
total_loss = 0
num_batches = 0

for batch in train_loader:
user_features = batch['user_features'].to(device)
item_features = batch['item_features'].to(device)
labels = batch['label'].to(device)

# 前向传播
optimizer.zero_grad()
scores = model(user_features, item_features)
loss = criterion(scores, labels)

# 反向传播
loss.backward()
optimizer.step()

total_loss += loss.item()
num_batches += 1

avg_loss = total_loss / num_batches
print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")


# 使用示例
def example_two_tower():
"""Two-Tower 模型使用示例"""
# 模拟数据
num_samples = 10000
user_feature_dim = 50
item_feature_dim = 40
embedding_dim = 128

# 生成随机特征
user_features = np.random.randn(num_samples, user_feature_dim)
item_features = np.random.randn(num_samples, item_feature_dim)

# 生成标签(简单的相似度规则)
labels = (np.random.rand(num_samples) > 0.5).astype(float)

# 创建数据集
dataset = RecommendationDataset(user_features, item_features, labels)
train_loader = DataLoader(dataset, batch_size=256, shuffle=True)

# 创建模型
model = TwoTowerModel(
user_feature_dim=user_feature_dim,
item_feature_dim=item_feature_dim,
embedding_dim=embedding_dim,
user_hidden_dims=[256, 128],
item_hidden_dims=[256, 128]
)

# 训练
train_two_tower(model, train_loader, num_epochs=10, learning_rate=0.001)

# 获取 Embedding
sample_user_features = torch.FloatTensor(user_features[:10])
sample_item_features = torch.FloatTensor(item_features[:10])

user_embeddings = model.get_user_embedding(sample_user_features)
item_embeddings = model.get_item_embedding(sample_item_features)

print(f"User embeddings shape: {user_embeddings.shape}")
print(f"Item embeddings shape: {item_embeddings.shape}")

关键点解读

Two-Tower 模型的实现包含几个关键组件,每个组件都有其特定的作用:

  1. 双塔架构设计:用户塔和物品塔是两个独立的神经网络,分别处理用户特征和物品特征。这种设计使得两个塔可以独立设计,分别处理不同类型的特征。例如,用户塔可以处理用户画像、历史行为等特征,物品塔可以处理物品属性、内容特征等。两个塔的输出维度必须相同,这样才能计算相似度。
  2. Embedding 归一化:代码中对用户和物品的 Embedding 进行 L2 归一化,这使得内积等价于余弦相似度,取值范围为 [-1, 1]。归一化还可以稳定训练过程,避免 Embedding 的模长过大导致梯度爆炸。在实际应用中,归一化是 Two-Tower 模型的标准做法。
  3. 网络结构设计:每个塔采用多层全连接网络,每层包含线性变换、批归一化、 ReLU 激活和 Dropout 。批归一化可以加速训练并稳定梯度, Dropout 可以防止过拟合。网络深度和宽度需要根据特征维度和数据规模调整:特征维度高、数据量大时可以使用更深的网络;特征维度低、数据量小时可以使用较浅的网络。
  4. 损失函数选择:代码中使用二元交叉熵损失( BCEWithLogitsLoss),适用于点击率预估任务。对于其他任务,可以使用不同的损失函数:对比学习任务可以使用 InfoNCE 损失,排序任务可以使用 BPR 损失。

设计权衡

在 Two-Tower 模型的实现中,存在多个设计权衡:

  1. 网络深度 vs 计算效率:网络越深,模型的表达能力越强,但计算开销也越大。在实际应用中,通常使用 2-4 层全连接网络,在效果和效率之间取得平衡。对于大规模推荐系统,计算效率至关重要,因为需要在毫秒级时间内完成推理。
  2. Embedding 维度 vs 表达能力: Embedding 维度越高,模型的表达能力越强,但参数量也越大,且检索效率会降低(向量检索的时间复杂度与维度相关)。通常 Embedding 维度设置为 64-256 维,根据物品数量和计算资源选择。
  3. 归一化 vs 灵活性: L2 归一化使得内积等价于余弦相似度,简化了相似度计算,但限制了模型的灵活性。如果不需要归一化,可以移除归一化操作,使用原始的内积或点积。
  4. 特征工程 vs 端到端学习: Two-Tower 模型需要手工设计特征,特征质量直接影响模型效果。相比之下,端到端的深度学习方法(如深度协同过滤)可以自动学习特征表示,但需要更多的数据和计算资源。

常见问题

  1. 如何处理稀疏特征? 对于稀疏特征(如用户 ID 、物品 ID),可以使用 Embedding 层将其映射到稠密向量。 Embedding 层的维度通常设置为 16-64 维,根据特征数量和数据规模调整。
  2. 如何处理多模态特征? 对于多模态特征(如文本、图像),可以使用不同的编码器:文本特征可以使用 CNN 或 Transformer,图像特征可以使用 CNN 。然后将不同模态的特征拼接或融合,输入到塔网络中。
  3. 如何提升检索效率? Two-Tower 模型的核心优势在于支持高效的向量检索。训练完成后,可以预先计算所有物品的 Embedding 并建立索引(如 FAISS),在推理时只需要计算用户 Embedding,然后通过向量检索找到最相似的物品。这种方法可以将检索时间从秒级降低到毫秒级。
  4. 如何处理冷启动问题? 对于新用户或新物品,可以使用内容特征(如用户画像、物品属性)通过塔网络生成初始 Embedding 。如果内容特征不足,可以使用相似用户或相似物品的 Embedding 作为初始化。
  5. 如何选择负采样策略? 负采样策略影响模型的学习效果。随机负采样简单但可能不够困难;困难负采样( hard negative mining)能够提升模型对困难样本的区分能力,但计算开销更大。通常采用混合策略:大部分使用随机负采样,小部分使用困难负采样。

使用示例

下面的示例展示了如何使用 Two-Tower 模型进行训练和推理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 准备数据
user_features = ... # [num_samples, user_feature_dim]
item_features = ... # [num_samples, item_feature_dim]
labels = ... # [num_samples] 0 或 1

# 创建数据集和数据加载器
dataset = RecommendationDataset(user_features, item_features, labels)
train_loader = DataLoader(dataset, batch_size=256, shuffle=True)

# 创建模型
model = TwoTowerModel(
user_feature_dim=50,
item_feature_dim=40,
embedding_dim=128,
user_hidden_dims=[256, 128],
item_hidden_dims=[256, 128]
)

# 训练模型
train_two_tower(model, train_loader, num_epochs=10)

# 推理:计算用户对物品的兴趣分数
user_emb = model.get_user_embedding(user_features)
item_emb = model.get_item_embedding(item_features)
scores = torch.sum(user_emb * item_emb, dim=1) # 内积(余弦相似度)

# 检索:找到与用户最相似的物品(使用 FAISS)
import faiss
item_emb_np = item_emb.cpu().numpy()
index = faiss.IndexFlatIP(128) # 内积索引
index.add(item_emb_np)
user_emb_np = user_emb[0:1].cpu().numpy()
distances, indices = index.search(user_emb_np, k=10) # 找到最相似的 10 个物品

在实际应用中, Two-Tower 模型通常用于:( 1)大规模推荐系统的召回阶段,通过向量检索快速找到候选物品;( 2)广告推荐,通过用户和广告的 Embedding 计算匹配分数;( 3)搜索推荐,通过查询和物品的 Embedding 进行语义匹配。

DSSM 深度语义匹配

DSSM 算法原理

DSSM( Deep Structured Semantic Models)是微软在 2013 年提出的深度语义匹配模型,最初用于信息检索中的查询-文档匹配。在推荐系统中, DSSM 可以用于用户查询与物品的语义匹配。

DSSM 的基本思路:将查询和文档分别通过深度神经网络映射到同一个语义空间,然后计算它们的相似度

DSSM 网络结构

DSSM 的网络结构包括:

  1. Term Hashing:将文本转换为 n-gram 特征
  2. Word Hashing:将 n-gram 特征映射到低维向量
  3. 多层全连接网络:学习语义表示
  4. 相似度计算:使用余弦相似度

DSSM 完整实现

问题背景

在信息检索和推荐系统中,如何计算查询( query)和文档( document)之间的语义相似度是一个核心问题。传统的基于关键词匹配的方法(如 TF-IDF 、 BM25)只能捕获字面匹配,无法理解语义相似性。例如,查询"如何学习机器学习"和文档"机器学习入门教程"在语义上高度相关,但关键词重叠度很低。此外,在实际应用中,查询和文档往往以文本形式存在,如何将文本转换为数值特征,并通过深度神经网络学习语义表示,是一个挑战。

解决思路

DSSM( Deep Structured Semantic Models)通过将查询和文档分别通过深度神经网络映射到同一个低维语义空间,然后计算它们的余弦相似度。核心创新在于:( 1)使用 Word Hashing(词哈希)技术将文本转换为固定维度的特征向量,避免了传统词袋模型的高维稀疏问题;( 2)使用多层全连接网络学习语义表示,能够捕获文本的深层语义信息;( 3)通过最大化相关查询-文档对的相似度、最小化不相关对的相似度来训练模型,使得语义相似的查询和文档在向量空间中距离更近。

设计考虑

在实现 DSSM 时,需要考虑以下几个关键设计:

  1. Word Hashing 技术: DSSM 使用 n-gram(通常 n=3)将文本转换为特征向量。对于每个 n-gram,使用哈希函数将其映射到固定大小的特征空间(通常 50000 维)。这种方法避免了传统词袋模型的高维稀疏问题,同时能够处理未登录词( OOV)问题。
  2. 网络结构设计: DSSM 使用多层全连接网络,每层使用 Tanh 激活函数。网络深度通常为 3-4 层,每层维度逐渐减小(如 300 → 300 → 128)。最后一层输出固定维度的 Embedding(通常 128 维),并进行 L2 归一化。
  3. 双塔架构:查询塔和文档塔是两个独立的网络,但结构相同。这种设计使得查询和文档可以有不同的输入特征维度,但输出维度必须相同才能计算相似度。
  4. 损失函数选择: DSSM 通常使用对比学习损失或排序损失。对于每个查询,选择相关的文档作为正样本,不相关的文档作为负样本,最大化正样本的相似度、最小化负样本的相似度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import defaultdict

class WordHashing:
"""Word Hashing:将文本转换为 n-gram 特征"""
def __init__(self, n=3):
self.n = n # n-gram 大小

def ngram_hash(self, text):
"""将文本转换为 n-gram 哈希特征"""
text = text.lower()
ngrams = []
for i in range(len(text) - self.n + 1):
ngram = text[i:i+self.n]
ngrams.append(ngram)
return ngrams

def text_to_features(self, text, vocab_size=50000):
"""将文本转换为特征向量(使用简单的哈希)"""
ngrams = self.ngram_hash(text)
# 使用简单的哈希函数映射到固定大小的特征空间
features = np.zeros(vocab_size)
for ngram in ngrams:
idx = hash(ngram) % vocab_size
features[idx] += 1
return features

class DSSM(nn.Module):
"""DSSM 深度语义匹配模型"""
def __init__(self, input_dim, hidden_dims=[300, 300, 128], embedding_dim=128):
super(DSSM, self).__init__()

layers = []
current_dim = input_dim

# 构建隐藏层
for hidden_dim in hidden_dims:
layers.append(nn.Linear(current_dim, hidden_dim))
layers.append(nn.Tanh())
current_dim = hidden_dim

self.network = nn.Sequential(*layers)

# 输出层
self.output_layer = nn.Linear(current_dim, embedding_dim)

def forward(self, x):
"""
x: [batch_size, input_dim]
返回: [batch_size, embedding_dim]
"""
hidden = self.network(x)
output = self.output_layer(hidden)
# L2 归一化
output = F.normalize(output, p=2, dim=1)
return output

class DSSMModel(nn.Module):
"""DSSM 双塔模型(查询塔和文档塔)"""
def __init__(self, query_input_dim, doc_input_dim,
query_hidden_dims=[300, 300, 128],
doc_hidden_dims=[300, 300, 128],
embedding_dim=128):
super(DSSMModel, self).__init__()

self.query_tower = DSSM(query_input_dim, query_hidden_dims, embedding_dim)
self.doc_tower = DSSM(doc_input_dim, doc_hidden_dims, embedding_dim)

def forward(self, query_features, doc_features):
"""
query_features: [batch_size, query_input_dim]
doc_features: [batch_size, doc_input_dim]
返回: [batch_size] 相似度分数(余弦相似度)
"""
query_emb = self.query_tower(query_features)
doc_emb = self.doc_tower(doc_features)

# 计算余弦相似度(已经归一化,所以是内积)
similarity = torch.sum(query_emb * doc_emb, dim=1)

return similarity

def get_query_embedding(self, query_features):
"""获取查询 Embedding"""
return self.query_tower(query_features)

def get_doc_embedding(self, doc_features):
"""获取文档 Embedding"""
return self.doc_tower(doc_features)

def train_dssm(model, train_loader, num_epochs=10, learning_rate=0.001, device='cpu'):
"""训练 DSSM 模型"""
model = model.to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

model.train()

for epoch in range(num_epochs):
total_loss = 0
num_batches = 0

for batch in train_loader:
query_features = batch['query_features'].to(device)
doc_features = batch['doc_features'].to(device)
labels = batch['label'].to(device)

optimizer.zero_grad()
scores = model(query_features, doc_features)
loss = criterion(scores, labels)

loss.backward()
optimizer.step()

total_loss += loss.item()
num_batches += 1

avg_loss = total_loss / num_batches
print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def example_dssm():
"""DSSM 使用示例"""
# 模拟查询和文档特征
num_samples = 10000
query_input_dim = 50000 # Word Hashing 后的特征维度
doc_input_dim = 50000

# 生成随机特征(模拟 Word Hashing 的输出)
query_features = np.random.rand(num_samples, query_input_dim)
doc_features = np.random.rand(num_samples, doc_input_dim)

# 归一化(模拟词频特征)
query_features = query_features / (query_features.sum(axis=1, keepdims=True) + 1e-8)
doc_features = doc_features / (doc_features.sum(axis=1, keepdims=True) + 1e-8)

# 生成标签
labels = (np.random.rand(num_samples) > 0.5).astype(float)

# 创建数据集
class DSSMDataset:
def__init__(self, query_features, doc_features, labels):
self.query_features = torch.FloatTensor(query_features)
self.doc_features = torch.FloatTensor(doc_features)
self.labels = torch.FloatTensor(labels)

def__len__(self):
return len(self.labels)

def__getitem__(self, idx):
return {
'query_features': self.query_features[idx],
'doc_features': self.doc_features[idx],
'label': self.labels[idx]
}

dataset = DSSMDataset(query_features, doc_features, labels)
train_loader = DataLoader(dataset, batch_size=256, shuffle=True)

# 创建模型
model = DSSMModel(
query_input_dim=query_input_dim,
doc_input_dim=doc_input_dim,
query_hidden_dims=[300, 300, 128],
doc_hidden_dims=[300, 300, 128],
embedding_dim=128
)

# 训练
train_dssm(model, train_loader, num_epochs=10, learning_rate=0.001)

关键点解读

DSSM 的实现包含几个关键组件,每个组件都有其特定的作用:

  1. Word Hashing 技术:这是 DSSM 的核心创新之一。传统的词袋模型需要维护一个巨大的词汇表,对于新词或未登录词( OOV)无法处理。 Word Hashing 通过 n-gram(通常 n=3)将文本转换为固定维度的特征向量,使用哈希函数将每个 n-gram 映射到固定大小的特征空间(通常 50000 维)。这种方法避免了高维稀疏问题,同时能够处理任意文本,包括未登录词。
  2. 多层全连接网络: DSSM 使用多层全连接网络学习语义表示,每层使用 Tanh 激活函数。网络深度通常为 3-4 层,每层维度逐渐减小(如 300 → 300 → 128)。这种设计使得模型能够学习文本的深层语义信息,将高维稀疏的 n-gram 特征映射到低维稠密的语义空间。
  3. L2 归一化:代码中对输出的 Embedding 进行 L2 归一化,这使得内积等价于余弦相似度,取值范围为 [-1, 1]。归一化还可以稳定训练过程,避免 Embedding 的模长过大。
  4. 双塔架构:查询塔和文档塔是两个独立的网络,但结构相同。这种设计使得查询和文档可以有不同的输入特征维度,但输出维度必须相同才能计算相似度。训练时,通过最大化相关查询-文档对的相似度、最小化不相关对的相似度来学习参数。

设计权衡

在 DSSM 的实现中,存在多个设计权衡:

  1. Word Hashing vs 词嵌入: Word Hashing 避免了词汇表问题,但可能引入哈希冲突。现代方法通常使用预训练的词嵌入(如 Word2Vec 、 BERT),能够更好地捕获语义信息,但需要维护词汇表。
  2. 全连接网络 vs CNN/RNN: DSSM 使用全连接网络,简单但可能无法很好地捕获序列信息。 CDSSM 使用 CNN 能够捕获局部特征和序列信息,但计算开销更大。对于长文本,可以使用 RNN 或 Transformer 更好地捕获序列依赖。
  3. 特征维度 vs 计算效率: Word Hashing 的特征维度(通常 50000 维)很大,但可以通过稀疏表示和哈希技巧加速计算。现代方法通常使用预训练的词嵌入(通常 300-768 维),维度更小但需要维护词汇表。
  4. 损失函数选择: DSSM 可以使用多种损失函数:二元交叉熵损失适用于点击率预估,对比学习损失适用于学习通用表示,排序损失适用于学习相对排序。选择哪种损失函数取决于具体的任务目标。

常见问题

  1. 如何处理哈希冲突? Word Hashing 可能引入哈希冲突,即不同的 n-gram 映射到同一个特征位置。虽然冲突概率较低(对于 50000 维特征空间),但可能影响模型效果。可以使用更大的特征空间或使用多个哈希函数来减少冲突。
  2. 如何处理长文本? DSSM 的 Word Hashing 对所有 n-gram 一视同仁,可能无法很好地处理长文本。对于长文本,可以:( 1)截断或采样部分 n-gram;( 2)使用 CNN 或 RNN 捕获序列信息;( 3)使用注意力机制关注重要部分。
  3. 如何提升检索效率? 与 Two-Tower 模型类似, DSSM 也支持高效的向量检索。训练完成后,可以预先计算所有文档的 Embedding 并建立索引(如 FAISS),在推理时只需要计算查询 Embedding,然后通过向量检索找到最相似的文档。
  4. 如何处理多语言? Word Hashing 对于不同语言可能需要不同的 n-gram 大小。对于中文,通常使用字符级别的 n-gram;对于英文,通常使用词级别的 n-gram 。现代方法通常使用多语言预训练模型(如 mBERT),能够更好地处理多语言场景。

使用示例

下面的示例展示了如何使用 DSSM 进行训练和推理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Word Hashing:将文本转换为特征向量
word_hashing = WordHashing(n=3)
query_text = "how to learn machine learning"
query_features = word_hashing.text_to_features(query_text, vocab_size=50000)

# 创建模型
model = DSSMModel(
query_input_dim=50000,
doc_input_dim=50000,
query_hidden_dims=[300, 300, 128],
doc_hidden_dims=[300, 300, 128],
embedding_dim=128
)

# 训练模型(使用对比学习损失)
# ... 训练代码 ...

# 推理:计算查询和文档的相似度
query_emb = model.get_query_embedding(query_features)
doc_emb = model.get_doc_embedding(doc_features)
similarity = torch.sum(query_emb * doc_emb, dim=1) # 余弦相似度

在实际应用中, DSSM 通常用于:( 1)搜索引擎中的查询-文档匹配;( 2)推荐系统中的用户查询-物品匹配;( 3)问答系统中的问题-答案匹配。

DSSM 的改进: CDSSM

CDSSM( Convolutional DSSM)使用卷积神经网络替代全连接网络,能够更好地捕获局部特征和序列信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class CDSSM(nn.Module):
"""CDSSM:使用 CNN 的 DSSM"""
def __init__(self, input_dim, embedding_dim=128,
num_filters=300, filter_sizes=[1, 2, 3]):
super(CDSSM, self).__init__()

self.convs = nn.ModuleList([
nn.Conv1d(input_dim, num_filters, kernel_size=fs)
for fs in filter_sizes
])

self.fc = nn.Linear(len(filter_sizes) * num_filters, embedding_dim)

def forward(self, x):
"""
x: [batch_size, seq_len, input_dim]
返回: [batch_size, embedding_dim]
"""
# 转换为 [batch_size, input_dim, seq_len]
x = x.transpose(1, 2)

# 卷积 + 池化
conv_outputs = []
for conv in self.convs:
conv_out = F.relu(conv(x)) # [batch_size, num_filters, new_seq_len]
pooled = F.max_pool1d(conv_out, kernel_size=conv_out.size(2)) # [batch_size, num_filters, 1]
conv_outputs.append(pooled.squeeze(2))

# 拼接
concatenated = torch.cat(conv_outputs, dim=1) # [batch_size, len(filter_sizes)*num_filters]

# 全连接层
output = self.fc(concatenated)
output = F.normalize(output, p=2, dim=1)

return output

YouTube 双塔召回

YouTube 推荐系统架构

YouTube 的推荐系统采用多阶段架构:

  1. 候选生成( Candidate Generation):从百万级物品中快速召回数百个候选
  2. 排序( Ranking):对候选物品进行精细排序
  3. 重排( Re-ranking):考虑多样性、新鲜度等因素

双塔模型主要用于候选生成阶段,需要快速从海量物品中召回相关物品。

YouTube 双塔模型设计

用户塔输入特征

  • 用户观看历史(最近 N 个视频的 Embedding 平均)
  • 用户搜索历史
  • 用户画像特征(地理位置、设备等)
  • 用户行为统计(观看时长、点击率等)

物品塔输入特征

  • 视频 ID Embedding
  • 视频类别 Embedding
  • 视频统计特征(观看次数、点赞数等)
  • 视频内容特征(标题、描述等)

YouTube 双塔完整实现

问题背景

YouTube 作为全球最大的视频平台,每天需要为数亿用户推荐数十亿个视频。传统的协同过滤方法无法满足如此大规模的需求,因为计算用户-物品相似度矩阵需要巨大的计算和存储开销。此外, YouTube 需要处理多种类型的特征:用户观看历史、搜索历史、用户画像、视频属性、视频统计等,如何有效地融合这些特征并学习用户和视频的表示,是一个挑战。更重要的是, YouTube 需要在毫秒级时间内从百万级视频库中召回数百个候选视频,这就要求模型必须支持高效的向量检索。

解决思路

YouTube 双塔模型通过将用户和视频分别通过两个独立的神经网络("塔")映射到同一个低维向量空间,然后通过向量相似度计算匹配分数。核心设计包括:( 1)用户塔融合多种用户特征(观看历史、搜索历史、用户画像、行为统计),通过平均池化处理变长序列,然后通过全连接网络学习用户 Embedding;( 2)视频塔融合多种视频特征(视频 ID 、类别、统计特征、内容特征),通过 Embedding 层和全连接网络学习视频 Embedding;( 3)训练完成后,预先计算所有视频的 Embedding 并建立索引,在推理时只需要计算用户 Embedding,然后通过高效的向量检索(如 FAISS)找到最相似的视频,大幅提升检索效率。

设计考虑

在实现 YouTube 双塔模型时,需要考虑以下几个关键设计:

  1. 历史序列处理:用户观看历史是变长序列,需要将其转换为固定维度的特征。 YouTube 使用平均池化对历史视频的 Embedding 进行聚合,简单有效。也可以使用注意力机制或 RNN/Transformer 更好地捕获序列信息,但计算开销更大。
  2. 特征融合策略:用户塔和视频塔都需要融合多种类型的特征(稀疏特征、稠密特征、序列特征)。通常使用 Embedding 层处理稀疏特征(如视频 ID 、类别),使用全连接网络处理稠密特征(如统计特征),然后拼接或加权融合。
  3. 训练样本构造: YouTube 使用曝光样本( impression)作为训练数据,即用户看到但未点击的视频作为负样本,点击的视频作为正样本。这种构造方式更符合实际推荐场景,但可能导致样本分布不平衡问题。
  4. 负采样策略:由于正样本(点击)远少于负样本(未点击),需要采用负采样策略。 YouTube 使用随机负采样,简单但可能不够困难。也可以使用困难负采样( hard negative mining)提升模型效果,但计算开销更大。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class YouTubeUserTower(nn.Module):
"""YouTube 用户塔"""
def __init__(self,
video_embedding_dim=64,
category_embedding_dim=32,
user_feature_dim=100,
embedding_dim=128):
super(YouTubeUserTower, self).__init__()

# 视频历史 Embedding(假设最多 50 个历史视频)
self.max_history_length = 50
self.video_embedding_dim = video_embedding_dim

# 用户特征处理
self.user_fc = nn.Sequential(
nn.Linear(user_feature_dim, 128),
nn.ReLU(),
nn.Linear(128, 64)
)

# 最终融合层
self.fusion = nn.Sequential(
nn.Linear(video_embedding_dim + 64, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, embedding_dim)
)

def forward(self, user_features, watch_history):
"""
user_features: [batch_size, user_feature_dim]
watch_history: [batch_size, max_history_length, video_embedding_dim]
返回: [batch_size, embedding_dim]
"""
batch_size = user_features.size(0)

# 处理用户特征
user_emb = self.user_fc(user_features) # [batch_size, 64]

# 处理观看历史:平均池化
# watch_history 中可能包含 padding,需要 mask
history_mask = (watch_history.sum(dim=2) != 0).float() # [batch_size, max_history_length]
history_sum = watch_history.sum(dim=1) # [batch_size, video_embedding_dim]
history_count = history_mask.sum(dim=1, keepdim=True) + 1e-8
history_avg = history_sum / history_count # [batch_size, video_embedding_dim]

# 融合
combined = torch.cat([history_avg, user_emb], dim=1) # [batch_size, video_embedding_dim + 64]
output = self.fusion(combined)

# L2 归一化
output = F.normalize(output, p=2, dim=1)

return output

class YouTubeItemTower(nn.Module):
"""YouTube 物品塔"""
def __init__(self,
num_videos=1000000,
num_categories=1000,
video_embedding_dim=64,
category_embedding_dim=32,
item_feature_dim=50,
embedding_dim=128):
super(YouTubeItemTower, self).__init__()

# 视频 ID Embedding
self.video_embedding = nn.Embedding(num_videos, video_embedding_dim)

# 类别 Embedding
self.category_embedding = nn.Embedding(num_categories, category_embedding_dim)

# 物品特征处理
self.item_fc = nn.Sequential(
nn.Linear(item_feature_dim, 128),
nn.ReLU(),
nn.Linear(128, 64)
)

# 融合层
self.fusion = nn.Sequential(
nn.Linear(video_embedding_dim + category_embedding_dim + 64, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, embedding_dim)
)

def forward(self, video_ids, category_ids, item_features):
"""
video_ids: [batch_size]
category_ids: [batch_size]
item_features: [batch_size, item_feature_dim]
返回: [batch_size, embedding_dim]
"""
# 获取 Embedding
video_emb = self.video_embedding(video_ids) # [batch_size, video_embedding_dim]
category_emb = self.category_embedding(category_ids) # [batch_size, category_embedding_dim]
item_emb = self.item_fc(item_features) # [batch_size, 64]

# 融合
combined = torch.cat([video_emb, category_emb, item_emb], dim=1)
output = self.fusion(combined)

# L2 归一化
output = F.normalize(output, p=2, dim=1)

return output

class YouTubeTwoTower(nn.Module):
"""YouTube 双塔模型"""
def __init__(self,
num_videos=1000000,
num_categories=1000,
video_embedding_dim=64,
category_embedding_dim=32,
user_feature_dim=100,
item_feature_dim=50,
embedding_dim=128):
super(YouTubeTwoTower, self).__init__()

self.user_tower = YouTubeUserTower(
video_embedding_dim=video_embedding_dim,
category_embedding_dim=category_embedding_dim,
user_feature_dim=user_feature_dim,
embedding_dim=embedding_dim
)

self.item_tower = YouTubeItemTower(
num_videos=num_videos,
num_categories=num_categories,
video_embedding_dim=video_embedding_dim,
category_embedding_dim=category_embedding_dim,
item_feature_dim=item_feature_dim,
embedding_dim=embedding_dim
)

def forward(self, user_features, watch_history, video_ids, category_ids, item_features):
"""
返回: [batch_size] 相似度分数
"""
user_emb = self.user_tower(user_features, watch_history)
item_emb = self.item_tower(video_ids, category_ids, item_features)

# 计算内积(余弦相似度)
score = torch.sum(user_emb * item_emb, dim=1)

return score

def get_user_embedding(self, user_features, watch_history):
"""获取用户 Embedding(用于召回)"""
return self.user_tower(user_features, watch_history)

def get_item_embedding(self, video_ids, category_ids, item_features):
"""获取物品 Embedding(用于索引)"""
return self.item_tower(video_ids, category_ids, item_features)

关键点解读

YouTube 双塔模型的实现包含几个关键组件,每个组件都有其特定的作用:

  1. 用户塔设计:用户塔融合多种用户特征,包括观看历史、搜索历史、用户画像和行为统计。观看历史是变长序列,代码使用平均池化对历史视频的 Embedding 进行聚合,简单有效。用户画像和行为统计通过全连接网络处理,然后与历史 Embedding 拼接,通过融合层学习用户 Embedding 。这种设计能够同时捕获用户的长期兴趣(历史)和短期偏好(画像、统计)。

  2. 视频塔设计:视频塔融合多种视频特征,包括视频 ID 、类别、统计特征和内容特征。视频 ID 和类别使用 Embedding 层处理,统计特征通过全连接网络处理,然后拼接并通过融合层学习视频 Embedding 。这种设计能够同时捕获视频的语义信息( ID 、类别)和统计信息(观看次数、点赞数等)。

  3. 特征融合策略:代码使用拼接( concatenation)方式融合不同特征,然后通过全连接网络学习融合表示。也可以使用加权融合或注意力机制,但拼接方式简单有效,是常用的做法。

  4. L2 归一化:代码中对用户和视频的 Embedding 进行 L2 归一化,这使得内积等价于余弦相似度,取值范围为 [-1, 1]。归一化还可以稳定训练过程,避免 Embedding 的模长过大。

设计权衡

在 YouTube 双塔模型的实现中,存在多个设计权衡:

  1. 历史序列处理 vs 计算效率:平均池化简单高效,但可能丢失序列信息。使用注意力机制或 RNN/Transformer 能够更好地捕获序列信息,但计算开销更大。对于大规模推荐系统,平均池化是常用的折中方案。

  2. 特征融合 vs 模型复杂度:拼接方式简单但可能引入冗余信息。使用注意力机制或门控机制能够更好地融合特征,但模型复杂度更高。对于大规模推荐系统,拼接方式是常用的折中方案。

  3. 负采样策略 vs 训练效果:随机负采样简单但可能不够困难;批量内负采样高效但负样本质量可能不高;困难负采样能够提升模型效果,但计算开销更大。通常采用混合策略:大部分使用批量内负采样,小部分使用困难负采样。

  4. Embedding 维度 vs 检索效率: Embedding 维度越高,模型的表达能力越强,但检索效率会降低(向量检索的时间复杂度与维度相关)。通常 Embedding 维度设置为 64-256 维,根据视频数量和计算资源选择。

常见问题

  1. 如何处理新用户或新视频? 对于新用户,可以使用用户画像和行为统计特征通过用户塔生成初始 Embedding 。对于新视频,可以使用视频类别和内容特征通过视频塔生成初始 Embedding 。如果特征不足,可以使用相似用户或相似视频的 Embedding 作为初始化。

  2. 如何提升召回效果? YouTube 双塔模型的核心优势在于支持高效的向量检索。训练完成后,可以预先计算所有视频的 Embedding 并建立索引(如 FAISS),在推理时只需要计算用户 Embedding,然后通过向量检索找到最相似的视频。为了提升召回效果,可以:( 1)使用更大的 Embedding 维度;( 2)使用困难负采样训练;( 3)使用多任务学习同时优化多个目标。

  3. 如何处理长尾视频? 长尾视频(观看次数少)的 Embedding 可能不够准确。可以使用内容特征(如标题、描述)通过额外的网络生成初始 Embedding,或者使用相似视频的 Embedding 作为初始化。

  4. 如何平衡准确率和多样性? YouTube 双塔模型主要优化准确率,可能推荐相似度高的视频导致多样性不足。可以在召回后使用重排( re-ranking)模块考虑多样性、新鲜度等因素,或者使用多目标优化同时优化准确率和多样性。

使用示例

下面的示例展示了如何使用 YouTube 双塔模型进行训练和推理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建模型
model = YouTubeTwoTower(
num_videos=1000000,
num_categories=1000,
video_embedding_dim=64,
category_embedding_dim=32,
user_feature_dim=100,
item_feature_dim=50,
embedding_dim=128
)

# 训练模型(使用批量内负采样)
# ... 训练代码 ...

# 推理:计算用户对视频的兴趣分数
user_emb = model.get_user_embedding(user_features, watch_history)
video_emb = model.get_item_embedding(video_ids, category_ids, item_features)
scores = torch.sum(user_emb * video_emb, dim=1) # 内积(余弦相似度)

# 召回:找到与用户最相似的视频(使用 FAISS)
import faiss
video_emb_np = video_emb.cpu().numpy()
index = faiss.IndexFlatIP(128) # 内积索引
index.add(video_emb_np)
user_emb_np = user_emb[0:1].cpu().numpy()
distances, indices = index.search(user_emb_np, k=100) # 找到最相似的 100 个视频

在实际应用中, YouTube 双塔模型通常用于:( 1)大规模推荐系统的召回阶段,通过向量检索快速找到候选视频;( 2)视频搜索,通过查询和视频的 Embedding 进行语义匹配;( 3)相似视频推荐,通过视频 Embedding 找到相似视频。

YouTube 双塔的训练策略

负采样策略

  • 随机负采样:从所有视频中随机采样
  • 批量内负采样:在同一个 batch 中,其他样本的视频作为负样本(更高效)
  • 困难负采样:选择模型预测分数较高的负样本(需要额外训练)

训练目标: 使用 softmax 交叉熵损失:

$$

L = -} $$

其中 是正样本, 是负样本。

负采样策略详解

负采样的必要性

在推荐系统中,正样本(用户-物品交互)通常只占所有可能的用户-物品对的很小一部分。如果使用所有负样本训练,会导致:

  1. 计算成本过高:需要计算所有负样本的损失
  2. 类别不平衡:负样本远多于正样本
  3. 训练效率低:大部分负样本对模型学习没有帮助

负采样通过只选择一部分负样本进行训练,解决了这些问题。

随机负采样

最简单的负采样策略是随机采样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def random_negative_sampling(positive_items, all_items, num_negatives):
"""
随机负采样
positive_items: 正样本物品列表
all_items: 所有物品集合
num_negatives: 每个正样本对应的负样本数量
"""
negative_items = []
for pos_item in positive_items:
# 从所有物品中随机采样,排除正样本
candidates = list(set(all_items) - {pos_item})
negs = np.random.choice(candidates, size=num_negatives, replace=False)
negative_items.append(negs)
return negative_items

基于频率的负采样

根据物品的流行度(频率)进行采样,流行物品更容易被采样为负样本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def frequency_based_negative_sampling(positive_items, item_freq, num_negatives, alpha=0.75):
"""
基于频率的负采样
item_freq: 物品频率字典
alpha: 频率的幂次(通常为 0.75)
"""
# 构建采样分布
items = list(item_freq.keys())
frequencies = np.array([item_freq[item] for item in items])
# 使用 alpha 次幂
probs = np.power(frequencies, alpha)
probs = probs / probs.sum()

negative_items = []
for pos_item in positive_items:
# 排除正样本
mask = np.array([item != pos_item for item in items])
filtered_items = np.array(items)[mask]
filtered_probs = probs[mask]
filtered_probs = filtered_probs / filtered_probs.sum()

negs = np.random.choice(
filtered_items,
size=num_negatives,
p=filtered_probs,
replace=False
)
negative_items.append(negs)

return negative_items

批量内负采样

在同一个 batch 内,其他样本的物品可以作为负样本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def in_batch_negative_sampling(batch_items):
"""
批量内负采样
batch_items: [batch_size] 当前 batch 的物品 ID
返回: [batch_size, batch_size-1] 每个样本的负样本
"""
batch_size = len(batch_items)
negative_items = []

for i in range(batch_size):
# 当前样本的物品
pos_item = batch_items[i]
# 其他样本的物品作为负样本
negs = [batch_items[j] for j in range(batch_size) if j != i]
negative_items.append(negs)

return np.array(negative_items)

困难负采样( Hard Negative Sampling)

选择模型预测分数较高的负样本,这些样本更难区分,有助于提升模型性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def hard_negative_sampling(model, user_features, positive_items, candidate_items, 
num_negatives, k=10):
"""
困难负采样
k: 先采样 k 倍候选,然后选择分数最高的
"""
# 扩展候选集
extended_candidates = np.random.choice(
candidate_items,
size=min(len(candidate_items), num_negatives * k),
replace=False
)

# 计算分数
with torch.no_grad():
user_emb = model.get_user_embedding(user_features)
item_embeddings = model.get_item_embedding(extended_candidates)

scores = torch.matmul(user_emb, item_embeddings.T) # [1, len(candidates)]
scores = scores.squeeze(0).cpu().numpy()

# 选择分数最高的 num_negatives 个
top_indices = np.argsort(scores)[-num_negatives:]
hard_negatives = extended_candidates[top_indices]

return hard_negatives

混合负采样策略

结合多种采样策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class MixedNegativeSampling:
"""混合负采样策略"""
def __init__(self, item_freq, alpha=0.75):
self.item_freq = item_freq
self.alpha = alpha
self._build_distribution()

def _build_distribution(self):
"""构建采样分布"""
items = list(self.item_freq.keys())
frequencies = np.array([self.item_freq[item] for item in items])
probs = np.power(frequencies, self.alpha)
self.distribution = probs / probs.sum()
self.items = np.array(items)

def sample(self, positive_items, num_negatives, strategy='mixed'):
"""
strategy: 'random', 'frequency', 'mixed'
"""
if strategy == 'random':
return self._random_sample(positive_items, num_negatives)
elif strategy == 'frequency':
return self._frequency_sample(positive_items, num_negatives)
else: # mixed
return self._mixed_sample(positive_items, num_negatives)

def _random_sample(self, positive_items, num_negatives):
"""随机采样"""
negative_items = []
for pos_item in positive_items:
mask = self.items != pos_item
candidates = self.items[mask]
negs = np.random.choice(candidates, size=num_negatives, replace=False)
negative_items.append(negs)
return np.array(negative_items)

def _frequency_sample(self, positive_items, num_negatives):
"""频率采样"""
negative_items = []
for pos_item in positive_items:
mask = self.items != pos_item
candidates = self.items[mask]
probs = self.distribution[mask]
probs = probs / probs.sum()
negs = np.random.choice(candidates, size=num_negatives, p=probs, replace=False)
negative_items.append(negs)
return np.array(negative_items)

def _mixed_sample(self, positive_items, num_negatives):
"""混合采样:一半随机,一半频率"""
num_random = num_negatives // 2
num_freq = num_negatives - num_random

random_negs = self._random_sample(positive_items, num_random)
freq_negs = self._frequency_sample(positive_items, num_freq)

return np.concatenate([random_negs, freq_negs], axis=1)

ANN 近邻检索

ANN 检索的必要性

在推荐系统的召回阶段,需要从百万甚至千万级物品中快速找到与用户 Embedding 最相似的物品。如果使用暴力搜索( Brute Force),计算复杂度为 ,其中 是物品数量, 是 Embedding 维度,这在大规模场景下是不可接受的。

近似最近邻( Approximate Nearest Neighbor, ANN)检索通过牺牲一定的精度来换取速度,能够在毫秒级时间内完成检索。

FAISS 库使用

问题背景

在推荐系统的召回阶段,需要从百万甚至千万级物品中快速找到与用户 Embedding 最相似的物品。传统的暴力搜索( Brute Force)需要计算用户 Embedding 与所有物品 Embedding 的相似度,计算复杂度为 ,其中 是物品数量, 是 Embedding 维度。对于百万级物品库,暴力搜索需要数秒甚至数十秒,无法满足实时推荐的需求。此外,随着物品数量的增长,计算时间线性增长,这在大规模场景下是不可接受的。

解决思路

近似最近邻( Approximate Nearest Neighbor, ANN)检索通过牺牲一定的精度来换取速度,能够在毫秒级时间内完成检索。 FAISS( Facebook AI Similarity Search)是 Facebook 开源的向量相似度检索库,提供了多种索引类型,包括:( 1) Flat Index:暴力搜索,精度 100%但速度慢;( 2) IVF( Inverted File Index):通过聚类将向量分组,只搜索相关聚类,速度更快;( 3) HNSW( Hierarchical Navigable Small World):基于图的索引,速度快且精度高;( 4) Product Quantization:通过向量压缩减少内存占用。通过选择合适的索引类型和参数,可以在精度和速度之间取得平衡。

设计考虑

在实现 FAISS 索引时,需要考虑以下几个关键设计:

  1. 索引类型选择: Flat Index 精度最高但速度最慢,适用于小规模场景(<100 万); IVF 索引通过聚类加速检索,适用于中等规模场景( 100 万-1000 万); HNSW 索引速度快且精度高,适用于大规模场景(>1000 万)。选择哪种索引类型取决于物品数量、精度要求和计算资源。
  2. 距离度量选择: FAISS 支持多种距离度量,包括 L2 距离和内积( IP)。对于归一化的 Embedding,内积等价于余弦相似度,是推荐系统中常用的度量。选择哪种度量取决于 Embedding 是否归一化。
  3. 索引参数调优: IVF 索引需要设置聚类数量( nlist)和搜索时检查的聚类数量( nprobe)。 nlist 越大,检索精度越高但构建时间越长; nprobe 越大,检索精度越高但检索时间越长。需要在精度和速度之间权衡。
  4. 内存和持久化:大规模索引可能占用大量内存,需要考虑内存限制。 FAISS 支持索引的保存和加载,可以将索引持久化到磁盘,避免每次重新构建。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import faiss
import numpy as np

class FAISSIndex:
"""基于 FAISS 的向量检索"""
def __init__(self, dimension, index_type='L2'):
"""
dimension: Embedding 维度
index_type: 'L2' 或 'IP'(内积)
"""
self.dimension = dimension

if index_type == 'L2':
# L2 距离索引
self.index = faiss.IndexFlatL2(dimension)
elif index_type == 'IP':
# 内积索引(需要归一化)
self.index = faiss.IndexFlatIP(dimension)
else:
raise ValueError(f"Unknown index type: {index_type}")

self.index_type = index_type

def add(self, vectors):
"""
添加向量到索引
vectors: [N, dimension] numpy array
"""
if self.index_type == 'IP':
# 内积索引需要归一化
faiss.normalize_L2(vectors)

self.index.add(vectors.astype('float32'))

def search(self, query_vectors, k=10):
"""
搜索最相似的 k 个向量
query_vectors: [M, dimension] numpy array
返回: (distances, indices)
"""
if self.index_type == 'IP':
# 归一化查询向量
query_vectors = query_vectors.copy().astype('float32')
faiss.normalize_L2(query_vectors)

distances, indices = self.index.search(query_vectors.astype('float32'), k)
return distances, indices


# 使用示例
def example_faiss():
"""FAISS 使用示例"""
dimension = 128
num_items = 1000000
num_queries = 100

# 生成物品 Embedding
item_embeddings = np.random.randn(num_items, dimension).astype('float32')

# 创建索引(使用内积,适合归一化的 Embedding)
index = FAISSIndex(dimension, index_type='IP')
index.add(item_embeddings)

# 生成查询向量
query_embeddings = np.random.randn(num_queries, dimension).astype('float32')

# 搜索
distances, indices = index.search(query_embeddings, k=10)

print(f"Query 0 top-10 items: {indices[0]}")
print(f"Distances: {distances[0]}")


# 使用更高效的索引( IVF)
class IVFIndex:
"""IVF( Inverted File)索引:更快的检索速度"""
def __init__(self, dimension, nlist=100, nprobe=10, index_type='L2'):
"""
nlist: 聚类中心数量
nprobe: 搜索时检查的聚类数量
"""
self.dimension = dimension
quantizer = faiss.IndexFlatL2(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
self.index.nprobe = nprobe
self.index_type = index_type
self.is_trained = False

def train(self, vectors):
"""训练索引(需要先训练才能添加向量)"""
if self.index_type == 'IP':
vectors = vectors.copy().astype('float32')
faiss.normalize_L2(vectors)

self.index.train(vectors.astype('float32'))
self.is_trained = True

def add(self, vectors):
"""添加向量"""
if not self.is_trained:
raise ValueError("Index must be trained before adding vectors")

if self.index_type == 'IP':
vectors = vectors.copy().astype('float32')
faiss.normalize_L2(vectors)

self.index.add(vectors.astype('float32'))

def search(self, query_vectors, k=10):
"""搜索"""
if self.index_type == 'IP':
query_vectors = query_vectors.copy().astype('float32')
faiss.normalize_L2(query_vectors)

distances, indices = self.index.search(query_vectors.astype('float32'), k)
return distances, indices

关键点解读

FAISS 索引的实现包含几个关键组件,每个组件都有其特定的作用:

  1. 索引类型选择: Flat Index 使用暴力搜索,精度 100%但速度最慢,适用于小规模场景(<100 万物品)。 IVF Index 通过聚类将向量分组,只搜索相关聚类,速度更快但需要先训练索引。 HNSW Index 基于图的索引,速度快且精度高,适用于大规模场景(>1000 万物品)。
  2. 距离度量选择: FAISS 支持 L2 距离和内积( IP)两种度量。对于归一化的 Embedding,内积等价于余弦相似度,是推荐系统中常用的度量。代码中根据索引类型自动处理归一化,确保内积计算的正确性。
  3. 索引训练: IVF 索引需要先训练才能添加向量。训练过程使用 K-means 聚类算法将向量分组,训练时间与物品数量和聚类数量相关。通常训练一次后可以多次使用,训练好的索引可以保存到磁盘。
  4. 检索参数调优: IVF 索引的 nprobe 参数控制搜索时检查的聚类数量。 nprobe 越大,检索精度越高但检索时间越长。通常设置 nprobe=10-100,在精度和速度之间权衡。

设计权衡

在 FAISS 索引的实现中,存在多个设计权衡:

  1. 精度 vs 速度: Flat Index 精度最高但速度最慢; IVF Index 通过聚类加速检索,精度略降但速度大幅提升; HNSW Index 速度快且精度高,但构建时间较长。选择哪种索引类型取决于物品数量、精度要求和计算资源。
  2. 内存 vs 速度: Product Quantization( PQ)通过向量压缩减少内存占用,但检索精度会降低。对于内存受限的场景,可以使用 IVFPQ 索引,在内存和精度之间权衡。
  3. 构建时间 vs 检索时间: HNSW 索引构建时间较长,但检索速度最快; IVF 索引构建时间中等,检索速度较快; Flat Index 无需构建,但检索时间最长。如果索引需要频繁更新,应该选择构建时间短的索引类型。

常见问题

  1. 如何选择索引类型? 对于小规模场景(<100 万),使用 Flat Index;对于中等规模场景( 100 万-1000 万),使用 IVF Index;对于大规模场景(>1000 万),使用 HNSW Index 。如果内存受限,可以使用 IVFPQ 索引。
  2. 如何调优索引参数? IVF 索引的 nlist 参数控制聚类数量,通常设置为 sqrt(N),其中 N 是物品数量。 nprobe 参数控制搜索时检查的聚类数量,通常设置为 nlist 的 1/10 到 1/100 。可以通过实验找到精度和速度的平衡点。
  3. 如何处理索引更新? FAISS 支持增量添加向量,但删除向量需要重建索引。对于频繁更新的场景,可以使用支持删除的索引类型(如 HNSW),或者定期重建索引。
  4. 如何持久化索引? FAISS 支持索引的保存和加载,可以将索引持久化到磁盘。保存时使用 faiss.write_index(),加载时使用 faiss.read_index()。持久化可以避免每次重新构建索引,大幅提升启动速度。

使用示例

下面的示例展示了如何使用 FAISS 索引进行向量检索:

1
2
3
4
5
6
7
8
9
10
11
12
# 创建索引(使用内积,适合归一化的 Embedding)
index = FAISSIndex(dimension=128, index_type='IP')
index.add(item_embeddings) # 添加物品 Embedding

# 搜索最相似的物品
distances, indices = index.search(query_embeddings, k=10)

# 使用 IVF 索引加速检索(适用于大规模场景)
ivf_index = IVFIndex(dimension=128, nlist=1000, nprobe=50)
ivf_index.train(item_embeddings) # 先训练
ivf_index.add(item_embeddings) # 再添加
distances, indices = ivf_index.search(query_embeddings, k=10)

在实际应用中, FAISS 索引通常用于:( 1)大规模推荐系统的召回阶段,通过向量检索快速找到候选物品;( 2)相似物品推荐,通过物品 Embedding 找到相似物品;( 3)用户画像匹配,通过用户 Embedding 找到相似用户。

Annoy 库使用

Annoy( Approximate Nearest Neighbors Oh Yeah)是 Spotify 开源的 ANN 库,基于随机投影树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from annoy import AnnoyIndex

class AnnoyIndexWrapper:
"""Annoy 索引封装"""
def __init__(self, dimension, metric='angular', n_trees=10):
"""
dimension: Embedding 维度
metric: 'angular'(余弦相似度)或 'euclidean'(欧氏距离)
n_trees: 树的数量(越多越准确,但构建时间越长)
"""
self.dimension = dimension
self.metric = metric
self.index = AnnoyIndex(dimension, metric)
self.n_trees = n_trees
self.item_count = 0

def add_item(self, vector):
"""添加一个向量"""
self.index.add_item(self.item_count, vector)
self.item_count += 1

def build(self):
"""构建索引"""
self.index.build(self.n_trees)

def search(self, query_vector, k=10):
"""搜索最相似的 k 个向量"""
indices = self.index.get_nns_by_vector(query_vector, k)
return indices

def save(self, filepath):
"""保存索引"""
self.index.save(filepath)

def load(self, filepath):
"""加载索引"""
self.index.load(filepath)


# 使用示例
def example_annoy():
"""Annoy 使用示例"""
dimension = 128
num_items = 1000000

# 创建索引
index = AnnoyIndexWrapper(dimension, metric='angular', n_trees=50)

# 添加物品 Embedding
for i in range(num_items):
vector = np.random.randn(dimension).astype('float32')
# 归一化(用于余弦相似度)
vector = vector / (np.linalg.norm(vector) + 1e-8)
index.add_item(vector)

# 构建索引
index.build()

# 搜索
query_vector = np.random.randn(dimension).astype('float32')
query_vector = query_vector / (np.linalg.norm(query_vector) + 1e-8)

top_k = index.search(query_vector, k=10)
print(f"Top-10 items: {top_k}")

HNSW 算法

HNSW( Hierarchical Navigable Small World)是一种基于图的 ANN 算法,在精度和速度之间取得了很好的平衡。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import hnswlib

class HNSWIndex:
"""HNSW 索引"""
def __init__(self, dimension, metric='cosine', max_elements=1000000,
ef_construction=200, M=16):
"""
dimension: Embedding 维度
metric: 'cosine', 'l2', 'ip'
max_elements: 最大元素数量
ef_construction: 构建时的候选数量
M: 每个节点的最大连接数
"""
self.dimension = dimension
self.index = hnswlib.Index(space=metric, dim=dimension)
self.index.init_index(max_elements=max_elements,
ef_construction=ef_construction,
M=M)
self.item_count = 0

def add(self, vectors):
"""批量添加向量"""
num_vectors = len(vectors)
self.index.add_items(vectors.astype('float32'),
np.arange(self.item_count,
self.item_count + num_vectors))
self.item_count += num_vectors

def search(self, query_vectors, k=10, ef_search=None):
"""
搜索
ef_search: 搜索时的候选数量(越大越准确,但越慢)
"""
if ef_search is None:
ef_search = max(k * 2, 50)

self.index.set_ef(ef_search)
labels, distances = self.index.knn_query(query_vectors.astype('float32'), k=k)
return distances, labels

def save(self, filepath):
"""保存索引"""
self.index.save_index(filepath)

def load(self, filepath):
"""加载索引"""
self.index.load_index(filepath)


# 使用示例
def example_hnsw():
"""HNSW 使用示例"""
dimension = 128
num_items = 1000000
num_queries = 100

# 生成物品 Embedding
item_embeddings = np.random.randn(num_items, dimension).astype('float32')
# 归一化(用于余弦相似度)
norms = np.linalg.norm(item_embeddings, axis=1, keepdims=True)
item_embeddings = item_embeddings / (norms + 1e-8)

# 创建索引
index = HNSWIndex(dimension, metric='cosine', max_elements=num_items)
index.add(item_embeddings)

# 生成查询向量
query_embeddings = np.random.randn(num_queries, dimension).astype('float32')
query_norms = np.linalg.norm(query_embeddings, axis=1, keepdims=True)
query_embeddings = query_embeddings / (query_norms + 1e-8)

# 搜索
distances, indices = index.search(query_embeddings, k=10, ef_search=100)

print(f"Query 0 top-10 items: {indices[0]}")
print(f"Distances: {distances[0]}")

ANN 索引选择指南

FAISS

  • 优点:功能丰富,支持多种索引类型,速度快
  • 缺点:需要编译,依赖较多
  • 适用场景:大规模生产环境

Annoy

  • 优点:简单易用,支持持久化,内存占用小
  • 缺点:构建索引较慢,精度略低于 HNSW
  • 适用场景:中小规模场景,需要持久化的场景

HNSW

  • 优点:精度高,速度快,内存占用适中
  • 缺点:构建时间较长
  • 适用场景:对精度要求较高的场景

Embedding 质量评估

评估指标

1. 相似度分布

检查 Embedding 的相似度分布是否合理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def evaluate_similarity_distribution(embeddings, sample_size=10000):
"""评估相似度分布"""
num_items = len(embeddings)
indices = np.random.choice(num_items, size=sample_size, replace=False)

similarities = []
for i in range(len(indices)):
for j in range(i+1, len(indices)):
sim = np.dot(embeddings[indices[i]], embeddings[indices[j]])
similarities.append(sim)

similarities = np.array(similarities)

print(f"Mean similarity: {similarities.mean():.4f}")
print(f"Std similarity: {similarities.std():.4f}")
print(f"Min similarity: {similarities.min():.4f}")
print(f"Max similarity: {similarities.max():.4f}")

return similarities

2. 召回率( Recall@K)

在测试集上评估召回率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def evaluate_recall_at_k(user_embeddings, item_embeddings, test_pairs, k=10):
"""
评估 Recall@K
test_pairs: [(user_id, item_id), ...] 测试集正样本对
"""
recalls = []

for user_id, true_item_id in test_pairs:
user_emb = user_embeddings[user_id]

# 计算与所有物品的相似度
similarities = np.dot(item_embeddings, user_emb)

# 获取 top-k
top_k_indices = np.argsort(similarities)[::-1][:k]

# 检查真实物品是否在 top-k 中
recall = 1.0 if true_item_id in top_k_indices else 0.0
recalls.append(recall)

return np.mean(recalls)

3. 覆盖率( Coverage)

评估推荐结果的多样性:

1
2
3
4
5
6
7
8
9
10
11
12
def evaluate_coverage(recommendations, num_items):
"""
评估覆盖率
recommendations: [[item_id, ...], ...] 每个用户的推荐列表
num_items: 物品总数
"""
recommended_items = set()
for rec_list in recommendations:
recommended_items.update(rec_list)

coverage = len(recommended_items) / num_items
return coverage

4. 新颖性( Novelty)

评估推荐物品的新颖性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def evaluate_novelty(recommendations, item_popularity):
"""
评估新颖性
item_popularity: {item_id: popularity_score}
"""
novelty_scores = []

for rec_list in recommendations:
scores = [item_popularity.get(item_id, 0) for item_id in rec_list]
# 新颖性 = 1 - 平均流行度
novelty = 1.0 - np.mean(scores)
novelty_scores.append(novelty)

return np.mean(novelty_scores)

Embedding 可视化

使用 t-SNE 或 UMAP 将高维 Embedding 降维到 2D 进行可视化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

def visualize_embeddings(embeddings, labels=None, title="Embedding Visualization"):
"""可视化 Embedding"""
# 使用 t-SNE 降维
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
embeddings_2d = tsne.fit_transform(embeddings)

# 绘制
plt.figure(figsize=(10, 8))
if labels is not None:
scatter = plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1],
c=labels, cmap='viridis', alpha=0.6)
plt.colorbar(scatter)
else:
plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], alpha=0.6)

plt.title(title)
plt.xlabel("t-SNE 1")
plt.ylabel("t-SNE 2")
plt.tight_layout()
plt.show()

在线评估

A/B 测试:将用户分为两组,一组使用旧模型,一组使用新模型,比较关键指标( CTR 、停留时间等)。

实时监控:监控 Embedding 的统计特性(均值、方差、相似度分布等),发现异常及时告警。

完整代码实现示例

端到端的推荐系统实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import torch
import torch.nn as nn
import numpy as np
from sklearn.model_selection import train_test_split
import faiss

class CompleteRecommendationSystem:
"""完整的推荐系统实现"""

def __init__(self, num_users, num_items, embedding_dim=128):
self.num_users = num_users
self.num_items = num_items
self.embedding_dim = embedding_dim

# 创建 Two-Tower 模型
self.model = TwoTowerModel(
user_feature_dim=50,
item_feature_dim=40,
embedding_dim=embedding_dim
)

# 创建 FAISS 索引
self.item_index = None
self.item_embeddings = None

def train(self, train_data, num_epochs=10, batch_size=256, learning_rate=0.001):
"""训练模型"""
# train_data: [(user_features, item_features, label), ...]

dataset = RecommendationDataset(
user_features=np.array([d[0] for d in train_data]),
item_features=np.array([d[1] for d in train_data]),
labels=np.array([d[2] for d in train_data])
)

train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

train_two_tower(self.model, train_loader, num_epochs, learning_rate)

def build_index(self, all_item_features):
"""构建物品索引"""
# 获取所有物品的 Embedding
self.model.eval()
with torch.no_grad():
item_features_tensor = torch.FloatTensor(all_item_features)
self.item_embeddings = self.model.get_item_embedding(item_features_tensor)
self.item_embeddings = self.item_embeddings.numpy()

# 创建 FAISS 索引
self.item_index = FAISSIndex(self.embedding_dim, index_type='IP')
self.item_index.add(self.item_embeddings)

def recommend(self, user_features, k=10):
"""为用户推荐物品"""
self.model.eval()
with torch.no_grad():
user_features_tensor = torch.FloatTensor(user_features).unsqueeze(0)
user_emb = self.model.get_user_embedding(user_features_tensor)
user_emb = user_emb.numpy()

# 使用 FAISS 搜索
distances, indices = self.item_index.search(user_emb, k=k)

return indices[0], distances[0]

def evaluate(self, test_data, k=10):
"""评估模型"""
recalls = []

for user_features, item_features, label in test_data:
if label == 1: # 只评估正样本
# 获取用户 Embedding
self.model.eval()
with torch.no_grad():
user_emb = self.model.get_user_embedding(
torch.FloatTensor(user_features).unsqueeze(0)
).numpy()

# 搜索 top-k
_, top_k_indices = self.item_index.search(user_emb, k=k)
top_k_indices = top_k_indices[0]

# 计算真实物品的相似度
item_emb = self.model.get_item_embedding(
torch.FloatTensor(item_features).unsqueeze(0)
).numpy()

similarity = np.dot(user_emb[0], item_emb[0])

# 检查是否在 top-k 中
# 这里简化处理,实际应该比较 item_id
recalls.append(1.0) # 简化版本

return np.mean(recalls) if recalls else 0.0

常见问题与解答

Q1: Embedding 维度如何选择?

A: Embedding 维度的选择需要在表达能力和计算效率之间平衡:

  • 小维度( 32-64):计算快,内存占用小,但表达能力有限
  • 中等维度( 128-256):平衡点,适合大多数场景
  • 大维度( 512+):表达能力强,但计算和存储成本高

建议从 128 维开始,根据实际效果调整。可以通过实验不同维度下的召回率来选择。

Q2: 如何处理冷启动问题?

A: 冷启动问题可以通过以下方式缓解:

  1. 内容特征:使用物品的内容特征(类别、描述等)初始化 Embedding
  2. 迁移学习:从相似物品或类别中迁移 Embedding
  3. 多任务学习:同时学习 Embedding 和内容理解任务
  4. 探索策略:对新物品给予更高的曝光机会

Q3: 用户 Embedding 和物品 Embedding 是否需要在同一空间?

A: 是的,为了计算相似度,用户和物品的 Embedding 必须在同一向量空间中。 Two-Tower 模型通过共享相同的输出维度来保证这一点。

Q4: 如何更新 Embedding?

A: 有几种更新策略:

  1. 全量重训:定期使用所有数据重新训练
  2. 增量更新:使用新数据微调模型
  3. 在线学习:实时更新 Embedding(需要谨慎,可能不稳定)

Q5: 负采样数量如何选择?

A: 负采样数量通常选择:

  • 小规模: 5-10 个负样本
  • 中大规模: 10-50 个负样本
  • 超大规模: 50-100+ 个负样本

可以通过实验选择最优值。注意负采样数量增加会提升训练时间。

Q6: Two-Tower 模型和深度协同过滤的区别?

A:

  • Two-Tower:用户和物品分别通过独立的网络,适合特征丰富的场景
  • 深度协同过滤:通常将用户 ID 和物品 ID 直接 Embedding,然后拼接输入网络

Two-Tower 更适合利用丰富的特征信息。

Q7: 如何加速训练?

A: 加速训练的方法:

  1. 批量内负采样:减少负采样计算
  2. 混合精度训练:使用 FP16 减少内存和加速
  3. 分布式训练:多 GPU/多机训练
  4. 梯度累积:模拟更大的 batch size

Q8: Embedding 是否需要归一化?

A: 取决于相似度计算方式:

  • 内积:如果使用内积计算相似度,建议归一化(等价于余弦相似度)
  • 欧氏距离:不需要归一化
  • 余弦相似度:必须归一化

归一化还能稳定训练过程。

Q9: 如何处理序列信息?

A: 处理序列信息的方法:

  1. 平均池化:对历史序列的 Embedding 求平均
  2. RNN/LSTM:使用循环神经网络处理序列
  3. Transformer:使用注意力机制捕获序列依赖
  4. Graph Neural Network:将序列建模为图

Q10: 如何评估 Embedding 质量?

A: 可以从多个维度评估:

  1. 离线指标: Recall@K 、 NDCG@K 、覆盖率等
  2. 相似度分布:检查相似度分布是否合理
  3. 可视化:使用 t-SNE 可视化 Embedding 分布
  4. 在线指标: CTR 、停留时间、转化率等

Q11: FAISS 、 Annoy 、 HNSW 如何选择?

A:

  • FAISS:功能最全,适合生产环境,需要编译
  • Annoy:最简单,适合中小规模,支持持久化
  • HNSW:精度和速度平衡最好,适合对精度要求高的场景

建议先尝试 HNSW,如果满足需求就使用;如果需要更多功能,考虑 FAISS 。

Q12: 如何处理多模态特征?

A: 多模态特征可以通过以下方式融合:

  1. 早期融合:在输入层拼接所有特征
  2. 晚期融合:每个模态单独处理,最后融合 Embedding
  3. 注意力机制:学习不同模态的权重

Q13: Embedding 可以用于其他任务吗?

A: 可以! Embedding 可以用于:

  1. 相似物品推荐:基于物品 Embedding 找相似物品
  2. 用户聚类:基于用户 Embedding 进行用户分群
  3. 异常检测:检测 Embedding 异常的物品或用户
  4. 可视化分析:理解用户和物品的分布

Q14: 如何解决 Embedding 的稀疏性问题?

A:

  1. 正则化: L2 正则化防止过拟合
  2. Dropout:在训练时随机丢弃部分特征
  3. 特征工程:提取更有意义的特征
  4. 预训练:使用大规模数据预训练 Embedding

Q15: 训练时出现 NaN 怎么办?

A: NaN 通常由以下原因引起:

  1. 学习率过大:降低学习率
  2. 梯度爆炸:使用梯度裁剪
  3. 数值不稳定:检查输入数据是否有异常值
  4. 损失函数:在损失函数中添加小的 epsilon 防止 log(0)

总结

Embedding 表示学习是推荐系统的核心技术之一。本文从基础理论出发,详细介绍了 Item2Vec 、 Node2Vec 、 Two-Tower 、 DSSM 等经典算法,并提供了完整的代码实现。同时,我们还深入探讨了负采样策略、 ANN 检索技术以及 Embedding 质量评估方法。

在实际应用中,需要根据具体场景选择合适的算法和参数。建议:

  1. 从小规模开始:先用小规模数据验证算法和流程
  2. 迭代优化:根据评估结果不断调整模型和参数
  3. 关注工程细节:负采样、索引构建、特征工程等细节对效果影响很大
  4. 持续监控:建立完善的监控体系,及时发现和解决问题

希望本文能够帮助读者深入理解推荐系统中的 Embedding 技术,并在实际项目中应用这些方法。

  • 本文标题:推荐系统(五)—— Embedding 表示学习
  • 本文作者:Chen Kai
  • 创建时间:2024-05-22 09:15:00
  • 本文链接:https://www.chenk.top/%E6%8E%A8%E8%8D%90%E7%B3%BB%E7%BB%9F%EF%BC%88%E4%BA%94%EF%BC%89%E2%80%94%E2%80%94-Embedding%E8%A1%A8%E7%A4%BA%E5%AD%A6%E4%B9%A0/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论