推荐系统(十六)—— 工业级架构与最佳实践
Chen Kai BOSS

推荐系统从实验室走向生产环境,面临的是完全不同的挑战。单机训练、离线评估的模型在真实流量面前往往不堪一击。工业级推荐系统需要处理每秒百万级的请求、毫秒级的响应延迟、实时特征更新、 A/B 测试分流、模型热更新、异常监控等复杂问题。本文将深入探讨工业推荐系统的完整架构设计,从召回、粗排、精排到重排序的每一层,结合阿里巴巴 EasyRec 、字节跳动 LONGER 等业界最佳实践,提供可落地的工程方案。

工业推荐系统全景

系统架构概览

工业级推荐系统通常采用分层漏斗架构,从海量候选集中逐步筛选出最相关的物品。典型的架构包含以下层次:

  1. 召回层( Recall):从百万级候选集中快速召回数千个相关物品
  2. 粗排层( Coarse Ranking):对召回结果进行初步排序,筛选出数百个候选
  3. 精排层( Fine Ranking):使用复杂模型对候选进行精确打分
  4. 重排序层( Re-ranking):考虑多样性、新颖性、业务规则等因素进行最终调整

代码目的: 实现工业级推荐系统的主流程,展示分层漏斗架构的完整实现。工业级推荐系统通过多阶段逐步筛选,在保证推荐质量的同时满足毫秒级响应要求。

整体思路: 1. 召回层:从百万级候选集中快速召回数千个相关物品,使用多种召回策略(协同过滤、内容、热门等) 2. 粗排层:使用轻量级模型快速筛选到数百个候选,平衡准确性和效率 3. 精排层:使用复杂模型精确打分,考虑用户特征、物品特征、上下文等 4. 重排序层:考虑多样性、新颖性、业务规则等因素进行最终调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class IndustrialRecommendationSystem:
"""工业级推荐系统主流程"""

def __init__(self):
self.recall_engine = MultiChannelRecall()
self.coarse_ranker = CoarseRanker()
self.fine_ranker = FineRanker()
self.re_ranker = ReRanker()
self.feature_service = FeatureService()

def recommend(self, user_id, context=None):
"""推荐主流程"""
# 1. 召回:从百万级候选集中召回数千个
recall_items = self.recall_engine.recall(user_id, top_k=5000)

# 2. 粗排:快速筛选到数百个
coarse_items = self.coarse_ranker.rank(
recall_items, user_id, top_k=500
)

# 3. 精排:精确打分
fine_items = self.fine_ranker.rank(
coarse_items, user_id, context, top_k=100
)

# 4. 重排序:考虑多样性等
final_items = self.re_ranker.re_rank(
fine_items, user_id, top_k=20
)

return final_items

性能指标与 SLA

工业系统需要明确的性能指标:

  • QPS(每秒查询数):通常需要支持 10K+ QPS
  • P99 延迟:召回层 < 50ms,粗排 < 20ms,精排 < 100ms
  • 可用性: 99.9%+ 可用性,故障自动降级
  • 吞吐量:支持峰值流量 3-5 倍扩容

代码目的: 实现性能监控系统,实时监控推荐系统各阶段的延迟指标,确保满足 SLA 要求。工业级系统需要持续监控 P99 延迟、 QPS 等关键指标,及时发现性能问题。

整体思路: 1. 延迟记录:记录召回、粗排、精排、总延迟等各阶段的耗时 2. P99 监控:计算 P99 延迟,及时发现性能瓶颈 3. 告警机制:当延迟超过阈值时触发告警,支持自动降级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class PerformanceMonitor:
"""性能监控"""

def __init__(self):
self.metrics = {
'recall_latency': [],
'coarse_latency': [],
'fine_latency': [],
'total_latency': []
}

def record_latency(self, stage, latency_ms):
"""记录延迟"""
self.metrics[f'{stage}_latency'].append(latency_ms)

# P99 延迟监控
if stage == 'total':
p99 = np.percentile(self.metrics['total_latency'], 99)
if p99 > 200: # 超过 200ms 告警
self.alert(f'P99 latency exceeds threshold: {p99}ms')

召回层设计:多路召回策略

召回层是推荐系统的第一道防线,需要在极短时间内从百万级候选集中找出数千个相关物品。单一召回策略往往无法覆盖所有场景,因此工业系统普遍采用多路召回策略。

协同过滤召回

协同过滤( Collaborative Filtering)是最经典的召回方法,包括用户协同过滤( UserCF)和物品协同过滤( ItemCF)。

用户协同过滤( UserCF)

UserCF 基于"相似用户喜欢相似物品"的假设:

$$

sim(u_i, u_j) = $$

其中 表示用户 交互过的物品集合。

代码目的: 实现用户协同过滤( UserCF)召回算法,基于"相似用户喜欢相似物品"的假设,从百万级候选集中快速召回相关物品。 UserCF 是工业级推荐系统中最常用的召回策略之一。

整体思路: 1. 相似度计算:使用余弦相似度计算用户之间的相似度 2. 相似用户查找:找到与目标用户最相似的 top-k 个用户 3. 物品推荐:基于相似用户的历史行为,计算物品推荐分数 4. 去重排序:去除用户已交互的物品,按分数排序返回 top-n

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class UserCFRecall:
"""用户协同过滤召回"""

def __init__(self, user_item_matrix, top_k=100):
self.user_item_matrix = user_item_matrix # 用户-物品矩阵
self.top_k = top_k
self.user_similarity = None

def compute_user_similarity(self):
"""计算用户相似度矩阵"""
# 使用余弦相似度
from sklearn.metrics.pairwise import cosine_similarity
self.user_similarity = cosine_similarity(
self.user_item_matrix
)
return self.user_similarity

def recall(self, user_id, top_n=500):
"""召回 top_n 个物品"""
if self.user_similarity is None:
self.compute_user_similarity()

# 找到最相似的 top_k 个用户
similar_users = np.argsort(
self.user_similarity[user_id]
)[-self.top_k-1:-1][::-1]

# 计算推荐分数
scores = {}
user_items = set(self.user_item_matrix[user_id].nonzero()[1])

for similar_user in similar_users:
similarity = self.user_similarity[user_id, similar_user]
similar_user_items = self.user_item_matrix[similar_user].nonzero()[1]

for item_id in similar_user_items:
if item_id not in user_items:
scores[item_id] = scores.get(item_id, 0) + similarity

# 返回 top_n
top_items = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [item_id for item_id, score in top_items[:top_n]]

物品协同过滤( ItemCF)

ItemCF 基于"喜欢物品 A 的用户也喜欢物品 B"的假设,通常效果更好且更稳定:

$$

sim(i_j, i_k) = $$

其中 表示喜欢物品 的用户集合。

代码目的: 实现物品协同过滤( ItemCF)召回算法,基于"喜欢物品 A 的用户也喜欢物品 B"的假设进行召回。 ItemCF 通常比 UserCF 效果更好且更稳定,是工业级推荐系统的核心召回策略。

整体思路: 1. 物品相似度计算:转置用户-物品矩阵,计算物品之间的余弦相似度 2. 历史物品扩展:基于用户历史交互的物品,找到相似物品 3. 加权评分:考虑用户对历史物品的偏好强度,加权计算推荐分数 4. 排序返回:按分数排序,返回 top-n 个物品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class ItemCFRecall:
"""物品协同过滤召回"""

def __init__(self, user_item_matrix, top_k=100):
self.user_item_matrix = user_item_matrix
self.top_k = top_k
self.item_similarity = None

def compute_item_similarity(self):
"""计算物品相似度矩阵"""
# 转置矩阵,计算物品相似度
item_user_matrix = self.user_item_matrix.T
from sklearn.metrics.pairwise import cosine_similarity
self.item_similarity = cosine_similarity(item_user_matrix)
return self.item_similarity

def recall(self, user_id, top_n=500):
"""基于用户历史行为召回"""
# 用户历史交互物品
user_items = self.user_item_matrix[user_id].nonzero()[1]

if len(user_items) == 0:
return []

# 计算每个候选物品的得分
scores = {}
for item_id in user_items:
# 找到与 item_id 最相似的 top_k 个物品
similar_items = np.argsort(
self.item_similarity[item_id]
)[-self.top_k-1:-1][::-1]

