时间序列模型(二)—— LSTM
Chen Kai BOSS

RNN 在长序列上“记不住”的问题,本质是信息与梯度在时间维度上不断衰减或爆炸。 LSTM 的设计很像给网络加了一个可控的“记账本”:信息要不要写进去、要不要擦掉、要不要读出来,都由门控来决定,从而把长期依赖变成可学习、可控的路径。本文会把 LSTM 的三个门和记忆单元逐个拆开讲清楚:每个公式对应的直觉是什么、它是怎么缓解梯度问题的,以及在时间序列预测里该如何组织输入/输出、怎么看训练稳定性与效果。

LSTM 的基本结构

记忆单元与门控机制

LSTM 的核心是其独特的记忆单元和三个门(输入门、遗忘门、输出门),这些门通过不同的方式控制信息在记忆单元中的流动和存储。可以把 LSTM 比作一个智能记事本。这个记事本不仅能记录信息,还能智能地决定哪些信息应该记住,哪些信息应该忘记,以及哪些信息应该输出。

  1. 记忆单元( Memory Cell):存储长期信息的单元。
  2. 输入门( Input Gate):控制新信息如何流入记忆单元。
  3. 遗忘门( Forget Gate):决定记忆单元中哪些信息需要被遗忘。
  4. 输出门( Output Gate):控制记忆单元的输出。

数学公式

为当前时间步, 为输入向量, 为隐藏状态, 为记忆单元状态, 为权重矩阵, 为偏置向量。具体的计算步骤如下:

  1. 遗忘门:决定哪些信息需要遗忘。遗忘门通过一个 sigmoid 函数 来控制遗忘的比例,输出一个 0 到 1 之间的数值。这个数值越接近 1,表示越不需要遗忘;越接近 0,表示越需要遗忘。 $$

f_t = (W_f + b_f) $$

  1. 输入门:决定哪些新信息需要加入记忆单元。输入门同样通过一个 sigmoid 函数来控制新信息的加入比例,输入门的输出是一个 0 到 1 之间的数值,表示新信息加入的程度。然后,通过一个 tanh 函数生成新的候选记忆 ,这个候选记忆可以加入到记忆单元中。 $$

i_t = (W_i + b_i) _t = (W_C + b_C) $$

  1. 更新记忆单元:结合遗忘门和输入门的作用更新记忆单元状态。记忆单元的状态 由遗忘门的输出和之前的记忆状态 以及输入门的输出和新的候选记忆 共同决定。 表示逐元素乘法。 $$

C_t = f_t C_{t-1} + i_t _t $$

  1. 输出门:决定记忆单元的输出。输出门通过一个 sigmoid 函数控制记忆单元的输出比例,最终的隐藏状态 由输出门的输出和当前记忆单元的状态 经过 tanh 函数处理后得到。 $$

o_t = (W_o + b_o)

h_t = o_t (C_t) $$

LSTM 的 Python 实现:从基础结构到时间序列预测

问题背景:传统 RNN 在处理长序列时面临梯度消失问题,无法学习长期依赖。 LSTM 通过引入门控机制和独立的细胞状态( Cell State),解决了信息在时间维度上的衰减问题。核心挑战在于如何正确初始化和管理隐藏状态、细胞状态,以及如何组织输入输出以适应时间序列预测任务。

解决思路:采用"门控+记忆"的双路径设计。遗忘门控制历史信息的保留,输入门控制新信息的注入,输出门控制信息的输出。细胞状态作为"高速公路"直接传递信息,避免梯度衰减。隐藏状态作为"过滤后的输出"供后续层使用。

设计考虑: 1. 状态初始化:隐藏状态和细胞状态通常初始化为零,但遗忘门偏置应初始化为 1(鼓励初始时保留信息) 2. 批处理组织:使用batch_first=True使输入形状更直观( batch, seq_len, features) 3. 多层堆叠:多层 LSTM 可以学习层次化的特征,但需要层间 dropout 防止过拟合 4. 输出选择:时间序列预测通常使用最后一个时间步的输出,或所有时间步的输出进行注意力加权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import torch
import torch.nn as nn

class LSTM(nn.Module):
"""
LSTM 模型:用于时间序列预测的基础实现

核心组件:
- 遗忘门:决定丢弃多少历史信息
- 输入门:决定存储多少新信息
- 输出门:决定输出多少信息
- 细胞状态:长期记忆的"高速公路"
- 隐藏状态:当前时刻的输出表示

Parameters:
-----------
input_size : int
输入特征的维度。例如:
- 单变量时间序列: input_size=1
- 多变量时间序列: input_size=特征数量(如温度、湿度、气压 → 3)
hidden_size : int
隐藏状态的维度,也是细胞状态的维度
- 控制模型的表达能力:越大表达能力越强,但参数越多
- 典型值: 32-256,根据数据量和任务复杂度选择
num_layers : int
LSTM 的层数(堆叠的 LSTM 层数)
- 单层:适合简单任务,训练快
- 2-3 层:大多数任务的最佳选择
- 4 层以上:容易梯度消失,需要残差连接等技巧

Attributes:
-----------
hidden_size : int
隐藏状态维度
num_layers : int
LSTM 层数
lstm : nn.LSTM
PyTorch 的 LSTM 层,自动实现所有门控机制

Notes:
------
- batch_first=True:输入输出形状为 (batch, seq_len, features)
- 默认不使用 dropout,多层时建议添加 dropout 参数
- 初始状态通常为零,但可以通过自定义初始化改进
"""
def __init__(self, input_size, hidden_size, num_layers):
super(LSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers

# 定义 LSTM 层
# batch_first=True:输入形状为 (batch_size, seq_len, input_size)
# 而不是默认的 (seq_len, batch_size, input_size)
self.lstm = nn.LSTM(
input_size, # 输入特征维度
hidden_size, # 隐藏状态维度
num_layers, # LSTM 层数
batch_first=True # 批次维度在前,更符合直觉
)

def forward(self, x):
"""
前向传播:处理输入序列并返回 LSTM 输出

工作流程:
1. 初始化隐藏状态和细胞状态(通常为零)
2. 将输入序列逐时间步输入 LSTM
3. LSTM 内部计算:遗忘门→输入门→更新细胞状态→输出门→更新隐藏状态
4. 返回所有时间步的输出

Parameters:
-----------
x : torch.Tensor
输入张量,形状为 (batch_size, sequence_length, input_size)
例如:(32, 50, 10) 表示 32 个样本,每个样本 50 个时间步,每步 10 个特征

Returns:
--------
out : torch.Tensor
输出张量,形状为 (batch_size, sequence_length, hidden_size)
包含每个时间步的隐藏状态
例如:(32, 50, 20) 表示 32 个样本, 50 个时间步,每步 20 维隐藏状态

Notes:
------
- h0 和 c0 初始化为零是常见做法,但可以改进(如遗忘门偏置初始化为 1)
- 返回的 out 包含所有时间步的输出,通常只使用最后一个时间步 out[:, -1, :]
- 如果需要多层间的隐藏状态,可以返回 (h_n, c_n)
"""
batch_size = x.size(0)

# 初始化隐藏状态 h0:形状 (num_layers, batch_size, hidden_size)
# 第一维是层数:多层 LSTM 时,每层有独立的隐藏状态
h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)

# 初始化细胞状态 c0:形状 (num_layers, batch_size, hidden_size)
# 细胞状态是 LSTM 的"长期记忆",独立于隐藏状态
c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)

# LSTM 前向传播
# 输入: x (batch, seq_len, input_size) 和初始状态 (h0, c0)
# 输出: out (batch, seq_len, hidden_size) 和最终状态 (h_n, c_n)
# out 包含所有时间步的隐藏状态, h_n 是最后一层的最终隐藏状态
out, (h_n, c_n) = self.lstm(x, (h0, c0))

# 返回所有时间步的输出(用于后续处理,如注意力机制)
# 如果只需要最后一个时间步,可以使用 out[:, -1, :]
return out

# 使用示例:时间序列预测
input_size = 10 # 输入特征数(如 10 个传感器)
hidden_size = 20 # 隐藏状态维度
num_layers = 2 # 2 层 LSTM 堆叠

# 创建 LSTM 模型
lstm = LSTM(input_size, hidden_size, num_layers)

# 准备输入数据
# batch_size=32: 32 个样本
# sequence_length=50:每个样本 50 个时间步
# input_size=10:每个时间步 10 个特征
x = torch.randn(32, 50, 10)

# 前向传播
output = lstm(x)

# 输出形状:(32, 50, 20)
# - 32 个样本
# - 50 个时间步
# - 每步 20 维隐藏状态
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")

