自然语言处理(九)—— 大语言模型架构深度解析
Chen Kai BOSS

ChatGPT 的出现让大语言模型( LLM)成为 AI 领域的焦点,但理解其工作原理并不容易。为什么 GPT 能生成流畅文本,而 BERT 更适合理解任务?为什么有些模型能处理数万 token 的长文本,而另一些在 2048 token 后就性能下降?这些差异背后是架构设计的根本性选择。

架构选择决定了模型的能力边界: Encoder-only 架构通过双向注意力理解上下文,但无法自回归生成; Decoder-only 架构擅长生成,但只能看到单向信息; Encoder-Decoder 架构兼顾两者,但计算成本更高。长文本处理技术( ALiBi 、 RoPE 、 Flash Attention)通过不同的位置编码和注意力优化,让模型突破序列长度限制。 MoE 架构通过稀疏激活实现万亿参数规模,而量化、 KV Cache 等技术让大模型能在消费级硬件上运行。

本文深入解析这些核心技术:从架构选择的权衡到长文本处理的实现细节,从 MoE 的路由机制到量化的误差控制,从 KV Cache 的内存优化到推理服务的工程实践。每个技术点都配有可运行的代码示例和性能分析,帮助读者不仅理解原理,更能动手实践。

LLM 架构选择: Encoder-only vs Decoder-only vs Encoder-Decoder

大语言模型的架构选择是决定其能力和应用场景的关键因素。目前主流的三种架构各有优劣,理解它们的区别对于选择合适的模型至关重要。

Encoder-only 架构

Encoder-only 架构只使用 Transformer 的编码器部分,典型代表是 BERT 。这种架构在预训练时通常使用掩码语言模型( Masked Language Modeling, MLM)任务。

特点: - 双向上下文理解:能够同时看到输入序列的前后文信息 - 适合理解任务:文本分类、命名实体识别、情感分析等 - 不适合生成任务:无法进行自回归生成

数学表示

对于输入序列 , Encoder-only 模型计算:

其中 是每个位置的上下文表示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from transformers import AutoModel, AutoTokenizer
import torch

# Encoder-only 模型示例( BERT)
model = AutoModel.from_pretrained("bert-base-uncased")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

text = "The cat sat on the mat"
inputs = tokenizer(text, return_tensors="pt")

# 前向传播:所有位置同时计算
outputs = model(**inputs)
# outputs.last_hidden_state shape: [batch_size, seq_len, hidden_size]
# 每个位置的表示都包含了双向上下文信息

应用场景: - 文本分类 - 命名实体识别( NER) - 情感分析 - 文本相似度计算 - 问答系统(需要理解上下文)

Decoder-only 架构

Decoder-only 架构只使用 Transformer 的解码器部分,典型代表是 GPT 系列。这种架构使用因果掩码( Causal Masking)确保每个位置只能看到之前的信息。

特点: - 自回归生成:逐 token 生成,每个 token 依赖于之前的所有 token - 单向上下文:只能看到当前位置之前的信息 - 适合生成任务:文本生成、对话系统、代码生成等

数学表示

对于输入序列, Decoder-only 模型在生成时:

其中 是位置 的隐藏状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

# Decoder-only 模型示例( GPT-2)
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

# 生成文本
prompt = "The future of AI is"
inputs = tokenizer(prompt, return_tensors="pt")

# 自回归生成
outputs = model.generate(
**inputs,
max_length=50,
num_return_sequences=1,
temperature=0.7
)

generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(generated_text)

应用场景: - 文本生成 - 对话系统 - 代码生成 - 文本补全 - 创意写作

Encoder-Decoder 架构

Encoder-Decoder 架构同时使用编码器和解码器,典型代表是 T5 、 BART 。编码器处理输入,解码器生成输出。

特点: - 双向理解 + 自回归生成:编码器双向理解输入,解码器单向生成输出 - 适合序列到序列任务:翻译、摘要、问答等 - 计算成本较高:需要同时维护编码器和解码器

数学表示

对于输入序列 和目标序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

# Encoder-Decoder 模型示例( T5)
model = AutoModelForSeq2SeqLM.from_pretrained("t5-small")
tokenizer = AutoTokenizer.from_pretrained("t5-small")

# 文本摘要任务
text = "The quick brown fox jumps over the lazy dog. " * 3
inputs = tokenizer("summarize: " + text, return_tensors="pt", max_length=512, truncation=True)

outputs = model.generate(
**inputs,
max_length=50,
num_beams=4,
early_stopping=True
)

summary = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(summary)

应用场景: - 机器翻译 - 文本摘要 - 问答系统 - 对话系统(需要理解上下文) - 文本改写

