推荐系统(十一)—— 对比学习与自监督学习
Chen Kai BOSS

推荐系统在数据稀疏性和冷启动问题上一直面临巨大挑战。传统的监督学习方法严重依赖大量标注数据,而在推荐场景中,用户-物品交互数据往往存在严重的类别不平衡和长尾分布问题。对比学习( Contrastive Learning)和自监督学习( Self-Supervised Learning)的兴起为这些问题提供了新的解决思路。

对比学习的核心思想是通过学习区分相似样本和不相似样本来学习有效的表示,而无需显式的标签。在推荐系统中,可以将同一用户的不同行为视图视为正样本对,将不同用户的行为视为负样本对,从而学习到更好的用户和物品表示。自监督学习则通过设计巧妙的预训练任务,从数据本身挖掘监督信号,显著提升了模型在数据稀疏场景下的表现。

本文将深入探讨对比学习和自监督学习在推荐系统中的应用,涵盖从理论基础到最新实践的完整路径。我们将详细解析 SimCLR 、 SGL 、 XSimGCL 等经典框架,探讨图数据增强策略、序列推荐中的对比学习应用,以及如何利用这些技术解决长尾物品推荐问题。文章将包含丰富的代码实现和实际案例,帮助读者深入理解这些前沿技术的设计思想和实现细节。

自监督学习基础

什么是自监督学习

自监督学习( Self-Supervised Learning)是一种无需人工标注数据就能学习数据表示的方法。它通过设计预测任务,从数据本身生成监督信号,从而学习到有用的特征表示。

在推荐系统中,自监督学习的基本思路:利用数据的内在结构来构造监督信号。例如,可以预测用户的下一个交互物品,或者预测图中被遮挡的边,这些任务都不需要额外的标注数据。

自监督学习与监督学习的区别

监督学习: - 需要大量标注数据(如用户对物品的显式评分) - 标注成本高,数据获取困难 - 容易过拟合到标注数据的分布

自监督学习: - 无需人工标注,利用数据本身的结构 - 可以充分利用大量无标注数据 - 学习到的表示更具泛化能力

自监督学习的核心组件

自监督学习通常包含三个核心组件:

  1. 数据增强( Data Augmentation):通过变换原始数据生成不同的视图
  2. 编码器( Encoder):将增强后的数据映射到表示空间
  3. 对比目标( Contrastive Objective):通过对比不同视图学习表示

推荐系统中的自监督信号

在推荐系统中,可以利用以下自监督信号:

1. 序列顺序信号 - 用户行为序列的时间顺序 - 相邻交互之间的关联性

2. 图结构信号 - 用户-物品二部图的结构 - 邻居节点的相似性

3. 多视图信号 - 同一用户的不同行为子序列 - 同一物品的不同用户交互

SimCLR 框架详解

SimCLR 的核心思想

SimCLR( Simple Framework for Contrastive Learning of Visual Representations)是 Google 在 2020 年提出的对比学习框架。虽然最初用于计算机视觉,但其核心思想可以很好地迁移到推荐系统。

SimCLR 的核心流程可以概括为:

  1. 数据增强:对每个样本应用随机增强,生成两个不同的视图
  2. 编码:使用编码器将增强后的样本映射到表示空间
  3. 投影:通过投影头将表示映射到对比学习空间
  4. 对比损失:最大化同一样本不同视图之间的相似度,最小化不同样本之间的相似度

SimCLR 的数学形式化

给定一个样本 ,我们通过数据增强生成两个视图

$$

x_i = t(x), x_j = t'(x)$$

其中 是随机采样的数据增强操作。

编码器 将增强后的样本映射到表示空间:

投影头 将表示映射到对比学习空间:

对比损失函数( InfoNCE):

𝟙

符号说明:

  • :余弦相似度
    • 取值范围 ,越大表示越相似
  • :温度参数( temperature),控制分布的锐度
    • 通常取值 0.05-0.2
    • 较小的 使分布更"尖锐",更关注困难负样本
  • :批次大小( batch size)
    • :增强后的样本数(每个样本有 2 个视图)
  • 𝟙:指示函数
    • 排除自己(不能和自己对比)

直觉理解: InfoNCE 损失在做什么?

核心思想: "正样本应该相似,负样本应该不同"

1. 分子( Numerator):正样本对的相似度

- 是同一样本的两个增强视图(正样本对) - 我们希望它们的相似度尽可能高 - 例如:同一张图片经过不同裁剪后,应该被识别为同一个对象

2. 分母( Denominator):正样本对 + 所有负样本对的相似度

𝟙 - 包括正样本对()和所有负样本(其他样本的视图) - 我们希望负样本的相似度尽可能低 - 例如:不同的图片应该被区分开

3. 整体损失: Softmax + 负对数

类比:多分类问题

可以把 InfoNCE 理解为一个"从 个样本中找到正样本"的分类问题:

  • 类别数(每个样本的视图都是一个"类别")
  • 正确类别:第 个样本( 的另一个视图)
  • 目标:最大化正确类别的概率

具体例子(推荐系统):

假设我们有 3 个用户的行为数据,批次大小

  • 用户 1 的两个视图:(删除了不同的交互历史)
  • 用户 2 的两个视图:
  • 用户 3 的两个视图: 对于,计算与所有视图的相似度:
对比对象 关系 相似度期望
正样本(同一用户) 高 (如 0.9)
负样本(用户 2) 低 (如 0.1)
负样本(用户 3) 低 (如 0.1)

损失计算(假设):

简化:

如果正样本相似度低(如 0.2):

损失很大,模型会更新参数使正样本更相似!

为什么需要负样本?

如果只有正样本约束: 问题:模型会坍缩( Collapse)

  • 所有样本的表示变成相同的向量(如全为 0 或全为 1)
  • 因为这样所有正样本对的相似度都是 1(最大)
  • 但模型完全没有区分能力!

负样本的作用:

  1. 防止坍缩:强制不同样本的表示要不同
  2. 提升区分能力:学会区分相似和不相似的样本
  3. 学习有意义的表示:正负样本的对比让模型学到数据的本质特征

负样本数量的影响:

  • 太少(如只有 2 个负样本):模型容易坍缩,学不到好的表示
  • 太多(如 10000 个负样本):计算开销大,但效果通常更好
  • SimCLR 的发现:批次大小从 256 增加到 4096,性能显著提升
  • 实际应用:可以使用 Memory Bank 或 Momentum Encoder 来增加负样本数量,不增加计算

SimCLR 在推荐系统中的实现

SimCLR 框架的核心是对比学习:通过最大化同一样本不同增强视图之间的相似度,最小化不同样本之间的相似度,学习有效的表示。在推荐系统中,可以将同一用户的不同行为子序列视为正样本对,将不同用户的行为视为负样本对。

SimCLR 的关键组件: 1. 编码器( Encoder):学习用户/物品的表示,这是最终要使用的特征 2. 投影头( Projection Head):将表示映射到对比学习空间,训练完成后可以丢弃 3. 对比损失( InfoNCE Loss):通过对比正负样本对学习表示

下面我们实现一个适用于推荐系统的 SimCLR 框架。这个实现展示了如何将对比学习应用到推荐场景,学习更好的用户和物品表示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class SimCLREncoder(nn.Module):
"""
SimCLR 编码器,用于学习用户/物品表示

编码器是 SimCLR 的核心组件,它学习到的表示可以直接用于推荐任务。
设计要点:
1. 多层全连接网络,学习非线性特征变换
2. L2 归一化,确保表示在单位球面上
3. Dropout 防止过拟合

在推荐系统中,编码器可以替换为:
- 图神经网络(如 GCN 、 GraphSAGE)用于图数据
- Transformer 用于序列数据
- 多层 MLP 用于特征数据
"""

def __init__(self, input_dim, hidden_dim, output_dim):
"""
初始化编码器

Args:
input_dim: 输入特征维度,如用户特征维度或物品特征维度
hidden_dim: 隐藏层维度,通常设置为 256-512
output_dim: 输出表示维度,通常设置为 128-256
- 维度越大,表达能力越强,但计算成本也越高
- 需要根据数据规模和计算资源平衡
"""
super(SimCLREncoder, self).__init__()
# 三层全连接网络
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(0.2) # Dropout 防止过拟合

def forward(self, x):
"""
编码器的前向传播

Args:
x: 输入特征, shape 为[batch_size, input_dim]

Returns:
encoded: 编码后的表示, shape 为[batch_size, output_dim]
- 经过 L2 归一化,表示在单位球面上
- 可以直接用于计算余弦相似度
"""
# 第一层: input_dim -> hidden_dim
x = F.relu(self.fc1(x))
x = self.dropout(x)

# 第二层: hidden_dim -> hidden_dim
x = F.relu(self.fc2(x))
x = self.dropout(x)

# 第三层: hidden_dim -> output_dim
x = self.fc3(x)