for similar_item in similar_items:
if similar_item not in user_items:
similarity = self.item_similarity[item_id, similar_item]
# 考虑用户对 item_id 的偏好强度
user_preference = self.user_item_matrix[user_id, item_id]
scores[similar_item] = scores.get(
similar_item, 0
) + similarity * user_preference

# 返回 top_n
top_items = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [item_id for item_id, score in top_items[:top_n]]

内容召回

内容召回基于物品的文本、图像、类别等特征,计算用户与物品的内容相似度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ContentBasedRecall:
"""基于内容的召回"""

def __init__(self, item_features, user_profile):
"""
item_features: dict{item_id: feature_vector}
user_profile: dict{user_id: feature_vector}
"""
self.item_features = item_features
self.user_profile = user_profile

def recall(self, user_id, top_n=500):
"""基于内容相似度召回"""
if user_id not in self.user_profile:
return []

user_vector = self.user_profile[user_id]
scores = {}

for item_id, item_vector in self.item_features.items():
# 计算余弦相似度
similarity = cosine_similarity(
user_vector.reshape(1, -1),
item_vector.reshape(1, -1)
)[0][0]
scores[item_id] = similarity

top_items = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [item_id for item_id, score in top_items[:top_n]]

热门召回

热门召回保证系统的覆盖度和新鲜度,避免过度个性化导致的"信息茧房"。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class PopularRecall:
"""热门物品召回"""

def __init__(self, time_window_hours=24):
self.time_window_hours = time_window_hours
self.popular_items = []

def update_popular_items(self, click_logs):
"""更新热门物品列表"""
import pandas as pd
from datetime import datetime, timedelta

cutoff_time = datetime.now() - timedelta(
hours=self.time_window_hours
)

# 统计时间窗口内的点击量
recent_clicks = click_logs[
click_logs['timestamp'] >= cutoff_time
]

item_counts = recent_clicks.groupby('item_id').size()

# 考虑时间衰减
item_scores = {}
for item_id, count in item_counts.items():
# 简单的时间衰减:越近的点击权重越高
latest_time = recent_clicks[
recent_clicks['item_id'] == item_id
]['timestamp'].max()

hours_ago = (datetime.now() - latest_time).total_seconds() / 3600
decay = np.exp(-hours_ago / self.time_window_hours)
item_scores[item_id] = count * decay

self.popular_items = sorted(
item_scores.items(), key=lambda x: x[1], reverse=True
)

def recall(self, user_id, top_n=200):
"""召回热门物品"""
return [item_id for item_id, score in self.popular_items[:top_n]]

实时召回

实时召回基于用户最近的交互行为,捕捉用户的即时兴趣变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class RealTimeRecall:
"""实时召回"""

def __init__(self, redis_client, time_window_minutes=30):
self.redis = redis_client
self.time_window_minutes = time_window_minutes

def record_user_action(self, user_id, item_id, action_type='click'):
"""记录用户行为"""
import time
key = f"realtime:user:{user_id}"
timestamp = time.time()

# 存储到 Redis,使用 sorted set 按时间排序
self.redis.zadd(
key,
{f"{item_id}:{action_type}": timestamp}
)

# 只保留最近 time_window_minutes 的行为
cutoff_time = timestamp - self.time_window_minutes * 60
self.redis.zremrangebyscore(key, 0, cutoff_time)

def recall(self, user_id, top_n=200):
"""基于实时行为召回"""
import time
key = f"realtime:user:{user_id}"

# 获取最近的行为
cutoff_time = time.time() - self.time_window_minutes * 60
recent_actions = self.redis.zrangebyscore(
key, cutoff_time, time.time(), withscores=True
)

if not recent_actions:
return []

# 基于实时行为找到相似物品(简化版)
# 实际中可以使用向量检索或图遍历
recent_items = set()
for action_data, timestamp in recent_actions:
item_id = action_data.split(':')[0]
recent_items.add(int(item_id))

# 这里简化处理,实际应该基于物品相似度扩展
return list(recent_items)[:top_n]

多路召回融合

多路召回的结果需要融合,常见策略包括:

  1. 加权融合:不同召回通道设置不同权重
  2. 去重合并:相同物品取最高分
  3. 多样性保证:确保各通道都有一定比例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class MultiChannelRecall:
"""多路召回融合"""

def __init__(self):
self.recall_channels = {
'itemcf': ItemCFRecall(...),
'content': ContentBasedRecall(...),
'popular': PopularRecall(...),
'realtime': RealTimeRecall(...)
}

# 各通道权重
self.weights = {
'itemcf': 0.4,
'content': 0.2,
'popular': 0.2,
'realtime': 0.2
}

# 各通道召回数量
self.recall_sizes = {
'itemcf': 2000,
'content': 1000,
'popular': 1000,
'realtime': 1000
}

def recall(self, user_id, top_k=5000):
"""多路召回并融合"""
all_items = {}

# 各通道召回
for channel_name, recaller in self.recall_channels.items():
items = recaller.recall(
user_id,
top_n=self.recall_sizes[channel_name]
)

# 归一化分数并加权
if items:
max_score = len(items)
for rank, item_id in enumerate(items):
normalized_score = (max_score - rank) / max_score
weighted_score = normalized_score * self.weights[channel_name]

if item_id not in all_items:
all_items[item_id] = 0
all_items[item_id] += weighted_score

# 去重并排序
sorted_items = sorted(
all_items.items(),
key=lambda x: x[1],
reverse=True
)

return [item_id for item_id, score in sorted_items[:top_k]]

粗排与精排

粗排层设计

粗排层需要在召回和精排之间做平衡:既要保证效果,又要控制计算成本。粗排通常使用轻量级模型,如双塔模型、浅层神经网络。

双塔模型粗排

双塔模型将用户和物品分别编码为向量,通过向量内积计算匹配分数:

$$

s(u, i) = f_u(u)^T f_i(i)$$

其中 分别是用户塔和物品塔的编码函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import torch
import torch.nn as nn

class CoarseRankerDualTower(nn.Module):
"""双塔模型粗排"""

def __init__(self, user_feature_dim, item_feature_dim, embedding_dim=128):
super().__init__()

# 用户塔
self.user_tower = nn.Sequential(
nn.Linear(user_feature_dim, 256),
nn.ReLU(),
nn.Linear(256, embedding_dim)
)

# 物品塔
self.item_tower = nn.Sequential(
nn.Linear(item_feature_dim, 256),
nn.ReLU(),
nn.Linear(256, embedding_dim)
)

def forward(self, user_features, item_features):
"""前向传播"""
user_emb = self.user_tower(user_features)
item_emb = self.item_tower(item_features)

# L2 归一化
user_emb = nn.functional.normalize(user_emb, p=2, dim=1)
item_emb = nn.functional.normalize(item_emb, p=2, dim=1)

# 内积计算相似度
score = torch.sum(user_emb * item_emb, dim=1)
return score

def rank(self, user_features, item_features_list, top_k=500):
"""批量排序"""
self.eval()
with torch.no_grad():
user_emb = self.user_tower(user_features)
user_emb = nn.functional.normalize(user_emb, p=2, dim=1)

scores = []
batch_size = 1000

for i in range(0, len(item_features_list), batch_size):
batch_items = item_features_list[i:i+batch_size]
item_batch = torch.stack(batch_items)

item_emb = self.item_tower(item_batch)
item_emb = nn.functional.normalize(item_emb, p=2, dim=1)

batch_scores = torch.matmul(user_emb, item_emb.T)
scores.extend(batch_scores.cpu().numpy().tolist()[0])

# 返回 top_k
top_indices = np.argsort(scores)[-top_k:][::-1]
return top_indices

粗排优化技巧

  1. 特征缓存:物品特征可以离线计算并缓存
  2. 批量计算:使用 GPU 批量计算提高吞吐
  3. 模型量化:使用 INT8 量化减少内存和计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class OptimizedCoarseRanker:
"""优化的粗排器"""

def __init__(self, model, item_feature_cache):
self.model = model
self.item_feature_cache = item_feature_cache # 预计算的物品特征

def rank(self, user_id, item_ids, top_k=500):
"""优化的排序流程"""
# 1. 获取用户特征(实时)
user_features = self.get_user_features(user_id)

# 2. 批量获取物品特征(从缓存)
item_features = [
self.item_feature_cache[item_id]
for item_id in item_ids
]

# 3. 批量计算分数( GPU)
scores = self.model.batch_score(
user_features,
item_features
)

