Recommendation Systems (16): Industrial Architecture and Best Practices
Chen Kai BOSS

permalink: "en/recommendation-systems-16-industrial-practice/" date: 2024-07-16 14:00:00 tags: - Recommendation Systems - Industrial Practice - System Architecture categories: Recommendation Systems mathjax: true---

Building production-grade recommendation systems requires navigating a complex landscape of architectural decisions, performance constraints, and business requirements. This article explores the industrial practice of recommendation systems, covering everything from multi-channel recall strategies to deployment pipelines and monitoring infrastructure.

Introduction

Industrial recommendation systems differ fundamentally from academic prototypes. While research papers focus on novel algorithms and metrics, production systems must handle millions of requests per second, maintain sub-100ms latency, and continuously adapt to changing user behavior. The architecture must balance accuracy, scalability, and operational complexity.

This article synthesizes best practices from leading tech companies, including Alibaba's EasyRec framework and ByteDance's LONGER system. We'll examine the complete pipeline: recall, ranking, reranking, feature engineering, A/B testing, and production deployment.

Industrial Recommendation System Landscape

Architecture Overview

Modern industrial recommendation systems follow a multi-stage pipeline architecture:

1
User Request → Recall (Multi-Channel) → Coarse Ranking → Fine Ranking → Reranking → Response

Each stage serves a specific purpose:

  • Recall: Reduces the candidate space from millions to thousands
  • Coarse Ranking: Quick filtering using lightweight models
  • Fine Ranking: Detailed scoring with complex models
  • Reranking: Business rules, diversity, and freshness adjustments

Key Design Principles

1. Scalability First

Production systems must handle traffic spikes. Horizontal scaling and stateless services are essential:

1
2
3
4
5
6
7
8
9
10
11
12
13
# Example: Stateless ranking service
class RankingService:
def __init__(self, model_path):
self.model = load_model(model_path)
self.feature_extractor = FeatureExtractor()

def rank(self, user_id, candidates, context):
"""Stateless ranking - can be scaled horizontally"""
features = self.feature_extractor.extract(
user_id, candidates, context
)
scores = self.model.predict(features)
return sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)

2. Latency Budget Allocation

Typical latency budgets: - Recall: 20-30ms - Coarse ranking: 10-20ms - Fine ranking: 30-50ms - Reranking: 10-20ms - Total: <100ms (p95)

3. Fault Tolerance

Every component must degrade gracefully:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class FaultTolerantRecall:
def __init__(self, recall_channels):
self.channels = recall_channels
self.fallback = PopularItemsRecall()

def recall(self, user_id, context):
results = []
for channel in self.channels:
try:
channel_results = channel.recall(user_id, context, timeout=20)
results.extend(channel_results)
except Exception as e:
logger.warning(f"Channel {channel.name} failed: {e}")
# Continue with other channels

if not results:
return self.fallback.recall(user_id, context)

return deduplicate(results)

Multi-Channel Recall Design

Recall is the most critical stage — it determines the upper bound of recommendation quality. Industrial systems employ multiple recall channels in parallel.

Channel Types

1. Collaborative Filtering Recall

Matrix factorization and item-based collaborative filtering remain effective:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import numpy as np
from scipy.sparse import csr_matrix

class MatrixFactorizationRecall:
def __init__(self, user_embeddings, item_embeddings):
self.user_emb = user_embeddings # [num_users, dim]
self.item_emb = item_embeddings # [num_items, dim]

def recall(self, user_id, top_k=1000):
"""Recall items similar to user preferences"""
user_vec = self.user_emb[user_id]
# Approximate nearest neighbor search
scores = np.dot(self.item_emb, user_vec)
top_indices = np.argsort(scores)[-top_k:][::-1]
return top_indices.tolist()

2. Deep Learning Recall

Neural collaborative filtering and two-tower models:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import torch
import torch.nn as nn

class TwoTowerRecall(nn.Module):
def __init__(self, user_dim, item_dim, hidden_dims=[256, 128]):
super().__init__()
self.user_tower = self._build_tower(user_dim, hidden_dims)
self.item_tower = self._build_tower(item_dim, hidden_dims)

def _build_tower(self, input_dim, hidden_dims):
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim)
])
prev_dim = hidden_dim
return nn.Sequential(*layers)

def forward(self, user_features, item_features):
user_emb = self.user_tower(user_features)
item_emb = self.item_tower(item_features)
return torch.mm(user_emb, item_emb.t())

def recall(self, user_features, item_pool, top_k=1000):
"""Efficient recall using approximate nearest neighbor"""
self.eval()
with torch.no_grad():
user_emb = self.user_tower(user_features)
item_embs = self.item_tower(item_pool)
scores = torch.mm(user_emb, item_embs.t())
_, top_indices = torch.topk(scores, top_k)
return top_indices.cpu().numpy()

3. Graph-Based Recall

Leveraging user-item interaction graphs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import networkx as nx
from collections import defaultdict

class GraphRecall:
def __init__(self, interaction_graph):
self.graph = interaction_graph
self.item_similarity = self._compute_item_similarity()

def _compute_item_similarity(self):
"""Compute item-item similarity using graph structure"""
similarity = defaultdict(dict)
items = [n for n in self.graph.nodes() if self.graph.nodes[n]['type'] == 'item']

for item1 in items:
neighbors1 = set(self.graph.neighbors(item1))
for item2 in items:
if item1 != item2:
neighbors2 = set(self.graph.neighbors(item2))
intersection = len(neighbors1 & neighbors2)
union = len(neighbors1 | neighbors2)
if union > 0:
similarity[item1][item2] = intersection / union

return similarity

def recall(self, user_id, top_k=1000):
"""Recall items connected to user's interacted items"""
user_items = [
n for n in self.graph.neighbors(user_id)
if self.graph.nodes[n]['type'] == 'item'
]

candidate_scores = defaultdict(float)
for item in user_items:
for similar_item, sim_score in self.item_similarity.get(item, {}).items():
candidate_scores[similar_item] += sim_score

top_items = sorted(
candidate_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]

return [item_id for item_id, _ in top_items]

4. Real-Time Behavior Recall

Capturing recent user actions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from collections import deque
from datetime import datetime, timedelta

class RealTimeBehaviorRecall:
def __init__(self, time_window_minutes=30):
self.time_window = timedelta(minutes=time_window_minutes)
self.user_behaviors = defaultdict(deque)

def add_behavior(self, user_id, item_id, action_type, timestamp):
"""Record user behavior"""
self.user_behaviors[user_id].append({
'item_id': item_id,
'action_type': action_type,
'timestamp': timestamp
})
# Remove old behaviors
cutoff = timestamp - self.time_window
while (self.user_behaviors[user_id] and
self.user_behaviors[user_id][0]['timestamp'] < cutoff):
self.user_behaviors[user_id].popleft()

def recall(self, user_id, top_k=500):
"""Recall items based on recent behaviors"""
behaviors = self.user_behaviors.get(user_id, deque())