# L2 归一化:确保表示在单位球面上
# 这对于对比学习很重要,因为需要计算余弦相似度
return F.normalize(x, dim=1)

class ProjectionHead(nn.Module):
"""
SimCLR 投影头

投影头将编码器的表示映射到对比学习空间。
设计要点:
1. 两层 MLP,学习非线性映射
2. L2 归一化,确保投影后的表示在单位球面上
3. 训练完成后可以丢弃,只使用编码器的输出

为什么需要投影头?
- 编码器的表示空间可能不适合直接进行对比学习
- 投影头提供了一个额外的非线性变换,提升对比学习的效果
- 实验表明,使用投影头比直接使用编码器输出效果更好
"""

def __init__(self, input_dim, hidden_dim, output_dim):
"""
初始化投影头

Args:
input_dim: 输入维度(编码器的输出维度)
hidden_dim: 隐藏层维度,通常设置为 256-512
output_dim: 投影后的维度,通常设置为 64-128
- 维度小于编码器输出维度,起到降维作用
"""
super(ProjectionHead, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)

def forward(self, x):
"""
投影头的前向传播

Args:
x: 编码器的输出, shape 为[batch_size, input_dim]

Returns:
projected: 投影后的表示, shape 为[batch_size, output_dim]
- 经过 L2 归一化,用于对比学习
"""
# 第一层: input_dim -> hidden_dim
x = F.relu(self.fc1(x))
# 第二层: hidden_dim -> output_dim
x = self.fc2(x)
# L2 归一化
return F.normalize(x, dim=1)

class SimCLR(nn.Module):
"""
SimCLR 完整模型

SimCLR 通过对比学习学习有效的表示,核心流程:
1. 数据增强:对每个样本生成两个不同的增强视图
2. 编码:使用编码器将增强后的样本映射到表示空间
3. 投影:使用投影头将表示映射到对比学习空间
4. 对比损失:最大化同一样本不同视图的相似度,最小化不同样本的相似度
"""

def __init__(self, input_dim, encoder_dim=128, projection_dim=64,
hidden_dim=256, temperature=0.07):
"""
初始化 SimCLR 模型

Args:
input_dim: 输入特征维度
encoder_dim: 编码器输出维度,通常设置为 128-256
projection_dim: 投影头输出维度,通常设置为 64-128
hidden_dim: 隐藏层维度,通常设置为 256-512
temperature: 温度参数,控制对比损失的锐度
- 较小的温度(如 0.07)产生更尖锐的分布
- 有助于模型学习更细粒度的特征
- 通常设置为 0.05-0.2 之间
"""
super(SimCLR, self).__init__()
self.encoder = SimCLREncoder(input_dim, hidden_dim, encoder_dim)
self.projection = ProjectionHead(encoder_dim, hidden_dim, projection_dim)
self.temperature = temperature

def forward(self, x1, x2):
"""
SimCLR 的前向传播

Args:
x1: 第一个增强视图, shape 为[batch_size, input_dim]
- 例如:用户特征经过 dropout 增强
x2: 第二个增强视图, shape 为[batch_size, input_dim]
- 例如:用户特征经过噪声增强

Returns:
h1, h2: 编码器的输出(用于下游任务)
shape: [batch_size, encoder_dim]
z1, z2: 投影头的输出(用于对比学习)
shape: [batch_size, projection_dim]
"""
# 第一步:编码
# 使用共享的编码器对两个增强视图进行编码
h1 = self.encoder(x1) # [batch_size, encoder_dim]
h2 = self.encoder(x2) # [batch_size, encoder_dim]

# 第二步:投影
# 使用投影头将编码后的表示映射到对比学习空间
z1 = self.projection(h1) # [batch_size, projection_dim]
z2 = self.projection(h2) # [batch_size, projection_dim]

return h1, h2, z1, z2

def compute_loss(self, z1, z2):
"""
计算对比损失( InfoNCE Loss)

InfoNCE 损失的核心思想:
- 对于每个样本,最大化它与同一样本不同视图的相似度
- 最小化它与不同样本的相似度
- 通过 Softmax 和交叉熵实现

数学形式:
L = -log(exp(sim(z_i, z_j) / τ) / Σ_k exp(sim(z_i, z_k) / τ))
其中 z_i 和 z_j 是同一样本的不同视图, z_k 是其他样本

Args:
z1: 第一个增强视图的投影表示, shape 为[batch_size, projection_dim]
z2: 第二个增强视图的投影表示, shape 为[batch_size, projection_dim]

Returns:
loss: 对比损失(标量)
"""
batch_size = z1.size(0)

# 第一步:拼接所有表示
# z shape: [2*batch_size, projection_dim]
# 前 batch_size 个是 z1,后 batch_size 个是 z2
z = torch.cat([z1, z2], dim=0)

# 第二步:计算相似度矩阵
# sim_matrix shape: [2*batch_size, 2*batch_size]
# sim_matrix[i][j] = z[i] · z[j] / temperature
# 由于 z 已经 L2 归一化, z[i] · z[j]就是余弦相似度
sim_matrix = torch.matmul(z, z.T) / self.temperature

# 第三步:创建标签
# 对于每个样本 i,它的正样本是:
# - 如果 i < batch_size,正样本是 i + batch_size( z2 中对应的样本)
# - 如果 i >= batch_size,正样本是 i - batch_size( z1 中对应的样本)
# 其他样本都是负样本
labels = torch.arange(batch_size).to(z.device)
# 扩展标签:前 batch_size 个样本的正样本是后 batch_size 个,反之亦然
labels = torch.cat([labels + batch_size, labels], dim=0)

# 第四步:计算交叉熵损失
# 对于每个样本,将其与正样本的相似度最大化,与负样本的相似度最小化
# 这通过 Softmax 和交叉熵实现
loss = F.cross_entropy(sim_matrix, labels)

return loss

# 数据增强函数
def augment_user_features(user_features, method='dropout'):
"""用户特征增强

Args:
user_features: 用户特征 [batch_size, feature_dim]
method: 增强方法 ('dropout', 'noise', 'mask')
"""
if method == 'dropout':
# 随机丢弃部分特征
mask = torch.rand_like(user_features) > 0.1
return user_features * mask
elif method == 'noise':
# 添加高斯噪声
noise = torch.randn_like(user_features) * 0.1
return user_features + noise
elif method == 'mask':
# 随机掩码部分特征
mask = torch.rand_like(user_features) > 0.2
return user_features * mask
else:
return user_features

# 训练示例
def train_simclr(model, dataloader, optimizer, device):
"""训练 SimCLR 模型"""
model.train()
total_loss = 0

for batch_idx, (user_features, item_features) in enumerate(dataloader):
user_features = user_features.to(device)
item_features = item_features.to(device)

# 生成两个增强视图
user_features_1 = augment_user_features(user_features, 'dropout')
user_features_2 = augment_user_features(user_features, 'noise')

# 前向传播
h1, h2, z1, z2 = model(user_features_1, user_features_2)

# 计算损失
loss = model.compute_loss(z1, z2)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')

return total_loss / len(dataloader)

SimCLR 的关键设计要点

1. 数据增强的重要性 - SimCLR 发现数据增强是成功的关键 - 在推荐系统中,可以通过特征丢弃、噪声添加、子序列采样等方式进行增强

2. 投影头的作用 - 投影头将表示映射到对比学习空间 - 训练完成后可以丢弃投影头,只使用编码器的输出

3. 温度参数的影响 - 较小的温度参数(如 0.07)会产生更尖锐的分布 - 有助于模型学习更细粒度的特征

4. 大批次训练 - SimCLR 需要大批次(如 4096)才能获得足够的负样本 - 在资源受限时可以使用内存库( Memory Bank)或动量编码器

SGL:自监督图对比学习

SGL 框架概述

SGL( Self-supervised Graph Learning)是华为在 2021 年提出的用于推荐系统的自监督图对比学习框架。它将对比学习的思想应用到用户-物品二部图上,通过图数据增强和对比学习来提升推荐性能。

SGL 的核心创新在于: 1. 图数据增强策略:设计了节点丢弃、边丢弃、子图采样等增强方法 2. 多视图对比学习:从同一图的不同增强视图学习一致表示 3. 联合训练:将对比学习损失与推荐任务损失联合优化

SGL 的图数据增强策略

SGL 提出了三种图数据增强策略:

1. 节点丢弃( Node Dropout) 随机丢弃图中的部分节点及其连接的边:

$$

G_1 = (V_1, E_1), V_1 V, E_1 = {(u, i) E | u, i V_1} $$

2. 边丢弃( Edge Dropout) 随机丢弃图中的部分边:

$$

G_2 = (V, E_2), E_2 E$$

3. 随机游走( Random Walk) 通过随机游走采样子图:

$$

G_3 = (G, =L)$$

SGL 的模型架构