# 4. Top-K 选择
top_indices = np.argsort(scores)[-top_k:][::-1]
return [item_ids[i] for i in top_indices]

精排层设计

精排层使用复杂模型进行精确打分,常见模型包括 Wide&Deep 、 DeepFM 、 DCN 、 xDeepFM 等。

DeepFM 精排模型

DeepFM 结合了因子分解机( FM)和深度神经网络:

$$

y = w_0 + {i=1}^n w_i x_i + {i=1}^n _{j=i+1}^n v_i, v_j x_i x_j + DNN(x)$$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class DeepFM(nn.Module):
"""DeepFM 模型"""

def __init__(self, field_dims, embed_dim=16, mlp_dims=[128, 64]):
super().__init__()
self.field_dims = field_dims
self.embed_dim = embed_dim

# 线性部分
self.linear = FeaturesLinear(field_dims)

# FM 部分
self.fm = FactorizationMachine(reduce_sum=True)

# 嵌入层
self.embedding = FeaturesEmbedding(field_dims, embed_dim)

# Deep 部分
input_dim = len(field_dims) * embed_dim
self.mlp = MultiLayerPerceptron(input_dim, mlp_dims, dropout=0.2)

def forward(self, x):
"""
x: (batch_size, num_fields)
"""
# 线性部分
linear_score = self.linear(x)

# FM 部分
x_emb = self.embedding(x)
fm_score = self.fm(x_emb)

# Deep 部分
deep_input = x_emb.view(x_emb.size(0), -1)
deep_score = self.mlp(deep_input)

# 组合
score = linear_score + fm_score + deep_score
return torch.sigmoid(score)


class FactorizationMachine(nn.Module):
"""因子分解机"""

def __init__(self, reduce_sum=True):
super().__init__()
self.reduce_sum = reduce_sum

def forward(self, x):
"""
x: (batch_size, num_fields, embed_dim)
"""
square_of_sum = torch.sum(x, dim=1) ** 2
sum_of_square = torch.sum(x ** 2, dim=1)
ix = square_of_sum - sum_of_square

if self.reduce_sum:
ix = torch.sum(ix, dim=1, keepdim=True)

return 0.5 * ix


class MultiLayerPerceptron(nn.Module):
"""多层感知机"""

def __init__(self, input_dim, hidden_dims, dropout=0.0):
super().__init__()
layers = []
dims = [input_dim] + hidden_dims

for i in range(len(dims) - 1):
layers.append(nn.Linear(dims[i], dims[i+1]))
layers.append(nn.ReLU())
if dropout > 0:
layers.append(nn.Dropout(dropout))

self.mlp = nn.Sequential(*layers)

def forward(self, x):
return self.mlp(x)

特征工程

精排模型的效果很大程度上依赖于特征质量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class FeatureEngineering:
"""特征工程"""

def __init__(self):
self.feature_encoders = {}
self.statistics = {}

def create_user_features(self, user_id, user_profile, behavior_history):
"""构建用户特征"""
features = {}

# 基础特征
features['user_id'] = user_id
features['age'] = user_profile.get('age', 0)
features['gender'] = user_profile.get('gender', 0)

# 统计特征
features['click_count_7d'] = self.count_recent_actions(
behavior_history, days=7, action='click'
)
features['purchase_count_30d'] = self.count_recent_actions(
behavior_history, days=30, action='purchase'
)

# 序列特征
features['recent_item_ids'] = self.get_recent_items(
behavior_history, top_k=10
)

# 交叉特征
features['age_gender'] = f"{features['age']}_{features['gender']}"

return features

def create_item_features(self, item_id, item_info):
"""构建物品特征"""
features = {}

# 基础特征
features['item_id'] = item_id
features['category'] = item_info.get('category', '')
features['price'] = item_info.get('price', 0.0)

# 统计特征
features['click_count_24h'] = item_info.get('click_count_24h', 0)
features['ctr_7d'] = item_info.get('ctr_7d', 0.0)

return features

def create_context_features(self, context):
"""构建上下文特征"""
features = {}

import datetime
now = datetime.datetime.now()

features['hour'] = now.hour
features['day_of_week'] = now.weekday()
features['is_weekend'] = 1 if now.weekday() >= 5 else 0

# 设备特征
features['device_type'] = context.get('device_type', 'unknown')
features['platform'] = context.get('platform', 'unknown')

return features

def create_interaction_features(self, user_features, item_features):
"""构建交互特征"""
features = {}

# 用户-物品交叉
features['user_item_category_match'] = (
1 if user_features.get('preferred_category') ==
item_features.get('category') else 0
)

# 价格偏好匹配
user_price_range = user_features.get('price_range', [0, 1000])
item_price = item_features.get('price', 0)
features['price_match'] = (
1 if user_price_range[0] <= item_price <= user_price_range[1]
else 0
)

return features

重排序策略

重排序层在精排结果基础上,考虑多样性、新颖性、业务规则等因素进行最终调整。

多样性重排序

多样性保证推荐结果的丰富程度,避免结果过于相似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class DiversityReRanker:
"""多样性重排序"""

def __init__(self, item_similarity_matrix, lambda_diversity=0.5):
self.item_similarity = item_similarity_matrix
self.lambda_diversity = lambda_diversity

def re_rank(self, ranked_items, scores, top_k=20):
"""MMR (Maximal Marginal Relevance) 重排序"""
selected = []
remaining = list(range(len(ranked_items)))

# 第一个选择最高分的
if remaining:
selected.append(remaining.pop(0))

# 迭代选择:平衡相关性和多样性
while len(selected) < top_k and remaining:
best_idx = None
best_score = -float('inf')

for idx in remaining:
# 相关性分数
relevance = scores[idx]

# 多样性:与已选物品的最大相似度
max_sim = 0
for sel_idx in selected:
sim = self.item_similarity[
ranked_items[idx],
ranked_items[sel_idx]
]
max_sim = max(max_sim, sim)

# MMR 分数
mmr_score = (
self.lambda_diversity * relevance -
(1 - self.lambda_diversity) * max_sim
)

if mmr_score > best_score:
best_score = mmr_score
best_idx = idx

if best_idx is not None:
selected.append(best_idx)
remaining.remove(best_idx)

return [ranked_items[i] for i in selected]

新颖性重排序

新颖性保证推荐结果的新鲜度,避免重复推荐用户已经看过的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class NoveltyReRanker:
"""新颖性重排序"""

def __init__(self, user_history, novelty_weight=0.3):
self.user_history = user_history # 用户历史交互集合
self.novelty_weight = novelty_weight

def re_rank(self, ranked_items, scores, top_k=20):
"""基于新颖性重排序"""
novelty_scores = []

for item_id in ranked_items:
# 新颖性:用户是否交互过
is_novel = 1 if item_id not in self.user_history else 0

# 时间衰减:如果交互过,时间越久新颖性越高
if item_id in self.user_history:
last_interaction_time = self.user_history[item_id]
hours_ago = (
time.time() - last_interaction_time
) / 3600
novelty = np.exp(-hours_ago / 720) # 30 天半衰期
else:
novelty = 1.0

novelty_scores.append(novelty)

# 组合分数
combined_scores = [
(1 - self.novelty_weight) * score +
self.novelty_weight * novelty
for score, novelty in zip(scores, novelty_scores)
]

# 重新排序
sorted_indices = np.argsort(combined_scores)[::-1]
return [ranked_items[i] for i in sorted_indices[:top_k]]

业务规则重排序

业务规则重排序考虑运营需求,如新品扶持、品类平衡等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class BusinessRuleReRanker:
"""业务规则重排序"""

def __init__(self, rules):
"""
rules: dict, 例如 {
'new_item_boost': 1.2, # 新品加权
'category_balance': True, # 品类平衡
'max_items_per_category': 5 # 每个品类最多推荐数
}
"""
self.rules = rules

def re_rank(self, ranked_items, item_metadata, top_k=20):
"""应用业务规则"""
result = []
category_count = {}

for item_id in ranked_items:
if len(result) >= top_k:
break

item_info = item_metadata[item_id]
category = item_info.get('category', 'other')

# 品类平衡检查
if self.rules.get('category_balance', False):
max_per_category = self.rules.get(
'max_items_per_category',
float('inf')
)
if category_count.get(category, 0) >= max_per_category:
continue

# 新品加权
if self.rules.get('new_item_boost', 1.0) > 1.0:
is_new = item_info.get('is_new', False)
if is_new and len(result) < top_k:
result.append(item_id)
category_count[category] = category_count.get(category, 0) + 1
continue