架构选择指南

架构类型 优势 劣势 典型应用
Encoder-only 双向理解,理解能力强 无法生成,需要额外任务头 分类、 NER 、相似度
Decoder-only 生成能力强,架构简单 只能单向理解 文本生成、对话
Encoder-Decoder 理解+生成,灵活 计算成本高,参数量大 翻译、摘要、问答

长文本处理技术

传统 Transformer 的注意力机制复杂度为,其中 是序列长度。当序列长度增加时,计算和内存成本急剧增长。为了解决这个问题,研究者们提出了多种长文本处理技术。

ALiBi( Attention with Linear Biases)

传统位置编码(如正弦位置编码)在训练时固定最大长度,超出这个长度时性能会急剧下降。 ALiBi 通过一个巧妙的思路解决了这个问题:不在嵌入层添加位置信息,而是在注意力计算时直接惩罚远距离的注意力连接。

核心思想

ALiBi 认为,位置信息应该体现在"距离越远,注意力应该越小"这个直觉上。它通过在注意力分数中添加与相对距离成比例的负偏置来实现这一点:

其中 是每个注意力头的斜率( slope), 是基于相对位置的线性偏置矩阵。对于位置,偏置值为,这意味着距离越远,注意力分数被减得越多。

斜率设计

ALiBi 为每个注意力头分配不同的斜率,通常使用几何序列:第 个头的斜率为。这样设计的原因是不同头关注不同尺度的依赖关系:小斜率头关注局部模式,大斜率头关注全局模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import torch
import torch.nn as nn
import math

class ALiBiAttention(nn.Module):
def __init__(self, d_model, n_heads, max_len=512):
super().__init__()
self.d_model = d_model
self.n_heads = n_heads
self.head_dim = d_model // n_heads

# ALiBi 斜率:每个头有不同的斜率
# 通常使用 2^{-8/n_heads * i},其中 i 是头的索引
slopes = []
for i in range(n_heads):
slope = 2 ** (-8 / n_heads * (i + 1))
slopes.append(slope)
self.register_buffer('slopes', torch.tensor(slopes))

self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, d_model)
self.v_proj = nn.Linear(d_model, d_model)
self.out_proj = nn.Linear(d_model, d_model)

def forward(self, x, mask=None):
batch_size, seq_len, _ = x.shape

Q = self.q_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
K = self.k_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
V = self.v_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)

# 计算注意力分数
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

# 添加 ALiBi 偏置
# bias shape: [n_heads, seq_len, seq_len]
bias = self._get_alibi_bias(seq_len, x.device)
scores = scores + bias.unsqueeze(0)

if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))

attn_weights = torch.softmax(scores, dim=-1)
attn_output = torch.matmul(attn_weights, V)

attn_output = attn_output.transpose(1, 2).contiguous().view(
batch_size, seq_len, self.d_model
)

return self.out_proj(attn_output)

def _get_alibi_bias(self, seq_len, device):
# 创建相对位置偏置矩阵
# 对于位置 i 和 j,偏置为 -m * |i - j|
positions = torch.arange(seq_len, device=device).float()
relative_positions = positions.unsqueeze(0) - positions.unsqueeze(1)

# 每个头应用不同的斜率
bias = -self.slopes.unsqueeze(1).unsqueeze(2) * relative_positions.abs()
return bias

优势: - 无需位置编码,简化模型架构 - 能够外推到更长的序列 - 训练和推理效率高

应用: - BLOOM 模型使用了 ALiBi - 适合需要处理长文本的场景

RoPE( Rotary Position Embedding)

RoPE 是 LLaMA 等主流模型采用的位置编码方法,其核心思想是将位置信息编码为旋转操作。相比绝对位置编码, RoPE 具有更好的外推能力,因为它编码的是相对位置关系。

数学原理

RoPE 将位置编码为复数域的旋转。对于维度为 的向量,将其分成 对,每对 视为一个复数。然后对这个复数乘以旋转因子,其中 是位置, 是频率。

对于位置 的查询向量 和位置 的键向量,旋转后的向量为:

其中 是旋转矩阵,Extra close brace or missing open brace\Theta = \{\theta_i = 10000^{-2i/d}, i \in [0, d/2)}。旋转矩阵的构造使得注意力分数 只依赖于相对位置,这赋予了模型相对位置理解能力。

为什么有效

RoPE 的优势在于它保持了相对位置的平移不变性:如果两个 token 的相对位置相同,无论它们的绝对位置在哪里,它们的注意力分数都相同。这使得模型能够自然地处理超出训练长度的序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import torch
import torch.nn as nn
import math