SGL 使用图神经网络(如 LightGCN)作为编码器:

其中 分别表示用户 和物品 的邻居集合。

SGL 的完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import MessagePassing
import numpy as np
from scipy.sparse import coo_matrix

class LightGCNLayer(MessagePassing):
"""LightGCN 图卷积层"""

def __init__(self):
super(LightGCNLayer, self).__init__(aggr='add')

def forward(self, x, edge_index):
"""前向传播

Args:
x: 节点特征 [num_nodes, embedding_dim]
edge_index: 边索引 [2, num_edges]
"""
return self.propagate(edge_index, x=x)

def message(self, x_j):
"""消息传递"""
return x_j

class LightGCN(nn.Module):
"""LightGCN 编码器"""

def __init__(self, num_users, num_items, embedding_dim=64, num_layers=3):
super(LightGCN, self).__init__()
self.num_users = num_users
self.num_items = num_items
self.num_layers = num_layers

# 用户和物品嵌入
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)

# 初始化嵌入
nn.init.normal_(self.user_embedding.weight, std=0.1)
nn.init.normal_(self.item_embedding.weight, std=0.1)

# 图卷积层
self.convs = nn.ModuleList([LightGCNLayer() for _ in range(num_layers)])

def forward(self, edge_index):
"""前向传播

Args:
edge_index: 边索引 [2, num_edges]
"""
# 获取所有节点嵌入
user_emb = self.user_embedding.weight
item_emb = self.item_embedding.weight
x = torch.cat([user_emb, item_emb], dim=0)

# 多层图卷积
all_embeddings = [x]
for conv in self.convs:
x = conv(x, edge_index)
all_embeddings.append(x)

# 平均池化
final_embeddings = torch.mean(torch.stack(all_embeddings), dim=0)

# 分离用户和物品嵌入
user_emb_final = final_embeddings[:self.num_users]
item_emb_final = final_embeddings[self.num_users:]

return user_emb_final, item_emb_final

class GraphAugmentation:
"""图数据增强"""

@staticmethod
def node_dropout(edge_index, num_users, num_items, drop_rate=0.1):
"""节点丢弃增强"""
num_nodes = num_users + num_items
num_drop = int(num_nodes * drop_rate)

# 随机选择要丢弃的节点
drop_nodes = torch.randperm(num_nodes)[:num_drop]
drop_nodes_set = set(drop_nodes.tolist())

# 过滤边:保留两端都不在丢弃集合中的边
mask = torch.ones(edge_index.size(1), dtype=torch.bool)
for i in range(edge_index.size(1)):
if edge_index[0, i].item() in drop_nodes_set or \
edge_index[1, i].item() in drop_nodes_set:
mask[i] = False

return edge_index[:, mask]

@staticmethod
def edge_dropout(edge_index, drop_rate=0.1):
"""边丢弃增强"""
num_edges = edge_index.size(1)
num_drop = int(num_edges * drop_rate)

# 随机选择要丢弃的边
keep_mask = torch.ones(num_edges, dtype=torch.bool)
drop_indices = torch.randperm(num_edges)[:num_drop]
keep_mask[drop_indices] = False

return edge_index[:, keep_mask]

@staticmethod
def random_walk_subgraph(edge_index, num_users, num_items,
walk_length=10, num_walks=5):
"""随机游走子图采样(简化实现)"""
# 这里使用边丢弃作为简化实现
# 实际应用中可以使用更复杂的随机游走算法
return GraphAugmentation.edge_dropout(edge_index, drop_rate=0.2)

class SGL(nn.Module):
"""SGL 模型"""

def __init__(self, num_users, num_items, embedding_dim=64,
num_layers=3, temperature=0.2):
super(SGL, self).__init__()
self.encoder = LightGCN(num_users, num_items, embedding_dim, num_layers)
self.temperature = temperature

def forward(self, edge_index):
"""前向传播"""
return self.encoder(edge_index)

def compute_contrastive_loss(self, z1, z2):
"""计算对比损失

Args:
z1: 第一个视图的表示 [num_nodes, embedding_dim]
z2: 第二个视图的表示 [num_nodes, embedding_dim]
"""
# 归一化
z1 = F.normalize(z1, dim=1)
z2 = F.normalize(z2, dim=1)

# 计算相似度矩阵
sim_matrix = torch.matmul(z1, z2.T) / self.temperature

# 对角线元素是正样本对
labels = torch.arange(z1.size(0)).to(z1.device)

# 计算交叉熵损失
loss = F.cross_entropy(sim_matrix, labels)

return loss

def compute_bpr_loss(self, user_emb, item_emb, users, pos_items, neg_items):
"""计算 BPR 损失"""
user_emb_selected = user_emb[users]
pos_emb = item_emb[pos_items]
neg_emb = item_emb[neg_items]

pos_scores = torch.sum(user_emb_selected * pos_emb, dim=1)
neg_scores = torch.sum(user_emb_selected * neg_emb, dim=1)

loss = -torch.mean(torch.log(torch.sigmoid(pos_scores - neg_scores) + 1e-10))

return loss

def train_sgl(model, train_loader, optimizer, device, lambda_cl=0.1):
"""训练 SGL 模型"""
model.train()
total_loss = 0

for batch_idx, batch in enumerate(train_loader):
edge_index = batch.edge_index.to(device)
users = batch.users.to(device)
pos_items = batch.pos_items.to(device)
neg_items = batch.neg_items.to(device)

# 图数据增强
edge_index_1 = GraphAugmentation.edge_dropout(edge_index, drop_rate=0.1)
edge_index_2 = GraphAugmentation.node_dropout(
edge_index, model.encoder.num_users,
model.encoder.num_items, drop_rate=0.1
)

# 获取两个视图的表示
user_emb_1, item_emb_1 = model(edge_index_1)
user_emb_2, item_emb_2 = model(edge_index_2)

# 拼接用户和物品嵌入
z1 = torch.cat([user_emb_1, item_emb_1], dim=0)
z2 = torch.cat([user_emb_2, item_emb_2], dim=0)

# 计算对比损失
contrastive_loss = model.compute_contrastive_loss(z1, z2)

# 计算 BPR 损失(使用第一个视图)
bpr_loss = model.compute_bpr_loss(
user_emb_1, item_emb_1, users, pos_items, neg_items
)

# 联合损失
loss = bpr_loss + lambda_cl * contrastive_loss

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, BPR Loss: {bpr_loss.item():.4f}, '
f'Contrastive Loss: {contrastive_loss.item():.4f}, '
f'Total Loss: {loss.item():.4f}')

return total_loss / len(train_loader)

SGL 的优势与局限

优势: 1. 有效利用图结构信息 2. 通过对比学习提升表示质量 3. 在稀疏数据上表现优异

局限: 1. 计算开销较大(需要多次前向传播) 2. 增强策略的选择需要调优 3. 对超参数(如温度参数、增强率)敏感

RecDCL:推荐系统中的数据增强对比学习

RecDCL 框架介绍

RecDCL( Recommendation with Data Augmentation Contrastive Learning)专注于推荐系统中的数据增强策略设计。与 SGL 不同, RecDCL 更关注如何设计有效的数据增强方法来提升对比学习效果。

RecDCL 的数据增强策略

RecDCL 提出了多种针对推荐系统的数据增强方法:

1. 特征掩码( Feature Masking) 随机掩码部分特征维度:

2. 特征噪声( Feature Noise) 添加高斯噪声:

3. 序列裁剪( Sequence Cropping) 对用户行为序列进行随机裁剪:

4. 序列重排( Sequence Reordering) 随机重排序列中的部分元素:

RecDCL 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class RecDCLAugmentation:
"""RecDCL 数据增强策略"""

@staticmethod
def feature_masking(features, mask_rate=0.2):
"""特征掩码"""
batch_size, feature_dim = features.shape
mask = torch.rand(batch_size, feature_dim).to(features.device) > mask_rate
return features * mask

@staticmethod
def feature_noise(features, noise_std=0.1):
"""特征噪声"""
noise = torch.randn_like(features) * noise_std
return features + noise

@staticmethod
def sequence_cropping(sequences, min_len=5, max_len=None):
"""序列裁剪"""
if max_len is None:
max_len = sequences.size(1)

batch_size = sequences.size(0)
cropped_seqs = []

for i in range(batch_size):
seq_len = (sequences[i] != 0).sum().item() # 假设 0 是 padding
if seq_len <= min_len:
cropped_seqs.append(sequences[i])
else:
crop_len = np.random.randint(min_len, min(seq_len, max_len) + 1)
start_idx = np.random.randint(0, seq_len - crop_len + 1)
cropped_seq = torch.zeros_like(sequences[i])
cropped_seq[:crop_len] = sequences[i, start_idx:start_idx+crop_len]
cropped_seqs.append(cropped_seq)

return torch.stack(cropped_seqs)

