本文详细介绍了几种经典的时间序列模型,包括非深度学习模型的公式和方法。涵盖的模型包括ARIMA、SARIMA、VAR、GARCH、指数平滑、Prophet和卡尔曼滤波器。希望通过详细的数学公式和解释,帮助理解这些模型背后的数学逻辑及其在实际时间序列数据中的应用场景。
ARIMA(AutoRegressive
Integrated Moving Average)
ARIMA模型由三部分组成:
- AR
(Autoregressive):自回归部分表示序列与其过去值之间的线性关系。
- I
(Integrated):差分部分用于处理非平稳性数据,通过进行差分操作使其成为平稳序列。
- MA (Moving
Average):移动平均部分通过过去的残差(白噪声)来预测当前值。
ARIMA模型用三个参数来表示:
- :自回归项的阶数,表示回溯多少个时间步长。
- :差分的阶数,表示进行多少次差分以获得平稳序列。
- :移动平均项的阶数,表示回溯多少个过去的白噪声。
模型的核心公式为: 其中: -
是当前时间的值; - 是常数项; - 是自回归系数; - 是误差项(白噪声); - 是移动平均系数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import numpy as np
def difference(series, order): """执行d阶差分操作""" diff_series = [] for i in range(order, len(series)): value = series[i] - series[i - order] diff_series.append(value) return diff_series
def ar_model(series, ar_order, ar_coeffs): """自回归模型计算""" ar_part = 0.0 for i in range(ar_order): ar_part += ar_coeffs[i] * series[-(i + 1)] return ar_part
def ma_model(errors, ma_order, ma_coeffs): """移动平均模型计算""" ma_part = 0.0 for i in range(ma_order): ma_part += ma_coeffs[i] * errors[-(i + 1)] return ma_part
def arima(series, ar_order, ma_order, diff_order, ar_coeffs, ma_coeffs): """手动实现ARIMA""" series_diff = difference(series, diff_order) errors = [] predictions = [] for t in range(len(series_diff)): if t >= max(ar_order, ma_order): ar_part = ar_model(series_diff[:t], ar_order, ar_coeffs) ma_part = ma_model(errors[:t], ma_order, ma_coeffs) predicted_value = ar_part + ma_part else: predicted_value = 0.0 actual_value = series_diff[t] error = actual_value - predicted_value errors.append(error) predictions.append(predicted_value) return predictions
data = np.random.randn(100).cumsum() ar_order, ma_order, diff_order = 2, 2, 1 ar_coeffs = [0.5, -0.25] ma_coeffs = [0.4, 0.3] predictions = arima(data, ar_order, ma_order, diff_order, ar_coeffs, ma_coeffs) print(predictions)
|
SARIMA(Seasonal ARIMA)
SARIMA是在ARIMA模型的基础上扩展出来的,能够处理具有季节性周期的时间序列。它的基本形式为SARIMA(p,
d, q)(P, D, Q, m),其中前面的表示非季节性部分,后面的表示季节性部分。
- :季节性自回归阶数。
- :季节性差分次数。
- :季节性移动平均阶数。
- :季节性周期的长度。
SARIMA的公式如下: 其中: - 是滞后算子; -
和
分别是季节性自回归和移动平均系数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import numpy as np
def seasonal_difference(series, season_lag): """处理季节性差分""" diff_series = [] for i in range(season_lag, len(series)): value = series[i] - series[i - season_lag] diff_series.append(value) return diff_series
def sarima(series, ar_order, ma_order, diff_order, seasonal_order, seasonal_lag, ar_coeffs, ma_coeffs, seasonal_ar_coeffs, seasonal_ma_coeffs): """手动实现SARIMA""" series_diff = difference(series, diff_order) series_season_diff = seasonal_difference(series_diff, seasonal_lag) errors = [] predictions = [] for t in range(len(series_season_diff)): if t >= max(ar_order, ma_order, seasonal_order): ar_part = ar_model(series_season_diff[:t], ar_order, ar_coeffs) ma_part = ma_model(errors[:t], ma_order, ma_coeffs) seasonal_ar_part = ar_model(series_season_diff[:t], seasonal_order, seasonal_ar_coeffs) seasonal_ma_part = ma_model(errors[:t], seasonal_order, seasonal_ma_coeffs) predicted_value = ar_part + ma_part + seasonal_ar_part + seasonal_ma_part else: predicted_value = 0.0 actual_value = series_season_diff[t] error = actual_value - predicted_value errors.append(error) predictions.append(predicted_value) return predictions
data = np.random.randn(100).cumsum() ar_order, ma_order, diff_order = 2, 2, 1 seasonal_order, seasonal_lag = 2, 12 ar_coeffs = [0.5, -0.25] ma_coeffs = [0.4, 0.3] seasonal_ar_coeffs = [0.3, -0.2] seasonal_ma_coeffs = [0.2, 0.1] predictions = sarima(data, ar_order, ma_order, diff_order, seasonal_order, seasonal_lag, ar_coeffs, ma_coeffs, seasonal_ar_coeffs, seasonal_ma_coeffs) print(predictions)
|
VAR(Vector AutoRegressive
Model)
VAR模型用于多变量时间序列的建模。它假设每个时间序列不仅与自身的过去值有关,还与其他序列的过去值有关。
VAR模型的数学表达式为: 其中: -
是包含多个时间序列的向量; -
是滞后系数矩阵; -
是误差向量。
每个向量表示多个时间序列的当前值,是描述各序列之间相互关系的系数矩阵。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import numpy as np
def var_model(data, lag): """手动实现VAR模型,处理2个时间序列""" n = len(data) coefficients = np.zeros((2, lag)) errors = [] predictions = [] for t in range(lag, n): y1 = data[t][0] y2 = data[t][1] pred_y1 = sum([coefficients[0][i] * data[t - i - 1][0] for i in range(lag)]) pred_y2 = sum([coefficients[1][i] * data[t - i - 1][1] for i in range(lag)]) error_y1 = y1 - pred_y1 error_y2 = y2 - pred_y2 errors.append((error_y1, error_y2)) predictions.append((pred_y1, pred_y2)) return predictions
data = np.column_stack((np.random.randn(100).cumsum(), np.random.randn(100).cumsum())) lag = 2
predictions = var_model(data, lag) print(predictions)
|
GARCH(Generalized
Autoregressive Conditional Heteroskedasticity)
GARCH模型用于建模时间序列中的条件异方差,特别适用于金融数据中的波动性预测。GARCH模型将过去的残差和过去的方差用于预测未来的方差。
GARCH(1, 1)模型的核心公式为: 其中: -
是时间的条件方差; -
是模型的参数; -
是残差序列。
该模型通过递归计算的方式来预测序列的波动性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import numpy as np
def garch_model(returns, alpha0, alpha1, beta1): """手动实现GARCH(1,1)模型""" n = len(returns) variances = np.zeros(n) errors = np.zeros(n) for t in range(1, n): variances[t] = alpha0 + alpha1 * errors[t-1]**2 + beta1 * variances[t-1] errors[t] = returns[t] - np.mean(returns) return variances
returns = np.random.randn(100) alpha0, alpha1, beta1 = 0.1, 0.3, 0.6 variances = garch_model(returns, alpha0, alpha1, beta1) print(variances)
|
Exponential
Smoothing(指数平滑)
指数平滑是一种用于时间序列数据的平滑技术,它对较新的数据点赋予更高的权重,常用于平稳序列的预测。指数平滑模型有多种形式,包括单指数平滑、双指数平滑和三指数平滑(Holt-Winters方法)。
单指数平滑
单指数平滑公式为: 其中: -
是平滑后的值; -
是平滑系数,;
- 是当前的观测值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| def single_exponential_smoothing(data, alpha): """手动实现单指数平滑""" smoothed_data = [data[0]] for t in range(1, len(data)): smoothed_value = alpha * data[t] + (1 - alpha) * smoothed_data[-1] smoothed_data.append(smoothed_value) return smoothed_data
data = np.random.randn(100).cumsum() alpha = 0.2 smoothed_data = single_exponential_smoothing(data, alpha) print(smoothed_data)
|
双指数平滑(用于趋势)
双指数平滑通过引入趋势项来预测具有线性趋势的序列。公式为: 其中: - 是趋势项; -
是趋势平滑系数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| def double_exponential_smoothing(data, alpha, beta): """手动实现双指数平滑(带趋势)""" level = data[0] trend = data[1] - data[0] smoothed_data = [level] for t in range(1, len(data)): new_level = alpha * data[t] + (1 - alpha) * (level + trend) new_trend = beta * (new_level - level) + (1 - beta) * trend level, trend = new_level, new_trend smoothed_data.append(level + trend) return smoothed_data
data = np.random.randn(100).cumsum() alpha, beta = 0.2, 0.1 smoothed_data = double_exponential_smoothing(data, alpha, beta) print(smoothed_data)
|
三指数平滑(Holt-Winters方法)
三指数平滑(Holt-Winters方法)是一种扩展的指数平滑技术,专门用于处理带有趋势和季节性波动的时间序列数据。它在预测时不仅考虑序列的当前水平和趋势,还能够捕捉到周期性的季节变化。该方法通常用于金融、零售和能源等领域,帮助预测具有重复模式的时间序列数据。
模型组成部分
Holt-Winters方法由以下三部分组成:
- 水平(Level):表示当前的估计水平值。
- 趋势(Trend):表示时间序列的线性趋势。
- 季节性(Seasonality):表示时间序列中的季节性周期波动。
每个部分的更新基于前一时间步的估计,类似于单指数和平滑和双指数平滑。三指数平滑的目标是捕捉到这些成分的动态变化,并使用它们来进行未来的预测。
公式
在 Holt-Winters
方法中,我们有以下三个核心公式来分别更新水平、趋势和季节性成分。
水平更新公式(Level equation):
其中:
- 是当前时刻 的水平估计;
-
是当前时刻的实际观察值;
- 是个周期前的季节性因子(为季节周期长度);
- 是水平平滑系数,。
趋势更新公式(Trend equation):
其中:
季节性更新公式(Seasonal equation):
其中:
预测公式
一旦我们通过以上公式得到了当前时刻
的水平、趋势和季节性成分,可以通过以下公式预测未来的值。
对于未来 时刻的预测:
其中:
- 是 时刻的预测值;
- 是当前时刻 的水平估计;
- 是当前时刻 的趋势估计;
- 是与 时刻相对应的季节性因子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import numpy as np
def holt_winters(data, alpha, beta, gamma, season_length, n_preds): """ Holt-Winters三指数平滑算法实现 data: 输入时间序列数据 alpha: 水平平滑系数 beta: 趋势平滑系数 gamma: 季节性平滑系数 season_length: 每个季节的长度(周期) n_preds: 要预测的时间步数 """ L = [np.mean(data[:season_length])] T = [(np.mean(data[season_length:2*season_length]) - np.mean(data[:season_length])) / season_length] S = [data[i] / L[0] for i in range(season_length)] predictions = [] for t in range(len(data)): if t >= season_length: L_t = alpha * (data[t] / S[t - season_length]) + (1 - alpha) * (L[-1] + T[-1]) T_t = beta * (L_t - L[-1]) + (1 - beta) * T[-1] S_t = gamma * (data[t] / L_t) + (1 - gamma) * S[t - season_length] L.append(L_t) T.append(T_t) S.append(S_t) else: L.append(L[-1]) T.append(T[-1]) S.append(S[t]) for h in range(1, n_preds+1): predictions.append((L[-1] + h * T[-1]) * S[-season_length + h % season_length]) return predictions
data = np.array([112, 118, 132, 129, 121, 135, 148, 148, 136, 119, 104, 118, 115, 126, 141, 135, 125, 149, 170, 170, 158, 133, 114, 140]) alpha = 0.2 beta = 0.1 gamma = 0.3 season_length = 12 n_preds = 12 predictions = holt_winters(data, alpha, beta, gamma, season_length, n_preds) print(predictions)
|
Prophet
Prophet是Facebook开发的一种时间序列预测模型,能够处理具有季节性、趋势和假日效应的复杂时间序列数据。
Prophet模型可以分为三部分:趋势、季节性和假日效应。模型的形式为:
其中: -
是趋势部分,用于捕捉长期变化; - 是季节性部分,表示周期性变化; -
是假日效应; - 是误差项。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from fbprophet import Prophet import pandas as pd
data = pd.DataFrame({ 'ds': pd.date_range(start='2020-01-01', periods=100, freq='D'), 'y': np.random.randn(100).cumsum() })
model = Prophet() model.fit(data)
future = model.make_future_dataframe(periods=10) forecast = model.predict(future) print(forecast[['ds', 'yhat']].tail())
|
Kalman Filter(卡尔曼滤波)
卡尔曼滤波是一种用于对时变系统进行递归估计的算法,广泛应用于噪声较大的系统中,用于平滑和预测。卡尔曼滤波的关键是通过对状态变量的估计和观测值之间的差异来更新估计。
卡尔曼滤波器的状态更新公式为: 观测更新公式为: 其中: -
是时间时刻的预测状态; - 是预测误差协方差矩阵; - 是卡尔曼增益; - 是观测值; - 是观测模型矩阵。
卡尔曼滤波器的优势在于它能够递归地更新状态估计,并且适用于实时系统。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import numpy as np
def kalman_filter(data, F, B, H, Q, R, initial_state, initial_covariance): """手动实现一维卡尔曼滤波器""" n = len(data) state_estimate = initial_state covariance_estimate = initial_covariance estimates = [] for t in range(n): state_predict = F * state_estimate + B covariance_predict = F * covariance_estimate * F + Q kalman_gain = covariance_predict * H / (H * covariance_predict * H + R) state_estimate = state_predict + kalman_gain * (data[t] - H * state_predict) covariance_estimate = (1 - kalman_gain * H) * covariance_predict estimates.append(state_estimate) return estimates
data = np.random.randn(100).cumsum() F, B, H = 1, 0, 1 Q, R = 1e-5, 0.1 initial_state, initial_covariance = 0, 1 estimates = kalman_filter(data, F, B, H, Q, R, initial_state, initial_covariance) print(estimates)
|