# 时间序列预测:通常使用最后一个时间步的输出
last_output = output[:, -1, :] # (32, 20)
print(f"最后时间步输出形状: {last_output.shape}")

# 添加全连接层进行预测
fc = nn.Linear(hidden_size, 1) # 预测 1 个值(如未来 1 天的销量)
prediction = fc(last_output) # (32, 1)
print(f"预测形状: {prediction.shape}")

关键点解读

  1. 隐藏状态 vs 细胞状态:隐藏状态 是"过滤后的输出",经过输出门处理,供后续层使用;细胞状态 是"原始记忆",直接传递信息,避免梯度衰减。两者维度相同但作用不同。

  2. 多层 LSTM 的堆叠:多层 LSTM 中,第 层的输出作为第 层的输入。底层学习局部模式(如相邻时间步的关系),高层学习全局模式(如长期趋势)。但层数过多会导致梯度消失,通常 2-3 层足够。

  3. 批处理的组织batch_first=True使输入形状为 而不是,更符合直觉。但需要注意:初始状态 的形状始终是,其中 是层数。

常见问题

  1. Q: 为什么初始状态要设置为零?
    • A: 零初始化是常见做法,但可以改进。更好的做法是将遗忘门偏置初始化为 1(鼓励初始时保留信息),这有助于梯度流动和长期记忆。
  2. Q: 如何选择 hidden_size 和 num_layers?
    • A: hidden_size 通常 64-128,根据数据量调整; num_layers 通常 2-3 层,超过 4 层收益递减。可以通过交叉验证选择最优组合。
  3. Q: 输出 out 和 h_n 有什么区别?
    • A: out包含所有时间步的隐藏状态h_n只包含最后一层的最终隐藏状态。如果只需要最后一个时间步,用out[:, -1, :];如果需要多层最终状态,用h_n[-1](最后一层)。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 完整的时间序列预测示例
class LSTMForecaster(nn.Module):
"""LSTM 时间序列预测模型"""
def __init__(self, input_size, hidden_size, num_layers, output_size=1, dropout=0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size, hidden_size, num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0 # 层间 dropout
)
self.fc = nn.Linear(hidden_size, output_size)
self.dropout = nn.Dropout(dropout)

def forward(self, x):
# LSTM 处理序列
out, _ = self.lstm(x) # (batch, seq_len, hidden_size)

# 使用最后一个时间步的输出
last_output = out[:, -1, :] # (batch, hidden_size)

# Dropout 防止过拟合
last_output = self.dropout(last_output)

# 全连接层输出预测
prediction = self.fc(last_output) # (batch, output_size)
return prediction

# 创建模型
model = LSTMForecaster(input_size=10, hidden_size=64, num_layers=2, output_size=1)

# 训练示例
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 模拟训练数据
x_train = torch.randn(100, 50, 10) # 100 个样本, 50 个时间步, 10 个特征
y_train = torch.randn(100, 1) # 100 个样本,预测 1 个值

# 前向传播
pred = model(x_train)
loss = criterion(pred, y_train)

# 反向传播
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # 梯度裁剪
optimizer.step()

print(f"训练损失: {loss.item():.4f}")

LSTM 的高级应用

注意力机制与 LSTM 的结合

注意力机制最早在机器翻译任务中引入,其思想是让模型在进行预测时,不是简单地依赖于最后一个隐藏状态,而是通过一种加权的方式,利用整个输入序列的所有隐藏状态。这个加权的过程通过注意力得分来实现,这些得分表示了每个时间步的重要性。注意力机制( Attention Mechanism)通过赋予输入序列中不同部分不同的重要性权重,进一步提升 LSTM 的性能。常见的注意力机制有 Bahdanau Attention 和 Luong Attention 。

Bahdanau Attention:让 LSTM 关注重要时间步

问题背景:标准 LSTM 只使用最后一个隐藏状态进行预测,但时间序列中关键信息可能分布在不同时间步(如周期峰值、异常点、趋势转折点)。如果关键信息在序列中间,最后一个隐藏状态可能已经"遗忘"了这些信息。

解决思路: Bahdanau 注意力机制(也称为加性注意力)允许模型动态关注所有历史时间步。通过计算当前查询( query)与所有键( key)的相似度,得到注意力权重,然后对所有值( value)进行加权求和,生成上下文向量。这样模型可以直接访问任何历史时间步的信息,而不依赖隐藏状态的传递。

设计考虑: 1. 加性注意力:使用 MLP(线性层+tanh)计算相似度,比点积注意力更灵活但计算量更大 2. 对齐模型self.attn学习查询和键之间的对齐关系,self.v将对齐分数映射为标量 3. 上下文向量:加权求和后的上下文向量融合了所有时间步的信息,权重由模型自动学习 4. 时间序列应用:在时间序列预测中,查询通常是当前时刻或预测时刻的表示,键值对是历史所有时刻的 LSTM 输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import torch.nn.functional as F

class BahdanauAttention(nn.Module):
"""
Bahdanau 注意力机制(加性注意力)实现

核心思想:通过学习的对齐模型计算查询与所有键的相似度
数学表达: score(q, k) = v^T · tanh(W · [q; k])
注意力权重:α_i = softmax(score(q, k_i))
上下文向量: c = Σ(α_i · v_i)

Parameters:
-----------
hidden_size : int
隐藏状态的维度,必须与 LSTM 的 hidden_size 一致

Attributes:
-----------
attn : nn.Linear
对齐模型:将[查询; 键]映射到 hidden_size 维
输入维度: hidden_size * 2(查询和键拼接)
输出维度: hidden_size
v : nn.Parameter
可学习的参数向量,用于将对齐分数映射为标量
形状:(hidden_size,)

Notes:
------
- 这是加性注意力( Additive Attention),计算复杂度 O(n · d ²)
- 相比点积注意力,更灵活但计算更慢
- 适用于查询和键维度不同的场景
"""
def __init__(self, hidden_size):
super(BahdanauAttention, self).__init__()
# 对齐模型:学习查询和键之间的对齐关系
# 输入:拼接的[查询, 键],维度 hidden_size * 2
# 输出:对齐分数,维度 hidden_size
self.attn = nn.Linear(hidden_size * 2, hidden_size)

# 可学习的参数向量:将对齐分数映射为标量注意力得分
# 通过矩阵乘法 v^T · energy 得到标量分数
self.v = nn.Parameter(torch.rand(hidden_size))

def forward(self, hidden, encoder_outputs):
"""
计算注意力权重

工作流程:
1. 将查询( hidden)扩展到所有时间步
2. 计算查询与每个键( encoder_outputs)的对齐分数
3. 通过 softmax 归一化得到注意力权重

Parameters:
-----------
hidden : torch.Tensor
查询向量,形状 (batch_size, hidden_size)
通常是解码器的当前隐藏状态或预测时刻的表示
encoder_outputs : torch.Tensor
编码器的所有输出,形状 (batch_size, seq_len, hidden_size)
包含所有时间步的隐藏状态(作为键和值)

Returns:
--------
attention_weights : torch.Tensor
注意力权重,形状 (batch_size, 1, seq_len)
每行的权重和为 1(经过 softmax 归一化)
"""
seq_len = encoder_outputs.size(1)

# 将查询扩展到所有时间步,以便与每个键计算相似度
# hidden: (batch, hidden_size) → (batch, seq_len, hidden_size)
hidden = hidden.repeat(seq_len, 1, 1).transpose(0, 1)

# 计算注意力得分(能量)
attn_energies = self.score(hidden, encoder_outputs)

# Softmax 归一化:将得分转换为概率分布
# dim=1:在 seq_len 维度上归一化,使得每个查询对所有键的权重和为 1
attention_weights = F.softmax(attn_energies, dim=1)

# 添加维度以便后续矩阵乘法:(batch, seq_len) → (batch, 1, seq_len)
return attention_weights.unsqueeze(1)

def score(self, hidden, encoder_outputs):
"""
计算注意力得分(能量)

核心计算:
1. 拼接查询和键:[hidden; encoder_outputs]
2. 通过对齐模型: W · [hidden; encoder_outputs]
3. Tanh 激活: tanh(...)
4. 与参数向量 v 相乘: v^T · tanh(...)

Parameters:
-----------
hidden : torch.Tensor
扩展后的查询,形状 (batch_size, seq_len, hidden_size)
encoder_outputs : torch.Tensor
编码器输出(键),形状 (batch_size, seq_len, hidden_size)

Returns:
--------
energy : torch.Tensor
注意力得分,形状 (batch_size, seq_len)
每个值表示对应时间步的重要性
"""
# 步骤 1:拼接查询和键
# hidden: (batch, seq_len, hidden_size)
# encoder_outputs: (batch, seq_len, hidden_size)
# cat 结果: (batch, seq_len, hidden_size * 2)
concat = torch.cat((hidden, encoder_outputs), 2)