# Weight by recency and action type
action_weights = {'view': 1.0, 'click': 2.0, 'purchase': 5.0}
candidate_scores = defaultdict(float)
current_time = datetime.now()

for behavior in behaviors:
age_minutes = (current_time - behavior['timestamp']).total_seconds() / 60
recency_weight = np.exp(-age_minutes / 10) # Exponential decay
action_weight = action_weights.get(behavior['action_type'], 1.0)

score = recency_weight * action_weight
candidate_scores[behavior['item_id']] += score

top_items = sorted(
candidate_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]

return [item_id for item_id, _ in top_items]

5. Content-Based Recall

Using item features and user preferences:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

class ContentBasedRecall:
def __init__(self, item_features):
self.item_features = item_features
self.vectorizer = TfidfVectorizer(max_features=1000)
self.item_vectors = self._vectorize_items()

def _vectorize_items(self):
"""Convert item features to vectors"""
texts = [
' '.join(str(v) for v in features.values())
for features in self.item_features.values()
]
return self.vectorizer.fit_transform(texts)

def recall(self, user_profile, top_k=1000):
"""Recall items matching user profile"""
user_vector = self.vectorizer.transform([' '.join(str(v) for v in user_profile.values())])
similarities = cosine_similarity(user_vector, self.item_vectors)[0]
top_indices = np.argsort(similarities)[-top_k:][::-1]
return top_indices.tolist()

Multi-Channel Fusion

Combining multiple recall channels requires careful orchestration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class MultiChannelRecall:
def __init__(self, channels, channel_weights=None):
self.channels = channels
self.channel_weights = channel_weights or {ch.name: 1.0 for ch in channels}

def recall(self, user_id, context, target_size=2000):
"""Fuse results from multiple channels"""
channel_results = {}

# Parallel execution
with ThreadPoolExecutor(max_workers=len(self.channels)) as executor:
futures = {
executor.submit(ch.recall, user_id, context): ch.name
for ch in self.channels
}

for future in as_completed(futures):
channel_name = futures[future]
try:
results = future.result(timeout=25)
channel_results[channel_name] = results
except Exception as e:
logger.error(f"Channel {channel_name} failed: {e}")

# Weighted fusion
candidate_scores = defaultdict(float)
for channel_name, items in channel_results.items():
weight = self.channel_weights.get(channel_name, 1.0)
for rank, item_id in enumerate(items):
# Rank-based scoring with channel weight
score = weight * (1.0 / (rank + 1))
candidate_scores[item_id] += score

# Select top candidates
top_candidates = sorted(
candidate_scores.items(),
key=lambda x: x[1],
reverse=True
)[:target_size]

return [item_id for item_id, _ in top_candidates]

Q&A: Multi-Channel Recall

Q1: How many recall channels should we use?

A: Typically 5-10 channels. Too few limits diversity; too many increases latency and complexity. Start with 3-5 core channels (CF, deep learning, real-time behavior) and add specialized channels based on business needs.

Q2: How to handle channel failures?

A: Implement circuit breakers and fallbacks. Each channel should have a timeout (20-30ms). If a channel fails, continue with others. Always maintain a fallback channel (e.g., popular items) that never fails.

Q3: Should we deduplicate across channels?

A: Yes, but after fusion. Deduplication before fusion loses information about item importance across channels. Fuse first, then deduplicate based on final scores.

Coarse Ranking

Coarse ranking filters recall results using lightweight models, reducing candidates from thousands to hundreds.

Lightweight Model Design

1. Linear Models

Fast and interpretable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import xgboost as xgb

class CoarseRankingModel:
def __init__(self):
self.model = xgb.XGBRanker(
objective='rank:pairwise',
tree_method='hist',
max_depth=4,
n_estimators=50,
learning_rate=0.1
)

def train(self, X_train, y_train, group_train):
"""Train with group-based ranking"""
self.model.fit(
X_train, y_train,
group=group_train,
eval_set=[(X_train, y_train)],
verbose=False
)

def predict(self, features):
"""Fast prediction"""
return self.model.predict(features)

2. Shallow Neural Networks

More capacity than linear models:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ShallowRankingNet(nn.Module):
def __init__(self, input_dim, hidden_dims=[128, 64]):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.1)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.network = nn.Sequential(*layers)

def forward(self, x):
return self.network(x).squeeze(-1)

3. Feature Engineering for Coarse Ranking

Focus on high-signal, low-computation features:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CoarseRankingFeatures:
def extract(self, user_id, item_id, context):
"""Extract lightweight features"""
features = {}

# User features
features['user_click_count_7d'] = self.get_user_stat(user_id, 'click_count', days=7)
features['user_purchase_count_30d'] = self.get_user_stat(user_id, 'purchase_count', days=30)
features['user_avg_price'] = self.get_user_stat(user_id, 'avg_price')

# Item features
features['item_popularity'] = self.get_item_stat(item_id, 'popularity')
features['item_ctr_7d'] = self.get_item_stat(item_id, 'ctr', days=7)
features['item_price'] = self.get_item_stat(item_id, 'price')

# Interaction features
features['user_item_click_count'] = self.get_interaction_count(user_id, item_id, 'click')
features['user_category_click_count'] = self.get_category_interaction(user_id, item_id, 'click')

# Context features
features['hour'] = context.get('hour', 0)
features['day_of_week'] = context.get('day_of_week', 0)
features['device_type'] = self.encode_device(context.get('device', 'unknown'))

return np.array([features[k] for k in sorted(features.keys())])

Q&A: Coarse Ranking

Q4: What's the ideal candidate reduction ratio?

A: Typically 10:1 (e.g., 2000 → 200). Too aggressive loses good candidates; too conservative wastes fine ranking resources. Monitor recall@K metrics to find the sweet spot.

Q5: Should coarse ranking use the same features as fine ranking?

A: No. Coarse ranking prioritizes speed, so use fewer, simpler features. Fine ranking can use complex, expensive features. Overlap is fine, but coarse ranking should avoid heavy computations.

Fine Ranking

Fine ranking uses complex models to score the remaining candidates precisely.

Deep Learning Models

1. Wide & Deep Architecture

Combines memorization and generalization:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class WideDeepRanking(nn.Module):
def __init__(self, wide_dim, deep_dims, embedding_dims):
super().__init__()
# Wide component (linear)
self.wide = nn.Linear(wide_dim, 1)

# Deep component
self.embeddings = nn.ModuleDict({
name: nn.Embedding(num_embeddings, dim)
for name, (num_embeddings, dim) in embedding_dims.items()
})

deep_input_dim = sum(dim for _, dim in embedding_dims.values())
deep_input_dim += deep_dims[0] # Dense features

layers = []
prev_dim = deep_input_dim
for hidden_dim in deep_dims[1:]:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.deep = nn.Sequential(*layers)

def forward(self, wide_features, sparse_ids, dense_features):
# Wide part
wide_out = self.wide(wide_features)

