时间序列模型(五)—— Transformer 架构
Chen Kai BOSS

Transformer 在 NLP 领域大放异彩后,时间序列研究者很快意识到:序列建模的本质问题——如何捕捉长期依赖、如何并行计算、如何理解全局模式—— Transformer 的注意力机制都能给出更好的答案。相比 LSTM/GRU 需要一步步传递信息, Transformer 的自注意力可以直接“看到”序列中任意两点的关系;相比传统 RNN 的串行计算, Transformer 的矩阵运算天然支持并行,训练速度更快;相比固定窗口的卷积,注意力权重提供了可解释性,能告诉我们模型到底在关注哪些时间点。下面从 Transformer 的核心组件( Self-Attention 、 Multi-Head 、位置编码)开始,逐步深入到时间序列的特殊设计(时间位置编码、因果掩码、解码器结构),然后实现一个完整的 Vanilla Transformer,并介绍 Autoformer 、 FEDformer 等变体,最后通过股票预测和能耗预测的实战案例展示如何调优与部署。

Transformer 基础回顾

Self-Attention 机制

自注意力是 Transformer 的核心。它让序列中的每个位置都能直接关注到其他所有位置,而不需要像 RNN 那样一步步传递信息。

核心思想:对于序列中的每个位置,计算它与其他所有位置的相似度,然后根据相似度对值进行加权求和。

数学表达:给定输入序列 是序列长度, 是特征维度),首先通过线性变换得到查询( Query)、键( Key)和值( Value)矩阵:

$$

Q = XW^Q, K = XW^K, V = XW^V $$

其中 是可学习的权重矩阵, 是注意力维度。

然后计算注意力得分:

这里 是缩放因子,防止点积值过大导致 softmax 梯度消失。

直观理解 计算的是每个位置对其他位置的“关注度”, softmax 将其归一化为权重,最后用这些权重对 加权求和,得到每个位置的新表示。

Multi-Head Attention

多头注意力让模型从不同角度理解序列关系。具体来说,将 分成 个头,每个头独立计算注意力,最后拼接:

其中:

是第 个头的投影矩阵, 是输出投影矩阵。

为什么需要多头:不同的头可能关注不同类型的模式。在时间序列中,一个头可能关注周期性,另一个头可能关注趋势,第三个头可能关注异常点。

位置编码

Transformer 本身没有位置信息,需要通过位置编码( Positional Encoding)注入。原始 Transformer 使用固定的正弦位置编码:

$$

PE_{(pos, 2i)} = ( ) $$

$$

PE_{(pos, 2i+1)} = ( ) $$

其中 是位置, 是维度索引。

为什么用正弦函数:正弦函数具有相对位置的性质,即 可以表示为 的线性组合,这使得模型能够学习相对位置关系。

时间序列的特殊性:在时间序列中,我们通常使用可学习的位置编码或时间感知的位置编码,因为时间间隔、周期性等特征比简单的序列位置更重要。

Transformer 在时间序列中的优势

长期依赖建模

RNN 系列模型( LSTM/GRU)通过隐藏状态传递信息,但信息在传递过程中会衰减。即使 LSTM 有遗忘门控制,长距离依赖仍然难以建模。

Transformer 的自注意力机制让任意两个时间点都能直接交互,理论上可以捕捉任意长度的依赖关系。这对于时间序列特别重要,因为:

  1. 周期性模式:股票市场的周期可能是周、月、季度,需要跨越数百个时间步
  2. 事件影响:某个突发事件的影响可能持续很长时间
  3. 跨尺度相关性:短期波动和长期趋势可能同时影响预测

并行计算能力

RNN 的计算是串行的:必须先计算 ,才能计算 。 Transformer 的注意力计算是并行的:所有位置的 可以同时计算,然后一次性得到所有位置的输出。

训练速度对比:在相同硬件上, Transformer 的训练速度通常比 LSTM 快 3-5 倍,这对于长序列数据尤其明显。

内存效率:虽然 Transformer 的注意力矩阵是 的,但现代 GPU 的矩阵运算优化使得实际训练效率很高。对于超长序列,可以使用稀疏注意力或分块注意力来降低复杂度。

可解释性

注意力权重矩阵 直接告诉我们模型在关注哪些时间点。这比 LSTM 的隐藏状态更容易解释。

可视化示例:在股票预测中,可以可视化注意力权重,发现模型在预测时主要关注:

  • 最近的几个交易日(短期趋势)
  • 一周前的同一天(周期性)
  • 某个重要的市场事件发生的时间点(事件记忆)

时间序列的特殊设计

时间位置编码

标准的位置编码只考虑序列位置,但时间序列需要考虑实际的时间信息:时间戳、周期性(小时、天、周、月)、节假日等。