# 步骤 2:通过对齐模型计算对齐分数
# self.attn: Linear(hidden_size*2 → hidden_size)
# energy: (batch, seq_len, hidden_size)
energy = torch.tanh(self.attn(concat))

# 步骤 3:转置以便与参数向量 v 相乘
# energy: (batch, seq_len, hidden_size) → (batch, hidden_size, seq_len)
energy = energy.transpose(2, 1)

# 步骤 4:扩展参数向量 v 以匹配批次大小
# self.v: (hidden_size,) → (batch, 1, hidden_size)
v = self.v.repeat(encoder_outputs.size(0), 1).unsqueeze(1)

# 步骤 5:矩阵乘法计算最终得分
# v: (batch, 1, hidden_size)
# energy: (batch, hidden_size, seq_len)
# bmm 结果: (batch, 1, seq_len)
energy = torch.bmm(v, energy)

# 移除中间维度:(batch, 1, seq_len) → (batch, seq_len)
return energy.squeeze(1)

# 使用示例: LSTM + Attention 时间序列预测
class AttentionLSTM(nn.Module):
"""带注意力机制的 LSTM 时间序列预测模型"""
def __init__(self, input_size, hidden_size, num_layers, output_size=1):
super().__init__()
# LSTM 编码器
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
# 注意力机制
self.attention = BahdanauAttention(hidden_size)
# 输出层:输入是上下文向量+最后隐藏状态
self.fc = nn.Linear(hidden_size * 2, output_size)

def forward(self, x):
"""
前向传播: LSTM 编码 + 注意力加权 + 预测

工作流程:
1. LSTM 处理输入序列,得到所有时间步的隐藏状态
2. 使用最后一个隐藏状态作为查询
3. 计算注意力权重,生成上下文向量
4. 拼接上下文向量和最后隐藏状态,输出预测
"""
# LSTM 编码
lstm_out, (h_n, c_n) = self.lstm(x) # lstm_out: (batch, seq_len, hidden_size)

# 使用最后一层的最终隐藏状态作为查询
query = h_n[-1] # (batch, hidden_size)

# 计算注意力权重
attn_weights = self.attention(query, lstm_out) # (batch, 1, seq_len)

# 生成上下文向量:加权求和所有时间步的隐藏状态
# attn_weights: (batch, 1, seq_len)
# lstm_out: (batch, seq_len, hidden_size)
# context: (batch, 1, hidden_size) → (batch, hidden_size)
context = torch.bmm(attn_weights, lstm_out).squeeze(1)

# 拼接上下文向量和查询(最后隐藏状态)
combined = torch.cat([context, query], dim=1) # (batch, hidden_size * 2)

# 输出预测
output = self.fc(combined) # (batch, output_size)
return output, attn_weights.squeeze(1)

# 使用示例
model = AttentionLSTM(input_size=10, hidden_size=64, num_layers=2, output_size=1)
x = torch.randn(32, 50, 10)
pred, attn_weights = model(x)

print(f"预测形状: {pred.shape}") # (32, 1)
print(f"注意力权重形状: {attn_weights.shape}") # (32, 50)
print(f"注意力权重示例(第一个样本): {attn_weights[0][:5]}") # 前 5 个时间步的权重

关键点解读

  1. 加性注意力的计算流程: Bahdanau 注意力使用 MLP(线性层+tanh)计算相似度,公式为$ e_i = v^T (W[h; k_i]) e_i = q^T k_i$ 更灵活,可以学习复杂的对齐关系,但计算复杂度更高。

  2. 上下文向量的作用:上下文向量Double subscripts: use braces to clarify c = _i _i h_i 融合了所有历史时间步的信息,权重 由模型自动学习。如果某个时间步很重要(如周期峰值),其权重会较大;如果不重要(如噪声),权重会较小。

  3. 与 LSTM 的互补: LSTM 通过隐藏状态传递信息(可能衰减),注意力直接访问所有时间步(无衰减)。两者结合: LSTM 捕捉局部模式和顺序依赖,注意力捕捉全局模式和重要时间步。

常见问题

  1. Q: Bahdanau 注意力和点积注意力有什么区别?
    • A: Bahdanau 使用 MLP 计算相似度(加性),点积注意力直接计算(乘性)。 Bahdanau 更灵活但计算慢,点积更快但要求查询和键维度相同。
  2. Q: 注意力权重如何解释?
    • A: 权重 表示"预测时对第 个时间步的关注程度"。可视化权重可以发现模型关注哪些时间步(如周期峰值、异常点、趋势转折点)。
  3. Q: 注意力机制会增加多少计算量?
    • A: 时间复杂度从( LSTM)增加到( LSTM+Attention),其中 是序列长度, 是隐藏维度。对于长序列(>1000 步),考虑使用稀疏注意力。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 可视化注意力权重
import matplotlib.pyplot as plt
import seaborn as sns

def visualize_attention(attention_weights, timestamps=None):
"""可视化注意力权重分布"""
attn = attention_weights[0].detach().cpu().numpy() # 第一个样本

plt.figure(figsize=(12, 6))
if timestamps is not None:
plt.plot(timestamps, attn, 'o-', linewidth=2, markersize=8)
plt.xlabel('时间')
else:
plt.plot(attn, 'o-', linewidth=2, markersize=8)
plt.xlabel('时间步')

plt.ylabel('注意力权重')
plt.title('LSTM 注意力权重分布(哪些时间步最重要)')
plt.grid(True, alpha=0.3)

# 标注权重最大的时间步
max_idx = np.argmax(attn)
plt.axvline(x=max_idx, color='r', linestyle='--', alpha=0.5)
plt.text(max_idx, attn[max_idx], f'最大权重: {attn[max_idx]:.3f}',
ha='center', va='bottom')
plt.show()

# 使用模型进行预测并可视化注意力
model.eval()
with torch.no_grad():
pred, attn_weights = model(x_test)
visualize_attention(attn_weights)

LSTM 编码器-解码器:序列到序列预测

问题背景:时间序列预测中,有时需要预测未来多个时间步(多步预测),而不是只预测下一步。简单的方法是递归预测(用预测值作为下一步输入),但误差会累积。编码器-解码器结构将"编码历史信息"和"生成未来序列"分离,解码器可以更好地利用编码器的完整上下文。

解决思路:编码器将整个输入序列压缩为固定大小的上下文向量(通过最终隐藏状态),解码器基于这个上下文逐步生成未来序列。解码器每一步的输入可以是: 1)前一步的预测值(自回归), 2)编码器的上下文向量, 3)注意力加权的编码器输出。

设计考虑: 1. 上下文传递:编码器的最终状态 作为解码器的初始状态,传递整个序列的信息 2. 自回归生成:解码器逐步生成,每一步的输入是前一步的输出(训练时可以用真实值,测试时用预测值) 3. 注意力增强:可以在编码器和解码器之间加入注意力机制,让解码器动态关注编码器的不同部分 4. 多步预测:解码器生成多个时间步,每个时间步的输出作为下一步的输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class EncoderLSTM(nn.Module):
"""
LSTM 编码器:将输入序列编码为固定大小的上下文表示

核心功能:处理整个输入序列,生成包含序列信息的隐藏状态
输出:所有时间步的隐藏状态 + 最终状态(作为解码器的初始状态)

Parameters:
-----------
input_size : int
输入特征维度
hidden_size : int
隐藏状态维度(也是上下文向量的维度)
num_layers : int
LSTM 层数

Returns:
--------
out : torch.Tensor
所有时间步的隐藏状态,形状 (batch, seq_len, hidden_size)
(hn, cn) : tuple
最终隐藏状态和细胞状态,形状 (num_layers, batch, hidden_size)
作为解码器的初始状态
"""
def __init__(self, input_size, hidden_size, num_layers):
super(EncoderLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

def forward(self, x):
"""
编码输入序列

Parameters:
-----------
x : torch.Tensor
输入序列,形状 (batch_size, input_seq_len, input_size)

Returns:
--------
out : torch.Tensor
所有时间步的编码输出,形状 (batch, input_seq_len, hidden_size)
(hn, cn) : tuple
最终状态,用于初始化解码器
"""
# 初始化状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

# LSTM 编码
out, (hn, cn) = self.lstm(x, (h0, c0))

# 返回所有时间步的输出和最终状态
return out, (hn, cn)

class DecoderLSTM(nn.Module):
"""
LSTM 解码器:基于编码器的上下文生成未来序列

核心功能:接收编码器的上下文,逐步生成未来时间步的预测
工作方式:自回归生成,每一步的输入是前一步的输出

Parameters:
-----------
hidden_size : int
隐藏状态维度(必须与编码器相同)
output_size : int
输出维度(预测值的维度,通常等于 input_size)
num_layers : int
LSTM 层数(通常与编码器相同)

Returns:
--------
output : torch.Tensor
当前时间步的预测,形状 (batch, output_size)
(hn, cn) : tuple
更新后的隐藏状态和细胞状态,用于下一步预测
"""
def __init__(self, hidden_size, output_size, num_layers):
super(DecoderLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers

# 解码器 LSTM:输入维度=hidden_size(接收编码器的上下文)
self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)

# 输出层:将隐藏状态映射到预测值
self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x, hidden):
"""
解码一步:生成一个时间步的预测

Parameters:
-----------
x : torch.Tensor
当前输入,形状 (batch_size, 1, hidden_size)
通常是前一步的输出(经过 embedding 或线性变换)
hidden : tuple
隐藏状态和细胞状态 (h_n, c_n)
初始时来自编码器,后续来自前一步的解码器输出

Returns:
--------
output : torch.Tensor
当前时间步的预测,形状 (batch_size, output_size)
(hn, cn) : tuple
更新后的状态,用于下一步预测
"""
# LSTM 解码
out, (hn, cn) = self.lstm(x, hidden)