@staticmethod
def sequence_shuffle(sequences):
"""序列重排"""
batch_size = sequences.size(0)
shuffled_seqs = []

for i in range(batch_size):
seq = sequences[i]
# 找到非零元素
non_zero_mask = seq != 0
non_zero_indices = torch.nonzero(non_zero_mask).squeeze()

if len(non_zero_indices) > 1:
# 随机重排非零元素
shuffled_indices = non_zero_indices[torch.randperm(len(non_zero_indices))]
shuffled_seq = torch.zeros_like(seq)
shuffled_seq[:len(shuffled_indices)] = seq[shuffled_indices]
shuffled_seqs.append(shuffled_seq)
else:
shuffled_seqs.append(seq)

return torch.stack(shuffled_seqs)

class RecDCLEncoder(nn.Module):
"""RecDCL 编码器"""

def __init__(self, input_dim, hidden_dims, output_dim, dropout=0.2):
super(RecDCLEncoder, self).__init__()
layers = []
prev_dim = input_dim

for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim

layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)

def forward(self, x):
return F.normalize(self.network(x), dim=1)

class RecDCL(nn.Module):
"""RecDCL 模型"""

def __init__(self, input_dim, encoder_dim=128, hidden_dims=[256, 256],
projection_dim=64, temperature=0.07):
super(RecDCL, self).__init__()
self.encoder = RecDCLEncoder(input_dim, hidden_dims, encoder_dim)
self.projection = nn.Sequential(
nn.Linear(encoder_dim, hidden_dims[0]),
nn.ReLU(),
nn.Linear(hidden_dims[0], projection_dim)
)
self.temperature = temperature

def forward(self, x1, x2):
"""前向传播"""
h1 = self.encoder(x1)
h2 = self.encoder(x2)

z1 = F.normalize(self.projection(h1), dim=1)
z2 = F.normalize(self.projection(h2), dim=1)

return h1, h2, z1, z2

def compute_loss(self, z1, z2):
"""计算对比损失"""
batch_size = z1.size(0)

# 拼接
z = torch.cat([z1, z2], dim=0)

# 相似度矩阵
sim_matrix = torch.matmul(z, z.T) / self.temperature

# 掩码:排除自身
mask = torch.eye(2 * batch_size, dtype=torch.bool).to(z.device)
sim_matrix = sim_matrix.masked_fill(mask, float('-inf'))

# 标签:同一样本的不同视图
labels = torch.arange(batch_size).to(z.device)
labels = torch.cat([labels + batch_size, labels], dim=0)

# 损失
loss = F.cross_entropy(sim_matrix, labels)

return loss

# 训练函数
def train_recdcl(model, dataloader, optimizer, device, aug_method='masking'):
"""训练 RecDCL 模型"""
model.train()
total_loss = 0

for batch_idx, (features, labels) in enumerate(dataloader):
features = features.to(device)

# 数据增强
if aug_method == 'masking':
features_1 = RecDCLAugmentation.feature_masking(features, mask_rate=0.2)
features_2 = RecDCLAugmentation.feature_masking(features, mask_rate=0.2)
elif aug_method == 'noise':
features_1 = RecDCLAugmentation.feature_noise(features, noise_std=0.1)
features_2 = RecDCLAugmentation.feature_noise(features, noise_std=0.1)
else:
features_1 = features
features_2 = features

# 前向传播
h1, h2, z1, z2 = model(features_1, features_2)

# 计算损失
loss = model.compute_loss(z1, z2)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')

return total_loss / len(dataloader)

RCL:对比推荐学习

RCL 框架概述

RCL( Recommendation Contrastive Learning)是一个专门为推荐系统设计的对比学习框架。它通过设计合适的正负样本对构造策略,学习更好的用户和物品表示。

RCL 的正负样本构造

RCL 的核心在于如何构造正负样本对:

正样本对: 1. 同一用户的不同行为视图 2. 同一物品的不同用户交互 3. 用户-物品交互对

负样本对: 1. 不同用户的行为 2. 不同物品的特征 3. 未交互的用户-物品对

RCL 的损失函数设计

RCL 使用改进的对比损失函数:

其中: - 是用户表示 - 是正样本物品表示 - 是负样本物品表示 - 是负样本数量

RCL 的完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class RCLEncoder(nn.Module):
"""RCL 编码器"""

def __init__(self, num_users, num_items, embedding_dim=64,
hidden_dims=[128, 128], dropout=0.2):
super(RCLEncoder, self).__init__()
self.num_users = num_users
self.num_items = num_items

# 用户和物品嵌入
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)

# 特征提取网络
layers = []
prev_dim = embedding_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim

self.user_network = nn.Sequential(*layers)
self.item_network = nn.Sequential(*layers)

# 初始化
nn.init.normal_(self.user_embedding.weight, std=0.1)
nn.init.normal_(self.item_embedding.weight, std=0.1)

def forward(self, user_ids, item_ids):
"""前向传播"""
user_emb = self.user_embedding(user_ids)
item_emb = self.item_embedding(item_ids)

user_repr = F.normalize(self.user_network(user_emb), dim=1)
item_repr = F.normalize(self.item_network(item_emb), dim=1)

return user_repr, item_repr

class RCL(nn.Module):
"""RCL 模型"""

def __init__(self, num_users, num_items, embedding_dim=64,
hidden_dims=[128, 128], temperature=0.2):
super(RCL, self).__init__()
self.encoder = RCLEncoder(num_users, num_items, embedding_dim, hidden_dims)
self.temperature = temperature

def forward(self, user_ids, item_ids):
"""前向传播"""
return self.encoder(user_ids, item_ids)

def compute_contrastive_loss(self, user_repr, pos_item_repr, neg_item_repr):
"""计算对比损失

Args:
user_repr: 用户表示 [batch_size, dim]
pos_item_repr: 正样本物品表示 [batch_size, dim]
neg_item_repr: 负样本物品表示 [batch_size, num_neg, dim]
"""
batch_size = user_repr.size(0)
num_neg = neg_item_repr.size(1)

# 正样本相似度
pos_sim = torch.sum(user_repr * pos_item_repr, dim=1) / self.temperature
pos_sim = pos_sim.unsqueeze(1) # [batch_size, 1]

# 负样本相似度
user_repr_expanded = user_repr.unsqueeze(1) # [batch_size, 1, dim]
neg_sim = torch.sum(user_repr_expanded * neg_item_repr, dim=2) / self.temperature
# neg_sim: [batch_size, num_neg]

# 拼接
logits = torch.cat([pos_sim, neg_sim], dim=1) # [batch_size, 1+num_neg]

# 标签:第一个是正样本
labels = torch.zeros(batch_size, dtype=torch.long).to(user_repr.device)

# 交叉熵损失
loss = F.cross_entropy(logits, labels)

return loss

def predict(self, user_ids, item_ids):
"""预测用户对物品的评分"""
user_repr, item_repr = self.forward(user_ids, item_ids)
scores = torch.sum(user_repr * item_repr, dim=1)
return scores

def sample_negative_items(user_items, num_items, num_neg=1):
"""采样负样本物品"""
batch_size = user_items.size(0)
neg_items = []

for i in range(batch_size):
pos_items = set(user_items[i].cpu().numpy())
neg_candidates = []

while len(neg_candidates) < num_neg:
candidate = np.random.randint(0, num_items)
if candidate not in pos_items:
neg_candidates.append(candidate)

neg_items.append(neg_candidates)

return torch.tensor(neg_items, dtype=torch.long)

def train_rcl(model, train_loader, optimizer, device, num_neg=5):
"""训练 RCL 模型"""
model.train()
total_loss = 0

for batch_idx, batch in enumerate(train_loader):
user_ids = batch['user_ids'].to(device)
pos_item_ids = batch['item_ids'].to(device)
user_items = batch['user_items'].to(device) # 用户交互过的物品集合

# 采样负样本
neg_item_ids = sample_negative_items(
user_items, model.encoder.num_items, num_neg
).to(device)

# 获取表示
user_repr, pos_item_repr = model(user_ids, pos_item_ids)

# 获取负样本表示
neg_item_repr_list = []
for i in range(num_neg):
neg_repr = model.encoder.item_embedding(neg_item_ids[:, i])
neg_repr = F.normalize(model.encoder.item_network(neg_repr), dim=1)
neg_item_repr_list.append(neg_repr)
neg_item_repr = torch.stack(neg_item_repr_list, dim=1)

# 计算损失
loss = model.compute_contrastive_loss(user_repr, pos_item_repr, neg_item_repr)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')

return total_loss / len(train_loader)

图数据增强策略详解

图数据增强的重要性

在推荐系统中,用户-物品交互可以表示为二部图。图数据增强是图对比学习的关键,它通过生成图的不同视图来构造正样本对。

常见的图数据增强方法

1. 节点级增强