result.append(item_id)
category_count[category] = category_count.get(category, 0) + 1

return result

EasyRec 框架实践

EasyRec 是阿里巴巴开源的推荐算法框架,提供了完整的推荐系统解决方案。

EasyRec 架构

EasyRec 采用配置化的方式构建推荐模型,支持多种模型结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# EasyRec 配置示例
easyrec_config = {
"model_config": {
"model_class": "MultiTower",
"feature_groups": [
{
"group_name": "user",
"features": [
{"feature_name": "user_id", "feature_type": "id_feature"},
{"feature_name": "age", "feature_type": "raw_feature"},
{"feature_name": "gender", "feature_type": "id_feature"}
]
},
{
"group_name": "item",
"features": [
{"feature_name": "item_id", "feature_type": "id_feature"},
{"feature_name": "category", "feature_type": "id_feature"},
{"feature_name": "price", "feature_type": "raw_feature"}
]
},
{
"group_name": "context",
"features": [
{"feature_name": "hour", "feature_type": "id_feature"},
{"feature_name": "device", "feature_type": "id_feature"}
]
}
],
"wide_and_deep": {
"dnn": {
"hidden_units": [512, 256, 128],
"dropout_rate": 0.2
}
}
},
"data_config": {
"input_fields": ["user_id", "item_id", "label"],
"label_fields": ["label"]
}
}

EasyRec 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from easy_rec.python import easy_rec_predictor

class EasyRecRecommender:
"""基于 EasyRec 的推荐器"""

def __init__(self, model_path, config_path):
self.predictor = easy_rec_predictor.EasyRecPredictor(
model_path=model_path,
config_path=config_path
)

def predict(self, user_features, item_features, context_features):
"""预测用户对物品的偏好"""
# 构建输入
inputs = {
'user_id': user_features['user_id'],
'item_id': item_features['item_id'],
'age': user_features['age'],
'gender': user_features['gender'],
'category': item_features['category'],
'price': item_features['price'],
'hour': context_features['hour'],
'device': context_features['device']
}

# 预测
predictions = self.predictor.predict(inputs)
return predictions['probs']

def batch_predict(self, user_id, item_ids, context):
"""批量预测"""
user_features = self.get_user_features(user_id)
context_features = self.get_context_features(context)

scores = []
for item_id in item_ids:
item_features = self.get_item_features(item_id)
score = self.predict(user_features, item_features, context_features)
scores.append(score[0])

return scores

EasyRec 特征工程

EasyRec 提供了丰富的特征工程能力:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# EasyRec 特征配置
feature_config = {
"features": [
{
"feature_name": "user_id",
"feature_type": "id_feature",
"embedding_dim": 64,
"hash_bucket_size": 1000000
},
{
"feature_name": "item_id",
"feature_type": "id_feature",
"embedding_dim": 64,
"hash_bucket_size": 500000
},
{
"feature_name": "age",
"feature_type": "raw_feature",
"normalizer": "log"
},
{
"feature_name": "price",
"feature_type": "raw_feature",
"normalizer": "min_max",
"boundaries": [0, 1000]
},
{
"feature_name": "user_item_cross",
"feature_type": "sequence_feature",
"sequence_combiner": "mean"
}
]
}

LONGER 长序列建模

LONGER 是字节跳动提出的长序列推荐模型,能够处理用户的长历史行为序列(数千到数万条)。

长序列挑战

传统序列模型(如 GRU 、 LSTM)难以处理超长序列: 1. 计算复杂度 的注意力机制 2. 内存限制:无法将整个序列加载到内存 3. 噪声干扰:长序列中包含大量噪声

LONGER 架构

LONGER 采用分段检索 + 局部建模的策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class LONGERModel(nn.Module):
"""LONGER 长序列模型"""

def __init__(self, item_embed_dim=64, hidden_dim=128, num_segments=10):
super().__init__()
self.item_embed_dim = item_embed_dim
self.hidden_dim = hidden_dim
self.num_segments = num_segments

# 物品嵌入层
self.item_embedding = nn.Embedding(
num_items, item_embed_dim
)

# 分段检索模块
self.segment_retriever = SegmentRetriever(
item_embed_dim, hidden_dim
)

# 局部序列编码器
self.local_encoder = LocalSequenceEncoder(
item_embed_dim, hidden_dim
)

# 全局融合层
self.fusion_layer = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)

def forward(self, user_history, candidate_item):
"""
user_history: (batch_size, seq_len) 用户历史序列
candidate_item: (batch_size,) 候选物品
"""
batch_size = user_history.size(0)
seq_len = user_history.size(1)

# 1. 物品嵌入
history_emb = self.item_embedding(user_history) # (B, L, D)
candidate_emb = self.item_embedding(candidate_item) # (B, D)

# 2. 分段检索:找到最相关的片段
segment_size = seq_len // self.num_segments
segments = []

for i in range(self.num_segments):
start_idx = i * segment_size
end_idx = min((i + 1) * segment_size, seq_len)
segment = history_emb[:, start_idx:end_idx, :]
segments.append(segment)

# 计算每个片段与候选物品的相关性
segment_scores = []
for segment in segments:
# 片段平均表示
segment_repr = segment.mean(dim=1) # (B, D)
# 相关性分数
score = torch.sum(segment_repr * candidate_emb, dim=1)
segment_scores.append(score)

segment_scores = torch.stack(segment_scores, dim=1) # (B, num_segments)

# 选择 top-k 最相关的片段
top_k = min(3, self.num_segments)
top_k_scores, top_k_indices = torch.topk(
segment_scores, k=top_k, dim=1
)

# 3. 局部序列编码
selected_segments = []
for i in range(batch_size):
selected_indices = top_k_indices[i]
selected_segment_embs = []
for idx in selected_indices:
seg_idx = idx.item()
seg_emb = segments[seg_idx][i] # (segment_size, D)
selected_segment_embs.append(seg_emb)

# 拼接选中的片段
selected_segment = torch.cat(selected_segment_embs, dim=0)
selected_segments.append(selected_segment)

# 填充到相同长度
max_len = max(seg.size(0) for seg in selected_segments)
padded_segments = []
for seg in selected_segments:
pad_len = max_len - seg.size(0)
if pad_len > 0:
pad = torch.zeros(pad_len, self.item_embed_dim).to(seg.device)
seg = torch.cat([seg, pad], dim=0)
padded_segments.append(seg)

selected_history = torch.stack(padded_segments, dim=0) # (B, max_len, D)

# 局部编码
local_repr = self.local_encoder(selected_history) # (B, hidden_dim)

# 4. 全局融合
global_repr = history_emb.mean(dim=1) # (B, D)
global_repr = nn.functional.linear(
global_repr,
self.item_embedding.weight[candidate_item]
) # (B, hidden_dim)

# 融合
fused_repr = torch.cat([local_repr, global_repr], dim=1)
score = self.fusion_layer(fused_repr)

return torch.sigmoid(score)


class LocalSequenceEncoder(nn.Module):
"""局部序列编码器"""

def __init__(self, input_dim, hidden_dim):
super().__init__()
self.transformer = nn.TransformerEncoder(
nn.TransformerEncoderLayer(
d_model=input_dim,
nhead=8,
dim_feedforward=hidden_dim,
dropout=0.1
),
num_layers=2
)
self.output_proj = nn.Linear(input_dim, hidden_dim)

def forward(self, x):
"""
x: (batch_size, seq_len, input_dim)
"""
# Transformer 编码
x = x.transpose(0, 1) # (seq_len, batch_size, input_dim)
encoded = self.transformer(x)
encoded = encoded.transpose(0, 1) # (batch_size, seq_len, input_dim)

# 取最后一个时间步
output = encoded[:, -1, :]
output = self.output_proj(output)

return output

LONGER 优化技巧

  1. 增量更新:只对新行为进行编码,避免重复计算
  2. 缓存机制:缓存片段表示,减少计算量
  3. 采样策略:对超长序列进行智能采样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class OptimizedLONGER:
"""优化的 LONGER 实现"""

def __init__(self, model, cache_size=10000):
self.model = model
self.segment_cache = {} # 缓存片段表示
self.cache_size = cache_size

def encode_with_cache(self, user_history):
"""带缓存的编码"""
# 生成缓存 key
cache_key = self._generate_cache_key(user_history)