# Deep part
embeddings = [
self.embeddings[name](ids)
for name, ids in sparse_ids.items()
]
deep_input = torch.cat(embeddings + [dense_features], dim=1)
deep_out = self.deep(deep_input)

# Combine
return wide_out + deep_out

2. DeepFM Architecture

Factorization machines with deep networks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class DeepFM(nn.Module):
def __init__(self, field_dims, embed_dim=10, mlp_dims=[128, 64]):
super().__init__()
self.field_dims = field_dims
self.embed_dim = embed_dim

# FM component
self.embeddings = nn.ModuleList([
nn.Embedding(field_dim, embed_dim)
for field_dim in field_dims
])
self.fm_linear = nn.ModuleList([
nn.Linear(field_dim, 1)
for field_dim in field_dims
])

# Deep component
mlp_input_dim = len(field_dims) * embed_dim
layers = []
prev_dim = mlp_input_dim
for hidden_dim in mlp_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.mlp = nn.Sequential(*layers)

def forward(self, x):
# x: [batch_size, num_fields]
embeddings = [emb(x[:, i]) for i, emb in enumerate(self.embeddings)]

# FM part
fm_linear = sum([linear(x[:, i:i+1]) for i, linear in enumerate(self.fm_linear)])

# Sum of squares
sum_square = sum(embeddings) ** 2
square_sum = sum([emb ** 2 for emb in embeddings])
fm_interaction = 0.5 * (sum_square - square_sum).sum(dim=1, keepdim=True)

# Deep part
deep_input = torch.cat(embeddings, dim=1)
deep_out = self.mlp(deep_input)

return fm_linear + fm_interaction + deep_out

3. DIN (Deep Interest Network)

Models user interest evolution:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class DIN(nn.Module):
def __init__(self, item_embed_dim, user_embed_dim, hidden_dims=[128, 64]):
super().__init__()
self.item_embed_dim = item_embed_dim

# Attention mechanism
self.attention_net = nn.Sequential(
nn.Linear(item_embed_dim * 4, 36),
nn.ReLU(),
nn.Linear(36, 1)
)

# User behavior encoding
self.behavior_encoder = nn.LSTM(
item_embed_dim, user_embed_dim, batch_first=True
)

# Final MLP
mlp_input_dim = item_embed_dim + user_embed_dim + item_embed_dim
layers = []
prev_dim = mlp_input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.mlp = nn.Sequential(*layers)

def forward(self, candidate_item, user_behaviors, user_profile):
"""
candidate_item: [batch_size, item_embed_dim]
user_behaviors: [batch_size, seq_len, item_embed_dim]
user_profile: [batch_size, user_embed_dim]
"""
batch_size = candidate_item.size(0)

# Attention over behaviors
candidate_expanded = candidate_item.unsqueeze(1).expand_as(user_behaviors)
attention_input = torch.cat([
user_behaviors,
candidate_expanded,
user_behaviors - candidate_expanded,
user_behaviors * candidate_expanded
], dim=2)

attention_weights = self.attention_net(attention_input).squeeze(-1)
attention_weights = torch.softmax(attention_weights, dim=1)

# Weighted behavior representation
weighted_behaviors = (user_behaviors * attention_weights.unsqueeze(-1)).sum(dim=1)

# Final prediction
mlp_input = torch.cat([
candidate_item,
weighted_behaviors,
user_profile
], dim=1)

return self.mlp(mlp_input)

Feature Engineering for Fine Ranking

Complex, high-signal features:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class FineRankingFeatures:
def extract(self, user_id, item_id, context):
"""Extract comprehensive features"""
features = {}

# User sequence features
user_history = self.get_user_history(user_id, max_len=50)
features['user_history_emb'] = self.encode_sequence(user_history)
features['user_interest_diversity'] = self.compute_diversity(user_history)
features['user_preference_shift'] = self.compute_preference_shift(user_history)

# Item features
item_features = self.get_item_features(item_id)
features.update(item_features)
features['item_quality_score'] = self.compute_quality_score(item_id)

# Cross features
features['user_item_similarity'] = self.compute_similarity(
user_history, item_features
)
features['user_category_affinity'] = self.get_category_affinity(
user_id, item_features['category']
)

# Temporal features
features['time_since_last_interaction'] = self.get_time_since_interaction(
user_id, item_id
)
features['hour_category_match'] = self.check_hour_category_match(
context['hour'], item_features['category']
)

# Contextual features
features['device_item_match'] = self.check_device_match(
context['device'], item_features
)
features['location_category_match'] = self.check_location_match(
context['location'], item_features['category']
)

return features

Q&A: Fine Ranking

Q6: How complex should fine ranking models be?

A: Balance accuracy and latency. Start with Wide & Deep or DeepFM. Add complexity (e.g., DIN, DIEN) only if it improves metrics significantly. Monitor inference time — complex models may require model compression.

Q7: How to handle feature engineering complexity?

A: Use feature stores and automated feature engineering. Store precomputed features in Redis/feature store. Use tools like Feast or Tecton for feature management. Consider automated feature selection to reduce dimensionality.

Reranking

Reranking applies business rules, diversity constraints, and freshness adjustments to the final ranking.

Diversity Reranking

Ensuring result diversity:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class DiversityReranker:
def __init__(self, diversity_weight=0.3):
self.diversity_weight = diversity_weight

def rerank(self, items, scores, item_features, top_k=20):
"""Rerank with diversity constraint"""
selected = []
remaining = list(zip(items, scores, item_features))

# Greedy selection with diversity
while len(selected) < top_k and remaining:
best_idx = None
best_score = float('-inf')

for idx, (item, score, features) in enumerate(remaining):
# Relevance score
relevance = score

# Diversity score (distance from selected items)
if selected:
diversity = min([
self.compute_distance(features, sel_features)
for _, _, sel_features in selected
])
else:
diversity = 1.0

# Combined score
combined = (1 - self.diversity_weight) * relevance + \
self.diversity_weight * diversity

if combined > best_score:
best_score = combined
best_idx = idx

if best_idx is not None:
selected.append(remaining.pop(best_idx))

return [item for item, _, _ in selected]

def compute_distance(self, features1, features2):
"""Compute feature distance"""
# Example: cosine distance
vec1 = np.array([features1.get(k, 0) for k in sorted(features1.keys())])
vec2 = np.array([features2.get(k, 0) for k in sorted(features2.keys())])
return 1 - np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2) + 1e-8)

Business Rules Reranking

Applying business constraints:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class BusinessRulesReranker:
def __init__(self, rules):
self.rules = rules

def rerank(self, items, scores, item_metadata):
"""Apply business rules"""
results = []

for item, score in zip(items, scores):
# Check rules
passed = True
adjusted_score = score

for rule in self.rules:
rule_result = rule.apply(item, item_metadata.get(item, {}))
if not rule_result.passed:
passed = False
break
adjusted_score = rule_result.adjust_score(adjusted_score)