节点丢弃是最直接的增强方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
def node_dropout_augmentation(edge_index, num_nodes, drop_rate=0.1):
"""节点丢弃增强"""
num_drop = int(num_nodes * drop_rate)
drop_nodes = torch.randperm(num_nodes)[:num_drop]
drop_nodes_set = set(drop_nodes.tolist())

mask = torch.ones(edge_index.size(1), dtype=torch.bool)
for i in range(edge_index.size(1)):
if edge_index[0, i].item() in drop_nodes_set or \
edge_index[1, i].item() in drop_nodes_set:
mask[i] = False

return edge_index[:, mask]

2. 边级增强

边丢弃保留所有节点,但随机删除部分边:

1
2
3
4
5
6
def edge_dropout_augmentation(edge_index, drop_rate=0.1):
"""边丢弃增强"""
num_edges = edge_index.size(1)
num_keep = int(num_edges * (1 - drop_rate))
keep_indices = torch.randperm(num_edges)[:num_keep]
return edge_index[:, keep_indices]

3. 子图采样

通过随机游走或 BFS 采样子图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def random_walk_subgraph(edge_index, start_nodes, walk_length=10):
"""随机游走子图采样"""
# 构建邻接表
adj_dict = {}
for i in range(edge_index.size(1)):
src, dst = edge_index[0, i].item(), edge_index[1, i].item()
if src not in adj_dict:
adj_dict[src] = []
adj_dict[src].append(dst)

# 随机游走
visited_nodes = set()
sampled_edges = []

for start_node in start_nodes:
current = start_node
visited_nodes.add(current)

for _ in range(walk_length):
if current not in adj_dict or len(adj_dict[current]) == 0:
break

next_node = np.random.choice(adj_dict[current])
visited_nodes.add(next_node)
sampled_edges.append((current, next_node))
current = next_node

# 构建新的边索引
if sampled_edges:
sampled_edge_index = torch.tensor(list(zip(*sampled_edges))).T
return sampled_edge_index
else:
return edge_index

4. 特征增强

对节点特征进行增强:

1
2
3
4
5
6
7
8
9
10
11
12
def feature_augmentation(node_features, method='dropout', drop_rate=0.1):
"""节点特征增强"""
if method == 'dropout':
mask = torch.rand_like(node_features) > drop_rate
return node_features * mask
elif method == 'noise':
noise = torch.randn_like(node_features) * 0.1
return node_features + noise
elif method == 'normalize':
return F.normalize(node_features, dim=1)
else:
return node_features

增强策略的组合使用

在实际应用中,可以组合多种增强策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class CombinedGraphAugmentation:
"""组合图数据增强"""

def __init__(self, node_drop_rate=0.1, edge_drop_rate=0.1,
feature_drop_rate=0.1):
self.node_drop_rate = node_drop_rate
self.edge_drop_rate = edge_drop_rate
self.feature_drop_rate = feature_drop_rate

def augment(self, edge_index, node_features, num_nodes):
"""应用多种增强策略"""
# 1. 边丢弃
edge_index_aug = edge_dropout_augmentation(
edge_index, self.edge_drop_rate
)

# 2. 节点丢弃
edge_index_aug = node_dropout_augmentation(
edge_index_aug, num_nodes, self.node_drop_rate
)

# 3. 特征增强
node_features_aug = feature_augmentation(
node_features, 'dropout', self.feature_drop_rate
)

return edge_index_aug, node_features_aug

XSimGCL:极简设计的对比学习

XSimGCL 的设计理念

XSimGCL( eXtreme Simple Graph Contrastive Learning)是 2022 年提出的极简图对比学习框架。基本思路:最简单的设计往往最有效

XSimGCL 的主要创新: 1. 去除了复杂的图数据增强:直接使用原始图结构 2. 简化了对比学习目标:使用更直接的对比损失 3. 减少了计算开销:只需要一次前向传播

XSimGCL 的架构

XSimGCL 使用 LightGCN 作为编码器,但去除了数据增强步骤:

关键创新在于:使用不同层的表示作为不同的视图,而不是通过数据增强生成视图。

XSimGCL 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import torch
import torch.nn as nn
import torch.nn.functional as F

class XSimGCL(nn.Module):
"""XSimGCL 模型"""

def __init__(self, num_users, num_items, embedding_dim=64,
num_layers=3, temperature=0.2):
super(XSimGCL, self).__init__()
self.num_users = num_users
self.num_items = num_items
self.num_layers = num_layers
self.temperature = temperature

# 嵌入层
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)

# 初始化
nn.init.normal_(self.user_embedding.weight, std=0.1)
nn.init.normal_(self.item_embedding.weight, std=0.1)

def forward(self, edge_index):
"""前向传播"""
user_emb = self.user_embedding.weight
item_emb = self.item_embedding.weight
x = torch.cat([user_emb, item_emb], dim=0)

# 存储每一层的表示
all_embeddings = [x]

# 图卷积
for _ in range(self.num_layers):
x = self._propagate(edge_index, x)
all_embeddings.append(x)

# 最终表示(平均池化)
final_embedding = torch.mean(torch.stack(all_embeddings), dim=0)

user_emb_final = final_embedding[:self.num_users]
item_emb_final = final_embedding[self.num_users:]

return user_emb_final, item_emb_final, all_embeddings

def _propagate(self, edge_index, x):
"""消息传播"""
row, col = edge_index
deg = torch.zeros(x.size(0)).to(x.device)
deg.index_add_(0, row, torch.ones(row.size(0)).to(x.device))
deg = torch.sqrt(deg.clamp(min=1))

# 归一化
x_norm = x / deg.unsqueeze(1)

# 聚合
out = torch.zeros_like(x)
out.index_add_(0, col, x_norm[row])

# 归一化
deg_col = torch.zeros(x.size(0)).to(x.device)
deg_col.index_add_(0, col, torch.ones(col.size(0)).to(x.device))
deg_col = torch.sqrt(deg_col.clamp(min=1))
out = out / deg_col.unsqueeze(1)

return out

def compute_contrastive_loss(self, embeddings_list):
"""计算对比损失

使用不同层的表示作为不同的视图
"""
if len(embeddings_list) < 2:
return torch.tensor(0.0).to(embeddings_list[0].device)

# 选择两个不同层的表示
idx1, idx2 = 0, len(embeddings_list) - 1
z1 = F.normalize(embeddings_list[idx1], dim=1)
z2 = F.normalize(embeddings_list[idx2], dim=1)

# 计算相似度矩阵
sim_matrix = torch.matmul(z1, z2.T) / self.temperature

# 对角线是正样本对
labels = torch.arange(z1.size(0)).to(z1.device)

# 交叉熵损失
loss = F.cross_entropy(sim_matrix, labels)

return loss

def compute_bpr_loss(self, user_emb, item_emb, users, pos_items, neg_items):
"""计算 BPR 损失"""
user_emb_selected = user_emb[users]
pos_emb = item_emb[pos_items]
neg_emb = item_emb[neg_items]

pos_scores = torch.sum(user_emb_selected * pos_emb, dim=1)
neg_scores = torch.sum(user_emb_selected * neg_emb, dim=1)

loss = -torch.mean(torch.log(torch.sigmoid(pos_scores - neg_scores) + 1e-10))

return loss

def train_xsimgcl(model, train_loader, optimizer, device, lambda_cl=0.1):
"""训练 XSimGCL 模型"""
model.train()
total_loss = 0

for batch_idx, batch in enumerate(train_loader):
edge_index = batch.edge_index.to(device)
users = batch.users.to(device)
pos_items = batch.pos_items.to(device)
neg_items = batch.neg_items.to(device)

# 前向传播(只需要一次)
user_emb, item_emb, embeddings_list = model(edge_index)

# 对比损失(使用不同层的表示)
contrastive_loss = model.compute_contrastive_loss(embeddings_list)

# BPR 损失
bpr_loss = model.compute_bpr_loss(user_emb, item_emb, users, pos_items, neg_items)

# 联合损失
loss = bpr_loss + lambda_cl * contrastive_loss

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, BPR Loss: {bpr_loss.item():.4f}, '
f'Contrastive Loss: {contrastive_loss.item():.4f}, '
f'Total Loss: {loss.item():.4f}')

return total_loss / len(train_loader)

XSimGCL 的优势

  1. 计算效率高:不需要多次前向传播
  2. 实现简单:代码量少,易于理解和实现
  3. 效果优异:在多个数据集上达到或超过 SGL 的效果
  4. 超参数少:减少了需要调优的超参数

序列推荐中的对比学习

序列推荐的特殊性

序列推荐需要考虑用户行为的时间顺序,这与图推荐有所不同。在序列推荐中,对比学习可以通过以下方式应用:

  1. 序列增强:对用户行为序列进行增强
  2. 时间对比:利用时间信息构造正负样本
  3. 上下文对比:利用序列上下文信息