if cache_key in self.segment_cache:
return self.segment_cache[cache_key]

# 计算并缓存
representation = self.model.encode(user_history)

# LRU 缓存
if len(self.segment_cache) >= self.cache_size:
# 删除最旧的
oldest_key = next(iter(self.segment_cache))
del self.segment_cache[oldest_key]

self.segment_cache[cache_key] = representation
return representation

def incremental_update(self, old_history, new_actions):
"""增量更新"""
# 只对新行为进行编码
new_segment = self.model.encode_segment(new_actions)

# 与旧表示融合
old_repr = self.encode_with_cache(old_history)
updated_repr = self._fuse_representations(old_repr, new_segment)

return updated_repr

特征工程自动化

特征工程是推荐系统的关键环节,自动化特征工程可以大幅提升效率。

自动特征生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
class AutoFeatureGenerator:
"""自动特征生成器"""

def __init__(self):
self.feature_templates = []
self._init_templates()

def _init_templates(self):
"""初始化特征模板"""
# 统计特征模板
self.feature_templates.extend([
'count_{field}_{window}', # 计数特征
'avg_{field}_{window}', # 平均特征
'max_{field}_{window}', # 最大特征
'min_{field}_{window}', # 最小特征
])

# 交叉特征模板
self.feature_templates.extend([
'{field1}_{field2}_cross', # 交叉特征
'{field1}_div_{field2}', # 除法特征
'{field1}_mul_{field2}', # 乘法特征
])

# 序列特征模板
self.feature_templates.extend([
'last_{n}_{field}', # 最近 N 个
'first_{n}_{field}', # 最早 N 个
])

def generate_features(self, data, target_field='label'):
"""自动生成特征"""
generated_features = {}

# 1. 统计特征
for template in self.feature_templates:
if 'count' in template or 'avg' in template:
features = self._generate_stat_features(
data, template
)
generated_features.update(features)

# 2. 交叉特征
cross_features = self._generate_cross_features(data)
generated_features.update(cross_features)

# 3. 序列特征
seq_features = self._generate_sequence_features(data)
generated_features.update(seq_features)

return generated_features

def _generate_stat_features(self, data, template):
"""生成统计特征"""
features = {}

# 解析模板
if 'count' in template:
stat_type = 'count'
elif 'avg' in template:
stat_type = 'avg'
elif 'max' in template:
stat_type = 'max'
elif 'min' in template:
stat_type = 'min'

# 时间窗口
windows = ['1h', '24h', '7d', '30d']

for window in windows:
feature_name = template.format(
field='*', window=window
)
# 实际实现中需要根据具体字段和时间窗口计算
features[feature_name] = self._compute_stat(
data, stat_type, window
)

return features

def _generate_cross_features(self, data):
"""生成交叉特征"""
features = {}

# 选择重要字段进行交叉
important_fields = ['user_id', 'item_id', 'category', 'price']

for i, field1 in enumerate(important_fields):
for field2 in important_fields[i+1:]:
# 交叉特征
cross_name = f'{field1}_{field2}_cross'
features[cross_name] = self._compute_cross(
data, field1, field2
)

return features

def _generate_sequence_features(self, data):
"""生成序列特征"""
features = {}

# 最近 N 个物品
for n in [1, 3, 5, 10]:
feature_name = f'last_{n}_items'
features[feature_name] = self._get_last_n_items(data, n)

return features

特征选择

自动特征选择可以去除冗余特征,提升模型效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class AutoFeatureSelector:
"""自动特征选择"""

def __init__(self, method='mutual_info'):
self.method = method
self.selected_features = []

def select_features(self, X, y, top_k=100):
"""选择 top_k 个特征"""
if self.method == 'mutual_info':
scores = self._mutual_info_score(X, y)
elif self.method == 'chi2':
scores = self._chi2_score(X, y)
elif self.method == 'f_score':
scores = self._f_score(X, y)
else:
raise ValueError(f"Unknown method: {self.method}")

# 选择 top_k
top_indices = np.argsort(scores)[-top_k:][::-1]
self.selected_features = top_indices

return self.selected_features

def _mutual_info_score(self, X, y):
"""互信息分数"""
from sklearn.feature_selection import mutual_info_classif
scores = mutual_info_classif(X, y, random_state=42)
return scores

def _chi2_score(self, X, y):
"""卡方分数"""
from sklearn.feature_selection import chi2
scores, _ = chi2(X, y)
return scores

def _f_score(self, X, y):
"""F 分数"""
from sklearn.feature_selection import f_classif
scores, _ = f_classif(X, y)
return scores

def transform(self, X):
"""转换数据,只保留选中的特征"""
return X[:, self.selected_features]

A/B 测试方法论

A/B 测试是评估推荐系统改进效果的标准方法。通过对比实验组和对照组的真实指标差异,我们可以科学地验证新策略的效果。

A/B 测试框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class ABTestFramework:
"""A/B 测试框架"""

def __init__(self, redis_client):
self.redis = redis_client
self.experiments = {}

def create_experiment(self, exp_name, variants, traffic_split):
"""
创建实验
variants: ['control', 'treatment']
traffic_split: {'control': 0.5, 'treatment': 0.5}
"""
self.experiments[exp_name] = {
'variants': variants,
'traffic_split': traffic_split,
'start_time': time.time(),
'metrics': {v: [] for v in variants}
}

def assign_variant(self, user_id, exp_name):
"""分配用户到实验组"""
if exp_name not in self.experiments:
return 'control'

exp = self.experiments[exp_name]

# 使用用户 ID 的哈希值保证一致性
hash_value = hash(f"{user_id}_{exp_name}") % 100

cumulative = 0
for variant, split in exp['traffic_split'].items():
cumulative += split * 100
if hash_value < cumulative:
return variant

return 'control'

def record_event(self, user_id, exp_name, event_type, value=1):
"""记录事件"""
variant = self.assign_variant(user_id, exp_name)

key = f"abtest:{exp_name}:{variant}:{event_type}"
self.redis.incrby(key, value)

def get_metrics(self, exp_name):
"""获取实验指标"""
if exp_name not in self.experiments:
return None

exp = self.experiments[exp_name]
metrics = {}

for variant in exp['variants']:
variant_metrics = {}

# 从 Redis 获取指标
clicks = int(self.redis.get(
f"abtest:{exp_name}:{variant}:click"
) or 0)
impressions = int(self.redis.get(
f"abtest:{exp_name}:{variant}:impression"
) or 0)
conversions = int(self.redis.get(
f"abtest:{exp_name}:{variant}:conversion"
) or 0)

variant_metrics['ctr'] = clicks / impressions if impressions > 0 else 0
variant_metrics['cvr'] = conversions / clicks if clicks > 0 else 0
variant_metrics['clicks'] = clicks
variant_metrics['impressions'] = impressions

metrics[variant] = variant_metrics

return metrics

def statistical_significance_test(self, exp_name):
"""统计显著性检验"""
metrics = self.get_metrics(exp_name)

if not metrics:
return None

from scipy import stats

# 获取 control 和 treatment 的数据
control_clicks = metrics['control']['clicks']
control_impressions = metrics['control']['impressions']
treatment_clicks = metrics['treatment']['clicks']
treatment_impressions = metrics['treatment']['impressions']

# 双样本比例检验
control_ctr = control_clicks / control_impressions
treatment_ctr = treatment_clicks / treatment_impressions

# Z 检验
p1 = control_ctr
p2 = treatment_ctr
n1 = control_impressions
n2 = treatment_impressions

p_pooled = (control_clicks + treatment_clicks) / (n1 + n2)
se = np.sqrt(p_pooled * (1 - p_pooled) * (1/n1 + 1/n2))

z_score = (p2 - p1) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))

return {
'control_ctr': control_ctr,
'treatment_ctr': treatment_ctr,
'lift': (treatment_ctr - control_ctr) / control_ctr,
'p_value': p_value,
'significant': p_value < 0.05
}

A/B 测试最佳实践

  1. 样本量计算:确保有足够的样本量检测到显著差异
  2. 分层实验:支持多个实验并行运行
  3. 灰度发布:逐步扩大流量,降低风险
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class SampleSizeCalculator:
"""样本量计算器"""

@staticmethod
def calculate_sample_size(
baseline_ctr,
min_detectable_lift,
power=0.8,
alpha=0.05
):
"""
计算所需样本量
baseline_ctr: 基线 CTR
min_detectable_lift: 最小可检测的提升比例
"""
from scipy import stats