class RoPEMultiHeadAttention(nn.Module):
def __init__(self, d_model, n_heads):
super().__init__()
self.d_model = d_model
self.n_heads = n_heads
self.head_dim = d_model // n_heads

# 初始化旋转角度
inv_freq = 1.0 / (10000 ** (torch.arange(0, self.head_dim, 2).float() / self.head_dim))
self.register_buffer('inv_freq', inv_freq)

self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, d_model)
self.v_proj = nn.Linear(d_model, d_model)
self.out_proj = nn.Linear(d_model, d_model)

def apply_rotary_pos_emb(self, x, freqs):
"""应用旋转位置编码"""
# x shape: [batch, n_heads, seq_len, head_dim]
# freqs shape: [seq_len, head_dim // 2]

x1, x2 = x[..., 0::2], x[..., 1::2]

# 计算旋转后的值
cos = torch.cos(freqs).unsqueeze(0).unsqueeze(0) # [1, 1, seq_len, head_dim//2]
sin = torch.sin(freqs).unsqueeze(0).unsqueeze(0)

x1_rot = x1 * cos - x2 * sin
x2_rot = x1 * sin + x2 * cos

# 重新组合
x_rot = torch.zeros_like(x)
x_rot[..., 0::2] = x1_rot
x_rot[..., 1::2] = x2_rot

return x_rot

def forward(self, x):
batch_size, seq_len, _ = x.shape

# 生成位置频率
t = torch.arange(seq_len, device=x.device).float()
freqs = torch.outer(t, self.inv_freq) # [seq_len, head_dim // 2]

Q = self.q_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
K = self.k_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
V = self.v_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)

# 应用 RoPE
Q = self.apply_rotary_pos_emb(Q, freqs)
K = self.apply_rotary_pos_emb(K, freqs)

# 计算注意力
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
attn_weights = torch.softmax(scores, dim=-1)
attn_output = torch.matmul(attn_weights, V)

attn_output = attn_output.transpose(1, 2).contiguous().view(
batch_size, seq_len, self.d_model
)

return self.out_proj(attn_output)

优势: - 相对位置编码,泛化能力强 - 能够处理任意长度的序列 - 数学上优雅,计算高效

应用: - LLaMA 系列模型使用 RoPE - 广泛用于现代 LLM

Flash Attention

Flash Attention 通过分块计算和在线 softmax 重计算,在保持数值稳定性的同时大幅降低内存使用。

原理

传统注意力机制需要存储完整的注意力矩阵,内存复杂度为

Flash Attention 将计算分块进行:

  1. 分成多个块
  2. 对每个块计算注意力
  3. 使用在线 softmax 更新结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import torch
import torch.nn.functional as F

def flash_attention(q, k, v, block_size=64):
"""
Flash Attention 简化实现
q, k, v: [batch, n_heads, seq_len, head_dim]
"""
batch, n_heads, seq_len, head_dim = q.shape
device = q.device

# 初始化输出和统计量
o = torch.zeros_like(q)
m = torch.full((batch, n_heads, seq_len), float('-inf'), device=device)
l = torch.zeros((batch, n_heads, seq_len), device=device)

# 分块处理
for i in range(0, seq_len, block_size):
q_block = q[:, :, i:i+block_size]

for j in range(0, seq_len, block_size):
k_block = k[:, :, j:j+block_size]
v_block = v[:, :, j:j+block_size]

# 计算注意力分数
s_ij = torch.matmul(q_block, k_block.transpose(-2, -1)) / math.sqrt(head_dim)

# 在线 softmax 更新
m_ij = torch.maximum(m[:, :, i:i+block_size], s_ij.max(dim=-1, keepdim=True)[0])
p_ij = torch.exp(s_ij - m_ij)
l_ij = torch.exp(m[:, :, i:i+block_size] - m_ij) * l[:, :, i:i+block_size] + p_ij.sum(dim=-1, keepdim=True)

# 更新输出
o[:, :, i:i+block_size] = (
torch.exp(m[:, :, i:i+block_size] - m_ij) * o[:, :, i:i+block_size] +
torch.matmul(p_ij, v_block)
) / l_ij

# 更新统计量
m[:, :, i:i+block_size] = m_ij.squeeze(-1)
l[:, :, i:i+block_size] = l_ij.squeeze(-1)

return o

优势: - 内存复杂度从降低到 - 计算速度更快(更好的内存访问模式) - 数值稳定

应用: - 现代 LLM 训练和推理的标准配置 - 支持处理超长序列

Sparse Attention