序列数据增强策略

1. 序列裁剪( Crop)

1
2
3
4
5
6
def sequence_crop(sequence, crop_ratio=0.8):
"""序列裁剪"""
seq_len = len(sequence)
crop_len = int(seq_len * crop_ratio)
start_idx = np.random.randint(0, seq_len - crop_len + 1)
return sequence[start_idx:start_idx + crop_len]

2. 序列掩码( Mask)

1
2
3
4
5
6
7
8
9
10
11
def sequence_mask(sequence, mask_ratio=0.2):
"""序列掩码"""
seq_len = len(sequence)
num_mask = int(seq_len * mask_ratio)
mask_indices = np.random.choice(seq_len, num_mask, replace=False)

masked_sequence = sequence.copy()
for idx in mask_indices:
masked_sequence[idx] = 0 # 假设 0 是 mask token

return masked_sequence

3. 序列重排( Reorder)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def sequence_reorder(sequence, reorder_ratio=0.2):
"""序列重排"""
seq_len = len(sequence)
num_reorder = int(seq_len * reorder_ratio)

if num_reorder < 2:
return sequence

reorder_indices = np.random.choice(seq_len, num_reorder, replace=False)
reordered_sequence = sequence.copy()
reordered_values = reordered_sequence[reorder_indices]
np.random.shuffle(reordered_values)
reordered_sequence[reorder_indices] = reordered_values

return reordered_sequence

序列对比学习模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class SequenceEncoder(nn.Module):
"""序列编码器(使用 Transformer)"""

def __init__(self, num_items, embedding_dim=64, num_layers=2,
num_heads=2, dropout=0.2):
super(SequenceEncoder, self).__init__()
self.item_embedding = nn.Embedding(num_items + 1, embedding_dim, padding_idx=0)
self.pos_embedding = nn.Embedding(200, embedding_dim) # 最大序列长度

encoder_layer = nn.TransformerEncoderLayer(
d_model=embedding_dim,
nhead=num_heads,
dim_feedforward=embedding_dim * 4,
dropout=dropout,
batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)

self.dropout = nn.Dropout(dropout)

def forward(self, sequences):
"""前向传播

Args:
sequences: 序列 [batch_size, seq_len]
"""
batch_size, seq_len = sequences.shape

# 位置编码
positions = torch.arange(seq_len).unsqueeze(0).repeat(batch_size, 1).to(sequences.device)

# 嵌入
item_emb = self.item_embedding(sequences)
pos_emb = self.pos_embedding(positions)

# 添加位置编码
x = item_emb + pos_emb
x = self.dropout(x)

# Transformer 编码
# 创建 padding mask
mask = (sequences == 0)
x = self.transformer(x, src_key_padding_mask=mask)

# 取最后一个非 padding 位置的表示
lengths = (sequences != 0).sum(dim=1)
output = x[torch.arange(batch_size), lengths - 1]

return F.normalize(output, dim=1)

class ContrastiveSequentialRecommendation(nn.Module):
"""序列推荐对比学习模型"""

def __init__(self, num_items, embedding_dim=64, num_layers=2,
num_heads=2, temperature=0.2):
super(ContrastiveSequentialRecommendation, self).__init__()
self.encoder = SequenceEncoder(num_items, embedding_dim, num_layers, num_heads)
self.temperature = temperature

def forward(self, sequences):
"""前向传播"""
return self.encoder(sequences)

def compute_contrastive_loss(self, z1, z2):
"""计算对比损失"""
batch_size = z1.size(0)

# 拼接
z = torch.cat([z1, z2], dim=0)

# 相似度矩阵
sim_matrix = torch.matmul(z, z.T) / self.temperature

# 掩码对角线
mask = torch.eye(2 * batch_size, dtype=torch.bool).to(z.device)
sim_matrix = sim_matrix.masked_fill(mask, float('-inf'))

# 标签
labels = torch.arange(batch_size).to(z.device)
labels = torch.cat([labels + batch_size, labels], dim=0)

# 损失
loss = F.cross_entropy(sim_matrix, labels)

return loss

def predict_next_item(self, sequences, candidate_items):
"""预测下一个物品"""
user_repr = self.encoder(sequences) # [batch_size, embedding_dim]
item_emb = self.encoder.item_embedding(candidate_items) # [batch_size, num_candidates, embedding_dim]
item_emb = F.normalize(item_emb, dim=2)

scores = torch.bmm(user_repr.unsqueeze(1), item_emb.transpose(1, 2)).squeeze(1)
return scores

# 序列增强函数
def augment_sequence(sequence, method='crop'):
"""序列增强"""
if method == 'crop':
return sequence_crop(sequence, crop_ratio=0.8)
elif method == 'mask':
return sequence_mask(sequence, mask_ratio=0.2)
elif method == 'reorder':
return sequence_reorder(sequence, reorder_ratio=0.2)
else:
return sequence

def train_contrastive_sequential(model, train_loader, optimizer, device):
"""训练序列对比学习模型"""
model.train()
total_loss = 0

for batch_idx, sequences in enumerate(train_loader):
sequences = sequences.to(device)

# 序列增强
sequences_1 = torch.stack([
torch.tensor(augment_sequence(seq.cpu().numpy(), 'crop'))
for seq in sequences
]).to(device)

sequences_2 = torch.stack([
torch.tensor(augment_sequence(seq.cpu().numpy(), 'mask'))
for seq in sequences
]).to(device)

# 前向传播
z1 = model(sequences_1)
z2 = model(sequences_2)

# 计算损失
loss = model.compute_contrastive_loss(z1, z2)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')

return total_loss / len(train_loader)

长尾物品推荐

长尾问题的挑战

推荐系统中的长尾问题是指:少数热门物品占据了大部分交互,而大量长尾物品只有很少的交互。这导致:

  1. 数据稀疏:长尾物品的交互数据极少
  2. 表示学习困难:难以学习到有效的物品表示
  3. 推荐偏差:模型倾向于推荐热门物品

对比学习如何解决长尾问题

对比学习通过以下方式帮助解决长尾问题:

  1. 数据增强:为长尾物品生成更多训练样本
  2. 表示学习:学习更好的物品表示,即使数据稀疏
  3. 负样本挖掘:通过对比学习更好地区分相似物品

长尾物品推荐的对比学习框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import Counter

class LongTailContrastiveLearning(nn.Module):
"""长尾物品对比学习模型"""

def __init__(self, num_users, num_items, embedding_dim=64,
temperature=0.2, alpha=0.5):
super(LongTailContrastiveLearning, self).__init__()
self.num_users = num_users
self.num_items = num_items
self.temperature = temperature
self.alpha = alpha # 长尾物品权重

# 嵌入层
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)

# 初始化
nn.init.normal_(self.user_embedding.weight, std=0.1)
nn.init.normal_(self.item_embedding.weight, std=0.1)

# 物品流行度(用于加权)
self.item_popularity = None

def set_item_popularity(self, item_popularity):
"""设置物品流行度"""
self.item_popularity = item_popularity

def forward(self, user_ids, item_ids):
"""前向传播"""
user_emb = F.normalize(self.user_embedding(user_ids), dim=1)
item_emb = F.normalize(self.item_embedding(item_ids), dim=1)
return user_emb, item_emb

def compute_weighted_contrastive_loss(self, user_emb, pos_item_emb,
neg_item_emb, pos_items):
"""加权对比损失(对长尾物品给予更高权重)"""
batch_size = user_emb.size(0)
num_neg = neg_item_emb.size(1)

# 正样本相似度
pos_sim = torch.sum(user_emb * pos_item_emb, dim=1) / self.temperature

# 负样本相似度
user_emb_expanded = user_emb.unsqueeze(1)
neg_sim = torch.sum(user_emb_expanded * neg_item_emb, dim=2) / self.temperature

# 拼接
logits = torch.cat([pos_sim.unsqueeze(1), neg_sim], dim=1)

# 标签
labels = torch.zeros(batch_size, dtype=torch.long).to(user_emb.device)

# 计算权重(长尾物品权重更高)
if self.item_popularity is not None:
weights = torch.ones(batch_size).to(user_emb.device)
for i, item_id in enumerate(pos_items):
pop = self.item_popularity[item_id.item()]
# 流行度越低,权重越高
weights[i] = 1.0 / (pop + 1.0) * self.alpha + (1 - self.alpha)
else:
weights = torch.ones(batch_size).to(user_emb.device)

# 加权交叉熵损失
loss = F.cross_entropy(logits, labels, reduction='none')
loss = (loss * weights).mean()

return loss

def augment_long_tail_items(self, item_ids, item_popularity, threshold=100):
"""长尾物品增强

为长尾物品生成更多训练样本
"""
augmented_items = []

for item_id in item_ids:
pop = item_popularity[item_id.item()]

