在推荐系统中,如何将用户和物品表示为计算机能够理解和计算的向量,是一个核心问题。
Embedding(嵌入)技术正是解决这一问题的关键。从早期的协同过滤到如今的深度学习推荐系统,
Embedding 技术经历了从简单到复杂、从浅层到深层的演进过程。
本文将深入探讨推荐系统中 Embedding
表示学习的理论基础与实践方法。我们会从最基础的 Embedding
概念出发,逐步深入到 Item2Vec 、 Node2Vec 、 Two-Tower 模型、 DSSM
等经典算法,并详细讲解负采样策略、 ANN 近邻检索技术,以及如何评估
Embedding
的质量。每个算法都会配有完整的代码实现,帮助读者真正理解和掌握这些技术。
Embedding 基础理论
什么是 Embedding
Embedding 是将高维稀疏的离散对象(如用户 ID 、物品 ID
、类别等)映射到低维稠密的连续向量空间中的技术。在推荐系统中,这些向量能够捕获对象之间的语义相似性:相似的物品在向量空间中距离更近,不相似的物品距离更远。
形式上,给定一个离散对象集合 , Embedding
函数 将每个对象 映射到一个 维向量 ,其中 通常远小于 。
Embedding 的优势
维度压缩 :原始的特征空间可能是百万甚至千万维的(每个
ID 对应一维),而 Embedding
将其压缩到几十到几百维,大大降低了存储和计算成本。
语义捕获 :通过训练, Embedding
能够学习到对象之间的潜在关系。例如,经常被同一用户点击的物品,其
Embedding 向量会更接近。
泛化能力 :对于训练集中未出现的新物品,可以通过其属性或其他信息初始化
Embedding,然后通过少量数据快速学习。
可计算性 :向量空间中的相似度计算(如余弦相似度、内积)比原始特征空间中的计算更高效。
Embedding 学习的基本原理
Embedding 学习的基本思路:相似的对象应该有相似的 Embedding
向量 。这个"相似性"的定义取决于具体的任务和算法:
协同过滤 :被同一用户交互过的物品相似
序列推荐 :在序列中相邻的物品相似
图结构 :在图中有边连接的节点相似
语义匹配 :在语义空间中匹配的查询和文档相似
损失函数与优化目标
大多数 Embedding 学习算法都可以归结为优化以下形式的损失函数:
$$
L = _{(i,j) } (f(_i, j), y {ij}) + () $$
其中:
是正样本对集合(相似的对象对)
是相似度函数(如内积、余弦相似度)
是标签(通常为 1 表示相似, 0 表示不相似)
是损失函数(如交叉熵、均方误差)
是正则化项
是所有对象的
Embedding 矩阵
Item2Vec 与 Word2Vec 类比
Word2Vec 回顾
Word2Vec 是 Mikolov 等人在 2013
年提出的词向量学习算法,它通过"上下文预测词"或"词预测上下文"的方式学习词的
Embedding 。 Word2Vec 有两种架构:
Skip-gram :给定中心词,预测上下文词 CBOW(
Continuous Bag of Words) :给定上下文词,预测中心词
Skip-gram 的目标函数为:
$$
L = {t=1}^{T} {-c j c, j } p(w_{t+j} | w_t) $$
其中 通过 softmax 计算:
$$
p(w_{t+j} | w_t) = $$
Item2Vec 的核心思想
Item2Vec 将 Word2Vec
的思想迁移到推荐系统:将用户的行为序列看作句子,将物品看作词 。如果两个物品经常在同一个用户的序列中出现(即"共现"),那么它们的
Embedding 应该相似。
Item2Vec 算法流程
构建序列 :将每个用户的历史交互记录按时间排序,形成物品序列
定义上下文 :对于序列中的每个物品,将其前后
个物品作为上下文
训练模型 :使用 Skip-gram 或 CBOW 架构,学习物品的
Embedding
Item2Vec 完整实现
问题背景
在推荐系统中,如何学习物品的有效表示是一个核心问题。传统的协同过滤方法(如矩阵分解)需要显式的用户-物品交互矩阵,但在实际应用中,我们往往只有用户的行为序列数据(如点击序列、购买序列)。这些序列数据蕴含着丰富的物品共现信息:如果两个物品经常在同一个用户的行为序列中出现,它们很可能具有相似的性质或满足相似的用户需求。然而,如何从这些序列数据中提取物品的语义表示,并捕获物品之间的相似性,是一个挑战。
解决思路
Item2Vec 借鉴了自然语言处理中 Word2Vec
的思想,将推荐问题转化为序列建模问题。核心洞察是:用户的行为序列可以类比为自然语言中的句子,每个物品类比为一个词。如果两个物品在序列中经常共现(出现在相近的位置),它们的
Embedding 向量应该相似。具体而言, Item2Vec 采用 Skip-gram
架构:对于序列中的每个中心物品,模型学习预测其上下文物品(窗口内的其他物品)。通过最大化正样本对的相似度、最小化负样本对的相似度,模型能够学习到物品的向量表示,使得语义相似的物品在向量空间中距离更近。
设计考虑
在实现 Item2Vec 时,有几个关键的设计决策需要考虑:
两套 Embedding 矩阵 :与 Word2Vec
类似,我们使用两套独立的 Embedding 矩阵——中心物品 Embedding 和上下文物品
Embedding
。这种设计提供了更大的模型容量,让模型能够更灵活地学习不同角色的表示。最终我们只使用中心物品
Embedding 作为物品的最终表示。
负采样策略 :直接计算所有物品的 softmax
在物品数量很大时计算开销巨大。负采样技术通过随机采样少量负样本(通常
5-20 个)来近似完整的 softmax,大幅提升训练效率。负采样分布采用频率的
3/4 次幂,既避免高频词被过度采样,又给低频词一定的学习机会。
窗口大小选择 :上下文窗口大小决定了模型考虑多远的共现关系。窗口太小(如
1-2)只能捕获局部共现,窗口太大(如
10+)可能引入噪声。通常窗口大小设置为
3-5,在捕获共现关系和计算效率之间取得平衡。
数值稳定性 :内积计算可能产生很大的值,导致 sigmoid
函数溢出。需要在计算损失前对得分进行裁剪(
clamp),限制在合理范围内(如[-10, 10])。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 import numpy as npimport torchimport torch.nn as nnimport torch.optim as optimfrom collections import defaultdictfrom tqdm import tqdmclass Item2Vec (nn.Module): """ Item2Vec 模型实现 基于 Skip-gram 架构,通过预测上下文物品来学习物品的向量表示 核心思想:如果两个物品经常在同一个用户的行为序列中出现(共现), 那么它们的 Embedding 向量应该相似 """ def __init__ (self, vocab_size, embedding_dim ): """ 初始化 Item2Vec 模型 参数: vocab_size: 物品词汇表大小(即物品总数) embedding_dim: Embedding 向量的维度 """ super (Item2Vec, self).__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.center_embeddings = nn.Embedding(vocab_size, embedding_dim) self.context_embeddings = nn.Embedding(vocab_size, embedding_dim) self.center_embeddings.weight.data.uniform_(-0.5 / embedding_dim, 0.5 / embedding_dim) self.context_embeddings.weight.data.zero_() def forward (self, center_words, context_words, neg_words ): """ 前向传播:计算损失函数 使用负采样( Negative Sampling)技术简化 softmax 计算: - 对于每个正样本对(中心词-上下文词),最大化它们的相似度 - 对于每个负样本对(中心词-负样本词),最小化它们的相似度 参数: center_words: [batch_size] 中心物品 ID context_words: [batch_size] 上下文物品 ID(正样本) neg_words: [batch_size, num_neg] 负采样物品 ID 返回: loss: 标量,负对数似然损失 """ center_emb = self.center_embeddings(center_words) context_emb = self.context_embeddings(context_words) neg_emb = self.context_embeddings(neg_words) pos_score = torch.sum (center_emb * context_emb, dim=1 ) pos_score = torch.clamp(pos_score, max =10 , min =-10 ) neg_score = torch.bmm(neg_emb, center_emb.unsqueeze(2 )).squeeze(2 ) neg_score = torch.clamp(neg_score, max =10 , min =-10 ) pos_loss = -torch.log(torch.sigmoid(pos_score) + 1e-10 ) neg_loss = -torch.log(torch.sigmoid(-neg_score) + 1e-10 ).sum (dim=1 ) return (pos_loss + neg_loss).mean() def get_embeddings (self ): """ 获取最终的物品 Embedding 通常我们使用中心词 Embedding 作为最终的物品表示, 因为它直接对应于我们要建模的物品 返回: embeddings: numpy array,形状 [vocab_size, embedding_dim] """ return self.center_embeddings.weight.data.cpu().numpy() class Item2VecTrainer : """ Item2Vec 训练器 负责管理整个训练流程,包括: 1. 从用户行为序列生成训练样本 2. 构建负采样分布 3. 执行训练过程 """ def __init__ (self, sequences, vocab_size, embedding_dim=128 , window_size=5 , num_negatives=5 , batch_size=256 , learning_rate=0.01 ): """ 初始化训练器 参数: sequences: 用户行为序列列表,每个序列是物品 ID 的列表 例如: [[0,1,2,3], [1,2,4,5], ...] vocab_size: 物品词汇表大小(物品总数) embedding_dim: Embedding 维度 window_size: 上下文窗口大小,表示中心词前后各取多少个词 例如 window_size=2 表示取前后各 2 个词 num_negatives: 负采样数量,每个正样本对应的负样本数 batch_size: 训练批次大小 learning_rate: 学习率 """ self.sequences = sequences self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.window_size = window_size self.num_negatives = num_negatives self.batch_size = batch_size self.word_freq = self._build_word_freq() self.word_dist = self._build_word_distribution() self.model = Item2Vec(vocab_size, embedding_dim) self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate) self.train_pairs = self._generate_training_pairs() def _build_word_freq (self ): """ 统计词频 遍历所有序列,统计每个物品出现的次数 这个频率将用于构建负采样分布 返回: word_freq: 字典,{物品 ID: 出现次数} """ word_freq = defaultdict(int ) for seq in self.sequences: for word in seq: word_freq[word] += 1 return dict (word_freq) def _build_word_distribution (self ): """ 构建负采样分布(使用幂律分布) Word2Vec 论文建议使用频率的 3/4 次幂作为采样概率, 这样可以平衡高频词和低频词的采样概率: - 如果直接使用频率,高频词会被过度采样 - 如果使用均匀分布,低频词会被过度采样 - 使用 0.75 次幂是一个经验性的折中方案 返回: word_dist: numpy array,每个物品的采样概率 """ word_freq_array = np.array([self.word_freq.get(i, 0 ) for i in range (self.vocab_size)]) word_dist = np.power(word_freq_array, 0.75 ) word_dist = word_dist / word_dist.sum () return word_dist def _generate_training_pairs (self ): """ 生成训练样本对 对每个序列中的每个物品,提取其上下文物品, 形成(中心物品,上下文物品)训练样本对 例如:序列 [0, 1, 2, 3, 4], window_size=2 - 物品 2 的上下文:[0, 1, 3, 4] - 生成样本对:(2,0), (2,1), (2,3), (2,4) 返回: pairs: 训练样本对列表 [(中心词, 上下文词), ...] """ pairs = [] for seq in self.sequences: for i, center_word in enumerate (seq): start = max (0 , i - self.window_size) end = min (len (seq), i + self.window_size + 1 ) for j in range (start, end): if j != i: context_word = seq[j] pairs.append((center_word, context_word)) return pairs def _negative_sampling (self, center_words, num_samples ): """ 负采样 对每个中心词,根据构建的分布随机采样 num_samples 个负样本 负样本是那些不在当前中心词上下文中的物品 参数: center_words: numpy array,中心词 ID 列表 num_samples: 每个中心词对应的负样本数量 返回: neg_samples: numpy array,形状 [batch_size, num_samples] """ batch_size = len (center_words) neg_samples = [] for _ in range (num_samples): sampled = np.random.choice( self.vocab_size, size=batch_size, p=self.word_dist ) neg_samples.append(sampled) return np.array(neg_samples).T def train (self, num_epochs=10 ): """ 训练模型 执行多个 epoch 的训练,每个 epoch 中: 1. 打乱训练样本(提高泛化能力) 2. 按 batch 训练 3. 进行负采样 4. 计算损失并更新参数 参数: num_epochs: 训练轮数 返回: embeddings: 训练好的物品 Embedding """ self.model.train() for epoch in range (num_epochs): total_loss = 0 num_batches = 0 np.random.shuffle(self.train_pairs) for i in tqdm(range (0 , len (self.train_pairs), self.batch_size), desc=f"Epoch {epoch+1 } /{num_epochs} " ): batch_pairs = self.train_pairs[i:i+self.batch_size] center_words = torch.LongTensor([p[0 ] for p in batch_pairs]) context_words = torch.LongTensor([p[1 ] for p in batch_pairs]) neg_words = self._negative_sampling(center_words.numpy(), self.num_negatives) neg_words = torch.LongTensor(neg_words) self.optimizer.zero_grad() loss = self.model(center_words, context_words, neg_words) loss.backward() self.optimizer.step() total_loss += loss.item() num_batches += 1 avg_loss = total_loss / num_batches print (f"Epoch {epoch+1 } , Average Loss: {avg_loss:.4 f} " ) return self.model.get_embeddings() def example_item2vec (): """ Item2Vec 使用示例 展示如何使用 Item2VecTrainer 训练模型并获取物品 Embedding """ sequences = [ [0 , 1 , 2 , 3 , 4 ], [1 , 2 , 5 , 6 , 7 ], [0 , 3 , 4 , 8 , 9 ], [2 , 5 , 6 , 10 , 11 ], ] vocab_size = 12 embedding_dim = 64 trainer = Item2VecTrainer( sequences=sequences, vocab_size=vocab_size, embedding_dim=embedding_dim, window_size=2 , num_negatives=5 , batch_size=32 , learning_rate=0.01 ) embeddings = trainer.train(num_epochs=5 ) print (f"Embedding shape: {embeddings.shape} " ) print (f"Item 0 embedding (前 5 维): {embeddings[0 ][:5 ]} " ) def cosine_similarity (a, b ): """ 计算两个向量的余弦相似度 余弦相似度 = 向量内积 / (向量模长的乘积) 取值范围 [-1, 1],越接近 1 表示越相似 """ return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) item_0_emb = embeddings[0 ] similarities = [cosine_similarity(item_0_emb, embeddings[i]) for i in range (vocab_size)] top_k = np.argsort(similarities)[::-1 ][:5 ] print (f"与物品 0 最相似的 5 个物品: {top_k} " )
关键点解读
Item2Vec 的实现包含几个关键组件,每个组件都有其特定的作用:
序列构建与窗口采样 :将用户的历史交互按时间排序形成物品序列,这是模型的基础数据。对于序列中的每个中心物品,我们提取其窗口内的其他物品作为上下文。window_size=2
表示考虑前后各 2
个物品,这样既能捕获局部共现关系,又不会引入过多噪声。窗口大小的选择需要在捕获共现关系和计算效率之间权衡:窗口越大,训练样本越多,但可能引入不相关的共现。
负采样机制 :负采样是 Item2Vec
训练效率的关键。每个正样本对(中心物品-上下文物品)需要对应多个负样本对(中心物品-随机物品)。num_negatives=5
表示每个正样本对应 5 个负样本,这个数量通常设置为 5-20
之间。负样本数量越多,模型对不相似物品的区分能力越强,但训练时间也线性增长。
两套 Embedding 矩阵 :中心物品和上下文物品使用独立的
Embedding 矩阵,这是 Word2Vec
论文中的设计。这种设计提供了更大的模型容量,让模型能够学习不同角色的表示。虽然增加了参数量,但实验表明这种设计能够提升模型效果。最终我们只使用中心物品
Embedding 作为物品的最终表示。
损失函数设计 :使用负对数似然损失,对正样本最大化
sigmoid(内积),对负样本最小化
sigmoid(内积)。这种设计使得相似物品的内积增大,不相似物品的内积减小,从而学习到有意义的
Embedding 。
设计权衡
在 Item2Vec 的实现中,存在多个设计权衡:
窗口大小 vs
计算效率 :窗口越大,能够捕获更远的共现关系,但训练样本数量会急剧增加(
O(n ²))。通常窗口大小设置为 3-5,在效果和效率之间取得平衡。
负采样数量 vs
训练时间 :负采样数量越多,模型对负样本的区分能力越强,但每个
batch 的计算时间也线性增长。通常设置为 5-20
个,在效果和效率之间权衡。
Embedding 维度 vs
表达能力 :维度越高,模型的表达能力越强,但参数量也越大,且可能过拟合。通常设置为
64-256 维,根据物品数量和计算资源选择。
频率分布 vs 均匀分布 :负采样使用频率的 3/4
次幂分布,既避免高频词被过度采样,又给低频词学习机会。如果使用均匀分布,低频词会被过度采样;如果直接使用频率,高频词会被过度采样。
常见问题
如何处理冷启动物品? 对于训练集中未出现的新物品,
Item2Vec 无法直接学习其 Embedding 。常见的解决方案包括:(
1)使用物品的内容特征(如类别、标签)通过额外的神经网络生成初始
Embedding;( 2)使用相似物品的 Embedding 作为初始化;(
3)在训练时为新物品分配随机初始化的
Embedding,通过少量交互快速学习。
数值溢出问题 :内积计算可能产生很大的值(特别是
Embedding 未归一化时),导致 sigmoid 函数溢出。代码中使用
torch.clamp 将得分限制在 [-10, 10]
范围内,这是一个重要的数值稳定性技巧。
负样本重复问题 :随机负采样可能采样到正样本(即上下文窗口内的物品)。严格来说应该排除正样本,但在物品数量很大时(如百万级),采样到正样本的概率很小(通常<0.1%),可以忽略。如果物品数量较小,可以在采样时排除正样本。
序列长度不一致 :实际应用中用户序列长度差异很大,从几个物品到数百个物品不等。常见的处理方式包括:(
1)截断:只保留最近 N 个物品(如最近 50 个);(
2)填充:将短序列补齐到固定长度;(
3)动态批处理:将相似长度的序列放在同一个 batch 中。
如何处理时间信息? Item2Vec
忽略了序列中的时间信息,将窗口内的所有物品同等对待。如果需要考虑时间衰减,可以在损失函数中为不同位置的上下文物品赋予不同的权重,距离中心物品越远权重越小。
使用示例
下面的示例展示了如何使用 Item2Vec 训练模型并获取物品 Embedding:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 sequences = [ [0 , 1 , 2 , 3 , 4 ], [1 , 2 , 5 , 6 , 7 ], [0 , 3 , 4 , 8 , 9 ], ] trainer = Item2VecTrainer( sequences=sequences, vocab_size=12 , embedding_dim=64 , window_size=2 , num_negatives=5 , batch_size=32 , learning_rate=0.01 ) embeddings = trainer.train(num_epochs=10 ) item_emb = embeddings[0 ] similarities = [cosine_similarity(item_emb, embeddings[i]) for i in range (12 )] top_k_items = np.argsort(similarities)[::-1 ][:5 ]
在实际应用中, Item2Vec 通常作为特征提取器,学习到的 Embedding
可以用于:( 1)相似物品推荐;( 2)作为下游模型的输入特征;(
3)物品聚类和可视化分析。
Item2Vec 的优缺点
优点 :
简单直观,易于理解和实现
不需要额外的特征信息,只需要用户行为序列
能够捕获物品之间的共现关系
缺点 :
忽略了用户信息,所有用户共享同一套物品 Embedding
对于新物品(冷启动)效果较差
序列中的位置信息利用不充分(只考虑窗口内的共现)
Node2Vec 图嵌入
图嵌入的基本概念
在推荐系统中,用户和物品可以构成一个二部图( Bipartite
Graph):用户和物品是两类节点,交互行为是边。图嵌入的目标是将图中的节点映射到低维向量空间,使得图中相似的节点(如连接相似、结构相似)在向量空间中距离更近。
Node2Vec 算法原理
Node2Vec 是 Grover 和 Leskovec 在 2016
年提出的图嵌入算法,它通过有偏随机游走( Biased Random
Walk)来生成节点的上下文,然后使用 Skip-gram 学习节点的 Embedding 。
有偏随机游走 : Node2Vec
的关键创新在于定义了一种灵活的游走策略,通过参数 和
控制游走的偏向性:
( return parameter):控制回到上一个节点的概率
( in-out parameter):控制走向远离起始节点的概率
从节点 游走到下一个节点 的未归一化概率为:
其中 是节点 (上一个节点)到节点 的最短距离。
Node2Vec 完整实现
问题背景
在推荐系统中,用户和物品之间的交互可以自然地建模为图结构:用户和物品是两类节点,交互行为是边。这种图结构蕴含着丰富的语义信息:连接相似的用户(如都购买了相同物品)可能具有相似的兴趣;结构相似的物品(如被相似用户购买)可能具有相似的性质。然而,如何从图结构中学习节点(用户和物品)的有效表示,使得相似节点在向量空间中距离更近,是一个挑战。传统的图嵌入方法(如
DeepWalk)使用标准的随机游走生成节点序列,但这种方法无法灵活地控制游走策略,难以同时捕获图的局部结构(如社区结构)和全局结构(如节点角色)。
解决思路
Node2Vec 通过引入有偏随机游走( Biased Random
Walk)来解决这个问题。基本思路:通过两个参数 p 和 q
控制游走策略,使得游走过程能够在深度优先搜索( DFS)和广度优先搜索(
BFS)之间灵活切换。具体而言, p( return
parameter)控制返回上一个节点的概率, q( in-out
parameter)控制探索远离起始节点的概率。当 p 较小、 q 较大时,游走倾向于
BFS,能够捕获局部社区结构;当 p 较大、 q 较小时,游走倾向于
DFS,能够捕获全局结构。通过这种灵活的游走策略, Node2Vec
能够生成多样化的节点序列,然后使用 Skip-gram 学习节点的
Embedding,使得结构相似或连接相似的节点在向量空间中距离更近。
设计考虑
在实现 Node2Vec 时,需要考虑以下几个关键设计:
有偏游走策略 :游走概率的计算需要考虑上一个节点、当前节点和候选节点的关系。对于每个候选节点,需要计算其与上一个节点的最短距离,然后根据距离赋予不同的权重。这种设计使得游走过程能够灵活地在局部探索和全局探索之间平衡。
图预处理 :为了加速游走生成,需要预先计算并存储每个节点的邻居信息。这样可以避免在训练时重复查询图结构,显著提高游走生成的速度。对于大规模图,这种预处理是必要的。
游走序列生成 :对每个节点执行多次游走(通常 10-50
次),每次游走生成长度固定的序列(通常
50-100)。游走次数越多,能够捕获的图结构信息越丰富,但计算开销也越大。
Skip-gram 训练 :将生成的游走序列看作"句子",使用
Skip-gram 学习节点 Embedding 。这与 Item2Vec
的训练过程类似,但数据来源不同: Item2Vec 使用用户行为序列, Node2Vec
使用图上的游走序列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 import numpy as npimport networkx as nximport torchimport torch.nn as nnimport torch.optim as optimfrom collections import defaultdictfrom tqdm import tqdmimport randomclass Node2Vec : """ Node2Vec 图嵌入算法 核心创新:有偏随机游走( Biased Random Walk) - 参数 p 控制返回上一个节点的概率( return parameter) - 参数 q 控制探索远离起点的概率( in-out parameter) - p=q=1 时退化为标准的随机游走 """ def __init__ (self, graph, dimensions=128 , walk_length=80 , num_walks=10 , p=1.0 , q=1.0 , window_size=10 , num_negatives=5 ): """ 初始化 Node2Vec 参数: graph: NetworkX 图对象(可以是有向图或无向图) dimensions: Embedding 维度 walk_length: 每次游走的长度(生成的节点序列长度) num_walks: 每个节点作为起点的游走次数 p: return parameter,控制返回上一个节点的倾向 - p < 1: 倾向于返回上一个节点(类似 BFS,关注局部结构) - p > 1: 不倾向返回(类似 DFS,关注全局结构) q: in-out parameter,控制探索新节点的倾向 - q < 1: 倾向于探索远离起点的节点( DFS) - q > 1: 倾向于探索起点附近的节点( BFS) window_size: Skip-gram 的上下文窗口大小 num_negatives: 负采样数量 """ self.graph = graph self.dimensions = dimensions self.walk_length = walk_length self.num_walks = num_walks self.p = p self.q = q self.window_size = window_size self.num_negatives = num_negatives self._preprocess_graph() self.walks = self._generate_walks() self.train_pairs = self._generate_training_pairs() self.node_freq = self._build_node_freq() self.node_dist = self._build_node_distribution() def _preprocess_graph (self ): """ 预处理图:为每个节点存储邻居信息 将邻居信息提前存储在字典中,避免训练时重复查询图结构, 显著提高游走生成的速度 """ self.neighbors = {} for node in self.graph.nodes(): self.neighbors[node] = list (self.graph.neighbors(node)) def _generate_walks (self ): """ 生成有偏随机游走序列 对每个节点,执行 num_walks 次游走,每次游走长度为 walk_length 总共生成 num_nodes * num_walks 条游走序列 返回: walks: 游走序列列表,每个序列是节点 ID 的列表 """ walks = [] nodes = list (self.graph.nodes()) for _ in range (self.num_walks): random.shuffle(nodes) for node in nodes: walk = self._node2vec_walk(node) walks.append(walk) return walks def _node2vec_walk (self, start_node ): """ 从起始节点开始的有偏随机游走 这是 Node2Vec 的核心算法:通过参数 p 和 q 控制游走的偏向性 游走策略: 1. 第一步:从起点随机选择一个邻居 2. 后续步骤:根据 p 和 q 参数计算每个邻居的权重,按权重采样 权重计算规则(设当前在节点 v,上一个节点是 t,候选下一个节点是 x): - 如果 x == t(返回上一个节点):权重 = 1/p - 如果 x 与 t 相邻(距离为 1):权重 = 1 - 如果 x 不与 t 相邻(距离为 2):权重 = 1/q 参数: start_node: 起始节点 ID 返回: walk: 游走序列(节点 ID 列表) """ walk = [start_node] if len (self.neighbors[start_node]) == 0 : return walk first_step = random.choice(self.neighbors[start_node]) walk.append(first_step) for _ in range (self.walk_length - 2 ): current = walk[-1 ] prev = walk[-2 ] neighbors = self.neighbors[current] if len (neighbors) == 0 : break weights = [] for neighbor in neighbors: if neighbor == prev: weight = 1.0 / self.p elif neighbor in self.neighbors[prev]: weight = 1.0 else : weight = 1.0 / self.q weights.append(weight) weights = np.array(weights) weights = weights / weights.sum () next_node = np.random.choice(neighbors, p=weights) walk.append(next_node) return walk def _generate_training_pairs (self ): """ 从游走序列生成训练样本对 与 Item2Vec 类似,将游走序列中的节点对转换为训练样本 每个节点作为中心节点,其窗口内的节点作为上下文节点 """ pairs = [] for walk in self.walks: for i, center_node in enumerate (walk): start = max (0 , i - self.window_size) end = min (len (walk), i + self.window_size + 1 ) for j in range (start, end): if j != i: context_node = walk[j] pairs.append((center_node, context_node)) return pairs def _build_node_freq (self ): """ 统计节点频率 遍历所有游走序列,统计每个节点出现的次数 用于构建负采样分布 """ node_freq = defaultdict(int ) for walk in self.walks: for node in walk: node_freq[node] += 1 return dict (node_freq) def _build_node_distribution (self ): """ 构建负采样分布 同样使用 0.75 次幂的频率分布 """ num_nodes = len (self.graph.nodes()) node_freq_array = np.array([ self.node_freq.get(i, 0 ) for i in range (num_nodes) ]) node_dist = np.power(node_freq_array + 1e-8 , 0.75 ) node_dist = node_dist / node_dist.sum () return node_dist def _negative_sampling (self, center_nodes, num_samples ): """ 负采样:为每个中心节点采样负样本节点 """ batch_size = len (center_nodes) num_nodes = len (self.graph.nodes()) neg_samples = [] for _ in range (num_samples): sampled = np.random.choice( num_nodes, size=batch_size, p=self.node_dist ) neg_samples.append(sampled) return np.array(neg_samples).T def train (self, batch_size=256 , num_epochs=10 , learning_rate=0.01 ): """ 训练 Node2Vec 模型 创建节点到索引的映射,然后使用 Skip-gram 训练节点 Embedding """ num_nodes = len (self.graph.nodes()) self.node_to_idx = {node: idx for idx, node in enumerate (self.graph.nodes())} self.idx_to_node = {idx: node for node, idx in self.node_to_idx.items()} train_pairs_idx = [ (self.node_to_idx[p[0 ]], self.node_to_idx[p[1 ]]) for p in self.train_pairs ] model = Node2VecModel(num_nodes, self.dimensions) optimizer = optim.Adam(model.parameters(), lr=learning_rate) model.train() for epoch in range (num_epochs): total_loss = 0 num_batches = 0 np.random.shuffle(train_pairs_idx) for i in tqdm(range (0 , len (train_pairs_idx), batch_size), desc=f"Epoch {epoch+1 } /{num_epochs} " ): batch_pairs = train_pairs_idx[i:i+batch_size] center_nodes = torch.LongTensor([p[0 ] for p in batch_pairs]) context_nodes = torch.LongTensor([p[1 ] for p in batch_pairs]) neg_nodes = self._negative_sampling( center_nodes.numpy(), self.num_negatives ) neg_nodes = torch.LongTensor(neg_nodes) optimizer.zero_grad() loss = model(center_nodes, context_nodes, neg_nodes) loss.backward() optimizer.step() total_loss += loss.item() num_batches += 1 avg_loss = total_loss / num_batches print (f"Epoch {epoch+1 } , Average Loss: {avg_loss:.4 f} " ) embeddings = model.get_embeddings() self.node_embeddings = { self.idx_to_node[i]: embeddings[i] for i in range (num_nodes) } return self.node_embeddings class Node2VecModel (nn.Module): """ Node2Vec 的神经网络模型 与 Item2Vec 的结构完全相同,只是语义上是节点而非物品 """ def __init__ (self, num_nodes, embedding_dim ): super (Node2VecModel, self).__init__() self.num_nodes = num_nodes self.embedding_dim = embedding_dim self.center_embeddings = nn.Embedding(num_nodes, embedding_dim) self.context_embeddings = nn.Embedding(num_nodes, embedding_dim) self.center_embeddings.weight.data.uniform_(-0.5 / embedding_dim, 0.5 / embedding_dim) self.context_embeddings.weight.data.zero_() def forward (self, center_nodes, context_nodes, neg_nodes ): """ 前向传播:与 Item2Vec 完全相同的损失函数 """ center_emb = self.center_embeddings(center_nodes) context_emb = self.context_embeddings(context_nodes) neg_emb = self.context_embeddings(neg_nodes) pos_score = torch.sum (center_emb * context_emb, dim=1 ) pos_score = torch.clamp(pos_score, max =10 , min =-10 ) neg_score = torch.bmm(neg_emb, center_emb.unsqueeze(2 )).squeeze(2 ) neg_score = torch.clamp(neg_score, max =10 , min =-10 ) pos_loss = -torch.log(torch.sigmoid(pos_score) + 1e-10 ) neg_loss = -torch.log(torch.sigmoid(-neg_score) + 1e-10 ).sum (dim=1 ) return (pos_loss + neg_loss).mean() def get_embeddings (self ): """获取节点 Embedding""" return self.center_embeddings.weight.data.cpu().numpy() def example_node2vec (): """ Node2Vec 使用示例 构建一个用户-物品二部图,然后学习节点的 Embedding """ G = nx.Graph() users = [0 , 1 , 2 , 3 , 4 ] items = list (range (10 , 20 )) edges = [ (0 , 10 ), (0 , 11 ), (0 , 12 ), (1 , 11 ), (1 , 12 ), (1 , 13 ), (2 , 12 ), (2 , 13 ), (2 , 14 ), (3 , 13 ), (3 , 14 ), (3 , 15 ), (4 , 14 ), (4 , 15 ), (4 , 16 ), ] G.add_edges_from(edges) node2vec = Node2Vec( graph=G, dimensions=64 , walk_length=20 , num_walks=10 , p=1.0 , q=1.0 , window_size=5 , num_negatives=5 ) embeddings = node2vec.train( batch_size=128 , num_epochs=10 , learning_rate=0.01 ) print (f"Embedding for node 0: {embeddings[0 ][:5 ]} " ) print (f"Embedding for node 10: {embeddings[10 ][:5 ]} " )
关键点解读
Node2Vec 的实现包含几个关键组件,每个组件都有其特定的作用:
有偏随机游走机制 :这是 Node2Vec
的核心创新。游走概率的计算需要考虑上一个节点、当前节点和候选节点的关系。对于每个候选节点,根据其与上一个节点的最短距离赋予不同的权重:如果候选节点是上一个节点(距离为
0),权重为 1/p;如果候选节点与上一个节点相邻(距离为 1),权重为
1;如果候选节点与上一个节点不相邻(距离为 2),权重为 1/q 。通过调整 p
和 q,可以控制游走策略: p 小(如 0.5)时倾向返回上一个节点,类似
BFS,能够捕获局部社区结构; q 小(如 0.5)时倾向探索远离起点的节点,类似
DFS,能够捕获全局结构。
图预处理优化 :为了加速游走生成,代码预先计算并存储每个节点的邻居信息。这样可以避免在训练时重复查询图结构,显著提高游走生成的速度。对于大规模图(百万级节点),这种预处理是必要的,可以将游走生成时间从数小时降低到数分钟。
游走序列生成 :对每个节点执行多次游走(num_walks,通常
10-50 次),每次游走生成长度固定的序列(walk_length,通常
50-100)。游走次数越多,能够捕获的图结构信息越丰富,但计算开销也越大。通常
num_walks * walk_length
的乘积决定了生成的序列数量,需要在效果和效率之间权衡。
Skip-gram 训练 :将生成的游走序列看作"句子",使用
Skip-gram 学习节点 Embedding 。这与 Item2Vec
的训练过程类似,但数据来源不同: Item2Vec 使用用户行为序列, Node2Vec
使用图上的游走序列。通过最大化正样本对的相似度、最小化负样本对的相似度,模型能够学习到节点的向量表示。
设计权衡
在 Node2Vec 的实现中,存在多个设计权衡:
游走策略 vs
计算复杂度 :有偏游走需要计算每个候选节点与上一个节点的最短距离,这在稠密图中计算开销较大。对于大规模图,可以使用近似方法(如只考虑
2 跳邻居)来加速计算。
游走数量 vs 训练时间 :num_walks 和
walk_length 的乘积决定了生成的序列数量。数量越多,
Embedding 质量越好,但训练时间也线性增长。通常设置
num_walks=10-50,walk_length=50-100,在效果和效率之间取得平衡。
p 和 q
参数选择 :这两个参数控制游走策略,没有通用的最优值,需要根据具体任务调整。对于同质性强的图(如社交网络),倾向于使用较小的
p 和较大的 q;对于结构等价性强的图(如知识图谱),倾向于使用较大的 p
和较小的 q 。建议从 p=1, q=1 开始,然后尝试不同的组合。
窗口大小 vs 上下文范围 : Skip-gram
的窗口大小决定了模型考虑多远的上下文。窗口越大,能够捕获更远的共现关系,但训练样本也会急剧增加。通常窗口大小设置为
5-10,在捕获共现关系和计算效率之间权衡。
常见问题
如何处理有向图? Node2Vec
默认支持无向图,对于有向图,需要修改游走策略:只考虑出边邻居,忽略入边邻居。这样可以保持游走的方向性,但可能丢失一些结构信息。
如何处理加权图?
对于加权图,可以在游走概率计算中考虑边的权重:权重越大的边,被选择的概率越高。这样可以使得游走更倾向于沿着重要边进行,捕获更重要的结构信息。
如何处理大规模图?
对于百万级甚至千万级节点的大规模图,完整的游走生成可能非常耗时。可以采用以下优化策略:(
1)并行化游走生成,利用多核 CPU 或 GPU 加速;(
2)使用采样方法,只对部分节点进行游走;( 3)使用近似方法,如只考虑 2
跳邻居。
如何选择 Embedding 维度? Embedding
维度影响模型的表达能力和计算开销。维度越高,模型的表达能力越强,但参数量也越大,且可能过拟合。通常设置为
64-256
维,根据节点数量和计算资源选择。对于大规模图,可以使用较小的维度(如
64-128)以节省内存。
如何处理新节点? 对于训练集中未出现的新节点,
Node2Vec 无法直接学习其 Embedding 。常见的解决方案包括:(
1)使用节点的特征(如度、邻居特征)通过额外的神经网络生成初始
Embedding;( 2)使用相似节点的 Embedding 作为初始化;(
3)在训练时为新节点分配随机初始化的
Embedding,通过少量交互快速学习。
使用示例
下面的示例展示了如何使用 Node2Vec 训练模型并获取节点 Embedding:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 G = nx.Graph() edges = [(0 , 10 ), (0 , 11 ), (1 , 11 ), (1 , 12 ), ...] G.add_edges_from(edges) node2vec = Node2Vec( graph=G, dimensions=64 , walk_length=20 , num_walks=10 , p=1.0 , q=1.0 , window_size=5 , num_negatives=5 ) embeddings = node2vec.train(batch_size=128 , num_epochs=10 ) user_emb = embeddings[0 ] similarities = [cosine_similarity(user_emb, embeddings[i]) for i in range (num_nodes)] top_k_users = np.argsort(similarities)[::-1 ][:5 ]
在实际应用中, Node2Vec 通常用于:( 1)学习用户和物品的
Embedding,用于相似度计算和推荐;( 2)作为下游模型的输入特征;(
3)节点聚类和可视化分析;( 4)图结构分析和社区发现。
节点索引映射 : NetworkX 的节点 ID
可以是任意类型(字符串、整数等),但 PyTorch 的 Embedding
层需要连续的整数索引,所以需要建立映射。
可能的坑 :
内存占用 :游走序列会占用大量内存。如果图很大,考虑分批生成游走或使用流式处理。
孤立节点 :如果图中有孤立节点(没有邻居),游走会立即结束。需要在构图时注意处理。
有向图 vs 无向图 : Node2Vec
可以用于有向图和无向图。推荐系统中的用户-物品图通常是无向图。
Node2Vec 参数调优
p 和 q 的选择 :
小、 小:倾向于深度优先搜索( DFS),捕获结构相似性
大、 大:倾向于广度优先搜索(
BFS),捕获同质性(连接相似性)
:标准的随机游走
walk_length 和 num_walks :
walk_length:每次游走的长度,通常设置为 40-100
num_walks:每个节点开始的游走次数,通常设置为
10-20
Two-Tower 模型详解
Two-Tower 架构概述
Two-Tower(双塔)模型是推荐系统中广泛使用的架构,它将用户和物品分别通过两个独立的"塔"(
Tower)网络映射到同一个向量空间,然后通过向量相似度(如内积、余弦相似度)计算匹配分数。
架构特点 :
用户塔( User Tower) :输入用户特征,输出用户
Embedding
物品塔( Item Tower) :输入物品特征,输出物品
Embedding
相似度计算 :用户 Embedding 和物品 Embedding
的内积或余弦相似度
Two-Tower 的数学形式
给定用户特征
和物品特征 , Two-Tower
模型计算:
$$
s(u, i) = _u^T _i $$
其中 和 分别是用户塔和物品塔的网络, 和 是各自的参数。
Two-Tower 完整实现
问题背景
在推荐系统中,需要计算用户对物品的兴趣分数,以决定是否向用户推荐某个物品。传统的协同过滤方法(如矩阵分解)需要显式的用户-物品交互矩阵,但在实际应用中,我们往往有丰富的用户特征(如年龄、性别、历史行为)和物品特征(如类别、标签、内容描述)。如何利用这些特征来学习用户和物品的表示,并高效地计算匹配分数,是一个挑战。此外,在实际的推荐系统中,需要在毫秒级时间内为每个用户从百万甚至千万级物品库中检索出最相关的物品,这就要求模型能够支持高效的相似度检索。
解决思路
Two-Tower(双塔)模型通过将用户和物品分别通过两个独立的神经网络("塔")映射到同一个低维向量空间,然后通过向量相似度(如内积、余弦相似度)计算匹配分数。这种设计的核心优势在于:(
1)用户塔和物品塔可以独立设计,分别处理不同类型的特征;(
2)训练完成后,可以预先计算所有物品的 Embedding
并建立索引,在推理时只需要计算用户 Embedding,然后通过高效的向量检索(如
FAISS)找到最相似的物品,大幅提升检索效率;(
3)两个塔的参数可以独立更新,训练过程更加灵活。
设计考虑
在实现 Two-Tower 模型时,需要考虑以下几个关键设计:
塔的网络结构 :用户塔和物品塔通常采用多层全连接网络,每层包含线性变换、批归一化、激活函数和
Dropout
。网络深度和宽度需要根据特征维度和数据规模调整:特征维度高、数据量大时可以使用更深的网络;特征维度低、数据量小时可以使用较浅的网络以避免过拟合。
Embedding
归一化 :为了使用余弦相似度,通常对用户和物品的 Embedding 进行
L2 归一化。归一化后的内积等价于余弦相似度,取值范围为 [-1,
1]。归一化还可以稳定训练过程,避免 Embedding 的模长过大。
损失函数选择 : Two-Tower
模型可以使用多种损失函数:( 1)二元交叉熵损失(
BCE),适用于点击率预估任务;( 2)对比学习损失(如
InfoNCE),适用于学习通用表示;( 3)排序损失(如
BPR),适用于学习相对排序。选择哪种损失函数取决于具体的任务目标。
负采样策略 :在训练时,需要为每个正样本采样负样本。负采样策略影响模型的学习效果:随机负采样简单但可能不够困难;困难负采样(
hard negative
mining)能够提升模型对困难样本的区分能力,但计算开销更大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 import torchimport torch.nn as nnimport torch.nn.functional as Fimport numpy as npfrom torch.utils.data import Dataset, DataLoaderclass UserTower (nn.Module): """用户塔网络""" def __init__ (self, user_feature_dim, embedding_dim, hidden_dims=[256 , 128 ] ): super (UserTower, self).__init__() layers = [] input_dim = user_feature_dim for hidden_dim in hidden_dims: layers.append(nn.Linear(input_dim, hidden_dim)) layers.append(nn.BatchNorm1d(hidden_dim)) layers.append(nn.ReLU()) layers.append(nn.Dropout(0.2 )) input_dim = hidden_dim layers.append(nn.Linear(input_dim, embedding_dim)) self.network = nn.Sequential(*layers) def forward (self, user_features ): """ user_features: [batch_size, user_feature_dim] 返回: [batch_size, embedding_dim] """ return self.network(user_features) class ItemTower (nn.Module): """物品塔网络""" def __init__ (self, item_feature_dim, embedding_dim, hidden_dims=[256 , 128 ] ): super (ItemTower, self).__init__() layers = [] input_dim = item_feature_dim for hidden_dim in hidden_dims: layers.append(nn.Linear(input_dim, hidden_dim)) layers.append(nn.BatchNorm1d(hidden_dim)) layers.append(nn.ReLU()) layers.append(nn.Dropout(0.2 )) input_dim = hidden_dim layers.append(nn.Linear(input_dim, embedding_dim)) self.network = nn.Sequential(*layers) def forward (self, item_features ): """ item_features: [batch_size, item_feature_dim] 返回: [batch_size, embedding_dim] """ return self.network(item_features) class TwoTowerModel (nn.Module): """Two-Tower 模型""" def __init__ (self, user_feature_dim, item_feature_dim, embedding_dim=128 , user_hidden_dims=[256 , 128 ], item_hidden_dims=[256 , 128 ] ): super (TwoTowerModel, self).__init__() self.user_tower = UserTower(user_feature_dim, embedding_dim, user_hidden_dims) self.item_tower = ItemTower(item_feature_dim, embedding_dim, item_hidden_dims) self.normalize = True def forward (self, user_features, item_features ): """ user_features: [batch_size, user_feature_dim] item_features: [batch_size, item_feature_dim] 返回: [batch_size] 相似度分数 """ user_emb = self.user_tower(user_features) item_emb = self.item_tower(item_features) if self.normalize: user_emb = F.normalize(user_emb, p=2 , dim=1 ) item_emb = F.normalize(item_emb, p=2 , dim=1 ) score = torch.sum (user_emb * item_emb, dim=1 ) return score def get_user_embedding (self, user_features ): """获取用户 Embedding""" user_emb = self.user_tower(user_features) if self.normalize: user_emb = F.normalize(user_emb, p=2 , dim=1 ) return user_emb def get_item_embedding (self, item_features ): """获取物品 Embedding""" item_emb = self.item_tower(item_features) if self.normalize: item_emb = F.normalize(item_emb, p=2 , dim=1 ) return item_emb class RecommendationDataset (Dataset ): """推荐系统数据集""" def __init__ (self, user_features, item_features, labels ): """ user_features: [num_samples, user_feature_dim] item_features: [num_samples, item_feature_dim] labels: [num_samples] 0 或 1 """ self.user_features = torch.FloatTensor(user_features) self.item_features = torch.FloatTensor(item_features) self.labels = torch.FloatTensor(labels) def __len__ (self ): return len (self.labels) def __getitem__ (self, idx ): return { 'user_features' : self.user_features[idx], 'item_features' : self.item_features[idx], 'label' : self.labels[idx] } def train_two_tower (model, train_loader, num_epochs=10 , learning_rate=0.001 , device='cpu' ): """训练 Two-Tower 模型""" model = model.to(device) criterion = nn.BCEWithLogitsLoss() optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) model.train() for epoch in range (num_epochs): total_loss = 0 num_batches = 0 for batch in train_loader: user_features = batch['user_features' ].to(device) item_features = batch['item_features' ].to(device) labels = batch['label' ].to(device) optimizer.zero_grad() scores = model(user_features, item_features) loss = criterion(scores, labels) loss.backward() optimizer.step() total_loss += loss.item() num_batches += 1 avg_loss = total_loss / num_batches print (f"Epoch {epoch+1 } /{num_epochs} , Average Loss: {avg_loss:.4 f} " ) def example_two_tower (): """Two-Tower 模型使用示例""" num_samples = 10000 user_feature_dim = 50 item_feature_dim = 40 embedding_dim = 128 user_features = np.random.randn(num_samples, user_feature_dim) item_features = np.random.randn(num_samples, item_feature_dim) labels = (np.random.rand(num_samples) > 0.5 ).astype(float ) dataset = RecommendationDataset(user_features, item_features, labels) train_loader = DataLoader(dataset, batch_size=256 , shuffle=True ) model = TwoTowerModel( user_feature_dim=user_feature_dim, item_feature_dim=item_feature_dim, embedding_dim=embedding_dim, user_hidden_dims=[256 , 128 ], item_hidden_dims=[256 , 128 ] ) train_two_tower(model, train_loader, num_epochs=10 , learning_rate=0.001 ) sample_user_features = torch.FloatTensor(user_features[:10 ]) sample_item_features = torch.FloatTensor(item_features[:10 ]) user_embeddings = model.get_user_embedding(sample_user_features) item_embeddings = model.get_item_embedding(sample_item_features) print (f"User embeddings shape: {user_embeddings.shape} " ) print (f"Item embeddings shape: {item_embeddings.shape} " )
关键点解读
Two-Tower 模型的实现包含几个关键组件,每个组件都有其特定的作用:
双塔架构设计 :用户塔和物品塔是两个独立的神经网络,分别处理用户特征和物品特征。这种设计使得两个塔可以独立设计,分别处理不同类型的特征。例如,用户塔可以处理用户画像、历史行为等特征,物品塔可以处理物品属性、内容特征等。两个塔的输出维度必须相同,这样才能计算相似度。
Embedding 归一化 :代码中对用户和物品的 Embedding
进行 L2 归一化,这使得内积等价于余弦相似度,取值范围为 [-1,
1]。归一化还可以稳定训练过程,避免 Embedding
的模长过大导致梯度爆炸。在实际应用中,归一化是 Two-Tower
模型的标准做法。
网络结构设计 :每个塔采用多层全连接网络,每层包含线性变换、批归一化、
ReLU 激活和 Dropout 。批归一化可以加速训练并稳定梯度, Dropout
可以防止过拟合。网络深度和宽度需要根据特征维度和数据规模调整:特征维度高、数据量大时可以使用更深的网络;特征维度低、数据量小时可以使用较浅的网络。
损失函数选择 :代码中使用二元交叉熵损失(
BCEWithLogitsLoss),适用于点击率预估任务。对于其他任务,可以使用不同的损失函数:对比学习任务可以使用
InfoNCE 损失,排序任务可以使用 BPR 损失。
设计权衡
在 Two-Tower 模型的实现中,存在多个设计权衡:
网络深度 vs
计算效率 :网络越深,模型的表达能力越强,但计算开销也越大。在实际应用中,通常使用
2-4
层全连接网络,在效果和效率之间取得平衡。对于大规模推荐系统,计算效率至关重要,因为需要在毫秒级时间内完成推理。
Embedding 维度 vs 表达能力 : Embedding
维度越高,模型的表达能力越强,但参数量也越大,且检索效率会降低(向量检索的时间复杂度与维度相关)。通常
Embedding 维度设置为 64-256 维,根据物品数量和计算资源选择。
归一化 vs 灵活性 : L2
归一化使得内积等价于余弦相似度,简化了相似度计算,但限制了模型的灵活性。如果不需要归一化,可以移除归一化操作,使用原始的内积或点积。
特征工程 vs 端到端学习 : Two-Tower
模型需要手工设计特征,特征质量直接影响模型效果。相比之下,端到端的深度学习方法(如深度协同过滤)可以自动学习特征表示,但需要更多的数据和计算资源。
常见问题
如何处理稀疏特征? 对于稀疏特征(如用户 ID 、物品
ID),可以使用 Embedding 层将其映射到稠密向量。 Embedding
层的维度通常设置为 16-64 维,根据特征数量和数据规模调整。
如何处理多模态特征?
对于多模态特征(如文本、图像),可以使用不同的编码器:文本特征可以使用
CNN 或 Transformer,图像特征可以使用 CNN
。然后将不同模态的特征拼接或融合,输入到塔网络中。
如何提升检索效率? Two-Tower
模型的核心优势在于支持高效的向量检索。训练完成后,可以预先计算所有物品的
Embedding 并建立索引(如 FAISS),在推理时只需要计算用户
Embedding,然后通过向量检索找到最相似的物品。这种方法可以将检索时间从秒级降低到毫秒级。
如何处理冷启动问题?
对于新用户或新物品,可以使用内容特征(如用户画像、物品属性)通过塔网络生成初始
Embedding 。如果内容特征不足,可以使用相似用户或相似物品的 Embedding
作为初始化。
如何选择负采样策略?
负采样策略影响模型的学习效果。随机负采样简单但可能不够困难;困难负采样(
hard negative
mining)能够提升模型对困难样本的区分能力,但计算开销更大。通常采用混合策略:大部分使用随机负采样,小部分使用困难负采样。
使用示例
下面的示例展示了如何使用 Two-Tower 模型进行训练和推理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 user_features = ... item_features = ... labels = ... dataset = RecommendationDataset(user_features, item_features, labels) train_loader = DataLoader(dataset, batch_size=256 , shuffle=True ) model = TwoTowerModel( user_feature_dim=50 , item_feature_dim=40 , embedding_dim=128 , user_hidden_dims=[256 , 128 ], item_hidden_dims=[256 , 128 ] ) train_two_tower(model, train_loader, num_epochs=10 ) user_emb = model.get_user_embedding(user_features) item_emb = model.get_item_embedding(item_features) scores = torch.sum (user_emb * item_emb, dim=1 ) import faissitem_emb_np = item_emb.cpu().numpy() index = faiss.IndexFlatIP(128 ) index.add(item_emb_np) user_emb_np = user_emb[0 :1 ].cpu().numpy() distances, indices = index.search(user_emb_np, k=10 )
在实际应用中, Two-Tower 模型通常用于:(
1)大规模推荐系统的召回阶段,通过向量检索快速找到候选物品;(
2)广告推荐,通过用户和广告的 Embedding 计算匹配分数;(
3)搜索推荐,通过查询和物品的 Embedding 进行语义匹配。
DSSM 深度语义匹配
DSSM 算法原理
DSSM( Deep Structured Semantic Models)是微软在 2013
年提出的深度语义匹配模型,最初用于信息检索中的查询-文档匹配。在推荐系统中,
DSSM 可以用于用户查询与物品的语义匹配。
DSSM
的基本思路:将查询和文档分别通过深度神经网络映射到同一个语义空间,然后计算它们的相似度 。
DSSM 网络结构
DSSM 的网络结构包括:
Term Hashing :将文本转换为 n-gram 特征
Word Hashing :将 n-gram 特征映射到低维向量
多层全连接网络 :学习语义表示
相似度计算 :使用余弦相似度
DSSM 完整实现
问题背景
在信息检索和推荐系统中,如何计算查询( query)和文档(
document)之间的语义相似度是一个核心问题。传统的基于关键词匹配的方法(如
TF-IDF 、
BM25)只能捕获字面匹配,无法理解语义相似性。例如,查询"如何学习机器学习"和文档"机器学习入门教程"在语义上高度相关,但关键词重叠度很低。此外,在实际应用中,查询和文档往往以文本形式存在,如何将文本转换为数值特征,并通过深度神经网络学习语义表示,是一个挑战。
解决思路
DSSM( Deep Structured Semantic
Models)通过将查询和文档分别通过深度神经网络映射到同一个低维语义空间,然后计算它们的余弦相似度。核心创新在于:(
1)使用 Word
Hashing(词哈希)技术将文本转换为固定维度的特征向量,避免了传统词袋模型的高维稀疏问题;(
2)使用多层全连接网络学习语义表示,能够捕获文本的深层语义信息;(
3)通过最大化相关查询-文档对的相似度、最小化不相关对的相似度来训练模型,使得语义相似的查询和文档在向量空间中距离更近。
设计考虑
在实现 DSSM 时,需要考虑以下几个关键设计:
Word Hashing 技术 : DSSM 使用 n-gram(通常
n=3)将文本转换为特征向量。对于每个
n-gram,使用哈希函数将其映射到固定大小的特征空间(通常 50000
维)。这种方法避免了传统词袋模型的高维稀疏问题,同时能够处理未登录词(
OOV)问题。
网络结构设计 : DSSM 使用多层全连接网络,每层使用
Tanh 激活函数。网络深度通常为 3-4 层,每层维度逐渐减小(如 300 → 300 →
128)。最后一层输出固定维度的 Embedding(通常 128 维),并进行 L2
归一化。
双塔架构 :查询塔和文档塔是两个独立的网络,但结构相同。这种设计使得查询和文档可以有不同的输入特征维度,但输出维度必须相同才能计算相似度。
损失函数选择 : DSSM
通常使用对比学习损失或排序损失。对于每个查询,选择相关的文档作为正样本,不相关的文档作为负样本,最大化正样本的相似度、最小化负样本的相似度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 import torchimport torch.nn as nnimport torch.nn.functional as Fimport numpy as npfrom collections import defaultdictclass WordHashing : """Word Hashing:将文本转换为 n-gram 特征""" def __init__ (self, n=3 ): self.n = n def ngram_hash (self, text ): """将文本转换为 n-gram 哈希特征""" text = text.lower() ngrams = [] for i in range (len (text) - self.n + 1 ): ngram = text[i:i+self.n] ngrams.append(ngram) return ngrams def text_to_features (self, text, vocab_size=50000 ): """将文本转换为特征向量(使用简单的哈希)""" ngrams = self.ngram_hash(text) features = np.zeros(vocab_size) for ngram in ngrams: idx = hash (ngram) % vocab_size features[idx] += 1 return features class DSSM (nn.Module): """DSSM 深度语义匹配模型""" def __init__ (self, input_dim, hidden_dims=[300 , 300 , 128 ], embedding_dim=128 ): super (DSSM, self).__init__() layers = [] current_dim = input_dim for hidden_dim in hidden_dims: layers.append(nn.Linear(current_dim, hidden_dim)) layers.append(nn.Tanh()) current_dim = hidden_dim self.network = nn.Sequential(*layers) self.output_layer = nn.Linear(current_dim, embedding_dim) def forward (self, x ): """ x: [batch_size, input_dim] 返回: [batch_size, embedding_dim] """ hidden = self.network(x) output = self.output_layer(hidden) output = F.normalize(output, p=2 , dim=1 ) return output class DSSMModel (nn.Module): """DSSM 双塔模型(查询塔和文档塔)""" def __init__ (self, query_input_dim, doc_input_dim, query_hidden_dims=[300 , 300 , 128 ], doc_hidden_dims=[300 , 300 , 128 ], embedding_dim=128 ): super (DSSMModel, self).__init__() self.query_tower = DSSM(query_input_dim, query_hidden_dims, embedding_dim) self.doc_tower = DSSM(doc_input_dim, doc_hidden_dims, embedding_dim) def forward (self, query_features, doc_features ): """ query_features: [batch_size, query_input_dim] doc_features: [batch_size, doc_input_dim] 返回: [batch_size] 相似度分数(余弦相似度) """ query_emb = self.query_tower(query_features) doc_emb = self.doc_tower(doc_features) similarity = torch.sum (query_emb * doc_emb, dim=1 ) return similarity def get_query_embedding (self, query_features ): """获取查询 Embedding""" return self.query_tower(query_features) def get_doc_embedding (self, doc_features ): """获取文档 Embedding""" return self.doc_tower(doc_features) def train_dssm (model, train_loader, num_epochs=10 , learning_rate=0.001 , device='cpu' ): """训练 DSSM 模型""" model = model.to(device) criterion = nn.BCEWithLogitsLoss() optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) model.train() for epoch in range (num_epochs): total_loss = 0 num_batches = 0 for batch in train_loader: query_features = batch['query_features' ].to(device) doc_features = batch['doc_features' ].to(device) labels = batch['label' ].to(device) optimizer.zero_grad() scores = model(query_features, doc_features) loss = criterion(scores, labels) loss.backward() optimizer.step() total_loss += loss.item() num_batches += 1 avg_loss = total_loss / num_batches print (f"Epoch {epoch+1 } /{num_epochs} , Average Loss: {avg_loss:.4 f} " )
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def example_dssm (): """DSSM 使用示例""" num_samples = 10000 query_input_dim = 50000 doc_input_dim = 50000 query_features = np.random.rand(num_samples, query_input_dim) doc_features = np.random.rand(num_samples, doc_input_dim) query_features = query_features / (query_features.sum (axis=1 , keepdims=True ) + 1e-8 ) doc_features = doc_features / (doc_features.sum (axis=1 , keepdims=True ) + 1e-8 ) labels = (np.random.rand(num_samples) > 0.5 ).astype(float ) class DSSMDataset : def__init__(self, query_features, doc_features, labels): self.query_features = torch.FloatTensor(query_features) self.doc_features = torch.FloatTensor(doc_features) self.labels = torch.FloatTensor(labels) def__len__(self): return len (self.labels) def__getitem__(self, idx): return { 'query_features' : self.query_features[idx], 'doc_features' : self.doc_features[idx], 'label' : self.labels[idx] } dataset = DSSMDataset(query_features, doc_features, labels) train_loader = DataLoader(dataset, batch_size=256 , shuffle=True ) model = DSSMModel( query_input_dim=query_input_dim, doc_input_dim=doc_input_dim, query_hidden_dims=[300 , 300 , 128 ], doc_hidden_dims=[300 , 300 , 128 ], embedding_dim=128 ) train_dssm(model, train_loader, num_epochs=10 , learning_rate=0.001 )
关键点解读
DSSM 的实现包含几个关键组件,每个组件都有其特定的作用:
Word Hashing 技术 :这是 DSSM
的核心创新之一。传统的词袋模型需要维护一个巨大的词汇表,对于新词或未登录词(
OOV)无法处理。 Word Hashing 通过 n-gram(通常
n=3)将文本转换为固定维度的特征向量,使用哈希函数将每个 n-gram
映射到固定大小的特征空间(通常 50000
维)。这种方法避免了高维稀疏问题,同时能够处理任意文本,包括未登录词。
多层全连接网络 : DSSM
使用多层全连接网络学习语义表示,每层使用 Tanh 激活函数。网络深度通常为
3-4 层,每层维度逐渐减小(如 300 → 300 →
128)。这种设计使得模型能够学习文本的深层语义信息,将高维稀疏的 n-gram
特征映射到低维稠密的语义空间。
L2 归一化 :代码中对输出的 Embedding 进行 L2
归一化,这使得内积等价于余弦相似度,取值范围为 [-1,
1]。归一化还可以稳定训练过程,避免 Embedding 的模长过大。
双塔架构 :查询塔和文档塔是两个独立的网络,但结构相同。这种设计使得查询和文档可以有不同的输入特征维度,但输出维度必须相同才能计算相似度。训练时,通过最大化相关查询-文档对的相似度、最小化不相关对的相似度来学习参数。
设计权衡
在 DSSM 的实现中,存在多个设计权衡:
Word Hashing vs 词嵌入 : Word Hashing
避免了词汇表问题,但可能引入哈希冲突。现代方法通常使用预训练的词嵌入(如
Word2Vec 、 BERT),能够更好地捕获语义信息,但需要维护词汇表。
全连接网络 vs CNN/RNN : DSSM
使用全连接网络,简单但可能无法很好地捕获序列信息。 CDSSM 使用 CNN
能够捕获局部特征和序列信息,但计算开销更大。对于长文本,可以使用 RNN 或
Transformer 更好地捕获序列依赖。
特征维度 vs 计算效率 : Word Hashing
的特征维度(通常 50000
维)很大,但可以通过稀疏表示和哈希技巧加速计算。现代方法通常使用预训练的词嵌入(通常
300-768 维),维度更小但需要维护词汇表。
损失函数选择 : DSSM
可以使用多种损失函数:二元交叉熵损失适用于点击率预估,对比学习损失适用于学习通用表示,排序损失适用于学习相对排序。选择哪种损失函数取决于具体的任务目标。
常见问题
如何处理哈希冲突? Word Hashing
可能引入哈希冲突,即不同的 n-gram
映射到同一个特征位置。虽然冲突概率较低(对于 50000
维特征空间),但可能影响模型效果。可以使用更大的特征空间或使用多个哈希函数来减少冲突。
如何处理长文本? DSSM 的 Word Hashing 对所有 n-gram
一视同仁,可能无法很好地处理长文本。对于长文本,可以:(
1)截断或采样部分 n-gram;( 2)使用 CNN 或 RNN 捕获序列信息;(
3)使用注意力机制关注重要部分。
如何提升检索效率? 与 Two-Tower 模型类似, DSSM
也支持高效的向量检索。训练完成后,可以预先计算所有文档的 Embedding
并建立索引(如 FAISS),在推理时只需要计算查询
Embedding,然后通过向量检索找到最相似的文档。
如何处理多语言? Word Hashing
对于不同语言可能需要不同的 n-gram 大小。对于中文,通常使用字符级别的
n-gram;对于英文,通常使用词级别的 n-gram
。现代方法通常使用多语言预训练模型(如
mBERT),能够更好地处理多语言场景。
使用示例
下面的示例展示了如何使用 DSSM 进行训练和推理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 word_hashing = WordHashing(n=3 ) query_text = "how to learn machine learning" query_features = word_hashing.text_to_features(query_text, vocab_size=50000 ) model = DSSMModel( query_input_dim=50000 , doc_input_dim=50000 , query_hidden_dims=[300 , 300 , 128 ], doc_hidden_dims=[300 , 300 , 128 ], embedding_dim=128 ) query_emb = model.get_query_embedding(query_features) doc_emb = model.get_doc_embedding(doc_features) similarity = torch.sum (query_emb * doc_emb, dim=1 )
在实际应用中, DSSM 通常用于:( 1)搜索引擎中的查询-文档匹配;(
2)推荐系统中的用户查询-物品匹配;( 3)问答系统中的问题-答案匹配。
DSSM 的改进: CDSSM
CDSSM( Convolutional
DSSM)使用卷积神经网络替代全连接网络,能够更好地捕获局部特征和序列信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class CDSSM (nn.Module): """CDSSM:使用 CNN 的 DSSM""" def __init__ (self, input_dim, embedding_dim=128 , num_filters=300 , filter_sizes=[1 , 2 , 3 ] ): super (CDSSM, self).__init__() self.convs = nn.ModuleList([ nn.Conv1d(input_dim, num_filters, kernel_size=fs) for fs in filter_sizes ]) self.fc = nn.Linear(len (filter_sizes) * num_filters, embedding_dim) def forward (self, x ): """ x: [batch_size, seq_len, input_dim] 返回: [batch_size, embedding_dim] """ x = x.transpose(1 , 2 ) conv_outputs = [] for conv in self.convs: conv_out = F.relu(conv(x)) pooled = F.max_pool1d(conv_out, kernel_size=conv_out.size(2 )) conv_outputs.append(pooled.squeeze(2 )) concatenated = torch.cat(conv_outputs, dim=1 ) output = self.fc(concatenated) output = F.normalize(output, p=2 , dim=1 ) return output
YouTube 双塔召回
YouTube 推荐系统架构
YouTube 的推荐系统采用多阶段架构:
候选生成( Candidate
Generation) :从百万级物品中快速召回数百个候选
排序( Ranking) :对候选物品进行精细排序
重排( Re-ranking) :考虑多样性、新鲜度等因素
双塔模型主要用于候选生成 阶段,需要快速从海量物品中召回相关物品。
YouTube 双塔模型设计
用户塔输入特征 :
用户观看历史(最近 N 个视频的 Embedding 平均)
用户搜索历史
用户画像特征(地理位置、设备等)
用户行为统计(观看时长、点击率等)
物品塔输入特征 :
视频 ID Embedding
视频类别 Embedding
视频统计特征(观看次数、点赞数等)
视频内容特征(标题、描述等)
YouTube 双塔完整实现
问题背景
YouTube
作为全球最大的视频平台,每天需要为数亿用户推荐数十亿个视频。传统的协同过滤方法无法满足如此大规模的需求,因为计算用户-物品相似度矩阵需要巨大的计算和存储开销。此外,
YouTube
需要处理多种类型的特征:用户观看历史、搜索历史、用户画像、视频属性、视频统计等,如何有效地融合这些特征并学习用户和视频的表示,是一个挑战。更重要的是,
YouTube
需要在毫秒级时间内从百万级视频库中召回数百个候选视频,这就要求模型必须支持高效的向量检索。
解决思路
YouTube
双塔模型通过将用户和视频分别通过两个独立的神经网络("塔")映射到同一个低维向量空间,然后通过向量相似度计算匹配分数。核心设计包括:(
1)用户塔融合多种用户特征(观看历史、搜索历史、用户画像、行为统计),通过平均池化处理变长序列,然后通过全连接网络学习用户
Embedding;( 2)视频塔融合多种视频特征(视频 ID
、类别、统计特征、内容特征),通过 Embedding 层和全连接网络学习视频
Embedding;( 3)训练完成后,预先计算所有视频的 Embedding
并建立索引,在推理时只需要计算用户 Embedding,然后通过高效的向量检索(如
FAISS)找到最相似的视频,大幅提升检索效率。
设计考虑
在实现 YouTube 双塔模型时,需要考虑以下几个关键设计:
历史序列处理 :用户观看历史是变长序列,需要将其转换为固定维度的特征。
YouTube 使用平均池化对历史视频的 Embedding
进行聚合,简单有效。也可以使用注意力机制或 RNN/Transformer
更好地捕获序列信息,但计算开销更大。
特征融合策略 :用户塔和视频塔都需要融合多种类型的特征(稀疏特征、稠密特征、序列特征)。通常使用
Embedding 层处理稀疏特征(如视频 ID
、类别),使用全连接网络处理稠密特征(如统计特征),然后拼接或加权融合。
训练样本构造 : YouTube 使用曝光样本(
impression)作为训练数据,即用户看到但未点击的视频作为负样本,点击的视频作为正样本。这种构造方式更符合实际推荐场景,但可能导致样本分布不平衡问题。
负采样策略 :由于正样本(点击)远少于负样本(未点击),需要采用负采样策略。
YouTube 使用随机负采样,简单但可能不够困难。也可以使用困难负采样( hard
negative mining)提升模型效果,但计算开销更大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 import torchimport torch.nn as nnimport torch.nn.functional as Fimport numpy as npclass YouTubeUserTower (nn.Module): """YouTube 用户塔""" def __init__ (self, video_embedding_dim=64 , category_embedding_dim=32 , user_feature_dim=100 , embedding_dim=128 ): super (YouTubeUserTower, self).__init__() self.max_history_length = 50 self.video_embedding_dim = video_embedding_dim self.user_fc = nn.Sequential( nn.Linear(user_feature_dim, 128 ), nn.ReLU(), nn.Linear(128 , 64 ) ) self.fusion = nn.Sequential( nn.Linear(video_embedding_dim + 64 , 256 ), nn.ReLU(), nn.Dropout(0.2 ), nn.Linear(256 , embedding_dim) ) def forward (self, user_features, watch_history ): """ user_features: [batch_size, user_feature_dim] watch_history: [batch_size, max_history_length, video_embedding_dim] 返回: [batch_size, embedding_dim] """ batch_size = user_features.size(0 ) user_emb = self.user_fc(user_features) history_mask = (watch_history.sum (dim=2 ) != 0 ).float () history_sum = watch_history.sum (dim=1 ) history_count = history_mask.sum (dim=1 , keepdim=True ) + 1e-8 history_avg = history_sum / history_count combined = torch.cat([history_avg, user_emb], dim=1 ) output = self.fusion(combined) output = F.normalize(output, p=2 , dim=1 ) return output class YouTubeItemTower (nn.Module): """YouTube 物品塔""" def __init__ (self, num_videos=1000000 , num_categories=1000 , video_embedding_dim=64 , category_embedding_dim=32 , item_feature_dim=50 , embedding_dim=128 ): super (YouTubeItemTower, self).__init__() self.video_embedding = nn.Embedding(num_videos, video_embedding_dim) self.category_embedding = nn.Embedding(num_categories, category_embedding_dim) self.item_fc = nn.Sequential( nn.Linear(item_feature_dim, 128 ), nn.ReLU(), nn.Linear(128 , 64 ) ) self.fusion = nn.Sequential( nn.Linear(video_embedding_dim + category_embedding_dim + 64 , 256 ), nn.ReLU(), nn.Dropout(0.2 ), nn.Linear(256 , embedding_dim) ) def forward (self, video_ids, category_ids, item_features ): """ video_ids: [batch_size] category_ids: [batch_size] item_features: [batch_size, item_feature_dim] 返回: [batch_size, embedding_dim] """ video_emb = self.video_embedding(video_ids) category_emb = self.category_embedding(category_ids) item_emb = self.item_fc(item_features) combined = torch.cat([video_emb, category_emb, item_emb], dim=1 ) output = self.fusion(combined) output = F.normalize(output, p=2 , dim=1 ) return output class YouTubeTwoTower (nn.Module): """YouTube 双塔模型""" def __init__ (self, num_videos=1000000 , num_categories=1000 , video_embedding_dim=64 , category_embedding_dim=32 , user_feature_dim=100 , item_feature_dim=50 , embedding_dim=128 ): super (YouTubeTwoTower, self).__init__() self.user_tower = YouTubeUserTower( video_embedding_dim=video_embedding_dim, category_embedding_dim=category_embedding_dim, user_feature_dim=user_feature_dim, embedding_dim=embedding_dim ) self.item_tower = YouTubeItemTower( num_videos=num_videos, num_categories=num_categories, video_embedding_dim=video_embedding_dim, category_embedding_dim=category_embedding_dim, item_feature_dim=item_feature_dim, embedding_dim=embedding_dim ) def forward (self, user_features, watch_history, video_ids, category_ids, item_features ): """ 返回: [batch_size] 相似度分数 """ user_emb = self.user_tower(user_features, watch_history) item_emb = self.item_tower(video_ids, category_ids, item_features) score = torch.sum (user_emb * item_emb, dim=1 ) return score def get_user_embedding (self, user_features, watch_history ): """获取用户 Embedding(用于召回)""" return self.user_tower(user_features, watch_history) def get_item_embedding (self, video_ids, category_ids, item_features ): """获取物品 Embedding(用于索引)""" return self.item_tower(video_ids, category_ids, item_features)
关键点解读
YouTube
双塔模型的实现包含几个关键组件,每个组件都有其特定的作用:
用户塔设计 :用户塔融合多种用户特征,包括观看历史、搜索历史、用户画像和行为统计。观看历史是变长序列,代码使用平均池化对历史视频的
Embedding
进行聚合,简单有效。用户画像和行为统计通过全连接网络处理,然后与历史
Embedding 拼接,通过融合层学习用户 Embedding
。这种设计能够同时捕获用户的长期兴趣(历史)和短期偏好(画像、统计)。
视频塔设计 :视频塔融合多种视频特征,包括视频 ID
、类别、统计特征和内容特征。视频 ID 和类别使用 Embedding
层处理,统计特征通过全连接网络处理,然后拼接并通过融合层学习视频
Embedding 。这种设计能够同时捕获视频的语义信息( ID
、类别)和统计信息(观看次数、点赞数等)。
特征融合策略 :代码使用拼接(
concatenation)方式融合不同特征,然后通过全连接网络学习融合表示。也可以使用加权融合或注意力机制,但拼接方式简单有效,是常用的做法。
L2 归一化 :代码中对用户和视频的 Embedding 进行
L2 归一化,这使得内积等价于余弦相似度,取值范围为 [-1,
1]。归一化还可以稳定训练过程,避免 Embedding 的模长过大。
设计权衡
在 YouTube 双塔模型的实现中,存在多个设计权衡:
历史序列处理 vs
计算效率 :平均池化简单高效,但可能丢失序列信息。使用注意力机制或
RNN/Transformer
能够更好地捕获序列信息,但计算开销更大。对于大规模推荐系统,平均池化是常用的折中方案。
特征融合 vs
模型复杂度 :拼接方式简单但可能引入冗余信息。使用注意力机制或门控机制能够更好地融合特征,但模型复杂度更高。对于大规模推荐系统,拼接方式是常用的折中方案。
负采样策略 vs
训练效果 :随机负采样简单但可能不够困难;批量内负采样高效但负样本质量可能不高;困难负采样能够提升模型效果,但计算开销更大。通常采用混合策略:大部分使用批量内负采样,小部分使用困难负采样。
Embedding 维度 vs 检索效率 : Embedding
维度越高,模型的表达能力越强,但检索效率会降低(向量检索的时间复杂度与维度相关)。通常
Embedding 维度设置为 64-256 维,根据视频数量和计算资源选择。
常见问题
如何处理新用户或新视频?
对于新用户,可以使用用户画像和行为统计特征通过用户塔生成初始 Embedding
。对于新视频,可以使用视频类别和内容特征通过视频塔生成初始 Embedding
。如果特征不足,可以使用相似用户或相似视频的 Embedding
作为初始化。
如何提升召回效果? YouTube
双塔模型的核心优势在于支持高效的向量检索。训练完成后,可以预先计算所有视频的
Embedding 并建立索引(如 FAISS),在推理时只需要计算用户
Embedding,然后通过向量检索找到最相似的视频。为了提升召回效果,可以:(
1)使用更大的 Embedding 维度;( 2)使用困难负采样训练;(
3)使用多任务学习同时优化多个目标。
如何处理长尾视频? 长尾视频(观看次数少)的
Embedding
可能不够准确。可以使用内容特征(如标题、描述)通过额外的网络生成初始
Embedding,或者使用相似视频的 Embedding 作为初始化。
如何平衡准确率和多样性? YouTube
双塔模型主要优化准确率,可能推荐相似度高的视频导致多样性不足。可以在召回后使用重排(
re-ranking)模块考虑多样性、新鲜度等因素,或者使用多目标优化同时优化准确率和多样性。
使用示例
下面的示例展示了如何使用 YouTube 双塔模型进行训练和推理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 model = YouTubeTwoTower( num_videos=1000000 , num_categories=1000 , video_embedding_dim=64 , category_embedding_dim=32 , user_feature_dim=100 , item_feature_dim=50 , embedding_dim=128 ) user_emb = model.get_user_embedding(user_features, watch_history) video_emb = model.get_item_embedding(video_ids, category_ids, item_features) scores = torch.sum (user_emb * video_emb, dim=1 ) import faissvideo_emb_np = video_emb.cpu().numpy() index = faiss.IndexFlatIP(128 ) index.add(video_emb_np) user_emb_np = user_emb[0 :1 ].cpu().numpy() distances, indices = index.search(user_emb_np, k=100 )
在实际应用中, YouTube 双塔模型通常用于:(
1)大规模推荐系统的召回阶段,通过向量检索快速找到候选视频;(
2)视频搜索,通过查询和视频的 Embedding 进行语义匹配;(
3)相似视频推荐,通过视频 Embedding 找到相似视频。
YouTube 双塔的训练策略
负采样策略 :
随机负采样 :从所有视频中随机采样
批量内负采样 :在同一个 batch
中,其他样本的视频作为负样本(更高效)
困难负采样 :选择模型预测分数较高的负样本(需要额外训练)
训练目标 : 使用 softmax 交叉熵损失:
$$
L = -} $$
其中 是正样本, 是负样本。
负采样策略详解
负采样的必要性
在推荐系统中,正样本(用户-物品交互)通常只占所有可能的用户-物品对的很小一部分。如果使用所有负样本训练,会导致:
计算成本过高 :需要计算所有负样本的损失
类别不平衡 :负样本远多于正样本
训练效率低 :大部分负样本对模型学习没有帮助
负采样通过只选择一部分负样本进行训练,解决了这些问题。
随机负采样
最简单的负采样策略是随机采样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def random_negative_sampling (positive_items, all_items, num_negatives ): """ 随机负采样 positive_items: 正样本物品列表 all_items: 所有物品集合 num_negatives: 每个正样本对应的负样本数量 """ negative_items = [] for pos_item in positive_items: candidates = list (set (all_items) - {pos_item}) negs = np.random.choice(candidates, size=num_negatives, replace=False ) negative_items.append(negs) return negative_items
基于频率的负采样
根据物品的流行度(频率)进行采样,流行物品更容易被采样为负样本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def frequency_based_negative_sampling (positive_items, item_freq, num_negatives, alpha=0.75 ): """ 基于频率的负采样 item_freq: 物品频率字典 alpha: 频率的幂次(通常为 0.75) """ items = list (item_freq.keys()) frequencies = np.array([item_freq[item] for item in items]) probs = np.power(frequencies, alpha) probs = probs / probs.sum () negative_items = [] for pos_item in positive_items: mask = np.array([item != pos_item for item in items]) filtered_items = np.array(items)[mask] filtered_probs = probs[mask] filtered_probs = filtered_probs / filtered_probs.sum () negs = np.random.choice( filtered_items, size=num_negatives, p=filtered_probs, replace=False ) negative_items.append(negs) return negative_items
批量内负采样
在同一个 batch 内,其他样本的物品可以作为负样本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def in_batch_negative_sampling (batch_items ): """ 批量内负采样 batch_items: [batch_size] 当前 batch 的物品 ID 返回: [batch_size, batch_size-1] 每个样本的负样本 """ batch_size = len (batch_items) negative_items = [] for i in range (batch_size): pos_item = batch_items[i] negs = [batch_items[j] for j in range (batch_size) if j != i] negative_items.append(negs) return np.array(negative_items)
困难负采样( Hard Negative
Sampling)
选择模型预测分数较高的负样本,这些样本更难区分,有助于提升模型性能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def hard_negative_sampling (model, user_features, positive_items, candidate_items, num_negatives, k=10 ): """ 困难负采样 k: 先采样 k 倍候选,然后选择分数最高的 """ extended_candidates = np.random.choice( candidate_items, size=min (len (candidate_items), num_negatives * k), replace=False ) with torch.no_grad(): user_emb = model.get_user_embedding(user_features) item_embeddings = model.get_item_embedding(extended_candidates) scores = torch.matmul(user_emb, item_embeddings.T) scores = scores.squeeze(0 ).cpu().numpy() top_indices = np.argsort(scores)[-num_negatives:] hard_negatives = extended_candidates[top_indices] return hard_negatives
混合负采样策略
结合多种采样策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class MixedNegativeSampling : """混合负采样策略""" def __init__ (self, item_freq, alpha=0.75 ): self.item_freq = item_freq self.alpha = alpha self._build_distribution() def _build_distribution (self ): """构建采样分布""" items = list (self.item_freq.keys()) frequencies = np.array([self.item_freq[item] for item in items]) probs = np.power(frequencies, self.alpha) self.distribution = probs / probs.sum () self.items = np.array(items) def sample (self, positive_items, num_negatives, strategy='mixed' ): """ strategy: 'random', 'frequency', 'mixed' """ if strategy == 'random' : return self._random_sample(positive_items, num_negatives) elif strategy == 'frequency' : return self._frequency_sample(positive_items, num_negatives) else : return self._mixed_sample(positive_items, num_negatives) def _random_sample (self, positive_items, num_negatives ): """随机采样""" negative_items = [] for pos_item in positive_items: mask = self.items != pos_item candidates = self.items[mask] negs = np.random.choice(candidates, size=num_negatives, replace=False ) negative_items.append(negs) return np.array(negative_items) def _frequency_sample (self, positive_items, num_negatives ): """频率采样""" negative_items = [] for pos_item in positive_items: mask = self.items != pos_item candidates = self.items[mask] probs = self.distribution[mask] probs = probs / probs.sum () negs = np.random.choice(candidates, size=num_negatives, p=probs, replace=False ) negative_items.append(negs) return np.array(negative_items) def _mixed_sample (self, positive_items, num_negatives ): """混合采样:一半随机,一半频率""" num_random = num_negatives // 2 num_freq = num_negatives - num_random random_negs = self._random_sample(positive_items, num_random) freq_negs = self._frequency_sample(positive_items, num_freq) return np.concatenate([random_negs, freq_negs], axis=1 )
ANN 近邻检索
ANN 检索的必要性
在推荐系统的召回阶段,需要从百万甚至千万级物品中快速找到与用户
Embedding 最相似的物品。如果使用暴力搜索( Brute Force),计算复杂度为
,其中 是物品数量, 是 Embedding
维度,这在大规模场景下是不可接受的。
近似最近邻( Approximate Nearest Neighbor,
ANN)检索通过牺牲一定的精度来换取速度,能够在毫秒级时间内完成检索。
FAISS 库使用
问题背景
在推荐系统的召回阶段,需要从百万甚至千万级物品中快速找到与用户
Embedding 最相似的物品。传统的暴力搜索( Brute Force)需要计算用户
Embedding 与所有物品 Embedding 的相似度,计算复杂度为 ,其中 是物品数量, 是 Embedding
维度。对于百万级物品库,暴力搜索需要数秒甚至数十秒,无法满足实时推荐的需求。此外,随着物品数量的增长,计算时间线性增长,这在大规模场景下是不可接受的。
解决思路
近似最近邻( Approximate Nearest Neighbor,
ANN)检索通过牺牲一定的精度来换取速度,能够在毫秒级时间内完成检索。
FAISS( Facebook AI Similarity Search)是 Facebook
开源的向量相似度检索库,提供了多种索引类型,包括:( 1) Flat
Index:暴力搜索,精度 100%但速度慢;( 2) IVF( Inverted File
Index):通过聚类将向量分组,只搜索相关聚类,速度更快;( 3) HNSW(
Hierarchical Navigable Small World):基于图的索引,速度快且精度高;(
4) Product
Quantization:通过向量压缩减少内存占用。通过选择合适的索引类型和参数,可以在精度和速度之间取得平衡。
设计考虑
在实现 FAISS 索引时,需要考虑以下几个关键设计:
索引类型选择 : Flat Index
精度最高但速度最慢,适用于小规模场景(<100 万); IVF
索引通过聚类加速检索,适用于中等规模场景( 100 万-1000 万); HNSW
索引速度快且精度高,适用于大规模场景(>1000
万)。选择哪种索引类型取决于物品数量、精度要求和计算资源。
距离度量选择 : FAISS 支持多种距离度量,包括 L2
距离和内积( IP)。对于归一化的
Embedding,内积等价于余弦相似度,是推荐系统中常用的度量。选择哪种度量取决于
Embedding 是否归一化。
索引参数调优 : IVF 索引需要设置聚类数量(
nlist)和搜索时检查的聚类数量( nprobe)。 nlist
越大,检索精度越高但构建时间越长; nprobe
越大,检索精度越高但检索时间越长。需要在精度和速度之间权衡。
内存和持久化 :大规模索引可能占用大量内存,需要考虑内存限制。
FAISS
支持索引的保存和加载,可以将索引持久化到磁盘,避免每次重新构建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 import faissimport numpy as npclass FAISSIndex : """基于 FAISS 的向量检索""" def __init__ (self, dimension, index_type='L2' ): """ dimension: Embedding 维度 index_type: 'L2' 或 'IP'(内积) """ self.dimension = dimension if index_type == 'L2' : self.index = faiss.IndexFlatL2(dimension) elif index_type == 'IP' : self.index = faiss.IndexFlatIP(dimension) else : raise ValueError(f"Unknown index type: {index_type} " ) self.index_type = index_type def add (self, vectors ): """ 添加向量到索引 vectors: [N, dimension] numpy array """ if self.index_type == 'IP' : faiss.normalize_L2(vectors) self.index.add(vectors.astype('float32' )) def search (self, query_vectors, k=10 ): """ 搜索最相似的 k 个向量 query_vectors: [M, dimension] numpy array 返回: (distances, indices) """ if self.index_type == 'IP' : query_vectors = query_vectors.copy().astype('float32' ) faiss.normalize_L2(query_vectors) distances, indices = self.index.search(query_vectors.astype('float32' ), k) return distances, indices def example_faiss (): """FAISS 使用示例""" dimension = 128 num_items = 1000000 num_queries = 100 item_embeddings = np.random.randn(num_items, dimension).astype('float32' ) index = FAISSIndex(dimension, index_type='IP' ) index.add(item_embeddings) query_embeddings = np.random.randn(num_queries, dimension).astype('float32' ) distances, indices = index.search(query_embeddings, k=10 ) print (f"Query 0 top-10 items: {indices[0 ]} " ) print (f"Distances: {distances[0 ]} " ) class IVFIndex : """IVF( Inverted File)索引:更快的检索速度""" def __init__ (self, dimension, nlist=100 , nprobe=10 , index_type='L2' ): """ nlist: 聚类中心数量 nprobe: 搜索时检查的聚类数量 """ self.dimension = dimension quantizer = faiss.IndexFlatL2(dimension) self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist) self.index.nprobe = nprobe self.index_type = index_type self.is_trained = False def train (self, vectors ): """训练索引(需要先训练才能添加向量)""" if self.index_type == 'IP' : vectors = vectors.copy().astype('float32' ) faiss.normalize_L2(vectors) self.index.train(vectors.astype('float32' )) self.is_trained = True def add (self, vectors ): """添加向量""" if not self.is_trained: raise ValueError("Index must be trained before adding vectors" ) if self.index_type == 'IP' : vectors = vectors.copy().astype('float32' ) faiss.normalize_L2(vectors) self.index.add(vectors.astype('float32' )) def search (self, query_vectors, k=10 ): """搜索""" if self.index_type == 'IP' : query_vectors = query_vectors.copy().astype('float32' ) faiss.normalize_L2(query_vectors) distances, indices = self.index.search(query_vectors.astype('float32' ), k) return distances, indices
关键点解读
FAISS 索引的实现包含几个关键组件,每个组件都有其特定的作用:
索引类型选择 : Flat Index 使用暴力搜索,精度
100%但速度最慢,适用于小规模场景(<100 万物品)。 IVF Index
通过聚类将向量分组,只搜索相关聚类,速度更快但需要先训练索引。 HNSW
Index 基于图的索引,速度快且精度高,适用于大规模场景(>1000
万物品)。
距离度量选择 : FAISS 支持 L2 距离和内积(
IP)两种度量。对于归一化的
Embedding,内积等价于余弦相似度,是推荐系统中常用的度量。代码中根据索引类型自动处理归一化,确保内积计算的正确性。
索引训练 : IVF
索引需要先训练才能添加向量。训练过程使用 K-means
聚类算法将向量分组,训练时间与物品数量和聚类数量相关。通常训练一次后可以多次使用,训练好的索引可以保存到磁盘。
检索参数调优 : IVF 索引的 nprobe
参数控制搜索时检查的聚类数量。 nprobe
越大,检索精度越高但检索时间越长。通常设置
nprobe=10-100,在精度和速度之间权衡。
设计权衡
在 FAISS 索引的实现中,存在多个设计权衡:
精度 vs 速度 : Flat Index 精度最高但速度最慢; IVF
Index 通过聚类加速检索,精度略降但速度大幅提升; HNSW Index
速度快且精度高,但构建时间较长。选择哪种索引类型取决于物品数量、精度要求和计算资源。
内存 vs 速度 : Product Quantization(
PQ)通过向量压缩减少内存占用,但检索精度会降低。对于内存受限的场景,可以使用
IVFPQ 索引,在内存和精度之间权衡。
构建时间 vs 检索时间 : HNSW
索引构建时间较长,但检索速度最快; IVF 索引构建时间中等,检索速度较快;
Flat Index
无需构建,但检索时间最长。如果索引需要频繁更新,应该选择构建时间短的索引类型。
常见问题
如何选择索引类型? 对于小规模场景(<100
万),使用 Flat Index;对于中等规模场景( 100 万-1000 万),使用 IVF
Index;对于大规模场景(>1000 万),使用 HNSW Index
。如果内存受限,可以使用 IVFPQ 索引。
如何调优索引参数? IVF 索引的 nlist
参数控制聚类数量,通常设置为 sqrt(N),其中 N 是物品数量。 nprobe
参数控制搜索时检查的聚类数量,通常设置为 nlist 的 1/10 到 1/100
。可以通过实验找到精度和速度的平衡点。
如何处理索引更新? FAISS
支持增量添加向量,但删除向量需要重建索引。对于频繁更新的场景,可以使用支持删除的索引类型(如
HNSW),或者定期重建索引。
如何持久化索引? FAISS
支持索引的保存和加载,可以将索引持久化到磁盘。保存时使用
faiss.write_index(),加载时使用
faiss.read_index()。持久化可以避免每次重新构建索引,大幅提升启动速度。
使用示例
下面的示例展示了如何使用 FAISS 索引进行向量检索:
1 2 3 4 5 6 7 8 9 10 11 12 index = FAISSIndex(dimension=128 , index_type='IP' ) index.add(item_embeddings) distances, indices = index.search(query_embeddings, k=10 ) ivf_index = IVFIndex(dimension=128 , nlist=1000 , nprobe=50 ) ivf_index.train(item_embeddings) ivf_index.add(item_embeddings) distances, indices = ivf_index.search(query_embeddings, k=10 )
在实际应用中, FAISS 索引通常用于:(
1)大规模推荐系统的召回阶段,通过向量检索快速找到候选物品;(
2)相似物品推荐,通过物品 Embedding 找到相似物品;(
3)用户画像匹配,通过用户 Embedding 找到相似用户。
Annoy 库使用
Annoy( Approximate Nearest Neighbors Oh Yeah)是 Spotify 开源的 ANN
库,基于随机投影树。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 from annoy import AnnoyIndexclass AnnoyIndexWrapper : """Annoy 索引封装""" def __init__ (self, dimension, metric='angular' , n_trees=10 ): """ dimension: Embedding 维度 metric: 'angular'(余弦相似度)或 'euclidean'(欧氏距离) n_trees: 树的数量(越多越准确,但构建时间越长) """ self.dimension = dimension self.metric = metric self.index = AnnoyIndex(dimension, metric) self.n_trees = n_trees self.item_count = 0 def add_item (self, vector ): """添加一个向量""" self.index.add_item(self.item_count, vector) self.item_count += 1 def build (self ): """构建索引""" self.index.build(self.n_trees) def search (self, query_vector, k=10 ): """搜索最相似的 k 个向量""" indices = self.index.get_nns_by_vector(query_vector, k) return indices def save (self, filepath ): """保存索引""" self.index.save(filepath) def load (self, filepath ): """加载索引""" self.index.load(filepath) def example_annoy (): """Annoy 使用示例""" dimension = 128 num_items = 1000000 index = AnnoyIndexWrapper(dimension, metric='angular' , n_trees=50 ) for i in range (num_items): vector = np.random.randn(dimension).astype('float32' ) vector = vector / (np.linalg.norm(vector) + 1e-8 ) index.add_item(vector) index.build() query_vector = np.random.randn(dimension).astype('float32' ) query_vector = query_vector / (np.linalg.norm(query_vector) + 1e-8 ) top_k = index.search(query_vector, k=10 ) print (f"Top-10 items: {top_k} " )
HNSW 算法
HNSW( Hierarchical Navigable Small World)是一种基于图的 ANN
算法,在精度和速度之间取得了很好的平衡。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import hnswlibclass HNSWIndex : """HNSW 索引""" def __init__ (self, dimension, metric='cosine' , max_elements=1000000 , ef_construction=200 , M=16 ): """ dimension: Embedding 维度 metric: 'cosine', 'l2', 'ip' max_elements: 最大元素数量 ef_construction: 构建时的候选数量 M: 每个节点的最大连接数 """ self.dimension = dimension self.index = hnswlib.Index(space=metric, dim=dimension) self.index.init_index(max_elements=max_elements, ef_construction=ef_construction, M=M) self.item_count = 0 def add (self, vectors ): """批量添加向量""" num_vectors = len (vectors) self.index.add_items(vectors.astype('float32' ), np.arange(self.item_count, self.item_count + num_vectors)) self.item_count += num_vectors def search (self, query_vectors, k=10 , ef_search=None ): """ 搜索 ef_search: 搜索时的候选数量(越大越准确,但越慢) """ if ef_search is None : ef_search = max (k * 2 , 50 ) self.index.set_ef(ef_search) labels, distances = self.index.knn_query(query_vectors.astype('float32' ), k=k) return distances, labels def save (self, filepath ): """保存索引""" self.index.save_index(filepath) def load (self, filepath ): """加载索引""" self.index.load_index(filepath) def example_hnsw (): """HNSW 使用示例""" dimension = 128 num_items = 1000000 num_queries = 100 item_embeddings = np.random.randn(num_items, dimension).astype('float32' ) norms = np.linalg.norm(item_embeddings, axis=1 , keepdims=True ) item_embeddings = item_embeddings / (norms + 1e-8 ) index = HNSWIndex(dimension, metric='cosine' , max_elements=num_items) index.add(item_embeddings) query_embeddings = np.random.randn(num_queries, dimension).astype('float32' ) query_norms = np.linalg.norm(query_embeddings, axis=1 , keepdims=True ) query_embeddings = query_embeddings / (query_norms + 1e-8 ) distances, indices = index.search(query_embeddings, k=10 , ef_search=100 ) print (f"Query 0 top-10 items: {indices[0 ]} " ) print (f"Distances: {distances[0 ]} " )
ANN 索引选择指南
FAISS :
优点:功能丰富,支持多种索引类型,速度快
缺点:需要编译,依赖较多
适用场景:大规模生产环境
Annoy :
优点:简单易用,支持持久化,内存占用小
缺点:构建索引较慢,精度略低于 HNSW
适用场景:中小规模场景,需要持久化的场景
HNSW :
优点:精度高,速度快,内存占用适中
缺点:构建时间较长
适用场景:对精度要求较高的场景
Embedding 质量评估
评估指标
1. 相似度分布
检查 Embedding 的相似度分布是否合理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def evaluate_similarity_distribution (embeddings, sample_size=10000 ): """评估相似度分布""" num_items = len (embeddings) indices = np.random.choice(num_items, size=sample_size, replace=False ) similarities = [] for i in range (len (indices)): for j in range (i+1 , len (indices)): sim = np.dot(embeddings[indices[i]], embeddings[indices[j]]) similarities.append(sim) similarities = np.array(similarities) print (f"Mean similarity: {similarities.mean():.4 f} " ) print (f"Std similarity: {similarities.std():.4 f} " ) print (f"Min similarity: {similarities.min ():.4 f} " ) print (f"Max similarity: {similarities.max ():.4 f} " ) return similarities
2. 召回率( Recall@K)
在测试集上评估召回率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def evaluate_recall_at_k (user_embeddings, item_embeddings, test_pairs, k=10 ): """ 评估 Recall@K test_pairs: [(user_id, item_id), ...] 测试集正样本对 """ recalls = [] for user_id, true_item_id in test_pairs: user_emb = user_embeddings[user_id] similarities = np.dot(item_embeddings, user_emb) top_k_indices = np.argsort(similarities)[::-1 ][:k] recall = 1.0 if true_item_id in top_k_indices else 0.0 recalls.append(recall) return np.mean(recalls)
3. 覆盖率( Coverage)
评估推荐结果的多样性:
1 2 3 4 5 6 7 8 9 10 11 12 def evaluate_coverage (recommendations, num_items ): """ 评估覆盖率 recommendations: [[item_id, ...], ...] 每个用户的推荐列表 num_items: 物品总数 """ recommended_items = set () for rec_list in recommendations: recommended_items.update(rec_list) coverage = len (recommended_items) / num_items return coverage
4. 新颖性( Novelty)
评估推荐物品的新颖性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def evaluate_novelty (recommendations, item_popularity ): """ 评估新颖性 item_popularity: {item_id: popularity_score} """ novelty_scores = [] for rec_list in recommendations: scores = [item_popularity.get(item_id, 0 ) for item_id in rec_list] novelty = 1.0 - np.mean(scores) novelty_scores.append(novelty) return np.mean(novelty_scores)
Embedding 可视化
使用 t-SNE 或 UMAP 将高维 Embedding 降维到 2D 进行可视化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from sklearn.manifold import TSNEimport matplotlib.pyplot as pltdef visualize_embeddings (embeddings, labels=None , title="Embedding Visualization" ): """可视化 Embedding""" tsne = TSNE(n_components=2 , random_state=42 , perplexity=30 ) embeddings_2d = tsne.fit_transform(embeddings) plt.figure(figsize=(10 , 8 )) if labels is not None : scatter = plt.scatter(embeddings_2d[:, 0 ], embeddings_2d[:, 1 ], c=labels, cmap='viridis' , alpha=0.6 ) plt.colorbar(scatter) else : plt.scatter(embeddings_2d[:, 0 ], embeddings_2d[:, 1 ], alpha=0.6 ) plt.title(title) plt.xlabel("t-SNE 1" ) plt.ylabel("t-SNE 2" ) plt.tight_layout() plt.show()
在线评估
A/B
测试 :将用户分为两组,一组使用旧模型,一组使用新模型,比较关键指标(
CTR 、停留时间等)。
实时监控 :监控 Embedding
的统计特性(均值、方差、相似度分布等),发现异常及时告警。
完整代码实现示例
端到端的推荐系统实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import torchimport torch.nn as nnimport numpy as npfrom sklearn.model_selection import train_test_splitimport faissclass CompleteRecommendationSystem : """完整的推荐系统实现""" def __init__ (self, num_users, num_items, embedding_dim=128 ): self.num_users = num_users self.num_items = num_items self.embedding_dim = embedding_dim self.model = TwoTowerModel( user_feature_dim=50 , item_feature_dim=40 , embedding_dim=embedding_dim ) self.item_index = None self.item_embeddings = None def train (self, train_data, num_epochs=10 , batch_size=256 , learning_rate=0.001 ): """训练模型""" dataset = RecommendationDataset( user_features=np.array([d[0 ] for d in train_data]), item_features=np.array([d[1 ] for d in train_data]), labels=np.array([d[2 ] for d in train_data]) ) train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True ) train_two_tower(self.model, train_loader, num_epochs, learning_rate) def build_index (self, all_item_features ): """构建物品索引""" self.model.eval () with torch.no_grad(): item_features_tensor = torch.FloatTensor(all_item_features) self.item_embeddings = self.model.get_item_embedding(item_features_tensor) self.item_embeddings = self.item_embeddings.numpy() self.item_index = FAISSIndex(self.embedding_dim, index_type='IP' ) self.item_index.add(self.item_embeddings) def recommend (self, user_features, k=10 ): """为用户推荐物品""" self.model.eval () with torch.no_grad(): user_features_tensor = torch.FloatTensor(user_features).unsqueeze(0 ) user_emb = self.model.get_user_embedding(user_features_tensor) user_emb = user_emb.numpy() distances, indices = self.item_index.search(user_emb, k=k) return indices[0 ], distances[0 ] def evaluate (self, test_data, k=10 ): """评估模型""" recalls = [] for user_features, item_features, label in test_data: if label == 1 : self.model.eval () with torch.no_grad(): user_emb = self.model.get_user_embedding( torch.FloatTensor(user_features).unsqueeze(0 ) ).numpy() _, top_k_indices = self.item_index.search(user_emb, k=k) top_k_indices = top_k_indices[0 ] item_emb = self.model.get_item_embedding( torch.FloatTensor(item_features).unsqueeze(0 ) ).numpy() similarity = np.dot(user_emb[0 ], item_emb[0 ]) recalls.append(1.0 ) return np.mean(recalls) if recalls else 0.0
常见问题与解答
Q1: Embedding 维度如何选择?
A : Embedding
维度的选择需要在表达能力和计算效率之间平衡:
小维度(
32-64) :计算快,内存占用小,但表达能力有限
中等维度( 128-256) :平衡点,适合大多数场景
大维度(
512+) :表达能力强,但计算和存储成本高
建议从 128
维开始,根据实际效果调整。可以通过实验不同维度下的召回率来选择。
Q2: 如何处理冷启动问题?
A : 冷启动问题可以通过以下方式缓解:
内容特征 :使用物品的内容特征(类别、描述等)初始化
Embedding
迁移学习 :从相似物品或类别中迁移 Embedding
多任务学习 :同时学习 Embedding 和内容理解任务
探索策略 :对新物品给予更高的曝光机会
Q3: 用户
Embedding 和物品 Embedding 是否需要在同一空间?
A : 是的,为了计算相似度,用户和物品的 Embedding
必须在同一向量空间中。 Two-Tower
模型通过共享相同的输出维度来保证这一点。
Q4: 如何更新 Embedding?
A : 有几种更新策略:
全量重训 :定期使用所有数据重新训练
增量更新 :使用新数据微调模型
在线学习 :实时更新
Embedding(需要谨慎,可能不稳定)
Q5: 负采样数量如何选择?
A : 负采样数量通常选择:
小规模 : 5-10 个负样本
中大规模 : 10-50 个负样本
超大规模 : 50-100+ 个负样本
可以通过实验选择最优值。注意负采样数量增加会提升训练时间。
Q6: Two-Tower
模型和深度协同过滤的区别?
A :
Two-Tower :用户和物品分别通过独立的网络,适合特征丰富的场景
深度协同过滤 :通常将用户 ID 和物品 ID 直接
Embedding,然后拼接输入网络
Two-Tower 更适合利用丰富的特征信息。
Q7: 如何加速训练?
A : 加速训练的方法:
批量内负采样 :减少负采样计算
混合精度训练 :使用 FP16 减少内存和加速
分布式训练 :多 GPU/多机训练
梯度累积 :模拟更大的 batch size
Q8: Embedding 是否需要归一化?
A : 取决于相似度计算方式:
内积 :如果使用内积计算相似度,建议归一化(等价于余弦相似度)
欧氏距离 :不需要归一化
余弦相似度 :必须归一化
归一化还能稳定训练过程。
Q9: 如何处理序列信息?
A : 处理序列信息的方法:
平均池化 :对历史序列的 Embedding 求平均
RNN/LSTM :使用循环神经网络处理序列
Transformer :使用注意力机制捕获序列依赖
Graph Neural Network :将序列建模为图
Q10: 如何评估 Embedding 质量?
A : 可以从多个维度评估:
离线指标 : Recall@K 、 NDCG@K 、覆盖率等
相似度分布 :检查相似度分布是否合理
可视化 :使用 t-SNE 可视化 Embedding 分布
在线指标 : CTR 、停留时间、转化率等
Q11: FAISS 、 Annoy 、 HNSW
如何选择?
A :
FAISS :功能最全,适合生产环境,需要编译
Annoy :最简单,适合中小规模,支持持久化
HNSW :精度和速度平衡最好,适合对精度要求高的场景
建议先尝试 HNSW,如果满足需求就使用;如果需要更多功能,考虑 FAISS
。
Q12: 如何处理多模态特征?
A : 多模态特征可以通过以下方式融合:
早期融合 :在输入层拼接所有特征
晚期融合 :每个模态单独处理,最后融合 Embedding
注意力机制 :学习不同模态的权重
Q13: Embedding
可以用于其他任务吗?
A : 可以! Embedding 可以用于:
相似物品推荐 :基于物品 Embedding 找相似物品
用户聚类 :基于用户 Embedding 进行用户分群
异常检测 :检测 Embedding 异常的物品或用户
可视化分析 :理解用户和物品的分布
Q14: 如何解决 Embedding
的稀疏性问题?
A :
正则化 : L2 正则化防止过拟合
Dropout :在训练时随机丢弃部分特征
特征工程 :提取更有意义的特征
预训练 :使用大规模数据预训练 Embedding
Q15: 训练时出现 NaN 怎么办?
A : NaN 通常由以下原因引起:
学习率过大 :降低学习率
梯度爆炸 :使用梯度裁剪
数值不稳定 :检查输入数据是否有异常值
损失函数 :在损失函数中添加小的 epsilon 防止
log(0)
总结
Embedding
表示学习是推荐系统的核心技术之一。本文从基础理论出发,详细介绍了
Item2Vec 、 Node2Vec 、 Two-Tower 、 DSSM
等经典算法,并提供了完整的代码实现。同时,我们还深入探讨了负采样策略、
ANN 检索技术以及 Embedding 质量评估方法。
在实际应用中,需要根据具体场景选择合适的算法和参数。建议:
从小规模开始 :先用小规模数据验证算法和流程
迭代优化 :根据评估结果不断调整模型和参数
关注工程细节 :负采样、索引构建、特征工程等细节对效果影响很大
持续监控 :建立完善的监控体系,及时发现和解决问题
希望本文能够帮助读者深入理解推荐系统中的 Embedding
技术,并在实际项目中应用这些方法。