在做时间序列预测时,很多问题并不需要一上来就堆深度模型:趋势/季节性/自相关/波动聚集这些结构,本来就可以被传统模型干净地表达出来,而且可解释、可诊断、也更容易做基线。本文把
ARIMA/SARIMA(平稳化与季节性)、 VAR(多变量联动)、
GARCH(波动率建模)、指数平滑与
Prophet(趋势分解)、以及卡尔曼滤波(状态空间视角)放在同一张图景里:每个模型解决什么结构、核心假设是什么、参数怎么理解,以及在真实数据里什么时候更稳、更值得优先尝试。
ARIMA(
AutoRegressive Integrated Moving Average)
ARIMA模型由三部分组成:
- AR
(Autoregressive):自回归部分表示序列与其过去值之间的线性关系。
- I
(Integrated):差分部分用于处理非平稳性数据,通过进行差分操作使其成为平稳序列。
- MA (Moving
Average):移动平均部分通过过去的残差(白噪声)来预测当前值。
ARIMA 模型用三个参数
来表示:
- :自回归项的阶数,表示回溯多少个时间步长。
- :差分的阶数,表示进行多少次差分以获得平稳序列。
- :移动平均项的阶数,表示回溯多少个过去的白噪声。
模型的核心公式为:
$$
Y_t = c + _{i=1}^{p} i Y{t-i} + t + {j=1}^{q} j
{t-j} $$
其中:
- 是当前时间 的值;
- 是常数项;
- 是自回归系数;
-
是误差项(白噪声);
- 是移动平均系数。
ARIMA
实现:从差分到预测的完整流程
问题背景:实际时间序列往往包含趋势和季节性,直接建模会导致非平稳性。
ARIMA 通过差分操作消除趋势,再对平稳化后的序列建立 ARMA
模型。核心挑战在于如何正确组合差分、自回归和移动平均三个组件。
解决思路:采用"先差分、后建模"的两阶段策略。首先通过
阶差分消除趋势,得到平稳序列;然后在该序列上建立 AR() 和 MA() 的组合模型。预测时, AR
部分利用历史值的线性组合, MA 部分利用历史预测误差的信息。
设计考虑:
- 差分策略:一阶差分通常足够消除线性趋势,高阶差分可能过度差分导致信息损失
- 参数初始化: AR 和 MA
系数需要满足平稳性和可逆性条件,实际应用中通过最大似然估计获得
- 误差累积: MA
项依赖历史误差,需要顺序计算并维护误差序列
- 边界处理:初始时间步缺乏足够历史数据,采用零预测或均值填充
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
| import numpy as np
def difference(series, order): """ 执行 d 阶差分操作,消除序列中的趋势成分 问题:原始序列可能包含趋势(如线性增长),导致非平稳性 解决:通过差分将非平稳序列转换为平稳序列 设计: order=1 消除线性趋势, order=2 消除二次趋势 Parameters: ----------- series : array-like 原始时间序列,形状为 (n,) order : int 差分阶数,通常取 1 或 2 。 order=1 表示一阶差分: y'_t = y_t - y_{t-1} Returns: -------- diff_series : list 差分后的序列,长度为 len(series) - order Notes: ------ - 差分会减少序列长度,损失前 order 个数据点 - 过度差分( order 过大)会导致序列方差增大,信息损失 """ diff_series = [] for i in range(order, len(series)): value = series[i] - series[i - order] diff_series.append(value) return diff_series
def ar_model(series, ar_order, ar_coeffs): """ 计算自回归( AR)部分:用历史值的加权和预测当前值 核心思想:当前值可以表示为过去 p 个值的线性组合 数学表达: AR_part = φ₁· y_{t-1} + φ₂· y_{t-2} + ... + φₚ· y_{t-p} Parameters: ----------- series : array-like 输入序列(通常是差分后的平稳序列) ar_order : int 自回归阶数 p,表示使用过去多少个时间步 ar_coeffs : array-like 自回归系数 [φ₁, φ₂, ..., φₚ],长度必须等于 ar_order Returns: -------- ar_part : float AR 部分的预测值 Example: -------- >>> series = [1.0, 2.0, 3.0, 4.0] >>> ar_model(series, ar_order=2, ar_coeffs=[0.5, 0.3]) # 计算: 0.5*4.0 + 0.3*3.0 = 2.0 + 0.9 = 2.9 """ ar_part = 0.0 for i in range(ar_order): ar_part += ar_coeffs[i] * series[-(i + 1)] return ar_part
def ma_model(errors, ma_order, ma_coeffs): """ 计算移动平均( MA)部分:用历史预测误差的加权和修正预测 核心思想:过去的预测误差包含有用信息,可以用来改进当前预测 数学表达: MA_part = θ₁·ε_{t-1} + θ₂·ε_{t-2} + ... + θₚ·ε_{t-q} Parameters: ----------- errors : list 历史预测误差序列 [ε₁, ε₂, ..., εₜ] ma_order : int 移动平均阶数 q,表示使用过去多少个误差项 ma_coeffs : array-like 移动平均系数 [θ₁, θ₂, ..., θₚ],长度必须等于 ma_order Returns: -------- ma_part : float MA 部分的修正值 Notes: ------ - MA 项帮助模型捕捉"误差的模式",如连续高估或低估的趋势 - 初始阶段 errors 为空或长度不足时, MA 部分为 0 """ ma_part = 0.0 for i in range(ma_order): ma_part += ma_coeffs[i] * errors[-(i + 1)] return ma_part
def arima(series, ar_order, ma_order, diff_order, ar_coeffs, ma_coeffs): """ 手动实现 ARIMA(p, d, q)模型的完整预测流程 工作流程: 1. 差分:对原始序列进行 d 阶差分,得到平稳序列 2. 建模:在平稳序列上建立 ARMA(p, q)模型 3. 预测:结合 AR 和 MA 两部分进行预测 Parameters: ----------- series : array-like 原始时间序列,形状为 (n,) ar_order : int AR 部分阶数 p ma_order : int MA 部分阶数 q diff_order : int 差分阶数 d,通常为 1(一阶差分) ar_coeffs : array-like AR 系数 [φ₁, φ₂, ..., φₚ] ma_coeffs : array-like MA 系数 [θ₁, θ₂, ..., θₚ] Returns: -------- predictions : list 对差分后序列的预测值,长度为 len(series_diff) Notes: ------ - 这是简化实现,实际 ARIMA 需要最大似然估计参数 - 初始阶段( t < max(p, q))无法计算完整 ARMA,返回 0 - 要得到原始序列的预测,需要对差分预测进行逆差分操作 """ series_diff = difference(series, diff_order) errors = [] predictions = [] for t in range(len(series_diff)): if t >= max(ar_order, ma_order): ar_part = ar_model(series_diff[:t], ar_order, ar_coeffs) ma_part = ma_model(errors[:t], ma_order, ma_coeffs) predicted_value = ar_part + ma_part else: predicted_value = 0.0 actual_value = series_diff[t] error = actual_value - predicted_value errors.append(error) predictions.append(predicted_value) return predictions
data = np.random.randn(100).cumsum()
ar_order, ma_order, diff_order = 2, 2, 1 ar_coeffs = [0.5, -0.25] ma_coeffs = [0.4, 0.3]
predictions = arima(data, ar_order, ma_order, diff_order, ar_coeffs, ma_coeffs) print(f"预测值数量: {len(predictions)}") print(f"前 5 个预测值: {predictions[:5]}")
|
关键点解读:
差分的作用:差分操作 $ y't = y_t -
y{t-1} y_t = at + b +
_t y'_t = a + t -
{t-1} at$。
AR 与 MA 的配合: AR
部分捕捉"历史值对当前值的影响", MA
部分捕捉"历史预测误差的模式"。两者结合能够同时利用序列的自相关性和误差的自相关性。
误差的递归性质: MA
项依赖历史误差,而误差又依赖历史预测,形成递归关系。这要求必须按时间顺序逐步计算,不能并行化。
常见问题:
- Q: 为什么初始阶段预测为 0?
- A: 当 时,没有足够的历史数据计算 AR 或 MA
项。实际应用中可以用序列均值或前几个观测值填充。
- Q: 如何选择
参数?
- A: 通常通过 ACF/PACF 图判断: PACF 在 lag 处截尾 → AR();
ACF 在 lag 处截尾 → MA()。差分阶数 通过 ADF
检验确定,直到序列平稳。
- Q: 预测值为什么是差分序列的?如何还原?
- A: 模型在差分序列上工作,预测值需要逆差分还原:,其中 是差分序列, 是原始序列。
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| from statsmodels.tsa.arima.model import ARIMA import pandas as pd
ts_data = pd.Series(data, index=pd.date_range('2020-01-01', periods=100, freq='D'))
model = ARIMA(ts_data, order=(2, 1, 2)) fitted_model = model.fit()
print(fitted_model.summary())
forecast = fitted_model.forecast(steps=5) print(f"未来 5 天预测: {forecast}")
forecast_ci = fitted_model.get_forecast(steps=5).conf_int() print(f"95%置信区间:\n{forecast_ci}")
|
SARIMA( Seasonal ARIMA)
SARIMA是在 ARIMA
模型的基础上扩展出来的,能够处理具有季节性周期的时间序列。它的基本形式为SARIMA(p,
d, q)(P, D, Q, m),其中前面的 表示非季节性部分,后面的 表示季节性部分。
- :季节性自回归阶数。
- :季节性差分次数。
- :季节性移动平均阶数。
- :季节性周期的长度。
SARIMA 的公式如下:
其中:
- 是滞后算子;
- 和
分别是季节性自回归和移动平均系数。
SARIMA
实现:处理季节性模式的 ARIMA 扩展
问题背景:许多时间序列不仅包含趋势,还包含明显的季节性模式(如月度数据的年度周期、日度数据的周度周期)。普通
ARIMA
只能处理趋势,无法捕捉"去年同期的值对当前值的影响"这类季节性依赖。
解决思路: SARIMA 在 ARIMA
基础上增加季节性组件。通过季节性差分($ y_t - y_{t-m} m$
为周期长度)消除季节性,然后同时建立非季节性 ARMA 和季节性 ARMA
模型。非季节性部分捕捉短期依赖,季节性部分捕捉跨周期的长期依赖。
设计考虑: 1.
双重差分:先进行普通差分消除趋势,再进行季节性差分消除季节性,顺序不能颠倒
2. 滞后选择:季节性滞后
必须等于数据的周期长度(如月度数据$ m=12 m=7$) 3.
参数分离:非季节性和季节性参数独立估计,避免相互干扰 4.
数据要求:至少需要 个观测值才能进行季节性差分和建模
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
| import numpy as np
def seasonal_difference(series, season_lag): """ 执行季节性差分:消除序列中的季节性成分 问题:序列可能包含周期性模式(如每年 12 月销量都高),导致非平稳 解决:通过季节性差分 y'_t = y_t - y_{t-m} 消除季节性 设计: season_lag=m 表示周期长度,如月度数据 m=12,周度数据 m=7 Parameters: ----------- series : array-like 输入序列(通常是已经过普通差分的序列) season_lag : int 季节性周期长度 m 。例如: - 月度数据: m=12(一年 12 个月) - 周度数据: m=52(一年 52 周) - 日度数据: m=7(一周 7 天) Returns: -------- diff_series : list 季节性差分后的序列,长度为 len(series) - season_lag Example: -------- >>> series = [10, 12, 15, 18, 20, 22, 25, 28, 30, 32, 35, 38] >>> seasonal_difference(series, season_lag=12) # 如果这是月度数据,计算当前月与去年同月的差值 """ diff_series = [] for i in range(season_lag, len(series)): value = series[i] - series[i - season_lag] diff_series.append(value) return diff_series
def sarima(series, ar_order, ma_order, diff_order, seasonal_order, seasonal_lag, ar_coeffs, ma_coeffs, seasonal_ar_coeffs, seasonal_ma_coeffs): """ 手动实现 SARIMA(p, d, q)(P, D, Q, m)模型 模型结构: SARIMA = 非季节性 ARIMA + 季节性 ARIMA - 非季节性部分:(p, d, q) 处理趋势和短期依赖 - 季节性部分:(P, D, Q, m) 处理周期性模式和跨周期依赖 工作流程: 1. 普通差分:消除趋势( d 阶) 2. 季节性差分:消除季节性( D 阶,这里 D=1) 3. 建模:在双重差分后的序列上建立 ARMA(p,q)和季节性 ARMA(P,Q) Parameters: ----------- series : array-like 原始时间序列 ar_order : int 非季节性 AR 阶数 p ma_order : int 非季节性 MA 阶数 q diff_order : int 非季节性差分阶数 d seasonal_order : int 季节性 AR/MA 阶数 P=Q(简化实现,实际可以不同) seasonal_lag : int 季节性周期长度 m ar_coeffs : array-like 非季节性 AR 系数 [φ₁, φ₂, ..., φₚ] ma_coeffs : array-like 非季节性 MA 系数 [θ₁, θ₂, ..., θₚ] seasonal_ar_coeffs : array-like 季节性 AR 系数 [Φ₁, Φ₂, ..., Φₚ] seasonal_ma_coeffs : array-like 季节性 MA 系数 [Θ₁, Θ₂, ..., Θₚ] Returns: -------- predictions : list 对双重差分后序列的预测值 Notes: ------ - 这是简化实现,实际 SARIMA 需要复杂的参数估计 - 季节性 AR/MA 项使用滞后 m 的倍数(如 t-m, t-2m 等) - 需要至少 2m 个数据点才能进行季节性差分 """ series_diff = difference(series, diff_order) series_season_diff = seasonal_difference(series_diff, seasonal_lag) errors = [] predictions = [] for t in range(len(series_season_diff)): if t >= max(ar_order, ma_order, seasonal_order * seasonal_lag): ar_part = ar_model(series_season_diff[:t], ar_order, ar_coeffs) ma_part = ma_model(errors[:t], ma_order, ma_coeffs) seasonal_ar_part = ar_model(series_season_diff[:t], seasonal_order, seasonal_ar_coeffs) seasonal_ma_part = ma_model(errors[:t], seasonal_order, seasonal_ma_coeffs) predicted_value = ar_part + ma_part + seasonal_ar_part + seasonal_ma_part else: predicted_value = 0.0 actual_value = series_season_diff[t] error = actual_value - predicted_value errors.append(error) predictions.append(predicted_value) return predictions
data = np.random.randn(100).cumsum()
ar_order, ma_order, diff_order = 2, 2, 1 seasonal_order, seasonal_lag = 2, 12
ar_coeffs = [0.5, -0.25] ma_coeffs = [0.4, 0.3]
seasonal_ar_coeffs = [0.3, -0.2] seasonal_ma_coeffs = [0.2, 0.1]
predictions = sarima(data, ar_order, ma_order, diff_order, seasonal_order, seasonal_lag, ar_coeffs, ma_coeffs, seasonal_ar_coeffs, seasonal_ma_coeffs) print(f"SARIMA 预测值数量: {len(predictions)}") print(f"前 5 个预测值: {predictions[:5]}")
|
关键点解读:
双重差分的必要性:普通差分消除趋势,季节性差分消除周期性。如果只做季节性差分,趋势仍然存在;如果只做普通差分,季节性模式仍然存在。两者结合才能得到真正平稳的序列。
季节性滞后的选择:
必须等于数据的实际周期长度。错误选择
会导致季节性差分无效,甚至引入虚假模式。可以通过 ACF 图在滞后
处出现峰值来识别。
非季节性与季节性的交互:两个组件独立建模但共同作用。非季节性部分捕捉"昨天对今天的影响",季节性部分捕捉"去年同一天对今天的影响",两者叠加形成完整预测。
常见问题:
**Q: 如何确定季节性周期长度$ m m, 2m, 3m$ 处出现峰值来验证。
Q: SARIMA 需要多少数据?
- A: 至少需要
个观测值。例如月度数据()的 SARIMA(1,1,1)(1,1,1,12)至少需要 个月的数据。
Q: 季节性差分后序列长度大幅减少怎么办?
- A: 这是正常现象。如果数据量不足,考虑使用其他方法(如
Prophet)或降低季节性阶数。
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from statsmodels.tsa.statespace.sarimax import SARIMAX
ts_data = pd.Series(data, index=pd.date_range('2020-01-01', periods=36, freq='M'))
model = SARIMAX(ts_data, order=(2, 1, 2), seasonal_order=(2, 1, 2, 12)) fitted_model = model.fit(disp=False)
print(fitted_model.summary())
forecast = fitted_model.forecast(steps=12) print(f"未来 12 个月预测:\n{forecast}")
import matplotlib.pyplot as plt plt.figure(figsize=(12, 6)) plt.plot(ts_data, label='实际值') plt.plot(fitted_model.fittedvalues, label='拟合值', linestyle='--') plt.plot(pd.date_range(ts_data.index[-1], periods=13, freq='M')[1:], forecast, label='预测值', marker='o') plt.legend() plt.title('SARIMA 模型拟合与预测') plt.show()
|
VAR( Vector AutoRegressive
Model)
VAR模型用于多变量时间序列的建模。它假设每个时间序列不仅与自身的过去值有关,还与其他序列的过去值有关。
VAR 模型的数学表达式为: $$
Y_t = A_1 Y_{t-1} + A_2 Y_{t-2} + + A_p Y_{t-p} + _t $$
其中:
-
是包含多个时间序列的向量;
- 是滞后系数矩阵;
- 是误差向量。
每个
向量表示多个时间序列的当前值,
是描述各序列之间相互关系的系数矩阵。
VAR
实现:多变量时间序列的联动建模
问题背景:实际应用中,多个时间序列往往相互影响(如
GDP 与失业率、股价与成交量、温度与湿度)。单变量 ARIMA
模型无法捕捉这种跨序列的依赖关系。 VAR
模型通过将每个变量的当前值表示为所有变量(包括自身)历史值的线性组合,能够同时建模多个序列及其相互影响。
解决思路: VAR
模型的核心是"向量自回归"——每个变量的当前值不仅依赖自身的历史值,还依赖其他变量的历史值。对于 个变量的 VAR($ p
k k p i j l$
步的影响"。模型通过最小二乘法或最大似然估计求解系数矩阵。
设计考虑: 1.
变量选择:所有变量必须平稳(或协整),否则需要先差分 2.
滞后阶数:通过信息准则( AIC/BIC)选择最优$ p p=1-4A_{i,j}^{(l)} j l i$ 的边际影响 4. Granger
因果检验:检验一个变量是否"Granger
导致"另一个变量(即历史值是否有助于预测)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| import numpy as np
def var_model(data, lag): """ 手动实现 VAR(p)模型:多变量时间序列的向量自回归 核心思想:每个变量的当前值 = 所有变量(包括自身)历史值的线性组合 数学表达: Y_t = A_1 · Y_{t-1} + A_2 · Y_{t-2} + ... + A_p · Y_{t-p} + ε_t 其中 Y_t 是 k 维向量( k 个变量), A_i 是 k × k 系数矩阵 Parameters: ----------- data : array-like, shape (n, k) 多变量时间序列数据 - n:时间步数 - k:变量数量(这里 k=2,处理 2 个时间序列) 例如:[[y1_t0, y2_t0], [y1_t1, y2_t1], ..., [y1_tn, y2_tn]] lag : int 滞后阶数 p,表示使用过去多少个时间步 - p=1:只使用前 1 步( VAR(1)) - p=2:使用前 2 步( VAR(2)) - 典型值: 1-4,通过 AIC/BIC 选择 Returns: -------- predictions : list 预测值列表,每个元素是(k,)形状的元组 长度为 n - lag(前 lag 个时间步无法预测) Notes: ------ - 这是简化实现,实际 VAR 需要估计系数矩阵 A_i(通过 OLS 或 MLE) - 这里假设系数已知(实际应用中需要先估计) - 所有变量必须平稳(或协整),否则需要先差分 - VAR 模型可以捕捉变量间的相互影响和动态关系 Example: -------- >>> data = np.column_stack((np.random.randn(100).cumsum(), ... np.random.randn(100).cumsum())) >>> predictions = var_model(data, lag=2) >>> # 每个预测是(y1_pred, y2_pred)的元组 """ n = len(data) k = data.shape[1] if len(data.shape) > 1 else 1 coefficients = np.zeros((k, lag)) errors = [] predictions = [] for t in range(lag, n): y1 = data[t][0] y2 = data[t][1] pred_y1 = sum([coefficients[0][i] * data[t - i - 1][0] for i in range(lag)]) pred_y2 = sum([coefficients[1][i] * data[t - i - 1][1] for i in range(lag)]) error_y1 = y1 - pred_y1 error_y2 = y2 - pred_y2 errors.append((error_y1, error_y2)) predictions.append((pred_y1, pred_y2)) return predictions
data = np.column_stack((np.random.randn(100).cumsum(), np.random.randn(100).cumsum())) lag = 2
predictions = var_model(data, lag) print(f"预测值数量: {len(predictions)}") print(f"前 5 个预测值: {predictions[:5]}")
|
关键点解读:
向量自回归的含义: VAR
模型的核心是"向量"——所有变量同时建模,每个变量的方程都包含所有变量的滞后项。这与单变量
ARIMA 不同: ARIMA 只考虑自身历史, VAR 考虑所有变量的历史。例如, GDP
的方程不仅包含 GDP
的历史值,还包含失业率、通胀率等变量的历史值。
系数矩阵的经济含义:系数矩阵 的元素 表示"变量 在
步前对变量 的边际影响"。如果,说明失业率上升 1 个单位,下一期 GDP 下降
个单位。这种解释使得 VAR 模型在经济学中广泛应用。
Granger 因果关系: VAR 模型的一个重要应用是检验
Granger 因果关系。如果变量
的历史值有助于预测变量(在控制 自身历史值后),则称"Granger 导致"。这不等同于真正的因果关系,但能揭示预测关系。
常见问题:
| 问题 |
解答 |
| Q: VAR 模型需要多少数据? |
A: 至少需要 个时间步,其中 是变量数,
是滞后阶数。例如 2 变量 VAR(2)至少需要 54 个时间步。 |
| **Q: 如何选择滞后阶数$ p p=1 p
k$ 增加时,参数数量
快速增长。通常,超过 10 个变量时考虑降维或因子模型。 |
|
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| from statsmodels.tsa.vector_ar.var_model import VAR import pandas as pd
data_dict = { 'GDP': gdp_data, '失业率': unemployment_data, '通胀率': inflation_data } data_df = pd.DataFrame(data_dict)
model = VAR(data_df)
lag_order = model.select_order(maxlags=10) print(f"最优滞后阶数: {lag_order.selected_orders}")
results = model.fit(maxlags=lag_order.selected_orders['aic']) print(results.summary())
forecast = results.forecast(data_df.values[-lag_order.selected_orders['aic']:], steps=5) print(f"未来 5 步预测:\n{forecast}")
from statsmodels.tsa.stattools import grangercausalitytests print("\n 检验'失业率'是否 Granger 导致'GDP':") grangercausalitytests(data_df[['GDP', '失业率']], maxlag=4)
|
GARCH(
Generalized Autoregressive Conditional Heteroskedasticity)
GARCH模型用于建模时间序列中的条件异方差,特别适用于金融数据中的波动性预测。
GARCH 模型将过去的残差和过去的方差用于预测未来的方差。
GARCH(1, 1)模型的核心公式为:
其中:
- 是时间
的条件方差;
-
是模型的参数;
- 是残差序列。
该模型通过递归计算的方式来预测序列的波动性。
GARCH
实现:波动率建模与风险预测
问题背景:金融时间序列(如股价、汇率)的一个重要特征是"波动聚集"(
Volatility
Clustering)——大波动后往往继续大波动,小波动后往往继续小波动。传统 ARIMA
模型假设方差恒定(同方差),无法捕捉这种时变的波动性。 GARCH
模型通过将条件方差建模为历史残差平方和条件方差的函数,能够预测"明天的波动会有多大",这对风险管理至关重要。
解决思路: GARCH
模型的核心思想是"条件异方差"——方差不是常数,而是随时间变化的。
GARCH(1,1)模型用三个成分预测条件方差: 1)常数项(长期平均方差), 2) ARCH
项(捕捉"昨天的冲击对今天波动的影响"), 3)
GARCH 项(捕捉"昨天的波动对今天波动的影响")。参数需满足(平稳性条件)。
设计考虑: 1. 均值模型: GARCH
通常与 ARMA 结合,先建模均值(如 ARMA-GARCH),再建模残差的方差 2.
参数约束:,,(保证方差为正且平稳) 3.
初始方差:通常用样本方差或无条件方差 初始化 4.
应用场景:主要用于金融数据(股价、汇率、收益率),对非金融数据效果有限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| import numpy as np
def garch_model(returns, alpha0, alpha1, beta1): """ 手动实现 GARCH(1,1)模型:条件异方差建模 核心思想:条件方差不是常数,而是历史残差平方和条件方差的函数 数学表达:σ²_t = α₀ + α₁·ε²_{t-1} + β₁·σ²_{t-1} 其中: - σ²_t:时间 t 的条件方差(要预测的波动率) - ε_{t-1}:时间 t-1 的残差(实际值 - 预测值) - σ²_{t-1}:时间 t-1 的条件方差 参数含义: - α₀:长期平均方差(基准波动率) - α₁: ARCH 系数("昨天的冲击"对今天波动的影响) - β₁: GARCH 系数("昨天的波动"对今天波动的影响) Parameters: ----------- returns : array-like, shape (n,) 收益率序列(通常是价格的对数差分或百分比变化) 例如:股票日收益率、汇率变化率 注意:必须是平稳序列(收益率通常已平稳) alpha0 : float 常数项,必须 > 0 - 表示长期平均方差水平 - 典型值: 0.01-0.1(取决于收益率尺度) alpha1 : float ARCH 系数,范围 [0, 1) - 控制"昨天的冲击"对今天波动的影响 - α₁越大:对冲击反应越快,但波动更不稳定 - 典型值: 0.1-0.3 beta1 : float GARCH 系数,范围 [0, 1) - 控制"昨天的波动"对今天波动的影响(波动持续性) - β₁越大:波动更持久(波动聚集效应更强) - 典型值: 0.6-0.9 - 通常 β₁ > α₁(波动持续性 > 冲击敏感性) Returns: -------- variances : numpy.ndarray, shape (n,) 每个时间步的条件方差序列 - variances[0]:初始方差(通常为 0 或样本方差) - variances[t]:时间 t 的条件方差(基于 t-1 及之前的信息) Notes: ------ - 参数必须满足:α₀ > 0,α₁ ≥ 0,β₁ ≥ 0,α₁ + β₁ < 1(平稳性条件) - 如果 α₁ + β₁ ≈ 1:接近 IGARCH(积分 GARCH),波动具有长记忆 - 初始方差通常设为样本方差或无条件方差:α₀/(1-α₁-β₁) - GARCH 模型主要用于金融数据,对非金融数据(如温度、销量)效果有限 Example: -------- >>> returns = np.random.randn(100) * 0.02 # 模拟 2%标准差的收益率 >>> variances = garch_model(returns, alpha0=0.0001, alpha1=0.1, beta1=0.85) >>> # variances 表示每个时间步的预测波动率 >>> volatility = np.sqrt(variances) # 转换为标准差 """ n = len(returns) variances = np.zeros(n) errors = np.zeros(n) if alpha1 + beta1 < 1: initial_variance = alpha0 / (1 - alpha1 - beta1) else: initial_variance = np.var(returns) variances[0] = initial_variance errors[0] = returns[0] - np.mean(returns) for t in range(1, n): variances[t] = alpha0 + alpha1 * errors[t-1]**2 + beta1 * variances[t-1] errors[t] = returns[t] - np.mean(returns) return variances
np.random.seed(42) n = 500 returns = np.random.randn(n) * 0.02
alpha0 = 0.0001 alpha1 = 0.1 beta1 = 0.85
variances = garch_model(returns, alpha0, alpha1, beta1)
volatility = np.sqrt(variances)
print(f"平均波动率: {volatility.mean():.4f}") print(f"最大波动率: {volatility.max():.4f}") print(f"最小波动率: {volatility.min():.4f}")
|
关键点解读:
波动聚集的数学表达: GARCH 模型通过项捕捉波动聚集。如果 接近 1(如 0.9),昨天的波动 会强烈影响今天的波动,导致"大波动后继续大波动"的现象。这是金融数据的典型特征:市场恐慌时波动持续放大,市场平静时波动持续缩小。
ARCH vs GARCH: ARCH 模型只使用残差平方预测方差, GARCH
模型同时使用残差平方和条件方差。 GARCH 的优势是参数更少(
GARCH(1,1)等价于无限阶 ARCH),且能更好地捕捉波动的持续性。实际应用中,
GARCH(1,1)通常足够,很少需要 GARCH(p,q)其中 或。
参数的经济含义:
衡量"波动冲击的衰减速度"。如果,意味着波动冲击以 5%的速度衰减(半衰期约 14
天)。如果 接近
1,波动具有长记忆(接近 IGARCH),适合建模金融危机等极端事件。
常见问题:
| 问题 |
解答 |
| Q: GARCH 模型预测的是什么? |
A:
预测的是条件方差(波动率的平方),而非收益率本身。要预测收益率,需要结合均值模型(如
ARMA-GARCH)。 |
| Q: 如何选择 GARCH 的阶数(p,q)? |
A: 通常 GARCH(1,1)足够。如果残差检验显示需要更高阶,可以尝试
GARCH(2,1)或 GARCH(1,2),但参数数量快速增长。 |
| Q: GARCH 适用于非金融数据吗? |
A: 通常不适用。 GARCH
假设"波动聚集",这在金融数据中常见,但在销量、温度等数据中较少见。非金融数据通常方差相对稳定。 |
| Q: 如何估计 GARCH 参数? |
A: 使用最大似然估计( MLE)。需要假设残差分布(通常为正态分布或 t
分布)。实际应用中可以使用arch库或statsmodels。 |
| Q: GARCH 模型能预测多远的未来? |
A: 可以预测任意步数,但长期预测会收敛到无条件方差。通常用于
1-5 步的短期波动预测。 |
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| from arch import arch_model import pandas as pd
returns_data = pd.Series(returns, index=pd.date_range('2020-01-01', periods=len(returns), freq='D'))
model = arch_model(returns_data, mean='Zero', vol='GARCH', p=1, q=1) fitted_model = model.fit(disp='off')
print(fitted_model.summary())
conditional_variance = fitted_model.conditional_volatility ** 2
forecast = fitted_model.forecast(horizon=5) future_variance = forecast.variance.values[-1, :] future_volatility = np.sqrt(future_variance)
print(f"\n 未来 5 步波动率预测:") for i, vol in enumerate(future_volatility): print(f" 第{i+1}步: {vol:.4f}")
import matplotlib.pyplot as plt fig, axes = plt.subplots(2, 1, figsize=(12, 8)) axes[0].plot(returns_data.index, returns_data.values, label='收益率', alpha=0.7) axes[0].set_ylabel('收益率') axes[0].legend() axes[0].grid(True, alpha=0.3)
axes[1].plot(returns_data.index, fitted_model.conditional_volatility, label='条件波动率', color='orange', linewidth=2) axes[1].set_ylabel('波动率(标准差)') axes[1].set_xlabel('时间') axes[1].legend() axes[1].grid(True, alpha=0.3) plt.tight_layout() plt.show()
|
Exponential
Smoothing(指数平滑)
指数平滑是一种用于时间序列数据的平滑技术,它对较新的数据点赋予更高的权重,常用于平稳序列的预测。指数平滑模型有多种形式,包括单指数平滑、双指数平滑和三指数平滑(
Holt-Winters 方法)。
单指数平滑
单指数平滑公式为: $$
S_t = Y_t + (1 - ) S_{t-1} $$
其中:
单指数平滑实现:加权移动平均的递归形式
问题背景:简单移动平均对所有历史数据赋予相同权重,但实际中"越近的数据越重要"。单指数平滑通过指数衰减权重实现这一思想:当前观测权重为, 步前的观测权重为,形成指数衰减。
解决思路:采用递归更新策略,每次只保留一个平滑值,新值通过当前观测和平滑值的加权平均得到。权重 控制对新信息的敏感度: 接近 1 时快速响应变化, 接近 0 时更平滑但响应慢。
设计考虑: 1.
初始值选择:通常用第一个观测值,或用前几个观测值的均值 2.
的选择:通过交叉验证或网格搜索选择,范围通常在 0.1-0.3 之间 3.
预测公式:未来 步预测为(常数预测) 4.
适用场景:仅适用于无趋势、无季节性的平稳序列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| def single_exponential_smoothing(data, alpha): """ 手动实现单指数平滑( Simple Exponential Smoothing) 核心思想:对历史数据赋予指数衰减权重,越近的数据权重越大 数学表达: S_t = α· Y_t + (1-α)· S_{t-1} 权重分布:当前观测权重α, k 步前观测权重α(1-α)^k(指数衰减) Parameters: ----------- data : array-like 输入时间序列,形状为 (n,) alpha : float 平滑系数,范围 [0, 1] - α接近 1:快速响应新数据,但波动大(适合变化快的序列) - α接近 0:平滑效果好,但响应慢(适合稳定序列) - 典型值: 0.1-0.3 Returns: -------- smoothed_data : list 平滑后的序列,长度与输入相同 Notes: ------ - 初始值使用第一个观测值: S_0 = Y_0 - 预测公式:未来 h 步预测 = S_t(常数预测) - 仅适用于无趋势、无季节性的平稳序列 Example: -------- >>> data = [10, 12, 11, 13, 12, 14, 13] >>> smoothed = single_exponential_smoothing(data, alpha=0.3) >>> # S_1 = 0.3*12 + 0.7*10 = 10.6 >>> # S_2 = 0.3*11 + 0.7*10.6 = 10.72 """ smoothed_data = [data[0]] for t in range(1, len(data)): smoothed_value = alpha * data[t] + (1 - alpha) * smoothed_data[-1] smoothed_data.append(smoothed_value) return smoothed_data
data = np.random.randn(100).cumsum() alpha = 0.2 smoothed_data = single_exponential_smoothing(data, alpha)
import matplotlib.pyplot as plt plt.figure(figsize=(12, 6)) plt.plot(data, label='原始数据', alpha=0.7) plt.plot(smoothed_data, label=f'平滑数据 (α={alpha})', linewidth=2) plt.legend() plt.title('单指数平滑效果对比') plt.xlabel('时间步') plt.ylabel('值') plt.show()
print(f"原始数据标准差: {np.std(data):.4f}") print(f"平滑数据标准差: {np.std(smoothed_data):.4f}")
|
关键点解读:
指数衰减的数学本质:展开递归式可得 ,权重 呈指数衰减。例如 时,当前权重 0.2, 1 步前
0.16, 2 步前 0.128,权重快速衰减。
的权衡:
大时模型快速适应变化但噪声敏感,
小时平滑效果好但可能滞后。实际选择需要平衡"跟踪能力"和"平滑程度"。
预测的局限性:单指数平滑只能做常数预测(未来值=当前平滑值),无法捕捉趋势或季节性,这是其最大局限。
常见问题:
- Q: 如何选择最优的?
- A: 使用时间序列交叉验证,选择使验证集 MSE 最小的。也可以使用
statsmodels的自动优化功能。
- Q: 单指数平滑适用于什么场景?
- A:
仅适用于无趋势、无季节性的平稳序列。如果序列有明显趋势,应使用双指数平滑;如果有季节性,应使用
Holt-Winters 。
- Q: 初始值的选择重要吗?
- A: 对于长序列影响不大,因为
会快速衰减。对于短序列,可以用前几个观测值的均值初始化。
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| from statsmodels.tsa.holtwinters import SimpleExpSmoothing
ts_data = pd.Series(data, index=pd.date_range('2020-01-01', periods=100, freq='D'))
model = SimpleExpSmoothing(ts_data) fitted_model = model.fit(optimized=True)
print(f"最优平滑系数α: {fitted_model.params['smoothing_level']:.4f}")
forecast = fitted_model.forecast(steps=10) print(f"未来 10 天预测: {forecast}")
forecast_ci = fitted_model.get_prediction(start=len(ts_data), end=len(ts_data)+9).conf_int() print(f"95%置信区间:\n{forecast_ci}")
|
双指数平滑(用于趋势)
双指数平滑通过引入趋势项来预测具有线性趋势的序列。公式为: $$
S_t = Y_t + (1 - )(S_{t-1} + T_{t-1}) $
$
$$
T_t = (S_t - S_{t-1}) + (1 - ) T_{t-1} $$
其中:
双指数平滑实现:捕捉线性趋势的
Holt 方法
问题背景:单指数平滑假设序列无趋势,但实际中许多序列呈现明显的上升或下降趋势(如销量逐年增长、股价长期上涨)。如果对带趋势的序列使用单指数平滑,预测会系统性滞后。
解决思路: Holt 双指数平滑引入独立的趋势项,分别平滑"水平"和"趋势"两个成分。水平 表示序列的当前值,趋势
表示序列的变化速度(斜率)。预测时,未来 步的预测为,即当前水平加上趋势的线性外推。
设计考虑: 1. 水平更新:新水平 =
·当前观测 + ·(上一水平+上一趋势),体现"趋势延续"的假设
2. 趋势更新:新趋势 = ·水平变化 + ·上一趋势,平滑趋势的变化 3.
初始值:水平用第一个观测值,趋势用前两个观测值的差值 4.
参数选择:
控制水平更新速度,
控制趋势更新速度,通常都较小( 0.1-0.3)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| def double_exponential_smoothing(data, alpha, beta): """ 手动实现双指数平滑( Holt 方法, Double Exponential Smoothing) 核心思想:将序列分解为"水平"和"趋势"两个成分,分别平滑 水平更新: S_t = α· Y_t + (1-α)·(S_{t-1} + T_{t-1}) 趋势更新: T_t = β·(S_t - S_{t-1}) + (1-β)· T_{t-1} 预测公式:Ŷ_{t+h} = S_t + h · T_t(线性外推) Parameters: ----------- data : array-like 输入时间序列,形状为 (n,),应包含明显的线性趋势 alpha : float 水平平滑系数,范围 [0, 1] - 控制对当前观测的响应速度 - 典型值: 0.1-0.3 beta : float 趋势平滑系数,范围 [0, 1] - 控制对趋势变化的响应速度 - 通常略小于 alpha,典型值: 0.05-0.2 Returns: -------- smoothed_data : list 平滑后的序列,长度与输入相同 Notes: ------ - 初始水平: S_0 = Y_0(第一个观测值) - 初始趋势: T_0 = Y_1 - Y_0(前两个观测值的差值) - 适用于有线性趋势但无季节性的序列 - 预测未来 h 步:Ŷ_{t+h} = S_t + h · T_t Example: -------- >>> data = [10, 12, 14, 16, 18, 20] # 线性增长趋势 >>> smoothed = double_exponential_smoothing(data, alpha=0.3, beta=0.1) >>> # S_1 = 0.3*12 + 0.7*(10+2) = 12.0 >>> # T_1 = 0.1*(12-10) + 0.9*2 = 2.0 """ level = data[0] trend = data[1] - data[0] smoothed_data = [level] for t in range(1, len(data)): new_level = alpha * data[t] + (1 - alpha) * (level + trend) new_trend = beta * (new_level - level) + (1 - beta) * trend level, trend = new_level, new_trend smoothed_data.append(level + trend) return smoothed_data
trend_data = np.linspace(10, 50, 100) + np.random.randn(100) * 2 alpha, beta = 0.2, 0.1 smoothed_data = double_exponential_smoothing(trend_data, alpha, beta)
plt.figure(figsize=(12, 6)) plt.plot(trend_data, label='原始数据(带趋势)', alpha=0.7) plt.plot(smoothed_data, label=f'双指数平滑 (α={alpha}, β={beta})', linewidth=2) plt.legend() plt.title('双指数平滑效果对比(捕捉趋势)') plt.xlabel('时间步') plt.ylabel('值') plt.show()
last_level = smoothed_data[-1] - (smoothed_data[-1] - smoothed_data[-2]) last_trend = smoothed_data[-1] - smoothed_data[-2] future_forecast = [last_level + h * last_trend for h in range(1, 6)] print(f"未来 5 步预测: {future_forecast}")
|
关键点解读:
水平更新的直觉:公式 中的 表示"如果趋势延续,当前应该的值"。如果实际观测
与这个值接近,说明趋势稳定;如果差异大,说明趋势可能改变。
趋势更新的平滑性:趋势本身也会变化(加速或减速),但变化是平滑的。 控制趋势变化的响应速度: 大时快速适应趋势变化, 小时趋势更稳定但可能滞后。
线性外推的假设:预测公式
假设趋势在未来保持不变。这对于短期预测(
小)通常合理,但长期预测可能不准确。
常见问题:
- Q: 如何选择
和?
- A: 通过时间序列交叉验证选择使验证集 MSE 最小的组合。通常 略小于,因为趋势变化通常比水平变化更平滑。
- Q: 双指数平滑能处理非线性趋势吗?
- A:
不能。双指数平滑假设线性趋势。对于非线性趋势(如指数增长),需要对数变换后再应用,或使用其他方法(如
Prophet)。
- Q: 预测的置信区间如何计算?
- A:
双指数平滑的预测误差方差会随时间增长。可以使用状态空间模型框架计算理论置信区间,或使用
Bootstrap 方法。
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from statsmodels.tsa.holtwinters import ExponentialSmoothing
ts_data = pd.Series(trend_data, index=pd.date_range('2020-01-01', periods=100, freq='D'))
model = ExponentialSmoothing(ts_data, trend='add', seasonal=None) fitted_model = model.fit(optimized=True)
print(f"最优水平系数α: {fitted_model.params['smoothing_level']:.4f}") print(f"最优趋势系数β: {fitted_model.params['smoothing_slope']:.4f}")
forecast = fitted_model.forecast(steps=10) print(f"未来 10 天预测:\n{forecast}")
plt.figure(figsize=(14, 6)) plt.plot(ts_data, label='原始数据') plt.plot(fitted_model.fittedvalues, label='拟合值', linestyle='--') plt.plot(pd.date_range(ts_data.index[-1], periods=11, freq='D')[1:], forecast, label='预测值', marker='o') plt.legend() plt.title('双指数平滑:拟合与预测') plt.show()
|
三指数平滑( Holt-Winters
方法)
三指数平滑( Holt-Winters
方法)是一种扩展的指数平滑技术,专门用于处理带有趋势和季节性波动的时间序列数据。它在预测时不仅考虑序列的当前水平和趋势,还能够捕捉到周期性的季节变化。该方法通常用于金融、零售和能源等领域,帮助预测具有重复模式的时间序列数据。
模型组成部分
Holt-Winters 方法由以下三部分组成:
- 水平( Level):表示当前的估计水平值。
- 趋势( Trend):表示时间序列的线性趋势。
- 季节性(
Seasonality):表示时间序列中的季节性周期波动。
每个部分的更新基于前一时间步的估计,类似于单指数和平滑和双指数平滑。三指数平滑的目标是捕捉到这些成分的动态变化,并使用它们来进行未来的预测。
公式
在 Holt-Winters
方法中,我们有以下三个核心公式来分别更新水平、趋势和季节性成分。
水平更新公式( Level equation):
$$
L_{t}=( {S_{t-m}} )+(1-)(L_{t-1}+T_{t-1})$$
其中:
- 是当前时刻
的水平估计;
-
是当前时刻的实际观察值;
- 是
个周期前的季节性因子( 为季节周期长度);
- 是水平平滑系数,。
趋势更新公式( Trend equation):
$$
T_{t}=(L_{t}-L_{t-1})+(1-) T_{t-1}$$
其中:
季节性更新公式( Seasonal equation):
$$
S_{t}=( {L_{t}} )+(1-) S_{t-m}$$
其中:
预测公式
一旦我们通过以上公式得到了当前时刻
的水平、趋势和季节性成分,可以通过以下公式预测未来的值。
对于未来 时刻的预测:
其中:
- 是
时刻的预测值;
- 是当前时刻
的水平估计;
- 是当前时刻
的趋势估计;
- 是与
时刻相对应的季节性因子。
Holt-Winters
三指数平滑实现:同时处理趋势和季节性
问题背景:实际时间序列往往同时包含趋势和季节性(如零售销量既有长期增长趋势,又有年度周期性波动)。双指数平滑只能处理趋势,无法捕捉"每年
12 月销量都高"这类季节性模式。 Holt-Winters 通过引入季节性因子,将序列分解为水平、趋势、季节性三个独立成分。
解决思路:采用"去季节化→平滑→加回季节性"的策略。水平更新时先除以季节性因子去除季节性影响,得到"去季节化"的值;然后按双指数平滑的方式更新水平和趋势;最后单独更新季节性因子。预测时,将水平+趋势的线性外推乘以对应的季节性因子。
设计考虑: 1.
季节性因子的周期性:需要维护 个季节性因子(),对应一个完整周期内的每个位置 2.
去季节化操作:水平更新公式中的
表示"去除季节性后的值",这样才能正确估计水平 3.
初始值估计:需要至少
个观测值才能初始化水平和趋势,季节性因子用第一个周期的数据估计 4.
加法 vs
乘法模型:这里实现的是乘法模型(季节性以比例形式出现),适用于季节性幅度随水平增长的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| import numpy as np
def holt_winters(data, alpha, beta, gamma, season_length, n_preds): """ Holt-Winters 三指数平滑算法实现(乘法模型) 模型结构:序列 = 水平 × 趋势 × 季节性 水平更新: L_t = α·(Y_t/S_{t-m}) + (1-α)·(L_{t-1}+T_{t-1}) 趋势更新: T_t = β·(L_t-L_{t-1}) + (1-β)· T_{t-1} 季节性更新: S_t = γ·(Y_t/L_t) + (1-γ)· S_{t-m} 预测公式:Ŷ_{t+h} = (L_t + h · T_t) × S_{t+h-m} Parameters: ----------- data : array-like 输入时间序列,形状为 (n,),应包含趋势和季节性 至少需要 2*season_length 个观测值 alpha : float 水平平滑系数,范围 [0, 1] - 控制对当前观测的响应速度 - 典型值: 0.1-0.3 beta : float 趋势平滑系数,范围 [0, 1] - 控制对趋势变化的响应速度 - 典型值: 0.05-0.2 gamma : float 季节性平滑系数,范围 [0, 1] - 控制对季节性变化的响应速度 - 典型值: 0.1-0.4 season_length : int 季节性周期长度 m - 月度数据: m=12(一年 12 个月) - 周度数据: m=52(一年 52 周) - 日度数据: m=7(一周 7 天)或 m=365(一年 365 天) n_preds : int 要预测的未来时间步数 Returns: -------- predictions : list 未来 n_preds 步的预测值 Notes: ------ - 这是乘法模型(季节性以比例形式出现) - 适用于季节性幅度随水平增长的场景(如销量:水平越高,季节性波动越大) - 如果季节性幅度恒定,应使用加法模型: Y_t = L_t + T_t + S_t Example: -------- >>> data = [100, 120, 140, 130, 110, 130, 150, ...] # 月度数据,有年度周期 >>> predictions = holt_winters(data, alpha=0.2, beta=0.1, gamma=0.3, ... season_length=12, n_preds=12) >>> # 预测未来 12 个月(一个完整年度周期) """ L = [np.mean(data[:season_length])] T = [(np.mean(data[season_length:2*season_length]) - np.mean(data[:season_length])) / season_length] S = [data[i] / L[0] for i in range(season_length)] for t in range(len(data)): if t >= season_length: L_t = alpha * (data[t] / S[t - season_length]) + (1 - alpha) * (L[-1] + T[-1]) T_t = beta * (L_t - L[-1]) + (1 - beta) * T[-1] S_t = gamma * (data[t] / L_t) + (1 - gamma) * S[t - season_length] L.append(L_t) T.append(T_t) S.append(S_t) else: L.append(L[-1]) T.append(T[-1]) S.append(S[t]) predictions = [] for h in range(1, n_preds + 1): seasonal_idx = (-season_length + h) % season_length if seasonal_idx < 0: seasonal_idx += season_length pred_value = (L[-1] + h * T[-1]) * S[seasonal_idx] predictions.append(pred_value) return predictions
data = np.array([112, 118, 132, 129, 121, 135, 148, 148, 136, 119, 104, 118, 115, 126, 141, 135, 125, 149, 170, 170, 158, 133, 114, 140])
alpha = 0.2 beta = 0.1 gamma = 0.3 season_length = 12 n_preds = 12
predictions = holt_winters(data, alpha, beta, gamma, season_length, n_preds) print(f"Holt-Winters 预测(未来 12 个月):") for i, pred in enumerate(predictions): print(f" 第{i+1}个月: {pred:.2f}")
plt.figure(figsize=(14, 6)) months = range(1, len(data) + 1) future_months = range(len(data) + 1, len(data) + n_preds + 1) plt.plot(months, data, 'o-', label='历史数据', linewidth=2) plt.plot(future_months, predictions, 's-', label='Holt-Winters 预测', linewidth=2, markersize=8) plt.axvline(x=len(data), color='r', linestyle='--', label='预测起点') plt.legend() plt.title('Holt-Winters 三指数平滑:历史拟合与未来预测') plt.xlabel('月份') plt.ylabel('销量') plt.grid(True, alpha=0.3) plt.show()
|
关键点解读:
去季节化的必要性:水平更新公式中的
是关键。如果不除以季节性因子,季节性波动会被误认为是水平变化,导致水平估计偏差。例如,如果
12 月销量总是高(季节性),不除以季节性因子会高估 12 月的水平。
季节性因子的周期性存储:需要维护
个季节性因子,对应周期内的每个位置。预测时,通过模运算
找到对应的季节性因子。例如预测第 13 个月时,使用第 1 个月()的季节性因子。
乘法 vs 加法模型:这里实现的是乘法模型(),适用于季节性幅度随水平增长的场景。如果季节性幅度恒定(如温度:夏天总是比冬天高
20 度),应使用加法模型()。
常见问题:
- Q: 如何判断使用乘法模型还是加法模型?
- A:
如果季节性波动幅度随序列水平增长而增大(如销量:高销量时波动更大),用乘法模型;如果季节性波动幅度恒定(如温度),用加法模型。可以通过可视化判断。
- Q: 初始值的选择对结果影响大吗?
- A: 对于长序列(>3
个周期)影响不大,因为指数平滑会快速"忘记"初始值。对于短序列,初始值选择很重要,建议使用更稳健的初始化方法。
- Q: 如何选择?
- A: 使用时间序列交叉验证,选择使验证集 MSE
最小的组合。也可以使用
statsmodels的自动优化功能。通常,因为趋势变化最慢,季节性变化最快。
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| from statsmodels.tsa.holtwinters import ExponentialSmoothing
ts_data = pd.Series(data, index=pd.date_range('2020-01-01', periods=24, freq='M'))
model = ExponentialSmoothing( ts_data, trend='add', seasonal='mul', seasonal_periods=12 ) fitted_model = model.fit(optimized=True)
print(f"最优水平系数α: {fitted_model.params['smoothing_level']:.4f}") print(f"最优趋势系数β: {fitted_model.params['smoothing_slope']:.4f}") print(f"最优季节性系数γ: {fitted_model.params['smoothing_seasonal']:.4f}")
forecast = fitted_model.forecast(steps=12) print(f"\n 未来 12 个月预测:\n{forecast}")
plt.figure(figsize=(16, 10)) fig, axes = plt.subplots(4, 1, figsize=(16, 12))
axes[0].plot(ts_data, 'o-', label='原始数据', linewidth=2) axes[0].set_title('原始时间序列') axes[0].legend() axes[0].grid(True, alpha=0.3)
axes[1].plot(fitted_model.level, label='水平 L_t', linewidth=2) axes[1].set_title('水平成分') axes[1].legend() axes[1].grid(True, alpha=0.3)
axes[2].plot(fitted_model.trend, label='趋势 T_t', linewidth=2) axes[2].set_title('趋势成分') axes[2].legend() axes[2].grid(True, alpha=0.3)
axes[3].plot(fitted_model.seasonal[:12], 'o-', label='季节性因子 S_t', linewidth=2) axes[3].set_title('季节性成分(一个周期)') axes[3].legend() axes[3].grid(True, alpha=0.3) axes[3].set_xlabel('周期内位置(月份)')
plt.tight_layout() plt.show()
|
Prophet
Prophet是 Facebook
开发的一种时间序列预测模型,能够处理具有季节性、趋势和假日效应的复杂时间序列数据。
Prophet 模型可以分为三部分:趋势、季节性和假日效应。模型的形式为:
$$
y(t) = g(t) + s(t) + h(t) + _t $$
其中:
- 是趋势部分,用于捕捉长期变化;
- 是季节性部分,表示周期性变化;
- 是假日效应;
- 是误差项。
Prophet
实现:自动化时间序列分解与预测
问题背景:传统时间序列模型(如 ARIMA 、
Holt-Winters)需要大量手动调参和领域知识:选择差分阶数、识别季节性周期、处理异常值等。
Prophet
的设计目标是"开箱即用"——用户只需提供时间序列数据,模型自动处理趋势变化点、多个季节性周期(日/周/年)、假日效应和异常值,特别适合业务场景中的快速原型和自动化预测。
解决思路: Prophet
采用"可加性分解"的思想,将时间序列分解为三个可解释的组件: 1)趋势$
g(t) s(t)
h(t)$(用户提供假日列表,模型学习其影响)。预测时,三个组件相加得到最终预测。模型使用贝叶斯方法估计参数,并提供不确定性区间。
设计考虑: 1. 趋势变化点检测:
Prophet
自动检测趋势变化点(如产品发布、政策变化),允许趋势在不同段有不同的增长率
2.
多季节性支持:可以同时建模多个季节性周期(如日度数据的日周期+周周期+年周期),无需手动指定
3. 鲁棒性:对异常值和缺失值鲁棒,使用 M-estimator
而非最小二乘 4.
可解释性:分解结果可视化,用户可以理解趋势、季节性、假日的贡献
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| from prophet import Prophet import pandas as pd import numpy as np
data = pd.DataFrame({ 'ds': pd.date_range(start='2020-01-01', periods=100, freq='D'), 'y': np.random.randn(100).cumsum() })
model = Prophet( yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False, seasonality_mode='additive' )
model.fit(data)
future = model.make_future_dataframe(periods=10, freq='D')
forecast = model.predict(future)
print("未来 10 天的预测:") print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10))
fig = model.plot(forecast)
fig2 = model.plot_components(forecast)
|
关键点解读:
自动变化点检测: Prophet
的核心优势是自动检测趋势变化点。如果序列在某个时间点趋势突然改变(如产品发布导致销量增长加速),
Prophet
会自动识别并在该点前后使用不同的增长率。用户可以指定先验变化点或让模型自动检测。这比
ARIMA 更灵活, ARIMA 假设趋势是固定的(通过差分消除)。
傅里叶级数建模季节性: Prophet 使用傅里叶级数 建模季节性,其中 是周期长度, 是项数(控制平滑程度)。这比
Holt-Winters
更灵活:可以建模多个周期(日/周/年),且不需要完整周期的数据。例如,即使只有
3 个月的数据, Prophet 也能估计年度季节性(虽然不准确)。
不确定性量化: Prophet
不仅提供点预测,还提供不确定性区间。模型假设趋势变化率和季节性参数来自先验分布,通过
MCMC
采样得到后验分布,从而量化预测不确定性。这对于业务决策很重要:可以评估"预测值±不确定性"的风险范围。
常见问题:
| 问题 |
解答 |
| Q: Prophet 需要多少数据? |
A: 至少需要 2 个完整周期(如月度数据至少 24 个月)。但 Prophet
对数据量要求不高,即使只有几十个数据点也能工作(虽然准确性有限)。 |
| Q: Prophet 能处理缺失值吗? |
A: 可以。 Prophet 会自动处理缺失值,但建议在
fit()前手动处理(如插值),以获得更好的结果。 |
| Q: 如何添加自定义季节性? |
A:
使用add_seasonality()方法。例如:model.add_seasonality(name='monthly', period=30.5, fourier_order=5)添加月度季节性。 |
| Q: Prophet vs ARIMA 哪个更好? |
A: Prophet 优势:自动化、处理异常值、多季节性、假日效应。 ARIMA
优势:理论基础扎实、置信区间更准确、可解释性强。选择:快速原型用
Prophet,学术研究用 ARIMA 。 |
| Q: 如何处理乘法季节性? |
A:
设置seasonality_mode='multiplicative'。适用于季节性幅度随水平增长的场景(如销量:水平越高,季节性波动越大)。 |
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| from prophet import Prophet import pandas as pd import matplotlib.pyplot as plt
dates = pd.date_range('2022-01-01', '2023-12-31', freq='D')
trend = np.linspace(100, 200, len(dates)) weekly_seasonal = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 7) yearly_seasonal = 20 * np.sin(2 * np.pi * np.arange(len(dates)) / 365.25) noise = np.random.randn(len(dates)) * 5 sales = trend + weekly_seasonal + yearly_seasonal + noise
data = pd.DataFrame({ 'ds': dates, 'y': sales })
holidays = pd.DataFrame({ 'holiday': '春节', 'ds': pd.to_datetime(['2022-02-01', '2023-01-22']), 'lower_window': -2, 'upper_window': 2 })
model = Prophet( yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False, holidays=holidays, seasonality_mode='additive', changepoint_prior_scale=0.05 )
model.fit(data)
future = model.make_future_dataframe(periods=90, freq='D') forecast = model.predict(future)
fig1 = model.plot(forecast) plt.title('Prophet 预测结果:销量趋势与未来 90 天预测') plt.show()
fig2 = model.plot_components(forecast) plt.show()
print("\n 模型摘要:") print(f"检测到的变化点数量: {len(model.changepoints)}") print(f"年度季节性强度: {forecast['yearly'].std():.2f}") print(f"周度季节性强度: {forecast['weekly'].std():.2f}")
from prophet.diagnostics import cross_validation, performance_metrics df_cv = cross_validation(model, initial='365 days', period='30 days', horizon='60 days') df_perf = performance_metrics(df_cv) print("\n 交叉验证性能指标:") print(df_perf[['horizon', 'mape', 'rmse']].head())
|
Kalman Filter(卡尔曼滤波)
卡尔曼滤波是一种用于对时变系统进行递归估计的算法,广泛应用于噪声较大的系统中,用于平滑和预测。卡尔曼滤波的关键是通过对状态变量的估计和观测值之间的差异来更新估计。
卡尔曼滤波器的状态更新公式为: $$
x_{t|t-1} = F x_{t-1|t-1} + B u_t $
$
$$
P_{t|t-1} = F P_{t-1|t-1} F^T + Q $$
观测更新公式为: $$
K_t = P_{t|t-1} H^T (H P_{t|t-1} H^T + R)^{-1} $
$
$$
x_{t|t} = x_{t|t-1} + K_t (y_t - H x_{t|t-1}) $$
其中:
- 是时间 时刻的预测状态;
-
是预测误差协方差矩阵;
- 是卡尔曼增益;
- 是观测值;
- 是观测模型矩阵。
卡尔曼滤波器的优势在于它能够递归地更新状态估计,并且适用于实时系统。
卡尔曼滤波实现:状态空间模型的递归估计
问题背景:许多实际系统(如 GPS
定位、机器人导航、传感器融合)中,我们无法直接观测系统的真实状态,只能通过带噪声的观测值推断状态。例如,
GPS
位置有误差,温度传感器有噪声,股价的真实价值无法直接观测。卡尔曼滤波通过"预测-更新"的递归框架,结合系统动力学模型和观测模型,在不确定性中做出最优状态估计。
解决思路:卡尔曼滤波的核心是"贝叶斯更新"——每个时间步分为两个阶段:
1)预测步骤:根据系统动力学模型(状态转移矩阵)预测下一状态,并更新不确定性(协方差);
2)更新步骤:当获得新观测时,计算卡尔曼增益(权衡预测和观测的可靠性),融合预测和观测得到最优估计。卡尔曼增益的数学形式 自动平衡"预测不确定性"和"观测不确定性"。
设计考虑: 1.
状态空间模型:需要定义状态转移方程($ x_t = F x_{t-1} +
B u_t + w_t y_t = H
x_t + v_tQR x_0$ 和初始协方差 影响早期估计,通常 设较大表示不确定性高 4.
实时性:卡尔曼滤波是递归的,每个时间步只需 计算(
是状态维度),适合实时应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
| import numpy as np
def kalman_filter(data, F, B, H, Q, R, initial_state, initial_covariance): """ 手动实现一维卡尔曼滤波器:状态空间模型的递归估计 核心思想:通过"预测-更新"循环,融合系统动力学和观测信息,得到最优状态估计 数学框架: - 状态转移: x_t = F · x_{t-1} + B · u_t + w_t (w_t ~ N(0, Q)) - 观测方程: y_t = H · x_t + v_t (v_t ~ N(0, R)) - 预测步骤: x_{t|t-1} = F · x_{t-1|t-1} + B · u_t P_{t|t-1} = F · P_{t-1|t-1}· F^T + Q - 更新步骤: K_t = P_{t|t-1}· H^T ·(H · P_{t|t-1}· H^T + R)^{-1} x_{t|t} = x_{t|t-1} + K_t ·(y_t - H · x_{t|t-1}) P_{t|t} = (I - K_t · H)· P_{t|t-1} Parameters: ----------- data : array-like, shape (n,) 观测序列(带噪声的测量值) 例如: GPS 位置、传感器读数、股价观测值 F : float or array-like 状态转移矩阵(一维时为标量) - F=1:状态不变(如位置跟踪) - F>1:状态增长(如累积过程) - F<1:状态衰减(如衰减过程) B : float or array-like 控制输入矩阵(一维时为标量) - B=0:无外部控制输入 - B ≠ 0:有外部控制(如加速度、推力) H : float or array-like 观测矩阵(一维时为标量,通常 H=1) - H=1:直接观测状态(如位置观测位置) - H ≠ 1:间接观测(如速度观测位置) Q : float 过程噪声协方差(系统不确定性) - Q 小:系统模型可信度高,预测更依赖模型 - Q 大:系统模型不确定,更依赖观测 - 典型值: 0.001-0.1(取决于状态尺度) R : float 观测噪声协方差(传感器不确定性) - R 小:观测可信度高,更信任观测 - R 大:观测噪声大,更信任预测 - 典型值: 0.01-1.0(取决于观测尺度) initial_state : float 初始状态估计 - 可以是先验知识或第一次观测值 initial_covariance : float 初始状态协方差(不确定性) - 通常设较大(如 1.0)表示初始不确定性高 - 随着观测增加,协方差会逐渐减小 Returns: -------- estimates : list 每个时间步的状态估计序列 长度为 n(与输入数据长度相同) Notes: ------ - 卡尔曼滤波假设噪声是高斯白噪声(均值为 0,协方差 Q 和 R) - 如果噪声非高斯,可以使用扩展卡尔曼滤波( EKF)或无迹卡尔曼滤波( UKF) - 一维实现简化了矩阵运算,多维时需要矩阵乘法 - 卡尔曼增益 K_t 自动平衡预测和观测的权重 Example: -------- >>> # 跟踪一个随机游走过程(真实状态无法直接观测) >>> true_state = np.random.randn(100).cumsum() # 真实状态 >>> noisy_obs = true_state + np.random.randn(100) * 0.5 # 带噪声观测 >>> estimates = kalman_filter(noisy_obs, F=1, B=0, H=1, Q=0.01, R=0.25, ... initial_state=0, initial_covariance=1) >>> # estimates 应该接近 true_state(平滑后的估计) """ n = len(data) state_estimate = initial_state covariance_estimate = initial_covariance estimates = [] for t in range(n): state_predict = F * state_estimate + B covariance_predict = F * covariance_estimate * F + Q kalman_gain = covariance_predict * H / (H * covariance_predict * H + R) innovation = data[t] - H * state_predict state_estimate = state_predict + kalman_gain * innovation covariance_estimate = (1 - kalman_gain * H) * covariance_predict estimates.append(state_estimate) return estimates
np.random.seed(42) true_state = np.random.randn(100).cumsum() noisy_obs = true_state + np.random.randn(100) * 0.5
F = 1 B = 0 H = 1 Q = 0.01 R = 0.25 initial_state = 0 initial_covariance = 1.0
estimates = kalman_filter(noisy_obs, F, B, H, Q, R, initial_state, initial_covariance)
print(f"观测噪声标准差: {np.std(noisy_obs - true_state):.3f}") print(f"滤波后误差标准差: {np.std(np.array(estimates) - true_state):.3f}") print(f"噪声减少比例: {(1 - np.std(np.array(estimates) - true_state) / np.std(noisy_obs - true_state)) * 100:.1f}%")
|
关键点解读:
卡尔曼增益的自动平衡:卡尔曼增益
是算法的核心,它自动平衡预测和观测的权重。如果预测不确定性
大(模型不可靠)而观测不确定性
小(传感器精确),则 接近
1,更信任观测;反之,如果预测不确定性小而观测不确定性大,则 接近
0,更信任预测。这种自适应权重使得卡尔曼滤波在不确定性环境中做出最优估计。
递归更新的优势:卡尔曼滤波是递归的——每个时间步只需要当前观测和上一时刻的状态估计,不需要存储所有历史数据。这使得它适合实时应用(如
GPS 导航、机器人控制),计算复杂度为(
是状态维度),远低于批处理方法(如最小二乘需要)。
不确定性量化:卡尔曼滤波不仅提供状态估计,还提供协方差(不确定性量化)。这允许我们计算置信区间:( 95%置信区间)。这对于风险评估和决策制定至关重要。
常见问题:
| 问题 |
解答 |
| Q: 如何设置 Q 和 R? |
A: Q(过程噪声)反映系统模型的不确定性,
R(观测噪声)反映传感器精度。通常通过经验或最大似然估计设置。如果观测很精确,
R 设小;如果系统模型不确定, Q 设大。 |
| Q: 卡尔曼滤波适用于非线性系统吗? |
A:
标准卡尔曼滤波只适用于线性系统。对于非线性系统,可以使用扩展卡尔曼滤波(
EKF,线性化)或无迹卡尔曼滤波( UKF,使用 sigma 点)。 |
| Q: 初始状态和协方差如何选择? |
A: 初始状态可以用第一次观测值或先验知识。初始协方差通常设较大(如
1.0 或 10.0),表示初始不确定性高;随着观测增加,协方差会自动减小。 |
| Q: 卡尔曼滤波能预测未来吗? |
A: 可以。预测步骤$ x_{t |
| Q: 如何处理缺失观测? |
A:
如果某个时间步没有观测,跳过更新步骤,只执行预测步骤。状态估计继续基于预测,但不确定性会增加(因为没有新信息)。 |
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| import numpy as np from scipy.linalg import inv
def kalman_filter_2d(observations, F, H, Q, R, x0, P0): """ 二维卡尔曼滤波:跟踪二维位置(如 GPS 坐标) Parameters: ----------- observations : array-like, shape (n, 2) 观测序列:每个元素是[x, y]坐标 F : array-like, shape (2, 2) 状态转移矩阵( 2 × 2) H : array-like, shape (2, 2) 观测矩阵(通常为单位矩阵) Q : array-like, shape (2, 2) 过程噪声协方差矩阵 R : array-like, shape (2, 2) 观测噪声协方差矩阵 x0 : array-like, shape (2,) 初始状态估计 P0 : array-like, shape (2, 2) 初始协方差矩阵 """ n = len(observations) x = x0.copy() P = P0.copy() estimates = [] for t in range(n): x_pred = F @ x P_pred = F @ P @ F.T + Q K = P_pred @ H.T @ inv(H @ P_pred @ H.T + R) innovation = observations[t] - H @ x_pred x = x_pred + K @ innovation P = (np.eye(2) - K @ H) @ P_pred estimates.append(x.copy()) return np.array(estimates)
true_trajectory = np.array([[i*0.1, i*0.05] for i in range(100)]) gps_noise = np.random.multivariate_normal([0, 0], [[1, 0], [0, 1]], 100) gps_observations = true_trajectory + gps_noise
F = np.array([[1, 0], [0, 1]]) H = np.array([[1, 0], [0, 1]]) Q = np.array([[0.01, 0], [0, 0.01]]) R = np.array([[1, 0], [0, 1]]) x0 = gps_observations[0] P0 = np.array([[10, 0], [0, 10]])
filtered_positions = kalman_filter_2d(gps_observations, F, H, Q, R, x0, P0)
import matplotlib.pyplot as plt plt.figure(figsize=(10, 6)) plt.plot(true_trajectory[:, 0], true_trajectory[:, 1], 'g-', label='真实轨迹', linewidth=2) plt.plot(gps_observations[:, 0], gps_observations[:, 1], 'r.', label='GPS 观测', alpha=0.5) plt.plot(filtered_positions[:, 0], filtered_positions[:, 1], 'b-', label='卡尔曼滤波估计', linewidth=2) plt.legend() plt.xlabel('X 坐标') plt.ylabel('Y 坐标') plt.title('GPS 位置跟踪:卡尔曼滤波平滑效果') plt.grid(True, alpha=0.3) plt.show()
|
小白友好补充:用比喻理解传统时间序列模型
🎯 核心概念的生活化比喻
ARIMA =
根据过去预测未来的"记忆系统"
想象你在预测明天的气温:
AR(自回归):根据过去几天的气温预测明天
- "昨天 20 ° C,前天 22 ° C → 明天可能 21 ° C"
- 就像用历史数据找规律
I(差分):处理趋势(让数据"平稳")
- 如果气温一直上升(夏天来了),直接预测不准
- 差分 = 看"变化量"而非"绝对值"
- "今天比昨天高 2 ° C,昨天比前天高 1.5 ° C → 明天可能再高 1.7 °
C"
MA(移动平均):考虑预测误差
- "昨天预测错了 +3 ° C,前天错了 +2 ° C → 今天可能会再错 +2.5 °
C"
- 用过去的"犯错模式"修正预测
组合起来: ARIMA = "我根据过去的值 + 变化趋势 +
过去的犯错模式来预测未来"
SARIMA = ARIMA + 日历效应
场景:预测冰淇淋店每天的销量
- 普通 ARIMA:只看"昨天卖了多少"
- SARIMA:还记得"去年同一天卖了多少"(季节性)
比喻:
- 每年夏天都热 → 季节性周期
- 每周五销量高 → 周期性规律
- SARIMA 就像"有日历的预测系统",知道"夏天 + 周五 = 爆单"
VAR = 多个相关变量的"联动系统"
场景:预测股市和债市
- 股市涨 → 债市往往跌(跷跷板效应)
- VAR 模型:同时预测多个变量,并捕捉它们之间的关系
比喻:
- 传统模型:每个人独立预测自己的体重
- VAR 模型:一家人一起预测,考虑"爸爸吃多了 → 妈妈也会多吃 →
孩子也跟着吃"
GARCH = 波动率的"情绪预测"
场景:预测股市波动
- 平时:股价每天上下 1%(波动小)
- 恐慌时:股价每天上下 5%(波动大)
GARCH
核心:预测"明天会不会剧烈波动"(而非"涨还是跌")
比喻:
- ARIMA:预测明天气温是多少度
- GARCH:预测明天气温波动是剧烈还是平稳
指数平滑 =
"越近越重要"的加权平均
直觉:预测明天气温时,昨天的数据比一个月前的数据更重要
单指数平滑:只看"水平"(当前值)
双指数平滑:看"水平 + 趋势"(斜率)
三指数平滑( Holt-Winters):看"水平 + 趋势 +
季节性"
比喻:
- 单指数:只记得"现在多少"
- 双指数:还记得"增长多快"
- 三指数:还记得"去年这时候什么样"
💡 模型选择决策树
1 2 3 4 5 6 7 8 9 10 11
| 我的数据是什么样的? │ ├─ 只有一个变量? │ ├─ 平稳(没有趋势/季节性) → ARIMA(p,0,q) │ ├─ 有趋势(逐渐上升/下降) → ARIMA(p,1,q) 或双指数平滑 │ ├─ 有季节性(周期性重复) → SARIMA 或 Holt-Winters │ └─ 波动率变化(金融数据) → GARCH │ └─ 多个相关变量? ├─ 变量之间相互影响 → VAR └─ 需要融合先验知识 → 卡尔曼滤波
|
❓ Q&A:新手常见疑问
Q1: ARIMA 的 (p,d,q)
参数怎么选?
快速经验法则:
| 参数 |
含义 |
如何选择 |
| p |
自回归阶数 |
ACF 图衰减慢 → p 可能较大;通常 p=1~3 |
| d |
差分次数 |
看趋势:无趋势 d=0,线性趋势 d=1,二次趋势 d=2 |
| q |
移动平均阶数 |
PACF 图截尾 → q 可能较大;通常 q=1~3 |
实战技巧: 1. 先画图看数据(有趋势?有季节性?) 2.
使用 auto_arima 自动选参数( Python 的
pmdarima 包) 3. 从简单开始:先试 ARIMA(1,1,1) 4. 看
AIC/BIC 指标:越小越好(权衡拟合 vs 复杂度)
Q2:什么时候用
ARIMA,什么时候用深度学习?
ARIMA 更好:
- ✅ 数据量小(<1000 个时间点)
- ✅ 需要可解释性("为什么预测这个值?")
- ✅ 需要置信区间("预测的不确定性有多大?")
- ✅ 有明显的季节性/趋势
深度学习( LSTM/Transformer)更好:
- ✅ 数据量大(>10000 个时间点)
- ✅ 多变量(>10 个特征)
- ✅ 非线性关系复杂
- ✅ 有外部特征可以融合(如新闻、天气)
混合策略:
- 用 ARIMA 做基线(快速验证数据质量)
- 用深度学习追求极致性能
- 集成多个模型( ARIMA + LSTM + Prophet)
Q3:为什么我的 ARIMA
预测总是"一条直线"?
常见原因:
原因 1:数据太平稳,模型学到"不变"
- 症状:预测值 = 最后一个观测值
- 解决:检查是否差分过度( d 太大)
原因 2:参数 p, q 太小
- 症状:模型太简单,捕捉不到变化
- 解决:增加 p 或 q(如 ARIMA(1,1,1) → ARIMA(3,1,3))
原因 3:训练数据太短
- 症状:模型没学到足够规律
- 解决:至少需要 50-100 个时间点
原因 4:未来时间跨度太长
- 症状:预测 1 步准,预测 10 步就是直线
- 解决: ARIMA 适合短期预测( 1-5 步),长期用其他方法
Q4: GARCH 和 ARIMA
有什么区别?
核心差异:
| 维度 |
ARIMA |
GARCH |
| 预测目标 |
预测值(如明天股价) |
预测波动率(如明天涨跌幅度) |
| 应用场景 |
销量、温度、人流量 |
金融市场、风险管理 |
| 关键假设 |
方差恒定 |
方差时变(波动聚集) |
何时用 GARCH:
- 金融数据(股票、汇率、期货)
- 关心"风险"而非"收益"
- 数据有"波动聚集"现象(大波动后继续大波动)
何时用 ARIMA:
Q5: Prophet 和 ARIMA
哪个更好?
Prophet 优势:
- ✅ 自动处理季节性(无需手动选参数)
- ✅ 自动处理异常值(鲁棒性强)
- ✅ 可以加入假日效应(如春节、黑五)
- ✅ 代码简单( 3 行搞定)
ARIMA 优势:
- ✅ 理论基础扎实(统计检验、置信区间)
- ✅ 可解释性强(每个参数有明确含义)
- ✅ 适合学术研究(论文常用)
- ✅ 对数据质量要求更严(强迫你理解数据)
选择建议:
- 工业界快速原型 → Prophet
- 学术研究/理解机理 → ARIMA
- 追求性能 → 都试试,选最好的
Q6:如何判断时间序列是否平稳?平稳性检验有哪些方法?
平稳性的定义:
- 均值平稳:序列的均值不随时间变化
- 方差平稳:序列的方差不随时间变化
- 协方差平稳:任意两个时间点的协方差只依赖于时间间隔,不依赖于具体时间点
检验方法对比:
| 方法 |
原理 |
适用场景 |
Python 实现 |
| ADF 检验 |
单位根检验 |
最常用,适合大多数序列 |
statsmodels.tsa.stattools.adfuller |
| KPSS 检验 |
趋势平稳检验 |
与 ADF 互补,检测趋势 |
statsmodels.tsa.stattools.kpss |
| PP 检验 |
Phillips-Perron 检验 |
对异方差鲁棒 |
statsmodels.tsa.stattools.pptest |
ADF 检验实战:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| from statsmodels.tsa.stattools import adfuller import pandas as pd
def check_stationarity(timeseries): """检查时间序列的平稳性""" result = adfuller(timeseries.dropna()) print('ADF 统计量:', result[0]) print('p-value:', result[1]) print('临界值:') for key, value in result[4].items(): print(f'\t{key}: {value:.3f}') if result[1] <= 0.05: print("✅ 序列是平稳的(拒绝原假设)") return True else: print("❌ 序列非平稳(接受原假设,存在单位根)") return False
data = pd.read_csv('your_data.csv') print("原始数据:") check_stationarity(data['value'])
diff_data = data['value'].diff().dropna() print("\n 一阶差分后:") check_stationarity(diff_data)
|
可视化检验:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import matplotlib.pyplot as plt
def plot_stationarity_check(data): """可视化平稳性检验""" fig, axes = plt.subplots(2, 2, figsize=(15, 10)) axes[0, 0].plot(data) axes[0, 0].set_title('原始时间序列') axes[0, 0].set_ylabel('值') rolling_mean = data.rolling(window=12).mean() rolling_std = data.rolling(window=12).std() axes[0, 1].plot(data, label='原始') axes[0, 1].plot(rolling_mean, label='滚动均值') axes[0, 1].plot(rolling_std, label='滚动标准差') axes[0, 1].set_title('滚动统计量') axes[0, 1].legend() from statsmodels.graphics.tsaplots import plot_acf plot_acf(data, ax=axes[1, 0], lags=40) axes[1, 0].set_title('自相关函数 (ACF)') axes[1, 1].hist(data, bins=50) axes[1, 1].set_title('数据分布') plt.tight_layout() plt.show()
|
平稳化方法: 1. 差分:$ y_t' = y_t
- y_{t-1}(y_t) - (y_{t-1}) y_t' = y_t - y_{t-m} m$ 为季节周期) 4. Box-Cox
变换:稳定方差
Q7: VAR
模型如何处理多个时间序列之间的因果关系?
VAR vs 单变量模型:
| 维度 |
ARIMA |
VAR |
| 变量数量 |
1 个 |
多个() |
| 变量关系 |
只考虑自身历史 |
考虑所有变量的历史 |
| 预测 |
单变量预测 |
联合预测所有变量 |
VAR 模型公式(以 2 变量为例):
其中 表示 对
的影响。
Granger 因果检验:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from statsmodels.tsa.vector_ar.var_model import VAR from statsmodels.tsa.stattools import grangercausalitytests
data = pd.DataFrame({ 'GDP': gdp_data, '失业率': unemployment_data, '通胀率': inflation_data })
model = VAR(data) results = model.fit(maxlags=4)
print("检验 '失业率' 是否 Granger 导致 'GDP':") grangercausalitytests(data[['GDP', '失业率']], maxlag=4)
|
脉冲响应函数( IRF):
1 2 3 4 5 6 7
| irf = results.irf(10) irf.plot(orth=False)
|
方差分解( Variance Decomposition):
1 2 3 4 5 6 7
| fevd = results.fevd(10) fevd.plot()
|
VAR 模型选择:
1 2 3 4 5 6 7 8 9 10 11
| lag_order = model.select_order(maxlags=10) print(f'最优滞后阶数:{lag_order.selected_orders}')
results = model.fit(lag_order.selected_orders['aic']) print(results.summary())
|
VAR 的限制:
- ❌ 所有变量必须平稳(或协整)
- ❌ 变量数量不能太多(计算复杂度 )
- ❌ 假设线性关系,无法捕捉非线性依赖
Q8:指数平滑法中,如何选择单指数、双指数和三指数平滑?
选择决策树:
1 2 3 4 5 6 7 8 9 10 11
| 数据是否有趋势? │ ├─ 否(平稳) → 单指数平滑 │ └─ 是 │ ├─ 是否有季节性? │ │ │ ├─ 否 → 双指数平滑( Holt 方法) │ │ │ └─ 是 → 三指数平滑( Holt-Winters)
|
单指数平滑( Simple Exponential Smoothing):
适用场景:
- ✅ 数据无趋势、无季节性
- ✅ 短期预测( 1-3 步)
- ✅ 快速基线模型
公式: $$
S_t = Y_t + (1-) S_{t-1} $$
参数选择:
- 越大 →
对最近数据越敏感(适合快速变化的数据)
- 越小 →
更平滑(适合稳定数据)
1 2 3 4 5 6 7 8 9
| from statsmodels.tsa.holtwinters import SimpleExpSmoothing
model = SimpleExpSmoothing(data)
fitted_model = model.fit(optimized=True) print(f'最优 alpha: {fitted_model.params["smoothing_level"]:.3f}')
forecast = fitted_model.forecast(steps=10)
|
双指数平滑( Double Exponential Smoothing / Holt
方法):
适用场景:
- ✅ 数据有线性趋势,但无季节性
- ✅ 中期预测( 5-10 步)
公式:
参数含义:
- :水平平滑系数(控制当前值的影响)
- :趋势平滑系数(控制趋势变化的速度)
1 2 3 4 5 6 7 8 9 10
| from statsmodels.tsa.holtwinters import ExponentialSmoothing
model = ExponentialSmoothing( data, trend='add', seasonal=None ) fitted_model = model.fit() forecast = fitted_model.forecast(steps=10)
|
三指数平滑( Holt-Winters):
适用场景:
- ✅ 数据有趋势 + 季节性
- ✅ 长期预测(>10 步)
公式(加法模型):
季节性类型选择:
| 类型 |
特点 |
适用场景 |
| 加法季节性 |
季节性幅度恒定 |
温度、销量(季节性波动固定) |
| 乘法季节性 |
季节性幅度随水平增长 |
股价、 GDP(季节性波动比例固定) |
1 2 3 4 5 6 7 8 9
| model = ExponentialSmoothing( data, trend='add', seasonal='add', seasonal_periods=12 ) fitted_model = model.fit() forecast = fitted_model.forecast(steps=12)
|
模型对比示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import pandas as pd import matplotlib.pyplot as plt
dates = pd.date_range('2020-01-01', periods=120, freq='M') trend = np.linspace(100, 200, 120) seasonal = 10 * np.sin(2 * np.pi * np.arange(120) / 12) noise = np.random.randn(120) * 5 data = trend + seasonal + noise
ses_model = SimpleExpSmoothing(data).fit() des_model = ExponentialSmoothing(data, trend='add', seasonal=None).fit() tes_model = ExponentialSmoothing(data, trend='add', seasonal='add', seasonal_periods=12).fit()
plt.figure(figsize=(15, 6)) plt.plot(data, label='原始数据') plt.plot(ses_model.fittedvalues, label='单指数平滑') plt.plot(des_model.fittedvalues, label='双指数平滑') plt.plot(tes_model.fittedvalues, label='三指数平滑') plt.legend() plt.title('指数平滑方法对比') plt.show()
from sklearn.metrics import mean_absolute_error
test_data = data[-12:] ses_pred = ses_model.forecast(steps=12) des_pred = des_model.forecast(steps=12) tes_pred = tes_model.forecast(steps=12)
print(f'单指数平滑 MAE: {mean_absolute_error(test_data, ses_pred):.2f}') print(f'双指数平滑 MAE: {mean_absolute_error(test_data, des_pred):.2f}') print(f'三指数平滑 MAE: {mean_absolute_error(test_data, tes_pred):.2f}')
|
Q9:卡尔曼滤波在时间序列预测中如何应用?它与
ARIMA 有什么区别?
卡尔曼滤波的核心思想:
状态空间模型:
其中:
- :隐藏状态(真实值,不可观测)
- :观测值(带噪声的测量值)
- :过程噪声和观测噪声
与 ARIMA 的区别:
| 维度 |
ARIMA |
卡尔曼滤波 |
| 模型类型 |
统计模型 |
状态空间模型 |
| 输入 |
单一时间序列 |
观测值 + 先验知识 |
| 输出 |
点预测 |
状态估计 + 不确定性(协方差) |
| 实时性 |
批量处理 |
✅ 在线更新(递归) |
| 多源融合 |
❌ |
✅ 可融合多个传感器数据 |
| 可解释性 |
参数可解释 |
状态转移可解释 |
卡尔曼滤波的优势:
- 实时更新:每来一个新观测,立即更新估计
- 不确定性量化:不仅给出预测值,还给出置信区间
- 多源融合:可以融合多个传感器的数据
- 处理缺失值:天然支持缺失观测
Python 实现示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import numpy as np from filterpy.kalman import KalmanFilter
kf = KalmanFilter(dim_x=2, dim_z=1)
kf.F = np.array([[1., 1.], [0., 1.]])
kf.H = np.array([[1., 0.]])
kf.Q = np.array([[0.1, 0.], [0., 0.1]])
kf.R = np.array([[1.0]])
kf.x = np.array([[0.], [0.]]) kf.P = np.eye(2) * 1000
observations = [10, 12, 15, 18, 20, 22] predictions = [] uncertainties = []
for obs in observations: kf.predict() predictions.append(kf.x[0, 0]) uncertainties.append(np.sqrt(kf.P[0, 0])) kf.update(obs) print(f'观测值: {obs:.1f}, 预测值: {kf.x[0, 0]:.2f}, ' f'不确定性: {np.sqrt(kf.P[0, 0]):.2f}')
future_predictions = [] for _ in range(5): kf.predict() future_predictions.append(kf.x[0, 0]) print(f'未来预测: {kf.x[0, 0]:.2f} ± {np.sqrt(kf.P[0, 0]):.2f}')
|
扩展卡尔曼滤波( EKF)处理非线性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from filterpy.kalman import ExtendedKalmanFilter
def hx(x): """非线性观测函数""" return x[0]**2
def H_jacobian(x): """观测函数的雅可比矩阵""" return np.array([[2*x[0], 0]])
ekf = ExtendedKalmanFilter(dim_x=2, dim_z=1) ekf.H = H_jacobian ekf.hx = hx
|
应用场景对比:
| 场景 |
推荐模型 |
原因 |
| 股票价格预测 |
ARIMA / GARCH |
历史数据充足,统计模型足够 |
| GPS 定位 |
卡尔曼滤波 |
实时更新,融合多传感器 |
| 温度预测 |
ARIMA / Prophet |
有明显季节性,批量处理即可 |
| 机器人导航 |
卡尔曼滤波 |
实时性要求高,需要不确定性量化 |
| 销量预测 |
ARIMA / 指数平滑 |
历史数据为主,无需实时更新 |
Q10:如何评估和比较不同传统时间序列模型的性能?
评估指标对比:
| 指标 |
公式 |
特点 |
适用场景 |
| MAE |
|
对异常值鲁棒 |
一般预测任务 |
| RMSE |
|
对大误差敏感 |
惩罚大误差 |
| MAPE |
|
百分比误差 |
业务汇报 |
| SMAPE |
|
对称 MAPE |
避免除零问题 |
完整评估流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import numpy as np import pandas as pd from sklearn.metrics import mean_absolute_error, mean_squared_error import matplotlib.pyplot as plt
def evaluate_model(y_true, y_pred, model_name): """全面评估模型性能""" mae = mean_absolute_error(y_true, y_pred) rmse = np.sqrt(mean_squared_error(y_true, y_pred)) mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100 direction_accuracy = np.mean( np.sign(np.diff(y_true)) == np.sign(np.diff(y_pred)) ) * 100 print(f'\n{model_name} 评估结果:') print(f' MAE: {mae:.2f}') print(f' RMSE: {rmse:.2f}') print(f' MAPE: {mape:.2f}%') print(f' 方向准确率: {direction_accuracy:.2f}%') return { 'MAE': mae, 'RMSE': rmse, 'MAPE': mape, 'Direction_Accuracy': direction_accuracy }
results = {}
from statsmodels.tsa.arima.model import ARIMA arima_model = ARIMA(train_data, order=(1,1,1)).fit() arima_pred = arima_model.forecast(steps=len(test_data)) results['ARIMA'] = evaluate_model(test_data, arima_pred, 'ARIMA')
from prophet import Prophet prophet_model = Prophet() prophet_model.fit(pd.DataFrame({'ds': train_dates, 'y': train_data})) prophet_forecast = prophet_model.predict(pd.DataFrame({'ds': test_dates})) results['Prophet'] = evaluate_model(test_data, prophet_forecast['yhat'].values, 'Prophet')
from statsmodels.tsa.holtwinters import ExponentialSmoothing es_model = ExponentialSmoothing(train_data, trend='add', seasonal='add', seasonal_periods=12).fit() es_pred = es_model.forecast(steps=len(test_data)) results['Holt-Winters'] = evaluate_model(test_data, es_pred, 'Holt-Winters')
comparison_df = pd.DataFrame(results).T print('\n 模型对比:') print(comparison_df)
|
可视化评估:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| def plot_model_comparison(y_true, predictions_dict): """可视化多个模型的预测结果""" fig, axes = plt.subplots(2, 2, figsize=(15, 10)) axes[0, 0].plot(y_true.index, y_true.values, 'k-', label='真实值', linewidth=2) for name, pred in predictions_dict.items(): axes[0, 0].plot(y_true.index, pred, '--', label=name, alpha=0.7) axes[0, 0].set_title('预测值对比') axes[0, 0].legend() axes[0, 0].grid(True) for name, pred in predictions_dict.items(): residuals = y_true - pred axes[0, 1].hist(residuals, alpha=0.5, label=name, bins=30) axes[0, 1].set_title('残差分布') axes[0, 1].legend() axes[0, 1].axvline(0, color='k', linestyle='--') for name, pred in predictions_dict.items(): residuals = y_true - pred axes[1, 0].plot(y_true.index, residuals, label=name, alpha=0.7) axes[1, 0].set_title('残差时间序列') axes[1, 0].axhline(0, color='k', linestyle='--') axes[1, 0].legend() axes[1, 0].grid(True) from scipy import stats for name, pred in predictions_dict.items(): residuals = y_true - pred stats.probplot(residuals, dist="norm", plot=axes[1, 1]) axes[1, 1].set_title('Q-Q 图(正态性检验)') plt.tight_layout() plt.show()
|
统计检验:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| from scipy import stats
def statistical_tests(y_true, y_pred, model_name): """统计检验:残差是否满足模型假设""" residuals = y_true - y_pred t_stat, p_value = stats.ttest_1samp(residuals, 0) print(f'\n{model_name} 残差检验:') print(f' 残差均值 t 检验: p-value = {p_value:.4f}') if p_value > 0.05: print(' ✅ 残差均值接近 0') else: print(' ❌ 残差均值显著不为 0(模型有偏)') if len(residuals) < 5000: stat, p_norm = stats.shapiro(residuals) print(f' 正态性检验: p-value = {p_norm:.4f}') if p_norm > 0.05: print(' ✅ 残差近似正态分布') else: print(' ❌ 残差非正态分布') from statsmodels.stats.diagnostic import acorr_ljungbox lb_test = acorr_ljungbox(residuals, lags=10, return_df=True) p_lb = lb_test['lb_pvalue'].iloc[-1] print(f' Ljung-Box 检验: p-value = {p_lb:.4f}') if p_lb > 0.05: print(' ✅ 残差无自相关(模型合格)') else: print(' ❌ 残差存在自相关(模型未充分捕捉信息)')
for name, pred in predictions_dict.items(): statistical_tests(test_data, pred, name)
|
模型选择建议:
- 多指标综合评估:不要只看一个指标(如 RMSE),要结合
MAE 、 MAPE 、方向准确率
- 残差诊断:检查残差是否满足模型假设(无自相关、正态分布、均值接近
0)
- 交叉验证:使用时间序列交叉验证(
TimeSeriesSplit)避免过拟合
- 业务指标:结合业务需求(如预测方向比绝对值更重要)
- 模型集成:如果多个模型性能相近,考虑集成(加权平均)
⚠️ 新手常见误区
误区 1:以为 ARIMA
能预测任何时间序列
❌ 错误认知: ARIMA 是万能的
✅ 正确理解:
- ARIMA 假设线性关系,对非线性数据效果差
- ARIMA
假设平稳性(或差分后平稳),对突变数据无能为力
- ARIMA 适合短期预测,长期预测误差累积
不适合 ARIMA 的场景:
- 突发事件(如疫情导致销量暴跌)
- 结构性变化(如政策改变市场规则)
- 高度非线性(如混沌系统)
误区 2:盲目追求复杂模型
❌ 错误认知: ARIMA(5,2,5) 一定比 ARIMA(1,1,1)
好
✅ 正确理解:
- 参数越多 → 过拟合风险越大
- 奥卡姆剃刀原则:优先选简单模型
- 复杂模型需要更多数据支撑
经验:
- 数据 < 100 点: p,q ≤ 2
- 数据 100-500 点: p,q ≤ 3
- 数据 > 500 点: p,q ≤ 5
误区 3:不做数据预处理
❌ 错误认知:直接把原始数据喂给模型
✅ 正确理解:
- 必须做:缺失值处理(插值 or 删除)
- 必须做:异常值处理(孤立森林检测)
- 建议做:标准化(如果多变量)
- 建议做:对数变换(如果方差随均值增长)
误区
4:只看预测准确率,不看置信区间
❌ 错误认知:预测值就是唯一答案
✅ 正确理解:
- 时间序列预测本质是概率预测
- 应该报告:点预测 ± 置信区间
- 例如:"明天销量 100 件, 95% 置信区间 [80, 120]"
📚 实战 Checklist
步骤 1:数据探索(必做)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import pandas as pd import matplotlib.pyplot as plt
plt.plot(data) plt.title('Raw Time Series') plt.show()
from statsmodels.tsa.stattools import adfuller result = adfuller(data) print(f'ADF Statistic: {result[0]}') print(f'p-value: {result[1]}')
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf plot_acf(data) plot_pacf(data) plt.show()
|
步骤 2:模型选择(按优先级)
1 2 3 4 5
| 1. 先试最简单:单指数平滑或 ARIMA(1,1,1) 2. 有季节性: SARIMA 或 Holt-Winters 3. 多变量: VAR 4. 波动率: GARCH 5. 需要自动化: Prophet
|
步骤 3:模型评估(必做)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| from sklearn.metrics import mean_absolute_error, mean_squared_error
train = data[:int(0.8*len(data))] test = data[int(0.8*len(data)):]
model.fit(train) predictions = model.forecast(steps=len(test))
mae = mean_absolute_error(test, predictions) rmse = mean_squared_error(test, predictions, squared=False) mape = np.mean(np.abs((test - predictions) / test)) * 100
print(f'MAE: {mae:.2f}') print(f'RMSE: {rmse:.2f}') print(f'MAPE: {mape:.2f}%')
|
步骤 4:残差诊断(高级)
1 2 3 4 5 6 7 8 9 10 11 12 13
| residuals = test - predictions
print(f'Residual mean: {residuals.mean():.4f}')
plot_acf(residuals)
from statsmodels.stats.diagnostic import acorr_ljungbox lb_test = acorr_ljungbox(residuals, lags=10) print(lb_test)
|
🎓
总结:传统时间序列模型的"地图"
一句话总结各模型:
| 模型 |
一句话理解 |
最佳场景 |
| ARIMA |
根据过去的值和误差预测未来 |
单变量、短期、平稳(或可差分平稳) |
| SARIMA |
ARIMA + 季节性 |
有周期性(如每周/每年重复) |
| VAR |
多个变量互相预测 |
多变量联动(如股市 + 债市) |
| GARCH |
预测波动率而非值 |
金融市场风险管理 |
| 指数平滑 |
越近越重要的加权平均 |
快速简单的基线模型 |
| Prophet |
自动化的趋势 + 季节性 |
业务预测(销量、流量) |
| 卡尔曼滤波 |
融合观测和先验的递归估计 |
实时系统、传感器融合 |
记忆口诀: > ARIMA 看过去, SARIMA
加季节, VAR 联动多变量, GARCH 关注波动,指数平滑越近越重, Prophet
自动化王者,卡尔曼实时最强
最后的建议:
- 新手:从最简单的开始(单指数平滑 or
ARIMA(1,1,1))
- 进阶:学会看 ACF/PACF 图,理解参数含义
- 高手:集成多模型,根据数据特点选工具
传统模型不是"过时",而是"经典"——理解它们,才能更好地理解深度学习模型在做什么!