if passed:
results.append((item, adjusted_score))

# Sort by adjusted score
results.sort(key=lambda x: x[1], reverse=True)
return [item for item, _ in results]

class BusinessRule:
def __init__(self, name, condition, score_adjustment=0.0):
self.name = name
self.condition = condition
self.score_adjustment = score_adjustment

def apply(self, item, metadata):
passed = self.condition(item, metadata)
return RuleResult(passed, self.score_adjustment if passed else -float('inf'))

# Example rules
rules = [
BusinessRule(
'promoted_items',
lambda item, meta: meta.get('is_promoted', False),
score_adjustment=0.5
),
BusinessRule(
'new_items_boost',
lambda item, meta: meta.get('days_since_launch', 365) < 7,
score_adjustment=0.2
),
BusinessRule(
'exclude_out_of_stock',
lambda item, meta: meta.get('stock', 0) > 0,
score_adjustment=0.0
)
]

Freshness Reranking

Promoting recent content:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class FreshnessReranker:
def __init__(self, freshness_decay_hours=24):
self.freshness_decay_hours = freshness_decay_hours

def rerank(self, items, scores, item_timestamps):
"""Boost fresh items"""
current_time = datetime.now()

adjusted_scores = []
for item, score in zip(items, scores):
item_time = item_timestamps.get(item)
if item_time:
age_hours = (current_time - item_time).total_seconds() / 3600
freshness_boost = np.exp(-age_hours / self.freshness_decay_hours)
adjusted_score = score * (1 + 0.3 * freshness_boost) # Up to 30% boost
else:
adjusted_score = score

adjusted_scores.append((item, adjusted_score))

adjusted_scores.sort(key=lambda x: x[1], reverse=True)
return [item for item, _ in adjusted_scores]

Q&A: Reranking

Q8: How to balance relevance and diversity?

A: Use MMR (Maximal Marginal Relevance) or similar algorithms. Start with diversity_weight=0.2-0.3. A/B test different weights and monitor both CTR and diversity metrics (e.g., category diversity, price diversity).

Q9: Should reranking be model-based or rule-based?

A: Hybrid approach works best. Use rules for hard constraints (e.g., exclude out-of-stock items) and model-based reranking for soft optimization (e.g., diversity, freshness). Consider learning-to-rank models for reranking if you have sufficient data.

Alibaba EasyRec Framework

EasyRec is Alibaba's open-source recommendation framework, providing end-to-end tools for building production systems.

Architecture Overview

EasyRec provides:

  1. Feature Engineering: Automated feature extraction and transformation
  2. Model Training: Pre-built models (Wide&Deep, DeepFM, DIN, etc.)
  3. Serving: High-performance inference engine
  4. Evaluation: Comprehensive metrics and A/B testing tools

Key Components

1. Feature Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# EasyRec feature config
features {
user_features {
features {
user_id: {
feature_type: IdFeature
embedding_dim: 64
}
user_age: {
feature_type: RawFeature
default_value: 25
}
}
}

item_features {
features {
item_id: {
feature_type: IdFeature
embedding_dim: 64
}
item_category: {
feature_type: IdFeature
embedding_dim: 32
}
item_price: {
feature_type: RawFeature
}
}
}

context_features {
features {
hour: {
feature_type: IdFeature
embedding_dim: 16
}
device: {
feature_type: IdFeature
embedding_dim: 16
}
}
}
}

2. Model Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# EasyRec model config
model_config {
model_class: "DeepFM"

feature_groups {
group_name: "wide"
feature_names: ["user_id", "item_id", "item_category"]
}

feature_groups {
group_name: "deep"
feature_names: ["user_id", "item_id", "item_category", "item_price", "hour"]
}

deepfm {
wide_output_dim: 1
deep_hidden_dims: [128, 64, 32]
dropout_rate: 0.2
}

embedding_regularization: 1e-6
l2_regularization: 1e-5
}

3. Training Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# EasyRec training example
import easy_rec

# Initialize
config = easy_rec.Config('model_config.yaml')
trainer = easy_rec.Trainer(config)

# Train
trainer.train(
train_data_path='s3://bucket/train/',
eval_data_path='s3://bucket/eval/',
output_path='s3://bucket/models/',
num_epochs=10
)

# Export model
trainer.export_model(
checkpoint_path='s3://bucket/models/checkpoint-10000',
export_path='s3://bucket/models/exported/'
)

4. Serving

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# EasyRec serving
import easy_rec

# Load model
predictor = easy_rec.Predictor('s3://bucket/models/exported/')

# Predict
features = {
'user_id': [12345],
'item_id': [67890],
'item_category': [5],
'item_price': [99.99],
'hour': [14]
}

scores = predictor.predict(features)

EasyRec Best Practices

  1. Feature Engineering: Use EasyRec's feature transformations (normalization, bucketization, etc.)
  2. Model Selection: Start with DeepFM, upgrade to DIN/DIEN if needed
  3. Distributed Training: Use EasyRec's distributed training for large datasets
  4. Model Versioning: Leverage EasyRec's model versioning for A/B testing

Q&A: EasyRec

Q10: How does EasyRec compare to TensorFlow Recommenders?

A: EasyRec is more production-oriented with built-in serving, feature engineering, and A/B testing. TensorFlow Recommenders is more flexible but requires more custom code. EasyRec is better for rapid deployment; TF Recommenders for research.

ByteDance LONGER System

LONGER (Learning to Optimize Recommendation with Graph Enhanced Ranking) is ByteDance's graph-enhanced ranking system.

Architecture

LONGER combines: 1. Graph Neural Networks: Modeling user-item relationships 2. Multi-Task Learning: Optimizing multiple objectives simultaneously 3. Real-Time Updates: Incremental graph updates

Key Innovations

1. Graph Construction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class LONGERGraph:
def __init__(self):
self.user_nodes = {}
self.item_nodes = {}
self.edges = defaultdict(list)

def add_interaction(self, user_id, item_id, interaction_type, timestamp):
"""Add interaction to graph"""
edge = {
'user_id': user_id,
'item_id': item_id,
'type': interaction_type,
'timestamp': timestamp,
'weight': self._compute_weight(interaction_type)
}
self.edges[user_id].append(edge)
self.edges[item_id].append(edge)

def _compute_weight(self, interaction_type):
"""Compute edge weight"""
weights = {'view': 1.0, 'click': 2.0, 'share': 3.0, 'purchase': 5.0}
return weights.get(interaction_type, 1.0)

2. Graph Neural Network

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv

class LONGERGNN(nn.Module):
def __init__(self, input_dim, hidden_dims=[128, 64], num_heads=4):
super().__init__()
self.convs = nn.ModuleList()
prev_dim = input_dim

# GAT layers for attention
for hidden_dim in hidden_dims:
self.convs.append(
GATConv(prev_dim, hidden_dim, heads=num_heads, concat=True)
)
prev_dim = hidden_dim * num_heads