Sparse Attention 通过只计算部分注意力连接来减少计算量。常见的稀疏模式包括局部注意力、滑动窗口、全局注意力等。

常见稀疏模式

  1. 局部注意力:每个位置只关注附近的 个位置
  2. 滑动窗口:固定大小的窗口在序列上滑动
  3. 全局注意力:少数位置关注所有位置
  4. 随机注意力:随机选择部分位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class SparseAttention(nn.Module):
def __init__(self, d_model, n_heads, window_size=128, num_global_tokens=8):
super().__init__()
self.d_model = d_model
self.n_heads = n_heads
self.head_dim = d_model // n_heads
self.window_size = window_size
self.num_global_tokens = num_global_tokens

self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, d_model)
self.v_proj = nn.Linear(d_model, d_model)
self.out_proj = nn.Linear(d_model, d_model)

def forward(self, x):
batch_size, seq_len, _ = x.shape

Q = self.q_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
K = self.k_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
V = self.v_proj(x).view(batch_size, seq_len, self.n_heads, self.head_dim).transpose(1, 2)

# 创建稀疏注意力掩码
mask = self._create_sparse_mask(seq_len, x.device)

# 计算注意力(只计算掩码为 True 的位置)
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
scores = scores.masked_fill(~mask, float('-inf'))

attn_weights = torch.softmax(scores, dim=-1)
attn_output = torch.matmul(attn_weights, V)

attn_output = attn_output.transpose(1, 2).contiguous().view(
batch_size, seq_len, self.d_model
)

return self.out_proj(attn_output)

def _create_sparse_mask(self, seq_len, device):
"""创建滑动窗口 + 全局注意力的稀疏掩码"""
mask = torch.zeros((seq_len, seq_len), dtype=torch.bool, device=device)