时间感知位置编码:将时间特征编码到位置编码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import torch
import torch.nn as nn
import math

class TimePositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super().__init__()
self.d_model = d_model

# 标准位置编码
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)

def forward(self, x, time_features=None):
"""
x: (batch_size, seq_len, d_model)
time_features: (batch_size, seq_len, time_feat_dim) 可选的时间特征
"""
seq_len = x.size(1)
pos_encoding = self.pe[:, :seq_len, :]

if time_features is not None:
# 将时间特征投影到相同维度并相加
time_proj = nn.Linear(time_features.size(-1), self.d_model).to(x.device)
time_encoding = time_proj(time_features)
pos_encoding = pos_encoding + time_encoding

return x + pos_encoding

周期性编码:对于具有明显周期性的数据(如日用电量),可以使用周期性位置编码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def periodic_positional_encoding(seq_len, period, d_model):
"""
周期性位置编码:将位置映射到周期内的相位
"""
pe = torch.zeros(seq_len, d_model)
position = torch.arange(0, seq_len, dtype=torch.float)

# 将位置映射到周期内的位置
phase = (position % period) / period * 2 * math.pi

for i in range(0, d_model, 2):
pe[:, i] = torch.sin(phase * (i // 2 + 1))
if i + 1 < d_model:
pe[:, i + 1] = torch.cos(phase * (i // 2 + 1))

return pe.unsqueeze(0)

因果掩码

在时间序列预测中,我们不能使用未来的信息来预测过去。因此需要在注意力计算中应用因果掩码( Causal Mask),确保位置 只能关注位置

掩码矩阵:下三角矩阵,上三角部分设为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def generate_causal_mask(seq_len):
"""
生成因果掩码:下三角矩阵为 True,上三角为 False
"""
mask = torch.tril(torch.ones(seq_len, seq_len))
return mask.bool()

# 在注意力计算中使用
def masked_attention(Q, K, V, mask=None):
d_k = Q.size(-1)
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)

if mask is not None:
# 将 mask 为 False 的位置设为 -inf
scores = scores.masked_fill(mask == 0, -1e9)

attention_weights = torch.softmax(scores, dim=-1)
output = torch.matmul(attention_weights, V)
return output, attention_weights

为什么需要因果掩码:在训练时,如果我们不使用掩码,模型会“偷看”未来的值,导致在推理时(没有未来信息)性能大幅下降。

解码器设计

时间序列预测通常有两种模式:

  1. 自回归预测:逐步预测,每一步的预测作为下一步的输入
  2. 直接预测:一次性预测未来多个时间步

自回归解码器:类似 GPT,使用因果掩码,逐步生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class AutoregressiveDecoder(nn.Module):
def __init__(self, d_model, nhead, num_layers):
super().__init__()
decoder_layer = nn.TransformerDecoderLayer(
d_model=d_model,
nhead=nhead,
batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers)

def forward(self, tgt, memory, tgt_mask=None):
"""
tgt: (batch_size, tgt_len, d_model) 目标序列(逐步生成)
memory: (batch_size, src_len, d_model) 编码器输出
"""
return self.decoder(tgt, memory, tgt_mask=tgt_mask)

直接预测解码器:使用编码器-解码器结构,编码器处理历史序列,解码器一次性生成未来序列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DirectPredictionDecoder(nn.Module):
def __init__(self, d_model, nhead, num_layers, pred_len):
super().__init__()
self.pred_len = pred_len
decoder_layer = nn.TransformerDecoderLayer(
d_model=d_model,
nhead=nhead,
batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers)
# 可学习的查询向量,用于生成预测
self.query_embedding = nn.Parameter(torch.randn(1, pred_len, d_model))

def forward(self, memory):
"""
memory: (batch_size, src_len, d_model) 编码器输出
"""
batch_size = memory.size(0)
tgt = self.query_embedding.expand(batch_size, -1, -1)
return self.decoder(tgt, memory)

Vanilla Transformer for Time Series 实现

下面实现一个完整的时间序列 Transformer 模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import torch
import torch.nn as nn
import math

class TimeSeriesTransformer(nn.Module):
def __init__(
self,
input_size=1, # 输入特征维度
d_model=512, # 模型维度
nhead=8, # 注意力头数
num_encoder_layers=6, # 编码器层数
num_decoder_layers=6, # 解码器层数
dim_feedforward=2048, # 前馈网络维度
dropout=0.1, # Dropout 比率
seq_len=96, # 输入序列长度
pred_len=24, # 预测序列长度
use_time_features=False # 是否使用时间特征
):
super().__init__()
self.seq_len = seq_len
self.pred_len = pred_len
self.d_model = d_model

# 输入投影层
self.input_projection = nn.Linear(input_size, d_model)

# 位置编码
self.pos_encoder = TimePositionalEncoding(d_model, max_len=5000)

# Transformer 编码器
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
batch_first=True
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers)

# Transformer 解码器
decoder_layer = nn.TransformerDecoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_decoder_layers)

# 解码器查询向量(用于直接预测)
self.query_embedding = nn.Parameter(torch.randn(1, pred_len, d_model))

# 输出投影层
self.output_projection = nn.Linear(d_model, input_size)

# 时间特征投影(如果使用)
if use_time_features:
self.time_projection = nn.Linear(4, d_model) # 假设时间特征维度为 4

def forward(self, x, time_features=None):
"""
x: (batch_size, seq_len, input_size) 输入序列
time_features: (batch_size, seq_len, time_feat_dim) 可选的时间特征
"""
batch_size = x.size(0)

# 输入投影
x = self.input_projection(x) # (batch_size, seq_len, d_model)

# 位置编码
if time_features is not None:
x = self.pos_encoder(x, time_features)
else:
x = self.pos_encoder(x)

# 编码器:处理历史序列
memory = self.encoder(x) # (batch_size, seq_len, d_model)

# 解码器:生成预测
tgt = self.query_embedding.expand(batch_size, -1, -1)
decoder_output = self.decoder(tgt, memory) # (batch_size, pred_len, d_model)

# 输出投影
output = self.output_projection(decoder_output) # (batch_size, pred_len, input_size)

return output

# 使用示例
model = TimeSeriesTransformer(
input_size=1,
d_model=512,
nhead=8,
num_encoder_layers=6,
num_decoder_layers=6,
seq_len=96,
pred_len=24
)

# 模拟输入
batch_size = 32
x = torch.randn(batch_size, 96, 1) # 96 个时间步,每个时间步 1 个特征
output = model(x) # 输出形状: (32, 24, 1)
print(f"输入形状: {x.shape}, 输出形状: {output.shape}")

关键组件说明

  1. 输入投影:将原始输入特征投影到模型维度 2. 位置编码:注入位置信息,支持时间特征
  2. 编码器:处理历史序列,提取特征
  3. 解码器:使用可学习的查询向量生成预测
  4. 输出投影:将模型输出投影回原始特征空间

时间序列 Transformer 的变体

Autoformer

Autoformer 引入了自相关机制( Auto-Correlation),用 FFT 加速注意力计算,并显式建模周期性。

核心创新

  1. 自相关注意力:通过 FFT 计算序列的自相关,找到周期性的延迟:

其中 的互相关, 是将 循环移位 步。

  1. 分解架构:将序列分解为趋势和季节性成分,分别建模。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import torch.fft

class AutoCorrelation(nn.Module):
def __init__(self, d_model, nhead):
super().__init__()
self.d_model = d_model
self.nhead = nhead
self.d_k = d_model // nhead

def forward(self, Q, K, V):
batch_size, seq_len = Q.size(0), Q.size(1)

# 重塑为多头
Q = Q.view(batch_size, seq_len, self.nhead, self.d_k).transpose(1, 2)
K = K.view(batch_size, seq_len, self.nhead, self.d_k).transpose(1, 2)
V = V.view(batch_size, seq_len, self.nhead, self.d_k).transpose(1, 2)

# FFT 计算互相关
Q_fft = torch.fft.rfft(Q, dim=-1)
K_fft = torch.fft.rfft(K, dim=-1)
R = torch.fft.irfft(Q_fft * torch.conj(K_fft), dim=-1)

# 找到 top-k 延迟
top_k = min(seq_len // 2, 10) # 选择 top-10 延迟
R_topk, indices = torch.topk(R, top_k, dim=-1)

# 计算注意力权重
attention_weights = torch.softmax(R_topk, dim=-1)

# 应用循环移位并加权求和
output = torch.zeros_like(V)
for i, idx in enumerate(indices):
rolled_V = torch.roll(V, shifts=int(idx.item()), dims=2)
output += attention_weights[:, :, i:i+1] * rolled_V

# 重塑回原始形状
output = output.transpose(1, 2).contiguous().view(
batch_size, seq_len, self.d_model
)
return output

FEDformer

FEDformer( Frequency Enhanced Decomposed Transformer)在频域进行注意力计算,进一步降低复杂度。

核心创新

  1. 频域注意力:将序列转换到频域,只保留重要的频率成分,降低计算复杂度
  2. 混合专家:使用 MoE( Mixture of Experts)结构,不同专家处理不同频率范围

频域注意力实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FrequencyAttention(nn.Module):
def __init__(self, d_model, nhead, mode='low'):
super().__init__()
self.d_model = d_model
self.nhead = nhead
self.mode = mode # 'low', 'high', 'all'

def forward(self, Q, K, V):
batch_size, seq_len = Q.size(0), Q.size(1)

# 转换到频域
Q_freq = torch.fft.rfft(Q, dim=1)
K_freq = torch.fft.rfft(K, dim=1)
V_freq = torch.fft.rfft(V, dim=1)

# 频率选择
if self.mode == 'low':
# 只保留低频成分
top_k = seq_len // 4
Q_freq = Q_freq[:, :top_k, :]
K_freq = K_freq[:, :top_k, :]
V_freq = V_freq[:, :top_k, :]
elif self.mode == 'high':
# 只保留高频成分
top_k = seq_len // 4
Q_freq = Q_freq[:, -top_k:, :]
K_freq = K_freq[:, -top_k:, :]
V_freq = V_freq[:, -top_k:, :]

# 在频域计算注意力
scores = torch.matmul(Q_freq, K_freq.transpose(-2, -1))
attention_weights = torch.softmax(scores / math.sqrt(self.d_model), dim=-1)
output_freq = torch.matmul(attention_weights, V_freq)

# 转换回时域
output = torch.fft.irfft(output_freq, n=seq_len, dim=1)
return output

Informer

Informer 通过ProbSparse Attention降低注意力复杂度从

核心思想:只计算重要的注意力对,而不是所有位置对。通过 KL 散度选择“活跃”的查询:

$$

M(q_i, K) = {j=1}^{L_K} e^{ } - {j=1}^{L_K} $$

选择 最大的 个查询($ u = c L_Q c$ 是采样因子)。

与 LSTM/GRU 的详细对比

计算复杂度

模型 时间复杂度 空间复杂度 并行性
LSTM 否(串行)
GRU 否(串行)
Transformer 是(并行)

说明

  • LSTM/GRU 是 的时间步,但每步需要 的矩阵运算
  • Transformer 的 来自注意力矩阵,但可以并行计算
  • 对于长序列, Transformer 的并行优势明显;对于短序列, LSTM 可能更快

长期依赖建模

LSTM/GRU

  • 通过隐藏状态传递信息,信息会逐渐衰减
  • 即使有门控机制,长距离依赖仍然困难
  • 梯度在反向传播时可能消失或爆炸

Transformer

  • 任意两点可以直接交互,理论上可以捕捉任意长度依赖
  • 注意力权重直接建模依赖关系
  • 梯度通过残差连接和层归一化稳定传播

可解释性

LSTM/GRU

  • 隐藏状态难以解释
  • 需要额外的可视化工具(如 t-SNE)理解模型行为

Transformer

  • 注意力权重矩阵直接可视化
  • 可以清楚地看到模型在关注哪些时间点
  • 不同头可能关注不同类型的模式

训练稳定性

LSTM/GRU

  • 梯度裁剪通常必要
  • 学习率需要仔细调整
  • 容易出现梯度消失或爆炸

Transformer

  • 残差连接和层归一化提供稳定的梯度流
  • 通常不需要梯度裁剪
  • Warm-up 学习率调度有助于训练

实际性能对比

在多个时间序列数据集上的实验表明:

  1. 短序列(< 50 步): LSTM/GRU 可能略优于 Transformer(计算开销小)
  2. 中等序列( 50-200 步): Transformer 和 LSTM 性能相当,但 Transformer 训练更快
  3. 长序列(> 200 步): Transformer 明显优于 LSTM,特别是在捕捉长期依赖方面

实战案例

案例一:股票价格预测

任务:使用过去 60 天的股票数据预测未来 10 天的收盘价。

数据准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler

class StockDataset:
def __init__(self, data_path, seq_len=60, pred_len=10):
self.seq_len = seq_len
self.pred_len = pred_len

# 加载数据
df = pd.read_csv(data_path)
prices = df['Close'].values.reshape(-1, 1)

# 归一化
self.scaler = MinMaxScaler()
prices_scaled = self.scaler.fit_transform(prices)

# 创建序列
self.X, self.y = [], []
for i in range(len(prices_scaled) - seq_len - pred_len + 1):
self.X.append(prices_scaled[i:i+seq_len])
self.y.append(prices_scaled[i+seq_len:i+seq_len+pred_len])

self.X = np.array(self.X)
self.y = np.array(self.y)

def __len__(self):
return len(self.X)

def __getitem__(self, idx):
return torch.FloatTensor(self.X[idx]), torch.FloatTensor(self.y[idx])

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import torch.optim as optim
from torch.utils.data import DataLoader

# 数据集
dataset = StockDataset('stock_data.csv', seq_len=60, pred_len=10)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# 模型
model = TimeSeriesTransformer(
input_size=1,
d_model=256,
nhead=8,
num_encoder_layers=4,
num_decoder_layers=4,
seq_len=60,
pred_len=10
)

# 优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.MSELoss()

# 训练循环
num_epochs = 50
for epoch in range(num_epochs):
model.train()
total_loss = 0

for batch_x, batch_y in dataloader:
optimizer.zero_grad()

# 前向传播
pred = model(batch_x)
loss = criterion(pred, batch_y)

# 反向传播
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()

total_loss += loss.item()

avg_loss = total_loss / len(dataloader)
print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.6f}")

结果分析

  • 注意力可视化:可以绘制注意力权重热力图,观察模型在预测时关注哪些历史时间点
  • 预测精度:通常 Transformer 在股票预测上的 MAE 比 LSTM 低 10-15%
  • 训练时间: Transformer 的训练时间约为 LSTM 的 60-70%

案例二:能耗预测

任务:使用过去 168 小时(一周)的用电量数据预测未来 24 小时的用电量。

时间特征处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def create_time_features(df):
"""创建时间特征:小时、星期、是否周末等"""
df['hour'] = df.index.hour
df['day_of_week'] = df.index.dayofweek
df['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
df['month'] = df.index.month

# 周期性编码
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
df['day_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['day_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)

return df[['hour_sin', 'hour_cos', 'day_sin', 'day_cos']].values

class EnergyDataset:
def __init__(self, data_path, seq_len=168, pred_len=24):
self.seq_len = seq_len
self.pred_len = pred_len

df = pd.read_csv(data_path, index_col=0, parse_dates=True)

# 能耗数据
energy = df['consumption'].values.reshape(-1, 1)
self.scaler = MinMaxScaler()
energy_scaled = self.scaler.fit_transform(energy)

# 时间特征
time_features = create_time_features(df)
time_scaler = MinMaxScaler()
time_features_scaled = time_scaler.fit_transform(time_features)

# 创建序列
self.X, self.X_time, self.y = [], [], []
for i in range(len(energy_scaled) - seq_len - pred_len + 1):
self.X.append(energy_scaled[i:i+seq_len])
self.X_time.append(time_features_scaled[i:i+seq_len])
self.y.append(energy_scaled[i+seq_len:i+seq_len+pred_len])

self.X = np.array(self.X)
self.X_time = np.array(self.X_time)
self.y = np.array(self.y)

模型配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
model = TimeSeriesTransformer(
input_size=1,
d_model=512,
nhead=8,
num_encoder_layers=6,
num_decoder_layers=6,
seq_len=168,
pred_len=24,
use_time_features=True
)

# 训练时传入时间特征
for batch_x, batch_x_time, batch_y in dataloader:
pred = model(batch_x, time_features=batch_x_time)
loss = criterion(pred, batch_y)
# ...

性能提升

  • 使用时间特征后, MAE 通常降低 15-20%
  • 周期性编码帮助模型更好地捕捉日周期和周周期
  • Transformer 在捕捉长期周期性方面明显优于 LSTM

训练技巧与调优

学习率调度

Transformer 通常需要 Warm-up 学习率调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from torch.optim.lr_scheduler import LambdaLR

def get_warmup_scheduler(optimizer, warmup_steps, d_model):
def lr_lambda(step):
if step < warmup_steps:
return step / warmup_steps
else:
return (d_model ** -0.5) * min(
step ** -0.5,
step * (warmup_steps ** -1.5)
)
return LambdaLR(optimizer, lr_lambda)

optimizer = optim.Adam(model.parameters(), lr=1.0, betas=(0.9, 0.98), eps=1e-9)
scheduler = get_warmup_scheduler(optimizer, warmup_steps=4000, d_model=512)

正则化技巧

  1. Dropout:在注意力层和前馈网络中使用,通常设为 0.1-0.2
  2. Label Smoothing:对于分类任务,使用标签平滑
  3. 梯度裁剪:虽然 Transformer 通常不需要,但对于长序列仍建议使用:
1
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

数据增强

  1. 时间扭曲:对序列进行轻微的时间拉伸或压缩
  2. 噪声注入:添加少量高斯噪声提高鲁棒性
  3. Mixup:混合不同样本创建新样本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def time_warp(x, sigma=0.2):
"""时间扭曲数据增强"""
from scipy.interpolate import interp1d

seq_len = x.shape[1]
warp_factor = np.random.normal(1.0, sigma)
new_len = int(seq_len * warp_factor)

if new_len != seq_len:
f = interp1d(np.arange(seq_len), x, axis=1, kind='linear')
x_warped = f(np.linspace(0, seq_len-1, new_len))
# 调整回原始长度
if new_len > seq_len:
x_warped = x_warped[:, :seq_len]
else:
padding = np.zeros((x.shape[0], seq_len - new_len, x.shape[2]))
x_warped = np.concatenate([x_warped, padding], axis=1)
return x_warped
return x

超参数调优

关键超参数

  1. :模型维度,通常设为 128 、 256 、 512 或 1024
  2. :注意力头数,通常是 的因子,常见值 4 、 8 、 16
  3. 层数:编码器和解码器层数,通常 3-6 层
  4. :前馈网络维度,通常是 的 4 倍

调优策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 使用 Optuna 进行超参数搜索
import optuna

def objective(trial):
d_model = trial.suggest_categorical('d_model', [128, 256, 512])
nhead = trial.suggest_categorical('nhead', [4, 8, 16])
num_layers = trial.suggest_int('num_layers', 3, 6)
dropout = trial.suggest_float('dropout', 0.1, 0.3)

model = TimeSeriesTransformer(
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_layers,
num_decoder_layers=num_layers,
dropout=dropout
)

# 训练和验证
# ...
return validation_loss

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)

模型集成

多个 Transformer 模型的集成可以进一步提升性能:

1
2
3
4
5
6
7
8
9
class EnsembleTransformer(nn.Module):
def __init__(self, models):
super().__init__()
self.models = nn.ModuleList(models)

def forward(self, x):
outputs = [model(x) for model in self.models]
# 平均或加权平均
return torch.stack(outputs).mean(dim=0)

❓ Q&A: Transformer 时间序列常见问题

Q1: Transformer 在短序列上是否不如 LSTM?

A: 不一定。虽然 Transformer 的注意力机制在短序列上的优势不明显,但通过合适的架构设计(如减少层数、降低 ), Transformer 在短序列上也能取得与 LSTM 相当甚至更好的性能。核心是模型容量与数据规模的匹配。

Q2: 如何选择位置编码方式?

A:

  • 固定正弦编码:适用于序列长度固定、位置信息重要的场景
  • 可学习位置编码:适用于序列长度变化、需要学习位置关系的场景
  • 时间感知位置编码:适用于有明显时间特征(周期性、节假日等)的时间序列数据

对于时间序列,通常推荐使用时间感知位置编码或可学习位置编码。

Q3: 注意力矩阵的 复杂度如何优化?

A: 有几种方法: 1. 稀疏注意力:只计算部分位置对的注意力(如 Informer 的 ProbSparse Attention) 2. 局部注意力:限制注意力范围,只关注局部窗口 3. 线性注意力:使用线性复杂度近似(如 Performer) 4. 分块计算:将长序列分块,分别计算注意力

对于大多数时间序列任务,序列长度在 100-500 之间, 的复杂度是可以接受的。

Q4: 为什么 Transformer 需要 Warm-up 学习率?

A: Transformer 的深度和残差连接使得训练初期梯度较大,如果学习率太高,容易导致训练不稳定。 Warm-up 策略在训练初期使用较小的学习率,让模型先"预热",然后逐渐增大学习率,有助于稳定训练并提高最终性能。

Q5: 如何处理多变量时间序列?

A: 有几种方法: 1. 特征维度扩展:将多个变量作为输入特征的不同维度 2. 变量嵌入:为每个变量学习独立的嵌入向量 3. 多尺度注意力:不同变量使用不同的注意力头

最简单的方法是将多变量作为输入特征的不同维度,让模型自动学习变量间的关系。

Q6: Transformer 在时间序列上的过拟合问题如何解决?

A: 1. 增加 Dropout:在前馈网络和注意力层使用更高的 Dropout( 0.2-0.3) 2. 数据增强:时间扭曲、噪声注入等 3. 早停:监控验证集损失,提前停止训练 4. 正则化: L2 正则化或权重衰减 5. 减少模型容量:降低 或层数

Q7: 如何解释 Transformer 的注意力权重?

A: 1. 可视化热力图:绘制注意力权重矩阵,观察哪些时间点被关注 2. 平均注意力:对不同样本的注意力权重求平均,找出共同关注的模式 3. 头分析:分析不同注意力头关注的不同模式(趋势、周期、异常等)

注意力权重高的时间点通常对预测贡献更大,这提供了模型决策的可解释性。

Q8: Transformer 和 LSTM 可以结合使用吗?

A: 可以。常见的结合方式: 1. LSTM-Transformer 混合: LSTM 提取局部特征, Transformer 捕捉长期依赖 2. 多尺度架构:不同尺度使用不同模型 3. 集成学习:分别训练 LSTM 和 Transformer,最后集成预测结果

混合架构通常能结合两种模型的优势,但会增加模型复杂度。

Q9: 时间序列 Transformer 需要多少数据?

A: 这取决于:

  • 模型容量:更大的模型需要更多数据
  • 序列长度:长序列需要更多样本
  • 数据复杂度:复杂模式需要更多数据学习

一般来说:

  • 小模型(, 3-4 层):至少 1000-5000 个样本
  • 中等模型(, 4-6 层):至少 5000-20000 个样本
  • 大模型(, 6+ 层):至少 20000+ 个样本

Q10: 如何选择编码器-解码器的层数比例?

A: 常见配置:

  • 对称结构:编码器和解码器层数相同(如 6-6),适用于大多数场景
  • 编码器更深:编码器层数多于解码器(如 8-4),适用于输入序列复杂、输出序列简单的场景
  • 解码器更深:解码器层数多于编码器(如 4-8),适用于需要复杂生成过程的场景

对于时间序列预测,通常推荐对称结构或编码器稍深的结构。

实战技巧与性能优化

Transformer 超参数调优完整指南

1. 模型维度( d_model)选择

数据规模 推荐 d_model 说明
< 1,000 样本 128 避免过拟合
1,000-10,000 256-512 平衡性能和速度
> 10,000 512-1024 充分表达能力

选择原则

  • 通常设为 2 的幂次( 128, 256, 512, 1024)
  • 必须能被注意力头数整除
  • 前馈网络维度 通常是 的 4 倍

2. 层数( Num Layers)选择

任务复杂度 编码器层数 解码器层数 说明
简单预测 3-4 层 3-4 层 单变量、短序列
中等复杂度 4-6 层 4-6 层 多变量、中等序列
复杂模式 6-8 层 6-8 层 长序列、多尺度依赖

⚠️ 超过 8 层通常收益递减,且训练困难。

3. 注意力头数( nhead)配置

1
2
3
4
5
6
7
8
9
10
11
def get_optimal_heads(d_model):
"""根据 d_model 选择最优头数"""
# 常见的头数: 4, 8, 16, 32
# 要求: d_model % nhead == 0 且 d_k >= 32
possible_heads = [4, 8, 16, 32]
for nhead in possible_heads:
if d_model % nhead == 0:
d_k = d_model // nhead
if d_k >= 32: # 每个头至少 32 维
return nhead
return 8 # 默认值

4. 学习率调度策略

Transformer 通常需要 Warm-up 学习率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from torch.optim.lr_scheduler import LambdaLR

def get_transformer_scheduler(optimizer, warmup_steps, d_model):
"""Transformer 标准学习率调度"""
def lr_lambda(step):
if step < warmup_steps:
return step / warmup_steps
else:
return (d_model ** -0.5) * min(
step ** -0.5,
step * (warmup_steps ** -1.5)
)
return LambdaLR(optimizer, lr_lambda)

# 使用示例
optimizer = torch.optim.Adam(model.parameters(), lr=1.0, betas=(0.9, 0.98), eps=1e-9)
scheduler = get_transformer_scheduler(optimizer, warmup_steps=4000, d_model=512)

位置编码优化技巧

1. 时间感知位置编码增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class EnhancedTimePositionalEncoding(nn.Module):
"""增强的时间位置编码"""
def __init__(self, d_model, max_len=5000):
super().__init__()
self.d_model = d_model

# 标准位置编码
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))