# Final projection
self.final_proj = nn.Linear(prev_dim, hidden_dims[-1])

def forward(self, x, edge_index, edge_weight=None):
"""Forward pass through graph"""
for conv in self.convs:
x = conv(x, edge_index, edge_weight)
x = F.relu(x)
x = F.dropout(x, training=self.training)

x = self.final_proj(x)
return x

3. Multi-Task Learning

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class LONGERMultiTask(nn.Module):
def __init__(self, gnn, task_dims):
super().__init__()
self.gnn = gnn
self.task_heads = nn.ModuleDict({
task: nn.Linear(gnn.final_proj.out_features, dim)
for task, dim in task_dims.items()
})

def forward(self, user_emb, item_emb, edge_index, task='ctr'):
"""Forward pass for specific task"""
# Combine user and item embeddings
node_emb = torch.cat([user_emb, item_emb], dim=0)

# GNN encoding
encoded = self.gnn(node_emb, edge_index)

# Task-specific head
user_encoded = encoded[:user_emb.size(0)]
item_encoded = encoded[user_emb.size(0):]

# Interaction
interaction = user_encoded * item_encoded
output = self.task_heads[task](interaction)

return output

LONGER Advantages

  1. Graph Structure: Captures complex user-item relationships
  2. Cold Start: Better handling of new users/items through graph propagation
  3. Multi-Objective: Optimizes CTR, engagement, and revenue simultaneously
  4. Real-Time: Incremental updates enable real-time personalization

Q&A: LONGER

Q11: When should we use graph-based approaches like LONGER?

A: Use graph methods when you have rich interaction data and need to model complex relationships. They're especially effective for cold start problems and multi-hop reasoning. However, they require more computational resources than traditional methods.

Feature Engineering Automation

Manual feature engineering is time-consuming and error-prone. Automation is essential for scale.

Automated Feature Generation

1. Feature Crosses

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AutoFeatureCross:
def __init__(self, max_cross_degree=2):
self.max_cross_degree = max_cross_degree
self.feature_crosses = []

def generate_crosses(self, feature_names):
"""Generate feature crosses"""
crosses = []
for degree in range(2, self.max_cross_degree + 1):
from itertools import combinations
for combo in combinations(feature_names, degree):
crosses.append(combo)
return crosses

def evaluate_cross(self, cross, data, target):
"""Evaluate feature cross importance"""
# Create cross feature
cross_values = data[list(cross)].apply(
lambda x: '_'.join(map(str, x)), axis=1
)

# Compute mutual information or correlation
from sklearn.feature_selection import mutual_info_regression
mi = mutual_info_regression(
cross_values.values.reshape(-1, 1),
target
)[0]

return mi

2. Temporal Features

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class TemporalFeatureGenerator:
def generate(self, user_id, item_id, timestamp):
"""Generate temporal features"""
features = {}

# Time-based features
dt = datetime.fromtimestamp(timestamp)
features['hour'] = dt.hour
features['day_of_week'] = dt.weekday()
features['is_weekend'] = dt.weekday() >= 5
features['month'] = dt.month
features['day_of_month'] = dt.day

# Cyclical encoding
features['hour_sin'] = np.sin(2 * np.pi * dt.hour / 24)
features['hour_cos'] = np.cos(2 * np.pi * dt.hour / 24)
features['day_sin'] = np.sin(2 * np.pi * dt.weekday() / 7)
features['day_cos'] = np.cos(2 * np.pi * dt.weekday() / 7)

# Time since events
features['time_since_last_click'] = self.get_time_since(
user_id, 'click', timestamp
)
features['time_since_last_purchase'] = self.get_time_since(
user_id, 'purchase', timestamp
)

return features

3. Statistical Features

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class StatisticalFeatureGenerator:
def generate(self, user_id, item_id, window_days=[1, 7, 30]):
"""Generate statistical features"""
features = {}

for window in window_days:
# User statistics
user_clicks = self.get_user_stat(user_id, 'click', window)
user_purchases = self.get_user_stat(user_id, 'purchase', window)
features[f'user_clicks_{window}d'] = user_clicks
features[f'user_purchases_{window}d'] = user_purchases
features[f'user_ctr_{window}d'] = (
user_purchases / (user_clicks + 1e-8)
)

# Item statistics
item_clicks = self.get_item_stat(item_id, 'click', window)
item_purchases = self.get_item_stat(item_id, 'purchase', window)
features[f'item_clicks_{window}d'] = item_clicks
features[f'item_purchases_{window}d'] = item_purchases
features[f'item_ctr_{window}d'] = (
item_purchases / (item_clicks + 1e-8)
)

# Interaction statistics
interaction_count = self.get_interaction_stat(
user_id, item_id, window
)
features[f'interaction_count_{window}d'] = interaction_count

return features

Feature Selection

Automatically selecting important features:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression

class AutoFeatureSelection:
def __init__(self, method='mutual_info', k=100):
self.method = method
self.k = k
self.selector = None

def fit(self, X, y):
"""Fit feature selector"""
if self.method == 'mutual_info':
self.selector = SelectKBest(
mutual_info_regression, k=self.k
)
elif self.method == 'f_test':
self.selector = SelectKBest(f_regression, k=self.k)

self.selector.fit(X, y)
return self

def transform(self, X):
"""Select features"""
return self.selector.transform(X)

def get_selected_features(self, feature_names):
"""Get names of selected features"""
selected_mask = self.selector.get_support()
return [name for name, selected in zip(feature_names, selected_mask) if selected]

Feature Store

Centralized feature management:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FeatureStore:
def __init__(self, redis_client, feature_ttl=3600):
self.redis = redis_client
self.ttl = feature_ttl

def get_feature(self, entity_type, entity_id, feature_name):
"""Get feature value"""
key = f"{entity_type}:{entity_id}:{feature_name}"
value = self.redis.get(key)
if value:
return json.loads(value)
return None

def set_feature(self, entity_type, entity_id, feature_name, value):
"""Set feature value"""
key = f"{entity_type}:{entity_type}:{feature_name}"
self.redis.setex(
key,
self.ttl,
json.dumps(value)
)

def batch_get_features(self, entity_type, entity_ids, feature_names):
"""Batch get features"""
keys = [
f"{entity_type}:{eid}:{fname}"
for eid in entity_ids
for fname in feature_names
]
values = self.redis.mget(keys)

# Reshape to [num_entities, num_features]
num_features = len(feature_names)
return [
[json.loads(v) if v else None for v in values[i:i+num_features]]
for i in range(0, len(values), num_features)
]

Q&A: Feature Engineering

Q12: How to balance feature engineering automation and manual curation?

A: Automate low-level features (statistical, temporal) and use automation to discover crosses. Manually curate high-level business features (e.g., user segments, item categories). Use feature importance to guide manual efforts.

Q13: How often should we refresh features?