# 滑动窗口注意力
for i in range(seq_len):
start = max(0, i - self.window_size // 2)
end = min(seq_len, i + self.window_size // 2 + 1)
mask[i, start:end] = True

# 全局注意力(前 num_global_tokens 个位置关注所有位置)
global_indices = torch.arange(self.num_global_tokens, device=device)
mask[global_indices, :] = True
mask[:, global_indices] = True

return mask.unsqueeze(0).unsqueeze(0) # [1, 1, seq_len, seq_len]

MoE( Mixture of Experts)架构详解

传统 Transformer 的 FFN 层是密集激活的:每个 token 都会经过同一个前馈网络。 MoE 架构打破了这一限制,通过引入多个专家网络( Expert)和智能路由机制,实现了"稀疏激活":每个 token 只激活部分专家,从而在保持计算成本可控的同时大幅增加模型容量。

MoE 基本原理

MoE 的核心思想借鉴了集成学习:不同专家擅长处理不同类型的输入。门控网络( Gating Network)负责决定每个输入应该路由到哪些专家,而每个专家专注于处理特定模式的数据。

数学表示

对于输入, MoE 层的输出为:

其中: - 是专家数量(通常为 8 、 64 或更多) - 是第 个专家的输出 - 是门控网络的输出,通常使用 Top-k 路由:只选择分数最高的 个专家(通常),然后在这 个专家上做 softmax 归一化

为什么有效

MoE 的优势在于计算效率:如果使用 8 个专家但每次只激活 2 个,那么实际计算量只相当于原来的 2.5 倍(考虑门控网络开销),但模型容量却增加了 4 倍。这使得训练万亿参数模型成为可能,如 GPT-MoE 、 Switch Transformer 等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import torch
import torch.nn as nn
import torch.nn.functional as F

class Expert(nn.Module):
"""单个专家网络"""
def __init__(self, d_model, d_ff):
super().__init__()
self.linear1 = nn.Linear(d_model, d_ff)
self.linear2 = nn.Linear(d_ff, d_model)
self.activation = nn.GELU()

def forward(self, x):
return self.linear2(self.activation(self.linear1(x)))

class MoELayer(nn.Module):
"""MoE 层"""
def __init__(self, d_model, d_ff, num_experts=8, top_k=2):
super().__init__()
self.num_experts = num_experts
self.top_k = top_k

# 创建多个专家
self.experts = nn.ModuleList([
Expert(d_model, d_ff) for _ in range(num_experts)
])

# 门控网络
self.gate = nn.Linear(d_model, num_experts)

def forward(self, x):
"""
x: [batch_size, seq_len, d_model]
"""
batch_size, seq_len, d_model = x.shape

# 计算门控分数
gate_scores = self.gate(x) # [batch_size, seq_len, num_experts]

# Top-k 路由:选择 top_k 个专家
top_k_scores, top_k_indices = torch.topk(
gate_scores, self.top_k, dim=-1
) # [batch_size, seq_len, top_k]

# Softmax 归一化(只在 top_k 上)
gate_probs = F.softmax(top_k_scores, dim=-1)

# 初始化输出
output = torch.zeros_like(x)

# 对每个专家计算输出
for expert_idx in range(self.num_experts):
# 找到使用当前专家的位置
expert_mask = (top_k_indices == expert_idx)

if expert_mask.any():
# 获取对应的输入和权重
expert_input = x[expert_mask.any(dim=-1)]
expert_weights = gate_probs[expert_mask]

# 计算专家输出
expert_output = self.experts[expert_idx](expert_input)

# 加权累加
output[expert_mask.any(dim=-1)] += expert_weights.unsqueeze(-1) * expert_output

return output

负载均衡( Load Balancing)

MoE 的一个关键挑战是确保专家之间的负载均衡。如果某些专家总是被选中而其他专家很少被使用,会导致训练不稳定。

负载均衡损失

其中: - 是第 个专家被选中的频率 - 是门控网络对第 个专家的平均概率 - 是平衡系数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def compute_load_balancing_loss(gate_probs, top_k_indices, num_experts):
"""
计算负载均衡损失
gate_probs: [batch_size, seq_len, top_k]
top_k_indices: [batch_size, seq_len, top_k]
"""
# 计算每个专家被选中的频率
expert_counts = torch.zeros(num_experts, device=gate_probs.device)
for i in range(num_experts):
expert_counts[i] = (top_k_indices == i).sum().float()

expert_freq = expert_counts / expert_counts.sum()

# 计算每个专家的平均概率
expert_probs = torch.zeros(num_experts, device=gate_probs.device)
for i in range(num_experts):
mask = (top_k_indices == i)
if mask.any():
expert_probs[i] = gate_probs[mask].mean()

# 负载均衡损失
balance_loss = (expert_freq * expert_probs).sum()

return balance_loss

MoE 的优势与挑战

优势: - 模型容量大幅增加(可扩展到万亿参数) - 计算成本只与激活的专家数量成正比 - 支持模型并行和专家并行

挑战: - 需要精心设计路由策略 - 负载均衡是关键问题 - 通信开销(在分布式训练中)

应用: - GPT-MoE 、 Switch Transformer 、 GLaM 等模型 - 大规模语言模型训练

模型压缩与量化

大模型部署面临的核心挑战是内存和计算资源。一个 7B 参数的模型在 FP32 精度下需要约 28GB 内存,这对于大多数硬件来说是不可接受的。量化通过降低数值精度来减少内存占用和加速计算,是实际部署中不可或缺的技术。

INT8 量化

INT8 量化将 32 位浮点数映射到 8 位整数,模型大小减少 4 倍,同时推理速度通常能提升 2-4 倍(取决于硬件支持)。

量化公式

量化过程分为两步:量化和反量化。量化将浮点数映射到整数范围:

反量化将整数映射回浮点数:

其中: - 是缩放因子( scale),决定了量化的精度 - 是零点( zero point),用于处理有符号/无符号整数的映射

对称 vs 非对称量化

对称量化中,量化范围是,实现简单但可能浪费精度。非对称量化允许,能更好地利用量化范围,但计算稍复杂。实际应用中,权重通常用对称量化,激活值用非对称量化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import torch
import torch.nn as nn

def quantize_tensor(x, num_bits=8):
"""对称量化"""
# 计算缩放因子
scale = x.abs().max() / (2 ** (num_bits - 1) - 1)

# 量化
q = torch.round(x / scale).clamp(-2**(num_bits-1), 2**(num_bits-1)-1)

# 反量化
x_dequant = q * scale

return q, scale, x_dequant

# 示例:量化线性层
class QuantizedLinear(nn.Module):
def __init__(self, linear_layer):
super().__init__()
self.weight = linear_layer.weight.data
self.bias = linear_layer.bias.data if linear_layer.bias is not None else None

# 量化权重
self.weight_q, self.weight_scale, _ = quantize_tensor(self.weight)

def forward(self, x):
# 反量化权重
weight_dequant = self.weight_q * self.weight_scale

# 计算输出
output = F.linear(x, weight_dequant, self.bias)
return output

INT4 量化

INT4 量化进一步将精度降低到 4 位,模型大小减少 8 倍,但可能带来更大的精度损失。

GPTQ( GPT Quantization)

GPTQ 是一种后训练量化方法,通过逐层优化来最小化量化误差。

原理

GPTQ 对每一层独立进行量化,使用 Hessian 矩阵来指导量化过程:

  1. 计算权重的 Hessian 矩阵2. 按重要性顺序量化权重
  2. 更新未量化的权重以补偿量化误差
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def gptq_quantize_layer(weight, num_bits=4):
"""
GPTQ 量化(简化版)
weight: [out_features, in_features]
"""
out_features, in_features = weight.shape

# 计算 Hessian 矩阵(简化:使用单位矩阵)
H = torch.eye(in_features, device=weight.device)

# 初始化量化权重
weight_q = weight.clone()
quantized = torch.zeros(in_features, dtype=torch.bool, device=weight.device)

# 逐列量化
for col_idx in range(in_features):
if quantized[col_idx]:
continue

# 计算量化误差
w_col = weight_q[:, col_idx]
scale = w_col.abs().max() / (2 ** (num_bits - 1) - 1)
w_col_q = torch.round(w_col / scale).clamp(-2**(num_bits-1), 2**(num_bits-1)-1)
w_col_dequant = w_col_q * scale

# 更新权重以补偿误差
error = w_col - w_col_dequant

# 更新未量化的列
for j in range(in_features):
if not quantized[j] and j != col_idx:
weight_q[:, j] -= error * H[col_idx, j] / H[col_idx, col_idx]

# 保存量化结果
weight_q[:, col_idx] = w_col_dequant
quantized[col_idx] = True

return weight_q

AWQ( Activation-aware Weight Quantization)

AWQ 是一种感知激活的量化方法,通过保护重要的权重通道来保持模型性能。

原理

AWQ 认为不同通道的重要性不同,应该对重要通道使用更高精度:

  1. 分析激活值的重要性
  2. 识别重要通道(通常占 1%)
  3. 对重要通道保持 FP16,其他通道量化到 INT4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def awq_quantize(weight, activation, num_bits=4, preserve_ratio=0.01):
"""
AWQ 量化
weight: [out_features, in_features]
activation: [batch_size, in_features] 用于分析重要性
"""
# 计算每个通道的重要性(使用激活的 L2 范数)
channel_importance = activation.abs().mean(dim=0) # [in_features]

# 选择重要通道
num_preserve = int(in_features * preserve_ratio)
_, important_indices = torch.topk(channel_importance, num_preserve)

# 初始化量化权重
weight_q = weight.clone()

# 量化非重要通道
for col_idx in range(weight.shape[1]):
if col_idx not in important_indices:
w_col = weight[:, col_idx]
scale = w_col.abs().max() / (2 ** (num_bits - 1) - 1)
w_col_q = torch.round(w_col / scale).clamp(-2**(num_bits-1), 2**(num_bits-1)-1)
weight_q[:, col_idx] = w_col_q * scale

return weight_q, important_indices

KV Cache 优化

在自回归生成中,每次生成新 token 时都需要重新计算之前所有 token 的 Key 和 Value 。 KV Cache 通过缓存这些中间结果来避免重复计算。

KV Cache 原理

无 Cache 的计算

生成第 个 token 时,需要计算:

有 Cache 的计算

只需要计算新 token 的,然后拼接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class KVCache:
"""KV Cache 实现"""
def __init__(self, batch_size, n_heads, head_dim, max_len=2048):
self.batch_size = batch_size
self.n_heads = n_heads
self.head_dim = head_dim
self.max_len = max_len

# 初始化 Cache
self.k_cache = torch.zeros(batch_size, n_heads, max_len, head_dim)
self.v_cache = torch.zeros(batch_size, n_heads, max_len, head_dim)
self.cache_len = 0

def update(self, k, v, start_pos=0):
"""
更新 Cache
k, v: [batch_size, n_heads, seq_len, head_dim]
"""
seq_len = k.shape[2]

if start_pos == 0:
# 首次填充
self.k_cache[:, :, :seq_len] = k
self.v_cache[:, :, :seq_len] = v
self.cache_len = seq_len
else:
# 追加新 token
end_pos = start_pos + seq_len
self.k_cache[:, :, start_pos:end_pos] = k
self.v_cache[:, :, start_pos:end_pos] = v
self.cache_len = max(self.cache_len, end_pos)

def get(self, start_pos=0, end_pos=None):
"""获取 Cache"""
if end_pos is None:
end_pos = self.cache_len
return (
self.k_cache[:, :, start_pos:end_pos],
self.v_cache[:, :, start_pos:end_pos]
)

# 使用 KV Cache 的生成示例
def generate_with_kv_cache(model, tokenizer, prompt, max_new_tokens=50):
inputs = tokenizer(prompt, return_tensors="pt")
input_ids = inputs["input_ids"]

# 初始化 KV Cache
kv_cache = KVCache(
batch_size=1,
n_heads=model.config.n_head,
head_dim=model.config.n_embd // model.config.n_head
)

# 处理初始 prompt
with torch.no_grad():
outputs = model(input_ids, use_cache=True)
kv_cache.update(
outputs.past_key_values[0][0], # k
outputs.past_key_values[0][1], # v
start_pos=0
)

# 自回归生成
generated_ids = input_ids.clone()
for _ in range(max_new_tokens):
# 使用 Cache 生成下一个 token
next_token_logits = model(
generated_ids[:, -1:],
past_key_values=kv_cache.get(),
use_cache=True
).logits

next_token = next_token_logits[:, -1, :].argmax(dim=-1, keepdim=True)
generated_ids = torch.cat([generated_ids, next_token], dim=-1)

# 更新 Cache
kv_cache.update(
outputs.past_key_values[0][0],
outputs.past_key_values[0][1],
start_pos=kv_cache.cache_len
)

return tokenizer.decode(generated_ids[0], skip_special_tokens=True)

KV Cache 优化策略

  1. 分块存储:将 Cache 分块存储,支持动态扩展
  2. 压缩:对历史 KV 进行压缩(如使用低精度)
  3. 滑动窗口:只保留最近的 个 token 的 KV

推理优化技术

批处理( Batching)

批处理将多个请求合并处理,提高 GPU 利用率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def batch_generate(model, tokenizer, prompts, batch_size=8):
"""批处理生成"""
results = []

for i in range(0, len(prompts), batch_size):
batch_prompts = prompts[i:i+batch_size]

# Tokenize
inputs = tokenizer(
batch_prompts,
return_tensors="pt",
padding=True,
truncation=True
)

# 生成
outputs = model.generate(**inputs, max_length=100)

# Decode
batch_results = tokenizer.batch_decode(outputs, skip_special_tokens=True)
results.extend(batch_results)

return results

连续批处理( Continuous Batching)

连续批处理允许动态添加和移除请求,提高吞吐量。

量化推理

使用量化模型进行推理,减少内存和计算需求。

1
2
3
4
5
6
7
8
9
10
11
12
from transformers import AutoModelForCausalLM, BitsAndBytesConfig

# 8-bit 量化
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)

model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-hf",
quantization_config=quantization_config
)