# 如果是长尾物品,进行增强
if pop < threshold:
# 添加噪声到嵌入空间
augmented_items.append(item_id)
# 可以添加更多增强策略
else:
augmented_items.append(item_id)

return torch.stack(augmented_items)

def compute_item_popularity(train_data):
"""计算物品流行度"""
item_counter = Counter()
for user_items in train_data:
item_counter.update(user_items)

return item_counter

def train_long_tail_model(model, train_loader, optimizer, device,
item_popularity, num_neg=5):
"""训练长尾物品推荐模型"""
model.train()
model.set_item_popularity(item_popularity)
total_loss = 0

for batch_idx, batch in enumerate(train_loader):
user_ids = batch['user_ids'].to(device)
pos_item_ids = batch['item_ids'].to(device)

# 采样负样本(可以针对长尾物品进行特殊采样)
neg_item_ids = sample_negative_items_with_long_tail(
pos_item_ids, model.num_items, item_popularity, num_neg
).to(device)

# 长尾物品增强
pos_item_ids_aug = model.augment_long_tail_items(
pos_item_ids, item_popularity
)

# 获取表示
user_emb, pos_item_emb = model(user_ids, pos_item_ids_aug)

# 获取负样本表示
neg_item_emb_list = []
for i in range(num_neg):
neg_emb = model.item_embedding(neg_item_ids[:, i])
neg_emb = F.normalize(neg_emb, dim=1)
neg_item_emb_list.append(neg_emb)
neg_item_emb = torch.stack(neg_item_emb_list, dim=1)

# 计算加权对比损失
loss = model.compute_weighted_contrastive_loss(
user_emb, pos_item_emb, neg_item_emb, pos_item_ids
)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')

return total_loss / len(train_loader)

def sample_negative_items_with_long_tail(pos_items, num_items, item_popularity,
num_neg=5, long_tail_ratio=0.3):
"""针对长尾物品的负样本采样"""
batch_size = pos_items.size(0)
num_long_tail = int(num_neg * long_tail_ratio)

neg_items = []

for i in range(batch_size):
neg_candidates = []

# 采样长尾物品作为负样本
long_tail_items = [item for item, pop in item_popularity.items()
if pop < 100]
if len(long_tail_items) > 0:
long_tail_samples = np.random.choice(
long_tail_items, min(num_long_tail, len(long_tail_items)),
replace=False
)
neg_candidates.extend(long_tail_samples.tolist())

# 采样其他负样本
remaining = num_neg - len(neg_candidates)
if remaining > 0:
other_samples = np.random.randint(0, num_items, remaining)
neg_candidates.extend(other_samples.tolist())

neg_items.append(neg_candidates[:num_neg])

return torch.tensor(neg_items, dtype=torch.long)

完整代码实现示例

端到端的推荐系统对比学习框架

下面我们提供一个完整的、可以直接运行的推荐系统对比学习框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd

# ==================== 数据准备 ====================

class RecommendationDataset(Dataset):
"""推荐系统数据集"""

def __init__(self, user_ids, item_ids, ratings=None):
self.user_ids = torch.tensor(user_ids, dtype=torch.long)
self.item_ids = torch.tensor(item_ids, dtype=torch.long)
self.ratings = torch.tensor(ratings, dtype=torch.float) if ratings is not None else None

def __len__(self):
return len(self.user_ids)

def __getitem__(self, idx):
if self.ratings is not None:
return {
'user_id': self.user_ids[idx],
'item_id': self.item_ids[idx],
'rating': self.ratings[idx]
}
else:
return {
'user_id': self.user_ids[idx],
'item_id': self.item_ids[idx]
}

def load_data(file_path):
"""加载数据"""
df = pd.read_csv(file_path)
return df['user_id'].values, df['item_id'].values, df['rating'].values

# ==================== 模型定义 ====================

class ContrastiveRecommendationModel(nn.Module):
"""对比学习推荐模型"""

def __init__(self, num_users, num_items, embedding_dim=64,
hidden_dims=[128, 128], temperature=0.2, dropout=0.2):
super(ContrastiveRecommendationModel, self).__init__()
self.num_users = num_users
self.num_items = num_items
self.temperature = temperature

# 嵌入层
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.item_embedding = nn.Embedding(num_items, embedding_dim)

# 特征提取网络
self.user_network = self._build_network(embedding_dim, hidden_dims, dropout)
self.item_network = self._build_network(embedding_dim, hidden_dims, dropout)

# 投影头
self.user_projection = nn.Linear(hidden_dims[-1], embedding_dim)
self.item_projection = nn.Linear(hidden_dims[-1], embedding_dim)

# 初始化
nn.init.normal_(self.user_embedding.weight, std=0.1)
nn.init.normal_(self.item_embedding.weight, std=0.1)

def _build_network(self, input_dim, hidden_dims, dropout):
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim
return nn.Sequential(*layers)

def forward(self, user_ids, item_ids):
"""前向传播"""
user_emb = self.user_embedding(user_ids)
item_emb = self.item_embedding(item_ids)

user_repr = self.user_network(user_emb)
item_repr = self.item_network(item_emb)

user_proj = F.normalize(self.user_projection(user_repr), dim=1)
item_proj = F.normalize(self.item_projection(item_repr), dim=1)

return user_proj, item_proj, user_repr, item_repr

def compute_contrastive_loss(self, z1, z2):
"""计算对比损失"""
batch_size = z1.size(0)
z = torch.cat([z1, z2], dim=0)

sim_matrix = torch.matmul(z, z.T) / self.temperature
mask = torch.eye(2 * batch_size, dtype=torch.bool).to(z.device)
sim_matrix = sim_matrix.masked_fill(mask, float('-inf'))

labels = torch.arange(batch_size).to(z.device)
labels = torch.cat([labels + batch_size, labels], dim=0)

return F.cross_entropy(sim_matrix, labels)

def predict(self, user_ids, item_ids):
"""预测评分"""
_, _, user_repr, item_repr = self.forward(user_ids, item_ids)
scores = torch.sum(user_repr * item_repr, dim=1)
return scores

# ==================== 数据增强 ====================

def augment_user_item_pair(user_ids, item_ids, method='dropout'):
"""用户-物品对增强"""
if method == 'dropout':
# 随机丢弃部分样本
mask = torch.rand(len(user_ids)) > 0.1
return user_ids[mask], item_ids[mask]
elif method == 'shuffle':
# 随机打乱
indices = torch.randperm(len(user_ids))
return user_ids[indices], item_ids[indices]
else:
return user_ids, item_ids

# ==================== 训练函数 ====================

def train_epoch(model, train_loader, optimizer, device, lambda_cl=0.1):
"""训练一个 epoch"""
model.train()
total_loss = 0

for batch_idx, batch in enumerate(train_loader):
user_ids = batch['user_id'].to(device)
item_ids = batch['item_id'].to(device)

# 数据增强
user_ids_1, item_ids_1 = augment_user_item_pair(user_ids, item_ids, 'dropout')
user_ids_2, item_ids_2 = augment_user_item_pair(user_ids, item_ids, 'shuffle')

# 前向传播
user_proj_1, item_proj_1, _, _ = model(user_ids_1, item_ids_1)
user_proj_2, item_proj_2, _, _ = model(user_ids_2, item_ids_2)

# 对比损失
contrastive_loss = model.compute_contrastive_loss(
torch.cat([user_proj_1, item_proj_1], dim=0),
torch.cat([user_proj_2, item_proj_2], dim=0)
)

# BPR 损失(简化版)
user_proj, item_proj, user_repr, item_repr = model(user_ids, item_ids)
pos_scores = torch.sum(user_repr * item_repr, dim=1)

# 采样负样本
neg_item_ids = torch.randint(0, model.num_items, (len(user_ids),)).to(device)
_, _, _, neg_item_repr = model(user_ids, neg_item_ids)
neg_scores = torch.sum(user_repr * neg_item_repr, dim=1)

bpr_loss = -torch.mean(torch.log(torch.sigmoid(pos_scores - neg_scores) + 1e-10))

# 总损失
loss = bpr_loss + lambda_cl * contrastive_loss

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

return total_loss / len(train_loader)

# ==================== 评估函数 ====================

def evaluate(model, test_loader, device, k=10):
"""评估模型( Hit Rate @ K)"""
model.eval()
hits = 0
total = 0

with torch.no_grad():
for batch in test_loader:
user_ids = batch['user_id'].to(device)
item_ids = batch['item_id'].to(device)

# 预测
scores = model.predict(user_ids, item_ids)

# 计算 Hit Rate
top_k_items = torch.topk(scores, k).indices
hits += (item_ids.unsqueeze(1) == top_k_items).any(dim=1).sum().item()
total += len(user_ids)

return hits / total

# ==================== 主函数 ====================