A: Real-time features (e.g., recent clicks) update continuously. Statistical features refresh hourly or daily. Embedding features may refresh weekly. Monitor feature drift and refresh when distributions change significantly.

A/B Testing Framework

A/B testing is crucial for validating improvements and making data-driven decisions.

Experiment Design

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class ABTestFramework:
def __init__(self, redis_client):
self.redis = redis_client
self.experiments = {}

def create_experiment(self, experiment_id, variants, traffic_split):
"""Create A/B test experiment"""
experiment = {
'id': experiment_id,
'variants': variants,
'traffic_split': traffic_split,
'start_time': datetime.now(),
'status': 'running'
}
self.experiments[experiment_id] = experiment
self.redis.set(
f"experiment:{experiment_id}",
json.dumps(experiment)
)

def assign_variant(self, user_id, experiment_id):
"""Assign user to variant"""
# Consistent hashing for stable assignment
hash_value = hash(f"{user_id}:{experiment_id}") % 100
experiment = self.experiments.get(experiment_id)

if not experiment:
return 'control'

cumulative = 0
for variant, split in experiment['traffic_split'].items():
cumulative += split
if hash_value < cumulative:
return variant

return 'control'

def log_event(self, user_id, experiment_id, variant, event_type, value=None):
"""Log experiment event"""
event = {
'user_id': user_id,
'experiment_id': experiment_id,
'variant': variant,
'event_type': event_type,
'value': value,
'timestamp': datetime.now().isoformat()
}
self.redis.lpush(
f"experiment:{experiment_id}:events",
json.dumps(event)
)

def analyze_results(self, experiment_id, metric='ctr'):
"""Analyze experiment results"""
events = self._get_events(experiment_id)

# Group by variant
variant_stats = defaultdict(lambda: {'impressions': 0, 'clicks': 0})

for event in events:
variant = event['variant']
if event['event_type'] == 'impression':
variant_stats[variant]['impressions'] += 1
elif event['event_type'] == 'click':
variant_stats[variant]['clicks'] += 1

# Compute metrics
results = {}
for variant, stats in variant_stats.items():
ctr = stats['clicks'] / (stats['impressions'] + 1e-8)
results[variant] = {
'ctr': ctr,
'impressions': stats['impressions'],
'clicks': stats['clicks']
}

# Statistical significance
if 'control' in results and len(results) > 1:
control_ctr = results['control']['ctr']
control_n = results['control']['impressions']

for variant, stats in results.items():
if variant != 'control':
variant_ctr = stats['ctr']
variant_n = stats['impressions']

# Z-test for proportions
p_pooled = (results['control']['clicks'] + stats['clicks']) / \
(control_n + variant_n)
se = np.sqrt(p_pooled * (1 - p_pooled) * (1/control_n + 1/variant_n))
z_score = (variant_ctr - control_ctr) / (se + 1e-8)

# Two-tailed test
p_value = 2 * (1 - norm.cdf(abs(z_score)))

stats['lift'] = (variant_ctr - control_ctr) / (control_ctr + 1e-8)
stats['p_value'] = p_value
stats['significant'] = p_value < 0.05

return results

Q&A: A/B Testing

Q14: How long should A/B tests run?

A: Run until statistical significance is reached or minimum sample size (typically 2-4 weeks). Use power analysis to determine required sample size before starting. Don't stop early due to early positive results — wait for full duration.

Q15: How to handle multiple simultaneous experiments?

A: Use experiment layering and orthogonal assignment. Ensure experiments don't interfere by using consistent hashing with experiment IDs. Monitor for interactions between experiments.

Performance Optimization

Production systems require careful optimization to meet latency and throughput requirements.

Model Optimization

1. Model Quantization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import torch.quantization as quantization

class ModelQuantizer:
def quantize_model(self, model, calibration_data):
"""Quantize model to INT8"""
model.eval()

# Prepare for quantization
model.qconfig = quantization.get_default_qconfig('fbgemm')
quantization.prepare(model, inplace=True)

# Calibrate
with torch.no_grad():
for batch in calibration_data:
model(batch)

# Convert to quantized
quantized_model = quantization.convert(model, inplace=False)
return quantized_model

2. Model Pruning

1
2
3
4
5
6
7
8
9
10
11
import torch.nn.utils.prune as prune

class ModelPruner:
def prune_model(self, model, pruning_ratio=0.3):
"""Prune model weights"""
for name, module in model.named_modules():
if isinstance(module, nn.Linear):
prune.l1_unstructured(module, name='weight', amount=pruning_ratio)
prune.remove(module, 'weight')

return model

3. Knowledge Distillation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class KnowledgeDistillation:
def __init__(self, teacher_model, student_model, temperature=3.0):
self.teacher = teacher_model
self.student = student_model
self.temperature = temperature

def distill_loss(self, student_logits, teacher_logits, labels, alpha=0.7):
"""Compute distillation loss"""
# Soft targets from teacher
teacher_soft = F.softmax(teacher_logits / self.temperature, dim=1)
student_soft = F.log_softmax(student_logits / self.temperature, dim=1)

# Distillation loss
distillation_loss = F.kl_div(
student_soft, teacher_soft, reduction='batchmean'
) * (self.temperature ** 2)

# Hard targets
hard_loss = F.cross_entropy(student_logits, labels)

# Combined
return alpha * distillation_loss + (1 - alpha) * hard_loss

Serving Optimization

1. Batch Inference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class BatchInference:
def __init__(self, model, batch_size=32, max_wait_ms=10):
self.model = model
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.queue = Queue()
self.results = {}

async def predict(self, request_id, features):
"""Batch prediction"""
self.queue.put((request_id, features, asyncio.Event()))

# Wait for batch
await asyncio.sleep(self.max_wait_ms / 1000)

# Process batch
batch = []
request_events = []
while len(batch) < self.batch_size and not self.queue.empty():
req_id, feat, event = self.queue.get()
batch.append(feat)
request_events.append((req_id, event))

if batch:
predictions = self.model.predict(np.array(batch))
for (req_id, event), pred in zip(request_events, predictions):
self.results[req_id] = pred
event.set()

# Wait for result
event = self.results.get(request_id)
if event:
await event.wait()
return self.results.pop(request_id)

2. Caching

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class PredictionCache:
def __init__(self, redis_client, ttl=300):
self.redis = redis_client
self.ttl = ttl

def get_cache_key(self, user_id, item_ids):
"""Generate cache key"""
item_str = ','.join(sorted(map(str, item_ids)))
return f"pred:{user_id}:{hash(item_str)}"

def get(self, user_id, item_ids):
"""Get cached predictions"""
key = self.get_cache_key(user_id, item_ids)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None

def set(self, user_id, item_ids, predictions):
"""Cache predictions"""
key = self.get_cache_key(user_id, item_ids)
self.redis.setex(key, self.ttl, json.dumps(predictions))

3. Feature Precomputation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class FeaturePrecomputation:
def __init__(self, feature_store):
self.feature_store = feature_store