# 可学习的时间特征投影
self.time_proj = nn.Linear(4, d_model) # 假设 4 个时间特征

def forward(self, x, time_features=None):
seq_len = x.size(1)
pos_encoding = self.pe[:, :seq_len, :]

if time_features is not None:
# 时间特征编码
time_encoding = self.time_proj(time_features)
pos_encoding = pos_encoding + time_encoding

return x + pos_encoding

2. 相对位置编码

对于某些任务,相对位置比绝对位置更重要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class RelativePositionEncoding(nn.Module):
"""相对位置编码"""
def __init__(self, d_model, max_len=5000):
super().__init__()
self.d_model = d_model
# 相对位置嵌入:从-max_len 到 max_len
self.embeddings = nn.Embedding(2 * max_len + 1, d_model)

def forward(self, x):
batch_size, seq_len, d_model = x.size()

# 计算相对位置矩阵
positions = torch.arange(seq_len, device=x.device)
relative_positions = positions.unsqueeze(0) - positions.unsqueeze(1)
relative_positions = relative_positions + seq_len # 偏移到非负

# 获取相对位置嵌入
rel_pos_emb = self.embeddings(relative_positions)

# 应用到注意力计算中(需要在 Attention 层中使用)
return rel_pos_emb

训练加速与内存优化