p1 = baseline_ctr
p2 = baseline_ctr * (1 + min_detectable_lift)

z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)

p_bar = (p1 + p2) / 2

n = (
(z_alpha * np.sqrt(2 * p_bar * (1 - p_bar)) +
z_beta * np.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2
) / ((p2 - p1) ** 2)

return int(np.ceil(n))

性能优化技巧

模型推理优化

  1. 模型量化: INT8 量化减少模型大小和推理时间
  2. 模型剪枝:去除冗余参数
  3. 批量推理:提高 GPU 利用率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class ModelOptimizer:
"""模型优化器"""

def quantize_model(self, model, calibration_data):
"""模型量化"""
import torch.quantization as quantization

# 设置量化配置
model.qconfig = quantization.get_default_qconfig('fbgemm')

# 准备量化
quantization.prepare(model, inplace=True)

# 校准
with torch.no_grad():
for batch in calibration_data:
model(batch)

# 转换
quantized_model = quantization.convert(model, inplace=False)

return quantized_model

def prune_model(self, model, pruning_rate=0.2):
"""模型剪枝"""
import torch.nn.utils.prune as prune

# 对线性层进行剪枝
for module in model.modules():
if isinstance(module, nn.Linear):
prune.l1_unstructured(module, name='weight', amount=pruning_rate)
prune.remove(module, 'weight')

return model

def optimize_for_inference(self, model):
"""推理优化"""
# TorchScript 编译
model.eval()
traced_model = torch.jit.trace(model, example_inputs)
optimized_model = torch.jit.optimize_for_inference(traced_model)

return optimized_model

缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class CacheStrategy:
"""缓存策略"""

def __init__(self, redis_client, ttl=3600):
self.redis = redis_client
self.ttl = ttl

def get_user_embedding(self, user_id):
"""获取用户嵌入(带缓存)"""
cache_key = f"user_emb:{user_id}"

# 尝试从缓存获取
cached = self.redis.get(cache_key)
if cached:
return pickle.loads(cached)

# 计算并缓存
embedding = self.compute_user_embedding(user_id)
self.redis.setex(
cache_key,
self.ttl,
pickle.dumps(embedding)
)

return embedding

def get_item_embeddings_batch(self, item_ids):
"""批量获取物品嵌入"""
# 批量从缓存获取
cache_keys = [f"item_emb:{item_id}" for item_id in item_ids]
cached_values = self.redis.mget(cache_keys)

# 找出未缓存的
missing_indices = [
i for i, val in enumerate(cached_values) if val is None
]
missing_item_ids = [item_ids[i] for i in missing_indices]

# 批量计算缺失的
if missing_item_ids:
new_embeddings = self.compute_item_embeddings_batch(missing_item_ids)

# 批量写入缓存
pipe = self.redis.pipeline()
for item_id, emb in zip(missing_item_ids, new_embeddings):
cache_key = f"item_emb:{item_id}"
pipe.setex(cache_key, self.ttl, pickle.dumps(emb))
pipe.execute()

# 组装结果
embeddings = []
for i, item_id in enumerate(item_ids):
if cached_values[i]:
embeddings.append(pickle.loads(cached_values[i]))
else:
idx = missing_indices.index(i)
embeddings.append(new_embeddings[idx])

return embeddings

异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncRecommendationService:
"""异步推荐服务"""

def __init__(self, max_workers=10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)

async def recommend_async(self, user_id, context):
"""异步推荐"""
loop = asyncio.get_event_loop()

# 并行执行多个召回通道
recall_tasks = [
loop.run_in_executor(
self.executor,
self.recall_channel,
channel,
user_id
)
for channel in ['itemcf', 'content', 'popular', 'realtime']
]

recall_results = await asyncio.gather(*recall_tasks)

# 合并结果
all_items = []
for result in recall_results:
all_items.extend(result)

# 去重
unique_items = list(set(all_items))

# 粗排和精排
coarse_items = await loop.run_in_executor(
self.executor,
self.coarse_rank,
unique_items,
user_id
)

fine_items = await loop.run_in_executor(
self.executor,
self.fine_rank,
coarse_items,
user_id,
context
)

return fine_items

模型部署与监控

模型服务化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from flask import Flask, request, jsonify
import torch

app = Flask(__name__)

class ModelServer:
"""模型服务"""

def __init__(self, model_path):
self.model = torch.load(model_path)
self.model.eval()
self.feature_service = FeatureService()

def predict(self, user_id, item_ids, context):
"""预测接口"""
# 获取特征
user_features = self.feature_service.get_user_features(user_id)
item_features_list = self.feature_service.get_item_features_batch(item_ids)
context_features = self.feature_service.get_context_features(context)

# 批量预测
scores = []
with torch.no_grad():
for item_features in item_features_list:
# 构建输入
input_features = self._combine_features(
user_features,
item_features,
context_features
)

# 预测
score = self.model(input_features)
scores.append(score.item())

return scores

model_server = ModelServer('model.pth')

@app.route('/recommend', methods=['POST'])
def recommend():
"""推荐接口"""
data = request.json
user_id = data['user_id']
item_ids = data.get('item_ids', [])
context = data.get('context', {})

if not item_ids:
# 如果没有提供候选物品,先召回
item_ids = recall(user_id, top_k=100)

# 预测分数
scores = model_server.predict(user_id, item_ids, context)

# 排序
ranked_items = [
{'item_id': item_id, 'score': score}
for item_id, score in zip(item_ids, scores)
]
ranked_items.sort(key=lambda x: x['score'], reverse=True)

return jsonify({
'user_id': user_id,
'recommendations': ranked_items[:20]
})

if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

监控系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class ModelMonitor:
"""模型监控"""

def __init__(self):
self.metrics = {
'latency': [],
'qps': [],
'error_rate': [],
'prediction_distribution': []
}
self.alert_thresholds = {
'latency_p99': 200, # ms
'error_rate': 0.01, # 1%
'qps_drop': 0.2 # 20% 下降
}

def record_prediction(self, latency_ms, score, error=None):
"""记录预测"""
self.metrics['latency'].append(latency_ms)
self.metrics['prediction_distribution'].append(score)

if error:
self.metrics['error_rate'].append(1)
else:
self.metrics['error_rate'].append(0)

# 检查告警
self._check_alerts()

def _check_alerts(self):
"""检查告警条件"""
if len(self.metrics['latency']) < 100:
return

# P99 延迟告警
p99_latency = np.percentile(self.metrics['latency'][-1000:], 99)
if p99_latency > self.alert_thresholds['latency_p99']:
self.send_alert(f'P99 latency exceeds threshold: {p99_latency}ms')

# 错误率告警
recent_error_rate = np.mean(self.metrics['error_rate'][-1000:])
if recent_error_rate > self.alert_thresholds['error_rate']:
self.send_alert(f'Error rate exceeds threshold: {recent_error_rate}')

# 预测分布异常检测
recent_scores = self.metrics['prediction_distribution'][-1000:]
mean_score = np.mean(recent_scores)
std_score = np.std(recent_scores)

# 如果预测分数分布发生显著变化,可能模型出现问题
if abs(mean_score - 0.5) > 0.3: # 假设正常预测应该在 0.5 附近
self.send_alert(
f'Prediction distribution abnormal: mean={mean_score}, std={std_score}'
)

def send_alert(self, message):
"""发送告警"""
print(f"ALERT: {message}")
# 实际实现中应该发送到监控系统(如 Prometheus 、 Grafana)

模型热更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class HotModelUpdater:
"""模型热更新"""

def __init__(self, model_path, check_interval=300):
self.model_path = model_path
self.check_interval = check_interval
self.current_model = None
self.model_version = None
self.load_model()

def load_model(self):
"""加载模型"""
self.current_model = torch.load(self.model_path)
self.model_version = self._get_model_version()

def _get_model_version(self):
"""获取模型版本"""
# 从文件修改时间或版本文件读取
import os
mtime = os.path.getmtime(self.model_path)
return mtime

def check_and_update(self):
"""检查并更新模型"""
new_version = self._get_model_version()

if new_version != self.model_version:
print(f"New model detected: {new_version}")
self._update_model()

def _update_model(self):
"""更新模型"""
try:
# 加载新模型
new_model = torch.load(self.model_path)

# 验证模型
if self._validate_model(new_model):
# 原子性替换
self.current_model = new_model
self.model_version = self._get_model_version()
print(f"Model updated successfully to version {self.model_version}")
else:
print("New model validation failed, keeping current model")
except Exception as e:
print(f"Error updating model: {e}")

def _validate_model(self, model):
"""验证模型"""
# 简单的验证:检查模型结构
try:
# 尝试前向传播
dummy_input = torch.randn(1, 100) # 根据实际输入维度调整
with torch.no_grad():
_ = model(dummy_input)
return True
except:
return False

def start_background_update(self):
"""后台更新线程"""
import threading

def update_loop():
while True:
time.sleep(self.check_interval)
self.check_and_update()

thread = threading.Thread(target=update_loop, daemon=True)
thread.start()

完整项目实战

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
recommendation_system/
├── config/
│ ├── model_config.yaml
│ └── feature_config.yaml
├── src/
│ ├── recall/
│ │ ├── itemcf.py
│ │ ├── content.py
│ │ └── multi_channel.py
│ ├── rank/
│ │ ├── coarse_ranker.py
│ │ ├── fine_ranker.py
│ │ └── re_ranker.py
│ ├── feature/
│ │ ├── feature_engineering.py
│ │ └── feature_service.py
│ ├── model/
│ │ ├── deepfm.py
│ │ └── longer.py
│ ├── service/
│ │ ├── recommendation_service.py
│ │ └── model_server.py
│ └── utils/
│ ├── cache.py
│ └── monitor.py
├── tests/
├── scripts/
│ ├── train.py
│ └── deploy.py
└── requirements.txt

完整推荐服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class CompleteRecommendationService:
"""完整的推荐服务"""

def __init__(self, config):
self.config = config

# 初始化各个组件
self.recall_engine = MultiChannelRecall(config.recall_config)
self.coarse_ranker = CoarseRankerDualTower(config.coarse_ranker_config)
self.fine_ranker = DeepFM(config.fine_ranker_config)
self.re_ranker = DiversityReRanker(config.re_ranker_config)

# 特征服务
self.feature_service = FeatureService(config.feature_config)

# 缓存
self.cache = CacheStrategy(config.cache_config)

# 监控
self.monitor = ModelMonitor()

# A/B 测试
self.ab_test = ABTestFramework(config.ab_test_config)

def recommend(self, user_id, context=None, top_k=20):
"""完整推荐流程"""
start_time = time.time()

try:
# 1. A/B 测试分流
exp_name = self.config.get('experiment_name', 'default')
variant = self.ab_test.assign_variant(user_id, exp_name)

# 2. 召回
recall_start = time.time()
recall_items = self.recall_engine.recall(
user_id,
top_k=self.config.recall_top_k
)
recall_latency = (time.time() - recall_start) * 1000

# 3. 粗排
coarse_start = time.time()
coarse_items = self.coarse_ranker.rank(
recall_items,
user_id,
top_k=self.config.coarse_top_k
)
coarse_latency = (time.time() - coarse_start) * 1000

# 4. 精排
fine_start = time.time()
fine_items = self.fine_ranker.rank(
coarse_items,
user_id,
context,
top_k=self.config.fine_top_k
)
fine_latency = (time.time() - fine_start) * 1000

# 5. 重排序
re_rank_start = time.time()
final_items = self.re_ranker.re_rank(
fine_items,
user_id,
top_k=top_k
)
re_rank_latency = (time.time() - re_rank_start) * 1000

total_latency = (time.time() - start_time) * 1000

# 记录指标
self.monitor.record_prediction(total_latency, fine_items[0]['score'])

# A/B 测试记录
self.ab_test.record_event(user_id, exp_name, 'impression')

return {
'user_id': user_id,
'recommendations': final_items,
'metrics': {
'recall_latency_ms': recall_latency,
'coarse_latency_ms': coarse_latency,
'fine_latency_ms': fine_latency,
're_rank_latency_ms': re_rank_latency,
'total_latency_ms': total_latency
}
}

except Exception as e:
self.monitor.record_prediction(
(time.time() - start_time) * 1000,
0,
error=e
)
raise

常见问题 Q&A

Q1: 召回层应该召回多少物品?

A: 召回数量需要在效果和效率之间平衡。通常: - 召回数量: 5000-10000 个物品 - 考虑因素: - 候选集大小:百万级候选集需要更多召回 - 精排模型复杂度:复杂模型可以处理更多候选 - 延迟要求:召回更多会增加延迟

经验值: - 候选集 < 100 万:召回 3000-5000 - 候选集 100 万-1000 万:召回 5000-10000 - 候选集 > 1000 万:召回 10000+

Q2: 如何选择粗排模型?

A: 粗排模型选择原则:

  1. 轻量级:推理速度快, P99 延迟 < 20ms
  2. 效果保证:与精排的相关性高( Spearman 相关系数 > 0.7)
  3. 可扩展:支持批量推理, GPU 利用率高

推荐方案: - 双塔模型:最常用,效果好且速度快 - 浅层神经网络: 2-3 层,速度快但效果略差 - 线性模型:最快但效果一般

Q3: 特征工程中哪些特征最重要?

A: 重要特征排序(经验值):

  1. 用户-物品交互特征:最重要
    • 用户历史点击/购买该物品的次数
    • 用户对该物品类别的偏好
  2. 用户统计特征
    • 用户活跃度( 7 天/30 天点击量)
    • 用户偏好类别分布
  3. 物品统计特征
    • 物品 CTR(点击率)
    • 物品热度( 24 小时点击量)
  4. 上下文特征
    • 时间特征(小时、星期)
    • 设备特征
  5. 交叉特征
    • 用户类别 × 物品类别
    • 用户价格偏好 × 物品价格

Q4: 如何处理冷启动问题?

A: 冷启动分为用户冷启动和物品冷启动:

用户冷启动: 1. 利用注册信息:年龄、性别、地域等 2. 热门推荐:推荐热门物品 3. 内容推荐:基于用户填写的兴趣标签 4. 探索策略:增加探索比例,快速收集用户行为

物品冷启动: 1. 内容特征:文本、图像、类别等 2. 相似物品:找到内容相似的热门物品 3. 流量扶持:新品加权,增加曝光机会

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ColdStartHandler:
"""冷启动处理"""

def recommend_for_new_user(self, user_profile):
"""新用户推荐"""
# 1. 基于注册信息的内容推荐
if user_profile.get('interests'):
items = self.content_based_recommend(
user_profile['interests']
)
if items:
return items

# 2. 热门推荐
return self.popular_recommend(top_k=20)

def recommend_new_item(self, item_content_features):
"""新物品推荐"""
# 1. 找到内容相似的热门物品
similar_items = self.find_similar_items(
item_content_features,
top_k=100
)

# 2. 推荐给喜欢相似物品的用户
target_users = self.get_users_like_items(similar_items)

return target_users

Q5: 如何平衡准确性和多样性?

A: 多样性-准确性权衡:

  1. MMR 算法:在重排序层使用,可调节 参数
  2. 类别多样性:限制每个类别的推荐数量
  3. 时间多样性:避免重复推荐最近看过的内容

参数调优: - :偏向准确性 - :平衡 - :偏向多样性

Q6: 模型训练数据如何构建?

A: 训练数据构建要点:

  1. 负样本采样
    • 随机负采样:简单但可能引入噪声
    • 曝光未点击:更真实,推荐使用
    • 困难负样本:提升模型区分能力
  2. 时间窗口
    • 使用最近 30-90 天的数据
    • 避免使用太旧的数据(用户兴趣会变化)
  3. 样本平衡
    • 正负样本比例 1:1 到 1:4
    • 不同用户/物品的样本数量尽量平衡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class TrainingDataBuilder:
"""训练数据构建"""

def build_training_data(self, click_logs, impressions, negative_ratio=4):
"""构建训练数据"""
positive_samples = []
negative_samples = []

# 正样本:点击的
for _, row in click_logs.iterrows():
positive_samples.append({
'user_id': row['user_id'],
'item_id': row['item_id'],
'label': 1,
'timestamp': row['timestamp']
})

# 负样本:曝光未点击
for _, row in impressions.iterrows():
if row['clicked'] == 0:
negative_samples.append({
'user_id': row['user_id'],
'item_id': row['item_id'],
'label': 0,
'timestamp': row['timestamp']
})

# 负样本采样
if len(negative_samples) > len(positive_samples) * negative_ratio:
negative_samples = random.sample(
negative_samples,
len(positive_samples) * negative_ratio
)

# 合并
training_data = positive_samples + negative_samples
random.shuffle(training_data)

return training_data

Q7: 如何评估推荐系统效果?

A: 推荐系统评估指标:

离线指标: 1. 准确率指标: - Precision@K:前 K 个推荐中相关的比例 - Recall@K:覆盖了多少相关物品 - NDCG@K:考虑位置的排序质量

  1. 覆盖率
    • 推荐物品的多样性
    • 长尾物品的覆盖度
  2. 新颖性
    • 推荐物品的平均热度(越低越新颖)

在线指标: 1. CTR(点击率):最重要的业务指标 2. CVR(转化率):购买/下载等转化行为 3. 停留时长:用户对推荐内容的参与度 4. GMV(成交总额):电商场景的核心指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class RecommendationEvaluator:
"""推荐系统评估器"""

def evaluate_offline(self, recommendations, ground_truth):
"""离线评估"""
metrics = {}

# Precision@K
for k in [5, 10, 20]:
precision = self.precision_at_k(
recommendations, ground_truth, k
)
metrics[f'precision@{k}'] = precision

# Recall@K
for k in [5, 10, 20]:
recall = self.recall_at_k(
recommendations, ground_truth, k
)
metrics[f'recall@{k}'] = recall

# NDCG@K
for k in [5, 10, 20]:
ndcg = self.ndcg_at_k(
recommendations, ground_truth, k
)
metrics[f'ndcg@{k}'] = ndcg

return metrics

def precision_at_k(self, recommendations, ground_truth, k):
"""Precision@K"""
top_k = recommendations[:k]
relevant = set(ground_truth)
hits = sum(1 for item in top_k if item in relevant)
return hits / k

def recall_at_k(self, recommendations, ground_truth, k):
"""Recall@K"""
top_k = recommendations[:k]
relevant = set(ground_truth)
hits = sum(1 for item in top_k if item in relevant)
return hits / len(relevant) if relevant else 0

def ndcg_at_k(self, recommendations, ground_truth, k):
"""NDCG@K"""
from sklearn.metrics import ndcg_score

# 构建 relevance 向量
y_true = [1 if item in ground_truth else 0
for item in recommendations[:k]]
y_score = list(range(k, 0, -1)) # 假设分数递减

if sum(y_true) == 0:
return 0

return ndcg_score([y_true], [y_score], k=k)

Q8: 如何处理数据稀疏性问题?

A: 数据稀疏性处理:

  1. 矩阵分解: SVD 、 NMF 等降维方法
  2. 深度学习: Embedding 层学习稠密表示
  3. 迁移学习:利用其他域的数据
  4. 内容特征:补充协同过滤的不足
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SparseDataHandler:
"""稀疏数据处理"""

def __init__(self, user_item_matrix):
self.user_item_matrix = user_item_matrix
self.sparsity = self._calculate_sparsity()

def _calculate_sparsity(self):
"""计算稀疏度"""
total = self.user_item_matrix.size
non_zero = self.user_item_matrix.nnz
return 1 - (non_zero / total)

def handle_sparsity(self, method='embedding'):
"""处理稀疏性"""
if method == 'embedding':
# 使用 Embedding 学习稠密表示
return self._learn_embeddings()
elif method == 'matrix_factorization':
# 矩阵分解
return self._matrix_factorization()
elif method == 'content_boost':
# 内容特征增强
return self._content_boost()

def _learn_embeddings(self):
"""学习 Embedding"""
from gensim.models import Word2Vec

# 将用户-物品交互序列化
sequences = []
for user_id in range(self.user_item_matrix.shape[0]):
items = self.user_item_matrix[user_id].nonzero()[1].tolist()
sequences.append([str(item) for item in items])

# 训练 Word2Vec
model = Word2Vec(sequences, vector_size=64, window=5, min_count=1)

return model

Q9: 推荐系统如何应对流量峰值?

A: 流量峰值应对策略:

  1. 水平扩容
    • 无状态服务,支持多实例部署
    • 使用负载均衡分发请求
  2. 缓存策略
    • 热门用户/物品的推荐结果缓存
    • 特征缓存减少计算
  3. 降级策略
    • 流量过大时简化模型(如只用召回+粗排)
    • 返回缓存结果
  4. 限流
    • 对非核心用户限流
    • 保护核心服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TrafficPeakHandler:
"""流量峰值处理"""

def __init__(self, max_qps=10000):
self.max_qps = max_qps
self.current_qps = 0
self.cache = {}

def recommend_with_fallback(self, user_id, context):
"""带降级的推荐"""
# 检查 QPS
if self.current_qps > self.max_qps:
# 降级:返回缓存或简化推荐
if user_id in self.cache:
return self.cache[user_id]
else:
return self.simple_recommend(user_id)

# 正常流程
try:
result = self.full_recommend(user_id, context)
# 缓存结果
self.cache[user_id] = result
return result
except Exception as e:
# 异常时降级
return self.simple_recommend(user_id)

def simple_recommend(self, user_id):
"""简化推荐(只用召回)"""
return self.recall_engine.recall(user_id, top_k=20)

Q10: 如何实现实时推荐?

A: 实时推荐实现:

  1. 实时特征
    • 使用 Kafka/Flink 处理实时数据流
    • Redis 存储实时特征
  2. 实时召回
    • 基于用户最近行为快速召回
    • 向量检索(如 Faiss)加速
  3. 模型更新
    • 增量学习:只训练新数据
    • 在线学习:流式更新模型参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class RealTimeRecommendation:
"""实时推荐"""

def __init__(self, kafka_consumer, redis_client):
self.kafka_consumer = kafka_consumer
self.redis = redis_client
self.user_recent_actions = {}

def process_real_time_events(self):
"""处理实时事件流"""
for message in self.kafka_consumer:
event = json.loads(message.value)
user_id = event['user_id']
item_id = event['item_id']
action_type = event['action_type']
timestamp = event['timestamp']

# 更新用户实时行为
self.update_user_actions(user_id, item_id, action_type, timestamp)

# 触发实时推荐更新
if action_type in ['click', 'purchase']:
self.update_recommendations(user_id)

def update_user_actions(self, user_id, item_id, action_type, timestamp):
"""更新用户行为"""
key = f"realtime:user:{user_id}"

# 存储到 Redis sorted set
self.redis.zadd(key, {f"{item_id}:{action_type}": timestamp})

# 只保留最近 1 小时
cutoff = timestamp - 3600
self.redis.zremrangebyscore(key, 0, cutoff)

def update_recommendations(self, user_id):
"""更新推荐结果"""
# 获取实时行为
recent_items = self.get_recent_items(user_id, minutes=30)

# 基于实时行为召回
realtime_items = self.realtime_recall(user_id, recent_items)

# 更新推荐缓存
cache_key = f"recommendations:{user_id}"
self.redis.setex(cache_key, 300, json.dumps(realtime_items))

总结

工业级推荐系统是一个复杂的系统工程,涉及召回、排序、重排序多个层次,需要综合考虑效果、性能、可维护性等多个方面。本文从架构设计、模型选择、特征工程、 A/B 测试、性能优化、部署监控等角度全面介绍了工业推荐系统的最佳实践。

关键要点:

  1. 分层架构:召回-粗排-精排-重排序的漏斗设计,平衡效果和效率
  2. 多路召回:协同过滤、内容、热门、实时等多通道融合
  3. 特征工程:自动化特征生成和选择,提升模型效果
  4. A/B 测试:科学评估模型改进,确保线上效果
  5. 性能优化:模型量化、缓存、异步处理等技巧
  6. 监控告警:实时监控系统健康,及时发现问题

推荐系统的优化是一个持续迭代的过程,需要不断根据业务反馈调整策略和模型。希望本文能为构建工业级推荐系统提供有价值的参考。

  • 本文标题:推荐系统(十六)—— 工业级架构与最佳实践
  • 本文作者:Chen Kai
  • 创建时间:2024-07-16 14:00:00
  • 本文链接:https://www.chenk.top/%E6%8E%A8%E8%8D%90%E7%B3%BB%E7%BB%9F%EF%BC%88%E5%8D%81%E5%85%AD%EF%BC%89%E2%80%94%E2%80%94-%E5%B7%A5%E4%B8%9A%E7%BA%A7%E6%9E%B6%E6%9E%84%E4%B8%8E%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论