def precompute_user_features(self, user_id):
"""Precompute user features"""
features = {
'stats_7d': self.compute_user_stats(user_id, days=7),
'stats_30d': self.compute_user_stats(user_id, days=30),
'embeddings': self.get_user_embeddings(user_id),
'preferences': self.get_user_preferences(user_id)
}
self.feature_store.set_feature('user', user_id, 'precomputed', features)

def precompute_item_features(self, item_id):
"""Precompute item features"""
features = {
'popularity': self.compute_popularity(item_id),
'quality_score': self.compute_quality_score(item_id),
'embeddings': self.get_item_embeddings(item_id),
'metadata': self.get_item_metadata(item_id)
}
self.feature_store.set_feature('item', item_id, 'precomputed', features)

Q&A: Performance Optimization

Q16: How to choose between quantization, pruning, and distillation?

A: Quantization for fastest inference (2-4x speedup). Pruning for model size reduction. Distillation for accuracy preservation with smaller models. Often combine: distill → prune → quantize for maximum optimization.

Q17: What's the trade-off between batch size and latency?

A: Larger batches improve throughput but increase latency (waiting for batch to fill). Find the sweet spot: typically batch_size=16-32 with max_wait=5-10ms works well. Monitor p95 latency, not just average.

Deployment and Monitoring

Production deployment requires robust infrastructure and comprehensive monitoring.

Deployment Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DeploymentPipeline:
def __init__(self, model_registry, serving_cluster):
self.model_registry = model_registry
self.serving_cluster = serving_cluster

def deploy(self, model_version, traffic_percentage=10):
"""Gradual rollout deployment"""
# 1. Validate model
if not self.validate_model(model_version):
raise ValueError("Model validation failed")

# 2. Deploy to staging
staging_endpoint = self.serving_cluster.deploy(
model_version, environment='staging'
)

# 3. Smoke tests
if not self.run_smoke_tests(staging_endpoint):
raise ValueError("Smoke tests failed")

# 4. Canary deployment
canary_endpoint = self.serving_cluster.deploy(
model_version, environment='production', traffic_percentage=traffic_percentage
)

# 5. Monitor metrics
metrics = self.monitor_metrics(canary_endpoint, duration_minutes=60)

# 6. Full rollout or rollback
if self.should_rollout(metrics):
self.serving_cluster.update_traffic(canary_endpoint, 100)
else:
self.serving_cluster.rollback(canary_endpoint)
raise ValueError("Canary deployment failed metrics check")

def validate_model(self, model_version):
"""Validate model before deployment"""
model = self.model_registry.load_model(model_version)

# Check model structure
# Check prediction format
# Check performance on holdout set

return True

def run_smoke_tests(self, endpoint):
"""Run basic smoke tests"""
test_cases = [
{'user_id': 1, 'item_ids': [100, 200, 300]},
{'user_id': 2, 'item_ids': [150, 250, 350]}
]

for test_case in test_cases:
try:
response = self.serving_cluster.predict(endpoint, test_case)
if not response or len(response) == 0:
return False
except Exception as e:
logger.error(f"Smoke test failed: {e}")
return False

return True

def should_rollout(self, metrics):
"""Decide if rollout should continue"""
# Check latency
if metrics['p95_latency'] > 100: # ms
return False

# Check error rate
if metrics['error_rate'] > 0.01: # 1%
return False

# Check prediction quality (if available)
if 'prediction_quality' in metrics:
if metrics['prediction_quality'] < 0.95:
return False

return True

Monitoring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class RecommendationMonitor:
def __init__(self, metrics_client):
self.metrics = metrics_client

def log_prediction(self, user_id, item_ids, scores, latency_ms):
"""Log prediction metrics"""
self.metrics.histogram('prediction.latency', latency_ms)
self.metrics.gauge('prediction.batch_size', len(item_ids))

# Score distribution
for score in scores:
self.metrics.histogram('prediction.scores', score)

def log_serving_error(self, error_type, error_message):
"""Log serving errors"""
self.metrics.increment(f'serving.errors.{error_type}')
self.metrics.log_event('serving_error', {
'type': error_type,
'message': error_message,
'timestamp': datetime.now().isoformat()
})

def log_feature_stats(self, feature_name, value):
"""Log feature statistics"""
self.metrics.histogram(f'features.{feature_name}', value)

def check_anomalies(self):
"""Check for anomalies"""
# Check latency spike
recent_latency = self.metrics.get_recent('prediction.latency', minutes=5)
if recent_latency and recent_latency['p95'] > 150:
self.alert('high_latency', recent_latency)

# Check error rate spike
error_rate = self.metrics.get_error_rate(minutes=5)
if error_rate > 0.05: # 5%
self.alert('high_error_rate', error_rate)

# Check prediction distribution shift
recent_scores = self.metrics.get_recent('prediction.scores', minutes=30)
baseline_scores = self.metrics.get_baseline('prediction.scores')
if self.detect_distribution_shift(recent_scores, baseline_scores):
self.alert('distribution_shift', {
'recent': recent_scores,
'baseline': baseline_scores
})

def detect_distribution_shift(self, recent, baseline):
"""Detect distribution shift using KL divergence"""
# Simplified: compare means and stds
if abs(recent['mean'] - baseline['mean']) > 2 * baseline['std']:
return True
return False

Q&A: Deployment and Monitoring

Q18: How to handle model versioning?

A: Use semantic versioning (major.minor.patch). Store models in a model registry (MLflow, DVC, or custom). Tag models with metadata (training data version, hyperparameters, metrics). Maintain version compatibility for gradual rollouts.

Q19: What metrics should we monitor?

A: System metrics (latency p50/p95/p99, throughput, error rate), prediction metrics (score distribution, prediction quality), business metrics (CTR, conversion rate, revenue). Set up alerts for anomalies in all metrics.

Complete Project Example

build a complete recommendation system from scratch.

Project Structure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
recommendation-system/
├── config/
│ ├── model_config.yaml
│ └── feature_config.yaml
├── src/
│ ├── recall/
│ │ ├── __init__.py
│ │ ├── collaborative_filtering.py
│ │ ├── deep_learning.py
│ │ └── multi_channel.py
│ ├── ranking/
│ │ ├── __init__.py
│ │ ├── coarse_ranking.py
│ │ ├── fine_ranking.py
│ │ └── reranking.py
│ ├── features/
│ │ ├── __init__.py
│ │ ├── extractor.py
│ │ └── store.py
│ ├── serving/
│ │ ├── __init__.py
│ │ ├── api.py
│ │ └── predictor.py
│ └── utils/
│ ├── __init__.py
│ └── monitoring.py
├── training/
│ ├── train_coarse.py
│ ├── train_fine.py
│ └── evaluate.py
├── tests/
│ ├── test_recall.py
│ ├── test_ranking.py
│ └── test_serving.py
└── requirements.txt