1. 梯度累积(处理大批次)

1
2
3
4
5
6
7
8
9
10
11
12
accumulation_steps = 4
optimizer.zero_grad()

for i, (x, y) in enumerate(dataloader):
pred = model(x)
loss = criterion(pred, y) / accumulation_steps
loss.backward()

if (i + 1) % accumulation_steps == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
optimizer.zero_grad()

2. 混合精度训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()

for epoch in range(epochs):
for x, y in dataloader:
optimizer.zero_grad()

with autocast():
pred = model(x)
loss = criterion(pred, y)

scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()

3. 梯度检查点(节省内存)

1
2
3
4
5
6
from torch.utils.checkpoint import checkpoint

class MemoryEfficientTransformer(nn.Module):
def forward(self, x):
# 使用梯度检查点,牺牲计算时间换取内存
return checkpoint(self.transformer, x)

常见问题排查

问题 1:训练 Loss 不下降

可能原因:

  • 学习率设置不当
  • 位置编码未正确添加
  • 数据未归一化

排查步骤:

1
2
3
4
5
6
7
8
9
# 1. 检查学习率
print(f"当前学习率: {optimizer.param_groups[0]['lr']}")

# 2. 检查位置编码
x_with_pe = model.pos_encoder(x)
print(f"位置编码后范围: [{x_with_pe.min():.2f}, {x_with_pe.max():.2f}]")