# 使用最后一个时间步的输出(实际上 x 只有 1 个时间步)
# 映射到预测值
output = self.fc(out[:, -1, :])

return output, (hn, cn)

# 完整示例: Encoder-Decoder 多步预测
class Seq2SeqLSTM(nn.Module):
"""序列到序列 LSTM 模型:用于多步时间序列预测"""
def __init__(self, input_size, hidden_size, num_layers, output_seq_len):
super().__init__()
self.output_seq_len = output_seq_len

# 编码器
self.encoder = EncoderLSTM(input_size, hidden_size, num_layers)

# 解码器
self.decoder = DecoderLSTM(hidden_size, input_size, num_layers)

# 输入投影:将预测值映射回 hidden_size(作为下一步解码器的输入)
self.input_proj = nn.Linear(input_size, hidden_size)

def forward(self, x, teacher_forcing_ratio=0.5):
"""
前向传播:编码输入序列,解码生成未来序列

Parameters:
-----------
x : torch.Tensor
输入序列,形状 (batch, input_seq_len, input_size)
teacher_forcing_ratio : float
Teacher Forcing 比例,训练时使用真实值作为下一步输入的概率
- 1.0:总是使用真实值(训练快但可能过拟合)
- 0.0:总是使用预测值(更接近测试场景但训练慢)
- 0.5:混合使用(平衡训练速度和泛化能力)

Returns:
--------
outputs : torch.Tensor
预测的未来序列,形状 (batch, output_seq_len, input_size)
"""
batch_size = x.size(0)

# 步骤 1:编码输入序列
encoder_out, (h_n, c_n) = self.encoder(x)

# 步骤 2:初始化解码器
# 使用编码器的最终状态作为解码器的初始状态
decoder_hidden = (h_n, c_n)

# 初始输入:使用输入序列的最后一个时间步
decoder_input = x[:, -1:, :] # (batch, 1, input_size)
decoder_input = self.input_proj(decoder_input) # (batch, 1, hidden_size)

# 步骤 3:逐步解码生成未来序列
outputs = []
for t in range(self.output_seq_len):
# 解码一步
output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
outputs.append(output)

# 准备下一步的输入
# Teacher Forcing:训练时有一定概率使用真实值
if self.training and torch.rand(1).item() < teacher_forcing_ratio:
# 使用真实值(需要提供 target,这里简化处理)
decoder_input = self.input_proj(output.unsqueeze(1))
else:
# 使用预测值(自回归)
decoder_input = self.input_proj(output.unsqueeze(1))

# 堆叠所有时间步的输出
return torch.stack(outputs, dim=1) # (batch, output_seq_len, input_size)

# 使用示例
model = Seq2SeqLSTM(input_size=10, hidden_size=64, num_layers=2, output_seq_len=5)
x = torch.randn(32, 50, 10) # 输入:过去 50 个时间步
pred = model(x) # 预测:未来 5 个时间步
print(f"输入形状: {x.shape}") # (32, 50, 10)
print(f"预测形状: {pred.shape}") # (32, 5, 10)

关键点解读

  1. 上下文传递机制:编码器的最终状态 包含整个输入序列的信息,作为解码器的初始状态。这相当于将"整个历史"压缩到固定大小的向量中,解码器基于这个向量生成未来。

  2. 自回归生成的挑战:解码器逐步生成,每一步依赖前一步的输出。如果前一步预测错误,误差会累积传播。 Teacher Forcing 通过训练时使用真实值缓解这个问题,但可能导致训练-测试不一致。

  3. 注意力机制的增强:标准 Encoder-Decoder 依赖固定大小的上下文向量,可能信息瓶颈。加入注意力后,解码器每一步可以动态关注编码器的不同部分,信息容量更大。

常见问题

  1. Q: Teacher Forcing 的作用是什么?
    • A: 训练时使用真实值作为下一步输入,避免误差累积,加速训练。但可能导致模型在测试时(只能用预测值)表现差。解决方案:逐步降低 Teacher Forcing 比例,或使用 Scheduled Sampling 。
  2. Q: 如何选择 output_seq_len(预测步数)?
    • A: 根据业务需求选择。通常短期预测( 1-5 步)更准确,长期预测(>20 步)误差累积严重。可以通过实验选择最优步数,平衡准确性和实用性。
  3. Q: 编码器和解码器必须使用相同的 hidden_size 吗?
    • A: 不一定,但通常相同更简单。如果不同,需要在编码器和解码器之间加入投影层,将编码器的 hidden_size 映射到解码器的 hidden_size 。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 训练 Encoder-Decoder 模型
model = Seq2SeqLSTM(input_size=10, hidden_size=64, num_layers=2, output_seq_len=10)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练数据:输入序列和对应的目标序列
x_train = torch.randn(100, 50, 10) # 输入:过去 50 步
y_train = torch.randn(100, 10, 10) # 目标:未来 10 步

# 训练循环
for epoch in range(100):
model.train()
optimizer.zero_grad()

# 前向传播(使用 Teacher Forcing)
pred = model(x_train, teacher_forcing_ratio=0.5)

# 计算损失
loss = criterion(pred, y_train)

# 反向传播
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()

if (epoch + 1) % 10 == 0:
print(f'Epoch {epoch+1}, Loss: {loss.item():.4f}')

# 测试时:不使用 Teacher Forcing
model.eval()
with torch.no_grad():
x_test = torch.randn(10, 50, 10)
pred_test = model(x_test, teacher_forcing_ratio=0.0) # 完全自回归
print(f"测试预测形状: {pred_test.shape}")

❓ Q&A: LSTM 常见疑问

Q1: LSTM 在处理长序列时仍然会面临哪些挑战?

虽然 LSTM 缓解了梯度消失问题,但在处理超长序列(如 >1000 步)时仍面临以下挑战:

计算复杂度问题

  • 时间复杂度,其中 是序列长度, 是隐藏状态维度
  • 内存占用:需要存储所有时间步的隐藏状态(用于反向传播)
  • 训练时间:随序列长度线性增长

并行化困难

  • LSTM 依赖顺序计算 依赖 ,无法像 Transformer 那样并行
  • GPU 利用率低:批处理时仍需逐步计算

长期依赖仍有限

  • 虽然比 RNN 强,但对于极长距离(如 500+ 步)的依赖,信息仍会衰减
  • 解决方案: Attention 机制(直接跨距离连接)

实践建议

1
2
3
4
5
6
7
8
9
10
11
# 1. 使用截断反向传播( Truncated BPTT)
max_seq_len = 100 # 限制梯度回传长度

# 2. 分段处理长序列
def process_long_sequence(data, chunk_size=200):
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
output = lstm(chunk)

# 3. 使用注意力机制替代纯 LSTM
# 或使用 Transformer 处理超长序列


Q2:如何提升 LSTM 在处理不平衡数据集时的性能?

采样技术

方法 原理 适用场景
上采样( Over-sampling) 复制少数类样本 少数类样本 < 1000
下采样( Under-sampling) 随机删除多数类样本 多数类样本 > 100,000
SMOTE 合成少数类样本 连续特征,少数类 < 10%
1
2
3
4
5
6
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

# SMOTE 示例
smote = SMOTE(sampling_strategy=0.5) # 使少数类达到多数类的 50%
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