def main():
# 超参数
num_users = 1000
num_items = 2000
embedding_dim = 64
hidden_dims = [128, 128]
batch_size = 256
num_epochs = 50
learning_rate = 0.001
temperature = 0.2
lambda_cl = 0.1

# 设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 创建模型
model = ContrastiveRecommendationModel(
num_users, num_items, embedding_dim, hidden_dims, temperature
).to(device)

# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# 数据加载(这里使用模拟数据)
# 实际应用中应该从文件加载真实数据
train_user_ids = np.random.randint(0, num_users, 10000)
train_item_ids = np.random.randint(0, num_items, 10000)
train_dataset = RecommendationDataset(train_user_ids, train_item_ids)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_user_ids = np.random.randint(0, num_users, 1000)
test_item_ids = np.random.randint(0, num_items, 1000)
test_dataset = RecommendationDataset(test_user_ids, test_item_ids)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 训练
for epoch in range(num_epochs):
train_loss = train_epoch(model, train_loader, optimizer, device, lambda_cl)
hit_rate = evaluate(model, test_loader, device)

print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, '
f'Hit Rate @ 10: {hit_rate:.4f}')

if __name__ == '__main__':
main()

常见问题与解答( Q&A)

Q1: 对比学习和传统推荐方法的主要区别是什么?

A: 对比学习与传统推荐方法的主要区别在于:

  1. 监督信号来源
    • 传统方法:依赖显式反馈(评分)或隐式反馈(点击)
    • 对比学习:从数据本身的结构中挖掘监督信号
  2. 数据利用
    • 传统方法:主要利用用户-物品交互矩阵
    • 对比学习:通过数据增强生成更多训练样本
  3. 表示学习
    • 传统方法:通过预测任务学习表示
    • 对比学习:通过对比相似和不相似样本学习表示
  4. 稀疏数据处理
    • 传统方法:在稀疏数据上表现较差
    • 对比学习:通过对比学习更好地处理稀疏数据

Q2: SimCLR 中的温度参数有什么作用?

A: 温度参数 在对比学习中起到关键作用:

  1. 控制分布的尖锐程度
    • 较小的 (如 0.07)会产生更尖锐的分布,模型更关注困难负样本
    • 较大的 (如 1.0)会产生更平滑的分布,模型对所有样本的关注更均匀
  2. 影响学习难度
    • 较小的 增加学习难度,可能提升模型性能
    • 但过小的 可能导致训练不稳定
  3. 实际应用
    • 在推荐系统中,通常使用 0.1-0.2 的温度参数
    • 需要通过实验找到最适合的值

Q3: SGL 和 XSimGCL 的主要区别是什么?

A: 主要区别如下:

  1. 数据增强
    • SGL:使用图数据增强(节点丢弃、边丢弃等)生成不同视图
    • XSimGCL:不使用数据增强,直接使用不同层的表示作为不同视图
  2. 计算开销
    • SGL:需要多次前向传播(每个视图一次)
    • XSimGCL:只需要一次前向传播
  3. 实现复杂度
    • SGL:需要实现图数据增强策略
    • XSimGCL:实现更简单,代码量更少
  4. 性能
    • 两者在大多数数据集上性能相近
    • XSimGCL 在某些场景下可能略优于 SGL

Q4: 如何选择合适的数据增强策略?

A: 选择数据增强策略需要考虑:

  1. 数据类型
    • 图数据:使用节点丢弃、边丢弃、子图采样
    • 序列数据:使用序列裁剪、掩码、重排
    • 特征数据:使用特征掩码、噪声添加
  2. 增强强度
    • 过强的增强可能破坏数据语义
    • 过弱的增强可能无法提供足够的多样性
    • 通常使用 10%-20% 的增强率
  3. 任务特性
    • 推荐任务:需要保持用户-物品关系的语义
    • 可以组合多种增强策略
  4. 实验验证
    • 通过消融实验找到最佳组合
    • 在不同数据集上验证策略的有效性

Q5: 对比学习如何解决冷启动问题?

A: 对比学习通过以下方式帮助解决冷启动问题:

  1. 数据增强
    • 为新用户/物品生成多个视图
    • 即使交互数据少,也能学习到有效表示
  2. 表示学习
    • 学习到更具泛化能力的表示
    • 相似用户/物品的表示更接近
  3. 迁移学习
    • 可以先用大量数据预训练
    • 再在冷启动场景下微调
  4. 负样本挖掘
    • 通过对比学习更好地利用负样本信息
    • 帮助模型学习到更细粒度的特征

Q6: 对比学习的计算开销大吗?

A: 对比学习的计算开销取决于具体实现:

  1. SimCLR
    • 需要大批次(如 4096)才能获得足够负样本
    • 计算开销较大,但可以通过梯度累积缓解
  2. SGL
    • 需要多次前向传播(每个视图一次)
    • 计算开销是传统方法的 2-3 倍
  3. XSimGCL
    • 只需要一次前向传播
    • 计算开销与传统方法相近
  4. 优化策略
    • 使用内存库( Memory Bank)减少计算
    • 使用动量编码器
    • 减少增强视图的数量

Q7: 如何评估对比学习模型的效果?

A: 评估对比学习模型可以使用以下指标:

  1. 推荐任务指标
    • Hit Rate @ K
    • NDCG @ K
    • Recall @ K
    • Precision @ K
  2. 表示学习指标
    • 表示的可分离性
    • 相似样本的表示距离
    • t-SNE 可视化
  3. 对比学习特定指标
    • 对比损失的收敛情况
    • 正样本对的相似度
    • 负样本对的相似度
  4. 实际业务指标
    • CTR(点击率)
    • 转化率
    • 用户满意度

Q8: 对比学习中的负样本采样策略有哪些?

A: 常见的负样本采样策略:

  1. 随机采样
    • 最简单的方法
    • 从所有未交互物品中随机采样
  2. 困难负样本挖掘
    • 选择与正样本相似但未交互的物品
    • 提升模型的学习难度
  3. 流行度加权采样
    • 根据物品流行度加权采样
    • 平衡热门和长尾物品
  4. 对抗性负样本
    • 使用对抗网络生成负样本
    • 提升模型的鲁棒性
  5. 批次内负样本
    • 使用同一批次内的其他样本作为负样本
    • SimCLR 使用的方法

Q9: 对比学习可以与其他推荐技术结合吗?

A: 可以,对比学习可以与多种技术结合:

  1. 注意力机制
    • 在对比学习中引入注意力
    • 关注重要的特征维度
  2. 图神经网络
    • SGL 就是结合了 GNN 和对比学习
    • 利用图结构信息
  3. 序列模型
    • 结合 Transformer 和对比学习
    • 用于序列推荐
  4. 多任务学习
    • 同时优化推荐任务和对比学习任务
    • 提升模型的泛化能力
  5. 元学习
    • 结合元学习和对比学习
    • 快速适应新场景

Q10: 对比学习在工业界的应用情况如何?

A: 对比学习在工业界的应用:

  1. 应用场景
    • 电商推荐(淘宝、京东等)
    • 视频推荐( YouTube 、抖音等)
    • 音乐推荐( Spotify 、网易云音乐等)
  2. 实际效果
    • 在稀疏数据场景下显著提升效果
    • 特别是在冷启动和长尾物品推荐上
  3. 挑战
    • 计算开销需要优化
    • 超参数调优复杂
    • 需要大量实验验证
  4. 未来方向
    • 更高效的数据增强策略
    • 更好的负样本采样方法
    • 与其他技术的深度融合

总结

本文深入探讨了对比学习和自监督学习在推荐系统中的应用。我们从自监督学习的基础开始,详细介绍了 SimCLR 、 SGL 、 RecDCL 、 RCL 、 XSimGCL 等经典框架,探讨了图数据增强策略、序列推荐中的对比学习应用,以及如何利用这些技术解决长尾物品推荐问题。

对比学习为推荐系统带来了新的思路和方法,特别是在处理数据稀疏性、冷启动和长尾问题方面表现出色。通过合理设计数据增强策略、对比损失函数和训练流程,可以学习到更好的用户和物品表示,从而提升推荐效果。

随着研究的深入和技术的成熟,对比学习在推荐系统中的应用将会越来越广泛。希望本文能够帮助读者深入理解这些前沿技术,并在实际项目中应用它们。

  • 本文标题:推荐系统(十一)—— 对比学习与自监督学习
  • 本文作者:Chen Kai
  • 创建时间:2024-06-21 10:00:00
  • 本文链接:https://www.chenk.top/%E6%8E%A8%E8%8D%90%E7%B3%BB%E7%BB%9F%EF%BC%88%E5%8D%81%E4%B8%80%EF%BC%89%E2%80%94%E2%80%94-%E5%AF%B9%E6%AF%94%E5%AD%A6%E4%B9%A0%E4%B8%8E%E8%87%AA%E7%9B%91%E7%9D%A3%E5%AD%A6%E4%B9%A0/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论