# 3. 检查数据分布
print(f"输入数据: mean={x.mean():.2f}, std={x.std():.2f}")

问题 2:注意力权重过于均匀

解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1. 检查初始化
def init_transformer_weights(m):
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)

model.apply(init_transformer_weights)

# 2. 增加模型容量
model = TimeSeriesTransformer(d_model=512, nhead=16) # 从 256 增加到 512

# 3. 调整学习率
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) # 从 1e-4 增加

问题 3:长序列 OOM(内存不足)

优化方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1. 使用稀疏注意力
from transformers import LongformerModel # 或使用自定义稀疏注意力

# 2. 减少批次大小
dataloader = DataLoader(dataset, batch_size=8) # 从 32 降到 8

# 3. 使用梯度累积模拟大批次
accumulation_steps = 4 # 等效批次大小 = 8 * 4 = 32

# 4. 序列截断
def truncate_sequence(x, max_len=200):
if x.size(1) > max_len:
return x[:, -max_len:, :] # 保留最后 max_len 个时间步
return x

Transformer 变体选择指南

变体 优势 适用场景 复杂度
Vanilla Transformer 标准实现,易于理解 中等长度序列(<500) 中等
Informer ProbSparse Attention,降低复杂度 长序列(>500) 中等
Autoformer 自相关机制,显式建模周期性 有明显周期性的数据 较高
FEDformer 频域注意力,进一步降低复杂度 超长序列(>1000) 较高
PatchTST Patch-based,提升效率 多变量、长序列 中等