代价敏感学习( Cost-Sensitive Learning)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import torch.nn as nn

# 方法 1:加权损失函数
class_weights = torch.tensor([1.0, 10.0]) # 少数类权重更高
criterion = nn.CrossEntropyLoss(weight=class_weights)

# 方法 2: Focal Loss(关注难分类样本)
class FocalLoss(nn.Module):
def __init__(self, alpha=0.25, gamma=2):
super().__init__()
self.alpha = alpha
self.gamma = gamma

def forward(self, inputs, targets):
ce_loss = F.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss)
focal_loss = self.alpha * (1-pt)**self.gamma * ce_loss
return focal_loss.mean()

集成方法

  • Bagging + LSTM:训练多个 LSTM,每个在不同的平衡子集上
  • Boosting:使用 AdaBoost 逐步关注误分类样本

Q3: LSTM 与 GRU 的主要区别是什么?

结构对比

维度 LSTM GRU
门的数量 3 个(输入门、遗忘门、输出门) 2 个(更新门、重置门)
记忆单元 独立的 ( Cell State) 直接更新 (无独立 Cell)
参数量 更多( 4 组权重矩阵) 更少( 3 组权重矩阵)
计算速度 较慢 快 10-15%
梯度流 通过 Cell State 保持长期记忆 通过更新门控制记忆保留

核心公式对比

LSTM

GRU

何时选择哪个

选择 LSTM

  • ✅ 数据量大(> 10,000 样本)
  • ✅ 需要复杂的长期记忆(如机器翻译)
  • ✅ 有足够的计算资源

选择 GRU

  • ✅ 数据量小(< 5,000 样本)
  • ✅ 训练时间敏感
  • ✅ 快速原型验证
  • ✅ 嵌入式设备部署

实验建议:两者都试试!在很多任务上性能相当。


Q4: LSTM 的梯度消失和梯度爆炸问题是如何解决的?

传统 RNN 的梯度问题

在反向传播时,梯度需要沿着时间步回传:

如果 ,梯度会指数衰减(梯度消失) 如果 ,梯度会指数增长(梯度爆炸)

LSTM 的解决方案

1. Cell State 的"高速公路"

$$

C_t = f_t C_{t-1} + i_t _t $$

关键:如果 (遗忘门全开),梯度可以直接 流回 ,不经过非线性变换!

时,→ 梯度稳定传播

2. 门控机制控制梯度流

  • 遗忘门:控制历史信息的保留($ f_t f_t $→ 遗忘)
  • 输入门:控制新信息的注入($ i_t i_t $→ 忽略)

3. 梯度裁剪( Gradient Clipping)

即使有门控,仍可能出现梯度爆炸,需要手动裁剪:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch
import torch.nn as nn

# 方法 1:全局梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

# 方法 2:逐参数裁剪
torch.nn.utils.clip_grad_value_(model.parameters(), clip_value=0.5)

# 在训练循环中使用
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()

实验验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 对比传统 RNN 和 LSTM 的梯度范数
import torch
import torch.nn as nn

def check_gradients(model, x, y):
"""检查模型梯度范数"""
output = model(x)
loss = nn.MSELoss()(output, y)
loss.backward()

total_norm = 0
for p in model.parameters():
if p.grad is not None:
param_norm = p.grad.data.norm(2)
total_norm += param_norm.item() ** 2
total_norm = total_norm ** (1. / 2)

return total_norm

# 传统 RNN(容易梯度爆炸)
rnn = nn.RNN(input_size=10, hidden_size=20, num_layers=2)
x = torch.randn(100, 32, 10) # 长序列
y = torch.randn(32, 20)
rnn_grad = check_gradients(rnn, x, y)
print(f'RNN 梯度范数: {rnn_grad:.2f}') # 可能 > 100

# LSTM(梯度稳定)
lstm = nn.LSTM(input_size=10, hidden_size=20, num_layers=2)
lstm_grad = check_gradients(lstm, x, y)
print(f'LSTM 梯度范数: {lstm_grad:.2f}') # 通常 < 10

LSTM 仍可能梯度消失的情况

  1. 序列极长(>1000 步):即使有 Cell State,信息仍会衰减
  2. 遗忘门太小→ Cell State 无法传递信息
  3. 多层 LSTM:深层网络梯度仍可能消失

解决方案

  • 使用 Residual Connections(残差连接)
  • 使用 Attention 机制(直接连接远距离依赖)
  • 使用 Transformer(完全并行,无梯度问题)

Q5:在模型训练过程中,如何避免 LSTM 的过拟合问题?

正则化技术

1. Dropout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class LSTMWithDropout(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, dropout=0.5):
super().__init__()
self.lstm = nn.LSTM(
input_size,
hidden_size,
num_layers,
dropout=dropout, # 层间 Dropout
batch_first=True
)
self.dropout = nn.Dropout(dropout) # 输出 Dropout
self.fc = nn.Linear(hidden_size, 1)

def forward(self, x):
out, _ = self.lstm(x)
out = self.dropout(out[:, -1, :]) # 只对最后时间步 Dropout
return self.fc(out)

注意nn.LSTMdropout 参数只作用于层间,不作用于时间步之间。

2. L2 正则化( Weight Decay)

1
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

3. 时间步 Dropout( Recurrent Dropout)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 使用 Keras 风格的 recurrent_dropout
# PyTorch 需要手动实现
class RecurrentDropoutLSTM(nn.Module):
def __init__(self, input_size, hidden_size, recurrent_dropout=0.2):
super().__init__()
self.hidden_size = hidden_size
self.recurrent_dropout = recurrent_dropout
self.lstm_cell = nn.LSTMCell(input_size, hidden_size)

def forward(self, x):
batch_size, seq_len, _ = x.size()
h = torch.zeros(batch_size, self.hidden_size).to(x.device)
c = torch.zeros(batch_size, self.hidden_size).to(x.device)

# 生成固定的 dropout mask(在整个序列上复用)
dropout_mask = torch.bernoulli(
torch.ones(batch_size, self.hidden_size) * (1 - self.recurrent_dropout)
).to(x.device) / (1 - self.recurrent_dropout)

outputs = []
for t in range(seq_len):
h, c = self.lstm_cell(x[:, t, :], (h, c))
h = h * dropout_mask # 应用 dropout
outputs.append(h)

return torch.stack(outputs, dim=1)

数据增强

滑动窗口技术

1
2
3
4
5
6
7
8
9
def create_sequences(data, seq_len=50, stride=1):
"""生成重叠的时间窗口"""
sequences = []
for i in range(0, len(data) - seq_len, stride):
sequences.append(data[i:i+seq_len])
return sequences

# stride=1 → 大量重叠窗口(数据增强)
# stride=seq_len → 无重叠(节省内存)

添加噪声

1
2
3
# 给输入添加高斯噪声
noise_level = 0.01
x_train_noisy = x_train + torch.randn_like(x_train) * noise_level

早停法( Early Stopping)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from torch.utils.tensorboard import SummaryWriter

class EarlyStopping:
def __init__(self, patience=7, delta=0):
self.patience = patience
self.counter = 0
self.best_loss = None
self.delta = delta

def __call__(self, val_loss):
if self.best_loss is None:
self.best_loss = val_loss
elif val_loss > self.best_loss - self.delta:
self.counter += 1
if self.counter >= self.patience:
return True # 触发早停
else:
self.best_loss = val_loss
self.counter = 0
return False

# 使用示例
early_stopping = EarlyStopping(patience=10)
for epoch in range(100):
train_loss = train(model, train_loader)
val_loss = validate(model, val_loader)

if early_stopping(val_loss):
print(f'早停触发于 epoch {epoch}')
break

交叉验证

1
2
3
4
5
6
7
8
9
from sklearn.model_selection import TimeSeriesSplit

tscv = TimeSeriesSplit(n_splits=5)
for train_idx, val_idx in tscv.split(X):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]

model.fit(X_train, y_train)
val_score = model.evaluate(X_val, y_val)

注意:时间序列不能随机划分,必须按时间顺序!


Q6: LSTM 如何处理多变量时间序列预测?

多变量输入的组织方式

方式 1:多特征输入( Multi-feature Input)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import torch
import torch.nn as nn

class MultiFeatureLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super().__init__()
# input_size = 特征数量(如温度、湿度、气压 = 3)
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x):
# x: (batch, seq_len, features)
# 例如:(32, 50, 3) → 32 个样本, 50 个时间步, 3 个特征
out, _ = self.lstm(x)
out = self.fc(out[:, -1, :]) # 只用最后时间步
return out