Main API Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# src/serving/api.py
from flask import Flask, request, jsonify
from src.serving.predictor import RecommendationPredictor
from src.utils.monitoring import RecommendationMonitor
import time

app = Flask(__name__)
predictor = RecommendationPredictor()
monitor = RecommendationMonitor()

@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})

@app.route('/recommend', methods=['POST'])
def recommend():
start_time = time.time()

try:
data = request.json
user_id = data['user_id']
context = data.get('context', {})
top_k = data.get('top_k', 20)

# Get recommendations
recommendations = predictor.predict(user_id, context, top_k)

latency_ms = (time.time() - start_time) * 1000

# Log metrics
monitor.log_prediction(
user_id,
[r['item_id'] for r in recommendations],
[r['score'] for r in recommendations],
latency_ms
)

return jsonify({
'recommendations': recommendations,
'latency_ms': latency_ms
})

except Exception as e:
monitor.log_serving_error('prediction_error', str(e))
return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

Complete Predictor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# src/serving/predictor.py
from src.recall.multi_channel import MultiChannelRecall
from src.ranking.coarse_ranking import CoarseRankingModel
from src.ranking.fine_ranking import FineRankingModel
from src.ranking.reranking import DiversityReranker, BusinessRulesReranker
from src.features.extractor import FeatureExtractor

class RecommendationPredictor:
def __init__(self):
# Initialize components
self.recall = MultiChannelRecall(...)
self.coarse_ranking = CoarseRankingModel(...)
self.fine_ranking = FineRankingModel(...)
self.reranker = DiversityReranker(...)
self.business_rules = BusinessRulesReranker(...)
self.feature_extractor = FeatureExtractor(...)

def predict(self, user_id, context, top_k=20):
"""Complete recommendation pipeline"""
# 1. Recall
candidates = self.recall.recall(user_id, context, target_size=2000)

# 2. Coarse ranking
coarse_features = [
self.feature_extractor.extract_coarse(user_id, item_id, context)
for item_id in candidates
]
coarse_scores = self.coarse_ranking.predict(coarse_features)
coarse_ranked = sorted(
zip(candidates, coarse_scores),
key=lambda x: x[1],
reverse=True
)[:200] # Reduce to 200

# 3. Fine ranking
fine_candidates = [item_id for item_id, _ in coarse_ranked]
fine_features = [
self.feature_extractor.extract_fine(user_id, item_id, context)
for item_id in fine_candidates
]
fine_scores = self.fine_ranking.predict(fine_features)
fine_ranked = sorted(
zip(fine_candidates, fine_scores),
key=lambda x: x[1],
reverse=True
)

# 4. Reranking
rerank_candidates = [item_id for item_id, _ in fine_ranked[:50]]
item_metadata = self.feature_extractor.get_item_metadata(rerank_candidates)

# Diversity reranking
diversity_ranked = self.reranker.rerank(
rerank_candidates,
[score for _, score in fine_ranked[:50]],
item_metadata,
top_k=top_k
)

# Business rules
final_ranked = self.business_rules.rerank(
diversity_ranked,
[score for item_id, score in fine_ranked if item_id in diversity_ranked],
item_metadata
)

# Format results
return [
{
'item_id': item_id,
'score': next(
score for iid, score in fine_ranked if iid == item_id
),
'rank': idx + 1
}
for idx, item_id in enumerate(final_ranked)
]

Training Script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# training/train_fine.py
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from src.ranking.fine_ranking import FineRankingModel
from src.features.extractor import FeatureExtractor

class RankingDataset(Dataset):
def __init__(self, data_path, feature_extractor):
self.data = pd.read_parquet(data_path)
self.feature_extractor = feature_extractor

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
row = self.data.iloc[idx]
features = self.feature_extractor.extract_fine(
row['user_id'],
row['item_id'],
row['context']
)
label = row['label'] # 0 or 1
return features, label

def train():
# Load data
train_dataset = RankingDataset('data/train.parquet', FeatureExtractor())
val_dataset = RankingDataset('data/val.parquet', FeatureExtractor())

train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False)

# Initialize model
model = FineRankingModel(input_dim=500)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCEWithLogitsLoss()

# Training loop
for epoch in range(10):
model.train()
train_loss = 0
for features, labels in train_loader:
optimizer.zero_grad()
predictions = model(features)
loss = criterion(predictions, labels.float())
loss.backward()
optimizer.step()
train_loss += loss.item()

# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for features, labels in val_loader:
predictions = model(features)
loss = criterion(predictions, labels.float())
val_loss += loss.item()

print(f"Epoch {epoch}: Train Loss={train_loss/len(train_loader):.4f}, "
f"Val Loss={val_loss/len(val_loader):.4f}")

# Save model
torch.save(model.state_dict(), 'models/fine_ranking_model.pt')

if __name__ == '__main__':
train()

Q&A: Complete System

Q20: How to handle cold start for new users?

A: Use content-based features and popular items as fallback. For new items, use content features and category-based similarity. Consider using graph methods (like LONGER) that can propagate information through the graph.

Q21: How to ensure system reliability?

A: Implement circuit breakers, timeouts, and fallbacks at every stage. Use health checks and graceful degradation. Monitor error rates and latency. Have rollback procedures ready. Test failure scenarios regularly.

Conclusion

Building industrial recommendation systems requires careful attention to architecture, performance, and operations. The multi-stage pipeline (recall → coarse ranking → fine ranking → reranking) provides a scalable framework for handling millions of users and items.

Key takeaways:

  1. Multi-channel recall is essential for diversity and coverage
  2. Staged ranking balances accuracy and latency
  3. Feature engineering automation enables rapid iteration
  4. A/B testing ensures data-driven improvements
  5. Performance optimization is critical for scale
  6. Monitoring and deployment practices ensure reliability

The frameworks and practices discussed — from EasyRec to LONGER — represent years of production experience. Adapt them to your specific use case, and always measure the impact of changes through rigorous experimentation.

As recommendation systems continue to evolve, new techniques like graph neural networks, transformer-based models, and reinforcement learning are pushing the boundaries. However, the fundamental principles of scalable architecture, careful feature engineering, and rigorous evaluation remain constant.

References

  1. Alibaba EasyRec: https://github.com/alibaba/EasyRec
  2. Zhou, G., et al. "Deep Interest Network for Click-Through Rate Prediction." KDD 2018. arXiv:1706.06978
  3. Guo, H., et al. "DeepFM: A Factorization-Machine based Neural Network for CTR Prediction." IJCAI 2017. arXiv:1703.04247
  4. Cheng, H., et al. "Wide & Deep Learning for Recommender Systems." DLRS 2016. arXiv:1606.07792

This article is part of a series on recommendation systems. For more articles, see the Recommendation Systems category.

  • Post title:Recommendation Systems (16): Industrial Architecture and Best Practices
  • Post author:Chen Kai
  • Create time:2026-02-03 23:11:11
  • Post link:https://www.chenk.top/recommendation-systems-16-industrial-practice/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.
 Comments