模型并行

将模型分布到多个 GPU 上,支持更大的模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
import torch.nn as nn

# 模型并行示例
class ParallelModel(nn.Module):
def __init__(self):
super().__init__()
self.layer1 = nn.Linear(1024, 2048).to('cuda:0')
self.layer2 = nn.Linear(2048, 1024).to('cuda:1')

def forward(self, x):
x = self.layer1(x.to('cuda:0'))
x = self.layer2(x.to('cuda:1'))
return x

实战:部署优化 LLM

使用 vLLM 部署

vLLM 是一个高性能的 LLM 推理和服务框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 安装: pip install vllm

from vllm import LLM, SamplingParams

# 加载模型
llm = LLM(model="meta-llama/Llama-2-7b-hf")

# 设置采样参数
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=100
)

# 生成
prompts = [
"The future of AI is",
"Machine learning is"
]

outputs = llm.generate(prompts, sampling_params)

for output in outputs:
print(f"Prompt: {output.prompt}")
print(f"Generated: {output.outputs[0].text}")

使用 TensorRT-LLM 优化

TensorRT-LLM 是 NVIDIA 的 LLM 推理优化框架。

1
2
3
4
5
6
7
8
9
# TensorRT-LLM 优化流程
# 1. 转换模型
# 2. 构建 TensorRT 引擎
# 3. 部署推理