# 示例:预测未来 1 天的温度(使用过去 7 天的温度、湿度、气压)
model = MultiFeatureLSTM(input_size=3, hidden_size=64, num_layers=2, output_size=1)
x = torch.randn(32, 7, 3) # (batch, 7 天, 3 个特征)
pred = model(x) # (32, 1) → 预测未来 1 天温度

方式 2:多变量输出( Multi-output)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MultiOutputLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
# 输出多个变量
self.fc = nn.Linear(hidden_size, output_size) # output_size = 3(温度、湿度、气压)

def forward(self, x):
out, _ = self.lstm(x)
out = self.fc(out[:, -1, :])
return out # (batch, 3) → 同时预测 3 个变量

# 示例:同时预测未来 1 天的温度、湿度、气压
model = MultiOutputLSTM(input_size=1, hidden_size=64, num_layers=2, output_size=3)
x = torch.randn(32, 7, 1) # 只用历史温度
pred = model(x) # (32, 3) → 预测温度、湿度、气压

方式 3: Encoder-Decoder(序列到序列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Seq2SeqLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_seq_len):
super().__init__()
self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.decoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, input_size) # 输出维度 = 输入维度
self.output_seq_len = output_seq_len

def forward(self, x):
# x: (batch, input_seq_len, features)
# 编码
encoder_out, (h, c) = self.encoder(x)

# 解码(自回归生成)
decoder_input = x[:, -1:, :] # 用最后一个时间步作为初始输入
outputs = []

for _ in range(self.output_seq_len):
decoder_out, (h, c) = self.decoder(decoder_input, (h, c))
output = self.fc(decoder_out)
outputs.append(output)
decoder_input = output # 用预测值作为下一步输入

return torch.cat(outputs, dim=1) # (batch, output_seq_len, features)

# 示例:用过去 7 天预测未来 3 天(多变量)
model = Seq2SeqLSTM(input_size=3, hidden_size=64, num_layers=2, output_seq_len=3)
x = torch.randn(32, 7, 3) # 过去 7 天(温度、湿度、气压)
pred = model(x) # (32, 3, 3) → 未来 3 天(温度、湿度、气压)

特征工程技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import pandas as pd
import numpy as np

def create_multivariate_features(df):
"""创建多变量特征"""
features = pd.DataFrame()

# 1. 原始特征
features['温度'] = df['temperature']
features['湿度'] = df['humidity']
features['气压'] = df['pressure']

# 2. 滞后特征( Lag Features)
for lag in [1, 2, 3]:
features[f'温度_lag{lag}'] = df['temperature'].shift(lag)
features[f'湿度_lag{lag}'] = df['humidity'].shift(lag)

# 3. 滚动统计特征
features['温度_rolling_mean_7'] = df['temperature'].rolling(7).mean()
features['温度_rolling_std_7'] = df['temperature'].rolling(7).std()

# 4. 交互特征
features['温度_湿度_交互'] = df['temperature'] * df['humidity']

# 5. 时间特征
features['小时'] = pd.to_datetime(df['timestamp']).dt.hour
features['星期'] = pd.to_datetime(df['timestamp']).dt.dayofweek
features['月份'] = pd.to_datetime(df['timestamp']).dt.month

return features.dropna()

# 使用
df_features = create_multivariate_features(df)

多变量预测的挑战

  1. 特征选择:不是所有特征都有用,需要特征重要性分析
  2. 特征缩放:不同特征的量纲不同,需要归一化
  3. 缺失值处理:多变量更容易出现缺失值
  4. 计算复杂度:特征越多,模型越复杂

实战建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 1. 特征重要性分析
from sklearn.ensemble import RandomForestRegressor

rf = RandomForestRegressor()
rf.fit(X_train, y_train)
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)

# 2. 特征归一化
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 3. 处理缺失值
from sklearn.impute import SimpleImputer

imputer = SimpleImputer(strategy='mean')
X_train_imputed = imputer.fit_transform(X_train)
X_test_imputed = imputer.transform(X_test)

Q7:如何选择 LSTM 的超参数(隐藏层大小、层数、学习率)?

隐藏层大小( Hidden Size)

数据规模 推荐 Hidden Size 说明
< 1,000 样本 32-64 避免过拟合
1,000-10,000 64-128 平衡性能和速度
> 10,000 128-512 充分表达能力

经验公式

层数( Num Layers)

任务复杂度 推荐层数 说明
简单(单变量预测) 1-2 层 足够
中等(多变量、短期) 2-3 层 平衡
复杂(长期依赖) 3-4 层 深层网络

⚠️ 超过 4 层通常收益递减,且容易梯度消失。

学习率( Learning Rate)

推荐策略: 1. 初始学习率2. 学习率调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 方法 1: ReduceLROnPlateau(根据验证 loss 调整)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5
)

# 方法 2: CosineAnnealingLR(余弦退火)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=50, eta_min=1e-6
)

# 训练循环
for epoch in range(epochs):
train_loss = train(model)
val_loss = validate(model)
scheduler.step(val_loss) # ReduceLROnPlateau
# 或
scheduler.step() # CosineAnnealingLR

Warm-up 策略(大模型推荐):

1
2
3
4
5
def get_lr(epoch, warmup_epochs=5, initial_lr=1e-3):
if epoch < warmup_epochs:
return initial_lr * (epoch + 1) / warmup_epochs
else:
return initial_lr

批量大小( Batch Size)

任务 推荐 Batch Size 说明
小数据集 16-32 避免梯度噪声过大
大数据集 64-128 加速训练
GPU 内存受限 8-16 根据显存调整

超参数搜索工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 使用 Optuna 自动调参
import optuna

def objective(trial):
hidden_size = trial.suggest_int('hidden_size', 32, 256)
num_layers = trial.suggest_int('num_layers', 1, 4)
lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
dropout = trial.suggest_uniform('dropout', 0.1, 0.5)

model = LSTMModel(hidden_size, num_layers, dropout)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

val_loss = train_and_evaluate(model, optimizer)
return val_loss

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)
print(f'最佳参数:{study.best_params}')


Q8: LSTM 在时间序列预测中,如何设计输入输出窗口?

滑动窗口( Sliding Window)设计

单步预测( One-step-ahead)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def create_sequences_one_step(data, seq_len=50):
"""
用过去 seq_len 个时间步预测未来 1 个时间步
"""
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i+seq_len]) # 输入:过去 seq_len 步
y.append(data[i+seq_len]) # 输出:未来 1 步
return np.array(X), np.array(y)

# 示例
data = np.random.randn(1000)
X, y = create_sequences_one_step(data, seq_len=50)
# X: (950, 50, 1), y: (950, 1)

多步预测( Multi-step-ahead)

1
2
3
4
5
6
7
8
9
10
11
12
13
def create_sequences_multi_step(data, input_len=50, output_len=10):
"""
用过去 input_len 个时间步预测未来 output_len 个时间步
"""
X, y = [], []
for i in range(len(data) - input_len - output_len + 1):
X.append(data[i:i+input_len])
y.append(data[i+input_len:i+input_len+output_len])
return np.array(X), np.array(y)

# 示例:用过去 7 天预测未来 3 天
X, y = create_sequences_multi_step(data, input_len=7, output_len=3)
# X: (991, 7, 1), y: (991, 3, 1)

序列到序列( Seq2Seq)预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Seq2SeqLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_len):
super().__init__()
self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.decoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, input_size)
self.output_len = output_len

def forward(self, x):
# 编码
_, (h, c) = self.encoder(x)

# 解码(自回归)
decoder_input = x[:, -1:, :] # 最后一个时间步
outputs = []

for _ in range(self.output_len):
decoder_out, (h, c) = self.decoder(decoder_input, (h, c))
output = self.fc(decoder_out)
outputs.append(output)
decoder_input = output # 用预测值作为下一步输入

return torch.cat(outputs, dim=1)

窗口大小选择指南

数据特点 推荐输入窗口 原因
高频数据(分钟级) 60-1440 步 捕捉日内和日间模式
日度数据 7-30 天 捕捉周度和月度模式
月度数据 12-24 月 捕捉年度季节性
有明确周期 2-3 个周期 至少包含完整周期

实战代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

class TimeSeriesDataset(Dataset):
def __init__(self, data, input_len, output_len, stride=1):
self.data = data
self.input_len = input_len
self.output_len = output_len
self.stride = stride

# 生成所有可能的窗口
self.sequences = []
for i in range(0, len(data) - input_len - output_len + 1, stride):
X = data[i:i+input_len]
y = data[i+input_len:i+input_len+output_len]
self.sequences.append((X, y))

def __len__(self):
return len(self.sequences)