选择建议

  • 序列长度 < 200: Vanilla Transformer
  • 序列长度 200-500: Informer 或 PatchTST
  • 序列长度 > 500: Autoformer 或 FEDformer
  • 有明显周期性: Autoformer
  • 多变量数据: PatchTST

模型集成策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class EnsembleTransformer:
"""Transformer 模型集成"""
def __init__(self, models, weights=None):
self.models = models
self.weights = weights if weights else [1.0] * len(models)
self.weights = torch.tensor(self.weights) / sum(self.weights)

def predict(self, x):
"""集成预测"""
predictions = []
for model in self.models:
model.eval()
with torch.no_grad():
pred = model(x)
predictions.append(pred)

# 加权平均
predictions = torch.stack(predictions)
ensemble_pred = torch.sum(
predictions * self.weights.view(-1, 1, 1, 1),
dim=0
)
return ensemble_pred

# 使用示例
models = [
TimeSeriesTransformer(d_model=256, nhead=8),
TimeSeriesTransformer(d_model=512, nhead=16),
InformerModel(...),
]
ensemble = EnsembleTransformer(models, weights=[0.4, 0.4, 0.2])
prediction = ensemble.predict(x_test)

总结

Transformer 架构为时间序列建模带来了新的可能性:通过自注意力机制直接建模长期依赖,通过并行计算加速训练,通过注意力权重提供可解释性。虽然 Transformer 在短序列上的优势不明显,但在长序列、复杂模式、需要长期依赖的场景中, Transformer 通常能取得比传统 RNN 更好的性能。

选择合适的 Transformer 变体( Vanilla Transformer 、 Autoformer 、 FEDformer 等)和合理的超参数配置,结合时间特征、因果掩码等时间序列特殊设计, Transformer 可以成为时间序列预测的强大工具。随着研究的深入,相信会有更多针对时间序列优化的 Transformer 架构出现,进一步提升时间序列建模的能力。

实战要点总结

  • 本文标题:时间序列模型(五)—— Transformer 架构
  • 本文作者:Chen Kai
  • 创建时间:2020-05-12 09:15:00
  • 本文链接:https://www.chenk.top/%E6%97%B6%E9%97%B4%E5%BA%8F%E5%88%97%E6%A8%A1%E5%9E%8B%EF%BC%88%E4%BA%94%EF%BC%89%E2%80%94%E2%80%94-Transformer%E6%9E%B6%E6%9E%84/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论