# 示例命令(需要 TensorRT-LLM 环境)
# trtllm-build --checkpoint_dir ./checkpoints \
# --output_dir ./engines \
# --gemm_plugin float16

性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import torch

def benchmark_model(model, tokenizer, prompt, num_runs=10):
"""性能基准测试"""
inputs = tokenizer(prompt, return_tensors="pt")

# Warmup
for _ in range(3):
_ = model.generate(**inputs, max_length=50)

# 同步 GPU
if torch.cuda.is_available():
torch.cuda.synchronize()

# 测试
start_time = time.time()
for _ in range(num_runs):
outputs = model.generate(**inputs, max_length=50)
if torch.cuda.is_available():
torch.cuda.synchronize()
end_time = time.time()

avg_time = (end_time - start_time) / num_runs
tokens_per_second = 50 / avg_time

print(f"Average generation time: {avg_time:.3f}s")
print(f"Tokens per second: {tokens_per_second:.2f}")

return avg_time, tokens_per_second

❓ Q&A: LLM 架构常见问题

Q1: Encoder-only 、 Decoder-only 和 Encoder-Decoder 架构如何选择?

A: 选择取决于任务类型: - Encoder-only:适合理解任务(分类、 NER 、相似度),需要双向上下文 - Decoder-only:适合生成任务(文本生成、对话),架构简单,训练效率高 - Encoder-Decoder:适合序列到序列任务(翻译、摘要),需要理解输入并生成输出