def __getitem__(self, idx):
X, y = self.sequences[idx]
return torch.FloatTensor(X), torch.FloatTensor(y)

# 使用示例
data = np.random.randn(1000)
dataset = TimeSeriesDataset(data, input_len=50, output_len=10, stride=1)
dataloader = DataLoader(dataset, batch_size=32, shuffle=False) # 时间序列不 shuffle!

for X, y in dataloader:
# X: (batch, 50, features), y: (batch, 10, features)
pred = model(X)
loss = criterion(pred, y)

窗口设计的常见误区

  1. 窗口太小:无法捕捉长期依赖
  2. 窗口太大:包含过多噪声,训练慢
  3. 随机 shuffle:破坏时间顺序
  4. 重叠窗口过多:数据冗余,容易过拟合

最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 1. 根据数据特点选择窗口
if has_seasonality(data, period=12):
input_len = 2 * period # 至少包含 2 个周期
else:
input_len = 30 # 默认 30 步

# 2. 使用交叉验证选择最优窗口
from sklearn.model_selection import TimeSeriesSplit

best_window = None
best_score = float('inf')

for window in [7, 14, 30, 60]:
scores = []
tscv = TimeSeriesSplit(n_splits=5)

for train_idx, val_idx in tscv.split(X):
X_train, X_val = X[train_idx], X[val_idx]
# 用 window 创建序列
# 训练和评估模型
score = evaluate_model(...)
scores.append(score)

avg_score = np.mean(scores)
if avg_score < best_score:
best_score = avg_score
best_window = window

print(f'最优窗口大小: {best_window}')

Q9: LSTM 模型训练时出现 NaN 或 Loss 不下降怎么办?

问题 1: Loss 为 NaN

原因 1:梯度爆炸

1
2
3
4
5
6
7
8
9
# 解决方案:梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

# 检查梯度
for name, param in model.named_parameters():
if param.grad is not None:
grad_norm = param.grad.norm().item()
if grad_norm > 100:
print(f'警告:{name} 梯度范数过大: {grad_norm:.2f}')

原因 2:学习率过大

1
2
3
4
5
6
7
# 解决方案:降低学习率
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) # 从 1e-3 降到 1e-4

# 或使用学习率调度器
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5
)

原因 3:输入数据包含 NaN 或 Inf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 检查数据
def check_data(data):
if torch.isnan(data).any():
print('警告:输入数据包含 NaN')
if torch.isinf(data).any():
print('警告:输入数据包含 Inf')

# 处理 NaN
data = torch.nan_to_num(data, nan=0.0, posinf=1e6, neginf=-1e6)
return data

# 在数据加载时使用
X = check_data(X)
y = check_data(y)

原因 4:权重初始化不当

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 解决方案:使用 Xavier 或 He 初始化
def init_weights(m):
if isinstance(m, nn.LSTM):
for name, param in m.named_parameters():
if 'weight_ih' in name:
nn.init.xavier_uniform_(param.data)
elif 'weight_hh' in name:
nn.init.orthogonal_(param.data)
elif 'bias' in name:
param.data.fill_(0)
# 遗忘门偏置设为 1(帮助记忆)
n = param.size(0)
start, end = n // 4, n // 2
param.data[start:end].fill_(1)

model.apply(init_weights)

问题 2: Loss 不下降

原因 1:学习率太小

1
2
3
4
5
6
7
8
# 解决方案:学习率搜索
learning_rates = [1e-5, 1e-4, 1e-3, 1e-2]

for lr in learning_rates:
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
# 训练几个 epoch
train_loss = train_one_epoch(model, dataloader, optimizer)
print(f'LR={lr}: Loss={train_loss:.4f}')

原因 2:数据未归一化

1
2
3
4
5
6
7
8
# 解决方案:归一化
from sklearn.preprocessing import MinMaxScaler, StandardScaler

scaler = MinMaxScaler(feature_range=(-1, 1))
# 或
scaler = StandardScaler()

data_scaled = scaler.fit_transform(data)

原因 3:模型容量不足

1
2
3
4
5
6
7
# 解决方案:增加模型容量
model = nn.LSTM(
input_size=10,
hidden_size=128, # 从 64 增加到 128
num_layers=3, # 从 2 增加到 3
batch_first=True
)

原因 4:标签错误或数据泄露

1
2
3
4
5
6
7
# 检查:预测值和真实值是否在同一分布
print(f'预测值范围: [{pred.min():.2f}, {pred.max():.2f}]')
print(f'真实值范围: [{y.min():.2f}, {y.max():.2f}]')

# 检查:是否存在数据泄露(未来信息泄露到过去)
# 确保输入窗口和输出窗口不重叠
assert input_end_idx < output_start_idx

调试技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 1. 打印每层的输出
def debug_forward(model, x):
with torch.no_grad():
out, (h, c) = model.lstm(x)
print(f'LSTM 输出范围: [{out.min():.2f}, {out.max():.2f}]')
print(f'隐藏状态范围: [{h.min():.2f}, {h.max():.2f}]')

pred = model.fc(out[:, -1, :])
print(f'最终预测范围: [{pred.min():.2f}, {pred.max():.2f}]')
return pred

# 2. 监控训练过程
import matplotlib.pyplot as plt

train_losses = []
val_losses = []

for epoch in range(epochs):
train_loss = train_one_epoch(...)
val_loss = validate(...)

train_losses.append(train_loss)
val_losses.append(val_loss)

# 绘制损失曲线
if epoch % 10 == 0:
plt.plot(train_losses, label='Train')
plt.plot(val_losses, label='Val')
plt.legend()
plt.show()

# 3. 检查模型是否真的在学习
# 在训练集上的 loss 应该持续下降
if train_losses[-1] > train_losses[0]:
print('警告:模型可能没有学习')

Q10:如何解释和理解 LSTM 模型的预测结果?

1. 可视化注意力权重(如果使用 Attention)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class AttentionLSTM(nn.Module):
def __init__(self, input_size, hidden_size):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
self.attention = nn.Linear(hidden_size, 1)

def forward(self, x):
lstm_out, _ = self.lstm(x) # (batch, seq_len, hidden_size)

# 计算注意力权重
attention_weights = torch.softmax(
self.attention(lstm_out).squeeze(-1), dim=1
) # (batch, seq_len)

# 加权求和
context = torch.bmm(
attention_weights.unsqueeze(1), lstm_out
).squeeze(1) # (batch, hidden_size)

return context, attention_weights

# 可视化
model = AttentionLSTM(input_size=10, hidden_size=64)
pred, attn_weights = model(x_test)

# 绘制注意力权重热力图
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(12, 6))
sns.heatmap(attn_weights.detach().numpy(), cmap='YlOrRd', annot=True)
plt.title('注意力权重热力图')
plt.xlabel('时间步')
plt.ylabel('样本')
plt.show()

2. 分析 Cell State 的变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 提取 Cell State
def extract_cell_states(model, x):
cell_states = []

def hook_fn(module, input, output):
# output[1] 是 (h, c) 元组
cell_states.append(output[1][1].detach()) # c 是 Cell State

handle = model.lstm.register_forward_hook(hook_fn)
_ = model(x)
handle.remove()

return torch.stack(cell_states, dim=0) # (seq_len, batch, hidden_size)

# 可视化 Cell State
cell_states = extract_cell_states(model, x_test[0:1]) # 单个样本
cell_states = cell_states.squeeze(1) # (seq_len, hidden_size)

plt.figure(figsize=(15, 6))
plt.plot(cell_states[:, :5].numpy()) # 只显示前 5 个维度
plt.title('Cell State 随时间变化')
plt.xlabel('时间步')
plt.ylabel('Cell State 值')
plt.legend([f'维度 {i}' for i in range(5)])
plt.show()

3. 特征重要性分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 方法 1: Permutation Importance
def permutation_importance(model, X, y, n_repeats=10):
baseline_score = evaluate_model(model, X, y)
importances = []

for feature_idx in range(X.shape[2]):
scores = []
for _ in range(n_repeats):
X_permuted = X.clone()
# 随机打乱某个特征
perm_idx = torch.randperm(X.shape[0])
X_permuted[:, :, feature_idx] = X_permuted[perm_idx, :, feature_idx]

score = evaluate_model(model, X_permuted, y)
scores.append(score)

importance = baseline_score - np.mean(scores)
importances.append(importance)

return importances

# 方法 2: SHAP 值(需要安装 shap)
import shap

explainer = shap.DeepExplainer(model, X_train[:100])
shap_values = explainer.shap_values(X_test[:10])
shap.summary_plot(shap_values, X_test[:10])