Q2: RoPE 和 ALiBi 哪个更好?

A: 各有优势: - RoPE:相对位置编码,泛化能力强,被 LLaMA 等主流模型采用 - ALiBi:无需位置编码,外推能力强, BLOOM 使用 - 选择取决于具体需求:如果需要处理超长序列, ALiBi 可能更好;如果需要更好的位置理解, RoPE 可能更合适

Q3: Flash Attention 能带来多少性能提升?

A: Flash Attention 主要优势在内存和长序列: - 内存:从降低到,可以处理 4-8 倍更长的序列 - 速度:在长序列(>2048 tokens)上通常有 2-4 倍加速 - 短序列:提升不明显,甚至可能略慢(由于分块开销)

Q4: MoE 架构如何实现负载均衡?

A: 负载均衡是关键挑战: 1. 路由策略:使用 Top-k 路由,确保每个输入激活固定数量的专家 2. 负载均衡损失:在损失函数中加入负载均衡项,鼓励均匀分布 3. 辅助损失:监控专家使用频率,对不平衡进行惩罚 4. 动态路由:根据负载动态调整路由策略

Q5: INT4 量化会损失多少精度?

A: 精度损失取决于: - 模型大小:大模型(>7B)通常损失较小(<2%) - 量化方法: GPTQ/AWQ 等先进方法损失更小 - 任务类型:生成任务通常比理解任务更敏感 - 激活量化:只量化权重损失较小,同时量化激活损失更大

Q6: KV Cache 能节省多少计算?

A: KV Cache 在自回归生成中至关重要: - 计算节省:避免重复计算,理论上可以节省 的计算( 是序列长度) - 实际效果:在生成 100 tokens 时,可以节省约 99% 的注意力计算 - 内存开销:需要额外存储 KV,内存增加约 2 倍(对于每个 token)

Q7: 如何选择量化方法( GPTQ vs AWQ)?

A: - GPTQ:后训练量化,适合通用场景,量化速度快 - AWQ:感知激活,通常精度更高,但需要校准数据 - 建议:如果追求最高精度,选择 AWQ;如果需要快速量化,选择 GPTQ

Q8: MoE 模型在推理时如何选择专家?

A: 推理时的专家选择: 1. Top-k 路由:选择门控分数最高的 个专家(通常) 2. 确定性路由:使用 argmax 选择单个专家(更快但可能精度略低) 3. 负载感知路由:考虑专家负载,避免某些专家过载

Q9: 长文本处理技术的适用场景?

A: - ALiBi:适合需要外推到超长序列的场景(如长文档处理) - RoPE:适合需要精确位置理解的场景(如代码生成) - Flash Attention:所有需要处理长序列的场景都应该使用 - Sparse Attention:适合对精度要求不高但需要处理超长序列的场景

Q10: 如何优化 LLM 推理延迟?

A: 多管齐下: 1. 量化:使用 INT8/INT4 量化减少计算 2. KV Cache:必须使用,避免重复计算 3. 批处理:合并请求提高 GPU 利用率 4. 模型并行:将大模型分布到多个 GPU 5. 编译优化:使用 TensorRT 、 ONNX Runtime 等 6. 硬件加速:使用专用 AI 芯片(如 H100)


本文深入探讨了大语言模型架构的各个方面,从基础的架构选择到高级的优化技术。理解这些技术对于构建高效、可扩展的 LLM 应用至关重要。在实际应用中,需要根据具体需求选择合适的架构和技术组合,在性能和成本之间找到平衡。

  • 本文标题:自然语言处理(九)—— 大语言模型架构深度解析
  • 本文作者:Chen Kai
  • 创建时间:2024-03-21 09:15:00
  • 本文链接:https://www.chenk.top/%E8%87%AA%E7%84%B6%E8%AF%AD%E8%A8%80%E5%A4%84%E7%90%86%EF%BC%88%E4%B9%9D%EF%BC%89%E2%80%94%E2%80%94-%E5%A4%A7%E8%AF%AD%E8%A8%80%E6%A8%A1%E5%9E%8B%E6%9E%B6%E6%9E%84%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论