4. 预测区间估计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 使用 Dropout 进行不确定性估计( MC Dropout)
def predict_with_uncertainty(model, x, n_samples=100):
model.train() # 保持 Dropout 开启
predictions = []

with torch.no_grad():
for _ in range(n_samples):
pred = model(x)
predictions.append(pred)

predictions = torch.stack(predictions) # (n_samples, batch, output_dim)

mean_pred = predictions.mean(dim=0)
std_pred = predictions.std(dim=0)

# 95% 置信区间
lower_bound = mean_pred - 1.96 * std_pred
upper_bound = mean_pred + 1.96 * std_pred

return mean_pred, lower_bound, upper_bound

# 可视化预测区间
mean_pred, lower, upper = predict_with_uncertainty(model, x_test[:10])

plt.figure(figsize=(15, 6))
plt.plot(y_test[:10].numpy(), 'o-', label='真实值')
plt.plot(mean_pred.numpy(), 's-', label='预测值')
plt.fill_between(range(10), lower.numpy(), upper.numpy(), alpha=0.3, label='95% 置信区间')
plt.legend()
plt.title('预测结果与不确定性')
plt.show()

5. 错误分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 分析预测误差的模式
def error_analysis(model, X_test, y_test):
predictions = model(X_test)
errors = y_test - predictions

# 1. 误差分布
plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.hist(errors.numpy(), bins=50)
plt.title('误差分布')
plt.xlabel('误差')

# 2. 误差 vs 真实值(检查是否存在系统性偏差)
plt.subplot(1, 3, 2)
plt.scatter(y_test.numpy(), errors.numpy(), alpha=0.5)
plt.xlabel('真实值')
plt.ylabel('误差')
plt.title('误差 vs 真实值')
plt.axhline(0, color='r', linestyle='--')

# 3. 误差 vs 时间(检查是否存在时间相关的模式)
plt.subplot(1, 3, 3)
plt.plot(errors.numpy())
plt.title('误差时间序列')
plt.xlabel('时间步')
plt.ylabel('误差')

plt.tight_layout()
plt.show()

# 4. 找出最大误差的样本
max_error_idx = errors.abs().argmax()
print(f'最大误差样本索引: {max_error_idx}')
print(f'真实值: {y_test[max_error_idx]:.2f}')
print(f'预测值: {predictions[max_error_idx]:.2f}')
print(f'误差: {errors[max_error_idx]:.2f}')

error_analysis(model, X_test, y_test)

6. 模型决策路径可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 可视化 LSTM 在每个时间步的决策
def visualize_decision_path(model, x_single):
"""可视化单个样本的决策过程"""
x_single = x_single.unsqueeze(0) # (1, seq_len, features)

# 提取每层的输出
outputs = []
h, c = None, None

for t in range(x_single.shape[1]):
if h is None:
out, (h, c) = model.lstm(x_single[:, :t+1, :])
else:
out, (h, c) = model.lstm(
x_single[:, t:t+1, :], (h, c)
)
outputs.append(h.squeeze(0).detach().numpy())

outputs = np.array(outputs) # (seq_len, hidden_size)

# 可视化隐藏状态的变化
plt.figure(figsize=(15, 8))
plt.imshow(outputs.T, aspect='auto', cmap='viridis')
plt.colorbar(label='隐藏状态值')
plt.xlabel('时间步')
plt.ylabel('隐藏状态维度')
plt.title('LSTM 隐藏状态随时间变化')
plt.show()

visualize_decision_path(model, x_test[0])

实战技巧与性能优化

模型初始化技巧

LSTM 的初始化对训练稳定性至关重要。遗忘门偏置的初始化尤其重要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def init_lstm_weights(m):
"""LSTM 权重初始化最佳实践"""
if isinstance(m, nn.LSTM):
for name, param in m.named_parameters():
if 'weight_ih' in name:
# 输入到隐藏的权重: Xavier 初始化
nn.init.xavier_uniform_(param.data)
elif 'weight_hh' in name:
# 隐藏到隐藏的权重:正交初始化(保持梯度稳定)
nn.init.orthogonal_(param.data)
elif 'bias' in name:
# 偏置初始化
param.data.fill_(0)
# 关键:遗忘门偏置设为 1(帮助记忆)
n = param.size(0)
start, end = n // 4, n // 2 # 遗忘门的位置
param.data[start:end].fill_(1)

model.apply(init_lstm_weights)

为什么遗忘门偏置设为 1?

  • 初始时,我们希望模型倾向于保留历史信息
  • ,当 时,
  • 这意味着初始时遗忘门倾向于"记住"而非"遗忘"

序列长度选择策略

不同任务需要不同的输入序列长度:

任务类型 推荐序列长度 原因
股票价格预测 20-60 天 捕捉短期趋势和周期性
电力负荷预测 48-168 小时 覆盖日周期和周周期
销售预测 7-30 天 捕捉周度和月度模式
文本生成 50-200 词 平衡上下文和计算成本
语音识别 100-500 帧 覆盖音素和词级模式

选择原则: 1. 至少包含 2-3 个完整周期(如果有周期性) 2. 不超过 GPU 内存限制:序列长度 × 批次大小 × 隐藏层大小 3. 通过交叉验证选择:尝试不同长度,选择验证集表现最好的

批处理与内存优化

对于长序列,内存可能成为瓶颈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class MemoryEfficientLSTM(nn.Module):
"""内存高效的 LSTM 实现"""
def __init__(self, input_size, hidden_size, num_layers):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# 使用 LSTMCell 手动实现,可以控制内存
self.lstm_cells = nn.ModuleList([
nn.LSTMCell(input_size if i == 0 else hidden_size, hidden_size)
for i in range(num_layers)
])

def forward(self, x):
batch_size, seq_len, _ = x.size()
h = [torch.zeros(batch_size, self.hidden_size).to(x.device)
for _ in range(self.num_layers)]
c = [torch.zeros(batch_size, self.hidden_size).to(x.device)
for _ in range(self.num_layers)]

outputs = []
for t in range(seq_len):
x_t = x[:, t, :]
for layer_idx, lstm_cell in enumerate(self.lstm_cells):
h[layer_idx], c[layer_idx] = lstm_cell(
x_t if layer_idx == 0 else h[layer_idx-1],
(h[layer_idx], c[layer_idx])
)
x_t = h[layer_idx]
outputs.append(h[-1])

return torch.stack(outputs, dim=1)

内存优化技巧

  • 使用梯度检查点( Gradient Checkpointing):牺牲计算时间换取内存
  • 减小批次大小:虽然训练慢,但可以处理更长序列
  • 使用混合精度训练: FP16 可以减少 50%内存占用

常见问题排查指南

问题 1:训练 Loss 震荡

可能原因:

  • 学习率过大
  • 批次大小太小
  • 数据未归一化

解决方案:

1
2
3
4
5
6
7
8
9
10
# 1. 降低学习率
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) # 从 1e-3 降到 1e-4

# 2. 增加批次大小
dataloader = DataLoader(dataset, batch_size=64) # 从 32 增加到 64

# 3. 数据归一化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

问题 2:验证 Loss 不下降但训练 Loss 下降

这是典型的过拟合,解决方案:

1
2
3
4
5
6
7
8
9
# 1. 增加 Dropout
model = LSTMWithDropout(input_size, hidden_size, num_layers, dropout=0.5)

# 2. 减少模型容量
model = LSTM(input_size, hidden_size=64, num_layers=2) # 从 128 降到 64

# 3. 数据增强
def add_noise(x, noise_level=0.01):
return x + torch.randn_like(x) * noise_level

问题 3:预测值总是接近均值

可能原因:

  • 模型容量不足
  • 学习率太小
  • 损失函数选择不当

解决方案:

1
2
3
4
5
6
7
8
9
10
11
# 1. 增加模型容量
model = LSTM(input_size, hidden_size=256, num_layers=3)

# 2. 使用学习率调度
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5
)

# 3. 尝试不同的损失函数
# 对于回归任务,可以尝试 Huber Loss(对异常值更鲁棒)
criterion = nn.HuberLoss(delta=1.0)

🎓 总结: LSTM 实战要点

核心记忆公式

实战 Checklist

记忆口诀: > 遗忘门决定忘多少,输入门决定记多少,输出门决定露多少, Cell State 记忆传千里!

  • 本文标题:时间序列模型(二)—— LSTM
  • 本文作者:Chen Kai
  • 创建时间:2020-04-14 14:45:00
  • 本文链接:https://www.chenk.top/%E6%97%B6%E9%97%B4%E5%BA%8F%E5%88%97%E6%A8%A1%E5%9E%8B%EF%BC%88%E4%BA%8C%EF%BC%89%E2%80%94%E2%80%94-LSTM/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论