时间序列预测中的三体模型和三体预测法是由楼润平教授提出的一种专门针对经济管理领域的时间序列预测方法。该方法通过捕捉时间序列的三个核心特征(趋势、波动和惯性)来构建预测模型,具有简洁实用、泛化能力强等特点。
三体模型的核心构成
三体模型的基本数学表达式为:

其中:
n
是当前时刻的预测值;
n
是常数项,
用于捕捉时间序列的线性趋势(
为时间变量,从1开始递增);
n
是0-1虚拟变量,用于描述时间序列的波动特征(例如季节性、突发事件等);
n
是滞后项(
),用于捕捉时间序列的惯性特征(即历史值对当前值的影响);
n
为误差项。
三体预测法的实施步骤
三体预测法是一套系统化的建模流程,主要包括以下步骤:
1.数据观察与可视化:绘制时间序列的散点图或走势图,初步判断趋势特征(如线性、非线性或周期性)。
2.趋势处理:若序列无明显线性趋势,则引入虚拟变量(如季节性哑变量)来捕捉波动。
3.模型构建:
l首先进行多元回归分析,纳入时间变量
和虚拟变量
。
l检查拟合优度
:若
,需剔除不显著的变量;若
,则进一步引入滞后因变量(
)以增强模型对惯性的捕捉。
4.模型确定:根据显著性检验(如p值)和预测误差指标(如平均相对预测误差MRPE),选择最优模型。
5.预测与验证:使用测试集评估模型性能,若MRPE低于10%则认为预测表现优秀。






=== 参数敏感性分析结果 ===
数据量: 144 条
训练集: 115 条, 测试集: 29 条
不同滞后阶数的评估结果:
————————————————————
滞后阶数 MRPE(%) MAE RMSE 预测表现
————————————————————
1 14.3552 0.5325 0.6123 良好
2 14.5003 0.5401 0.6215 良好
3 13.6205 0.5032 0.5807 良好
4 14.8208 0.5556 0.6568 良好
5 14.7162 0.5512 0.6511 良好
————————————————————
=== 三体预测法 vs ARIMA 算法对比评估结果 ===
训练集: 2011年1月 – 2020年12月 (120 个月)
测试集: 2021年1月 – 2022年12月 (24 个月)
=====================================
预测任务1: 2021年1-8月
=====================================
指标 三体预测法 ARIMA
————————————————————
MRPE (%) 12.0959 8.4186
MAE 0.4118 0.3386
RMSE 0.5027 0.3741
=====================================
预测任务2: 2021年1-12月
=====================================
指标 三体预测法 ARIMA
————————————————————
MRPE (%) 12.4121 8.1410
MAE 0.4341 0.3197
RMSE 0.5138 0.3618
=====================================
算法性能总结
=====================================
2021年1-8月:
ARIMA表现更好 (MRPE更低)
2021年1-12月:
ARIMA表现更好 (MRPE更低)
代码如下:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error
import warnings
import os
import sys
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
warnings.filterwarnings('ignore')
# 设置Windows控制台编码为UTF-8
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
except:
pass
# 设置matplotlib支持中文
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
class ThreeBodyPredictor:
"""
三体预测法实现类
基于楼润平教授提出的三体模型:趋势、波动、惯性
"""
def __init__(self, seasonal_period=12, max_lags=3, r2_threshold=0.9):
"""
初始化参数
Parameters:
-----------
seasonal_period : int, 季节性周期(默认12个月)
max_lags : int, 最大滞后阶数(默认3阶)
r2_threshold : float, R^2阈值(默认0.9)
"""
self.seasonal_period = seasonal_period
self.max_lags = max_lags
self.r2_threshold = r2_threshold
self.model = None
self.feature_names = None
self.is_fitted = False
def create_features(self, data):
"""
创建特征矩阵:时间趋势、季节性虚拟变量、滞后项
Parameters:
-----------
data : array-like, 时间序列数据
Returns:
--------
X : array, 特征矩阵
y : array, 目标变量
feature_names : list, 特征名称
"""
n = len(data)
# 时间趋势特征
t = np.arange(1, n + 1).reshape(-1, 1)
# 季节性虚拟变量
seasonal_dummies = []
seasonal_names = []
for i in range(1, self.seasonal_period):
dummy = (np.arange(n) % self.seasonal_period == i).astype(int)
seasonal_dummies.append(dummy)
seasonal_names.append(f'seasonal_{i}')
# 滞后项特征
lag_features = []
lag_names = []
for lag in range(1, self.max_lags + 1):
lag_data = np.concatenate([np.full(lag, np.nan), data[:-lag]])
lag_features.append(lag_data)
lag_names.append(f'lag_{lag}')
# 组合所有特征
all_features = [t] + seasonal_dummies + lag_features
feature_names = ['trend'] + seasonal_names + lag_names
# 转换为数组
X = np.column_stack(all_features)
y = data
# 移除包含NaN的行(由于滞后项)
valid_mask = ~np.any(np.isnan(X), axis=1)
X = X[valid_mask]
y = y[valid_mask]
return X, y, feature_names
def calculate_mrpe(self, y_true, y_pred):
"""
计算平均相对预测误差 (MRPE)
Parameters:
-----------
y_true : array, 真实值
y_pred : array, 预测值
Returns:
--------
mrpe : float, 平均相对预测误差百分比
"""
relative_errors = np.abs((y_true - y_pred) / y_true)
mrpe = np.mean(relative_errors) * 100
return mrpe
def fit(self, data, verbose=True, return_base_info=False):
"""
训练三体预测模型
Parameters:
-----------
data : array-like, 时间序列数据
verbose : bool, 是否输出训练过程信息
return_base_info : bool, 是否返回第一步基础模型信息
Returns:
--------
base_info : dict, 如果return_base_info=True,返回第一步模型信息
"""
data = np.array(data)
# 第一步:创建基础特征(趋势 + 季节性)
X_base, y_base, feature_names = self.create_features(data)
base_features_mask = [name.startswith(('trend', 'seasonal'))
for name in feature_names]
# 初始模型(仅趋势和季节性)
X_current = X_base[:, base_features_mask]
model_base = LinearRegression()
model_base.fit(X_current, y_base)
y_pred_base = model_base.predict(X_current)
r2_base = model_base.score(X_current, y_base)
if verbose:
print("=== 三体预测法建模过程 ===")
print(f"第一步:基础模型 R^2 = {r2_base:.4f}")
# 保存第一步模型信息用于绘图
base_info = None
if return_base_info:
base_info = {
'model': model_base,
'X': X_current,
'y': y_base,
'y_pred': y_pred_base,
'r2': r2_base,
'feature_names': [feature_names[i] for i in range(len(feature_names)) if base_features_mask[i]]
}
# 第二步:根据R^2决定是否引入滞后项
if r2_base < self.r2_threshold:
if verbose:
print("R^2 < 0.9,尝试剔除不显著特征...")
# 简化:这里我们使用所有特征,实际应用中可以进行特征选择
X_final = X_base
self.feature_names = feature_names
else:
if verbose:
print("R^2 >= 0.9,引入滞后项特征...")
# 引入滞后项
X_final = X_base
self.feature_names = feature_names
# 训练最终模型
self.model = LinearRegression()
self.model.fit(X_final, y_base)
self.is_fitted = True
# 训练集评估
y_pred_train = self.model.predict(X_final)
mrpe_train = self.calculate_mrpe(y_base, y_pred_train)
r2_final = self.model.score(X_final, y_base)
if verbose:
print(f"最终模型 R^2 = {r2_final:.4f}")
print(f"训练集 MRPE = {mrpe_train:.2f}%")
print("模型训练完成!")
if return_base_info:
return self, base_info
return self
def predict(self, data, forecast_steps=1):
"""
进行预测
Parameters:
-----------
data : array-like, 历史数据
forecast_steps : int, 预测步长
Returns:
--------
predictions : array, 预测结果
"""
if not self.is_fitted:
raise ValueError("模型尚未训练,请先调用fit方法")
data = np.array(data)
predictions = []
current_data = data.copy()
for step in range(forecast_steps):
# 为当前时间点创建特征
n = len(current_data)
t = np.array([[n + 1]]) # 下一个时间点
# 季节性虚拟变量
seasonal_idx = (n) % self.seasonal_period
seasonal_dummy = np.zeros((1, self.seasonal_period - 1))
if seasonal_idx > 0:
seasonal_dummy[0, seasonal_idx - 1] = 1
# 滞后项
lags = []
for lag in range(1, self.max_lags + 1):
if n - lag >= 0:
lags.append(current_data[n - lag])
else:
# 如果历史数据不足,使用最后一个可用值
lags.append(current_data[-1])
# 将lags转换为numpy数组并reshape为(1, max_lags)
lags = np.array(lags).reshape(1, -1)
# 组合特征
X_new = np.column_stack([t, seasonal_dummy, lags])
# 预测
y_pred = self.model.predict(X_new)[0]
predictions.append(y_pred)
# 更新数据用于下一步预测
current_data = np.append(current_data, y_pred)
return np.array(predictions)
def evaluate(self, train_data, test_data, verbose=True, return_base_info=False):
"""
模型评估
Parameters:
-----------
train_data : array, 训练数据
test_data : array, 测试数据
verbose : bool, 是否输出详细信息
return_base_info : bool, 是否返回第一步模型信息
Returns:
--------
metrics : dict, 评估指标
predictions : array, 预测值
base_info : dict, 如果return_base_info=True,返回第一步模型信息
"""
# 在训练集上拟合模型
if return_base_info:
self, base_info = self.fit(train_data, verbose=False, return_base_info=True)
else:
self.fit(train_data, verbose=False)
base_info = None
# 预测测试集
predictions = self.predict(train_data, len(test_data))
# 计算评估指标
mrpe = self.calculate_mrpe(test_data, predictions)
mae = mean_absolute_error(test_data, predictions)
rmse = np.sqrt(mean_squared_error(test_data, predictions))
metrics = {
'MRPE': mrpe,
'MAE': mae,
'RMSE': rmse,
'预测表现': '优秀' if mrpe < 10 else '良好' if mrpe < 20 else '一般'
}
if verbose:
print("
=== 模型评估结果 ===")
for metric, value in metrics.items():
if metric == '预测表现':
print(f"{metric}: {value}")
else:
print(f"{metric}: {value:.4f}")
if return_base_info:
return metrics, predictions, base_info
return metrics, predictions
class ARIMAPredictor:
"""
ARIMA预测模型封装类
"""
def __init__(self, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)):
"""
初始化ARIMA模型
Parameters:
-----------
order : tuple, (p, d, q) 非季节性ARIMA参数
seasonal_order : tuple, (P, D, Q, s) 季节性ARIMA参数
"""
self.order = order
self.seasonal_order = seasonal_order
self.model = None
self.is_fitted = False
def fit(self, data, verbose=True):
"""
训练ARIMA模型
Parameters:
-----------
data : array-like, 时间序列数据
verbose : bool, 是否输出训练信息
"""
try:
# 尝试使用SARIMA(季节性ARIMA)
from statsmodels.tsa.statespace.sarimax import SARIMAX
self.model = SARIMAX(data, order=self.order,
seasonal_order=self.seasonal_order,
enforce_stationarity=False,
enforce_invertibility=False)
self.model = self.model.fit(disp=False)
self.is_fitted = True
if verbose:
print(f"ARIMA模型训练完成")
print(f"AIC: {self.model.aic:.4f}")
except Exception as e:
# 如果SARIMA失败,尝试使用普通ARIMA
if verbose:
print(f"SARIMA训练失败,尝试普通ARIMA: {e}")
try:
self.model = ARIMA(data, order=self.order)
self.model = self.model.fit()
self.is_fitted = True
if verbose:
print(f"ARIMA模型训练完成")
print(f"AIC: {self.model.aic:.4f}")
except Exception as e2:
if verbose:
print(f"ARIMA训练失败: {e2}")
raise
def predict(self, steps=1, start=None):
"""
进行预测
Parameters:
-----------
steps : int, 预测步数
start : int, 预测起始位置(可选)
Returns:
--------
predictions : array, 预测结果
"""
if not self.is_fitted:
raise ValueError("模型尚未训练,请先调用fit方法")
try:
forecast = self.model.forecast(steps=steps)
result = forecast.values if hasattr(forecast, 'values') else forecast
return np.array(result).flatten()
except:
# 如果forecast失败,使用predict
try:
if start is None:
start = len(self.model.model.endog)
predictions = self.model.predict(start=start, end=start+steps-1)
result = predictions.values if hasattr(predictions, 'values') else predictions
return np.array(result).flatten()
except:
# 最后的备选方案
if hasattr(self.model, 'forecast'):
forecast = self.model.forecast(steps=steps)
return np.array(forecast).flatten()
else:
raise ValueError("无法进行预测")
def evaluate(self, train_data, test_data, verbose=True):
"""
模型评估
Parameters:
-----------
train_data : array, 训练数据
test_data : array, 测试数据
Returns:
--------
metrics : dict, 评估指标
predictions : array, 预测值
"""
# 训练模型
self.fit(train_data, verbose=False)
# 预测测试集
predictions = self.predict(steps=len(test_data))
# 计算评估指标
mrpe = self._calculate_mrpe(test_data, predictions)
mae = mean_absolute_error(test_data, predictions)
rmse = np.sqrt(mean_squared_error(test_data, predictions))
metrics = {
'MRPE': mrpe,
'MAE': mae,
'RMSE': rmse,
'预测表现': '优秀' if mrpe < 10 else '良好' if mrpe < 20 else '一般'
}
if verbose:
print("
=== ARIMA模型评估结果 ===")
for metric, value in metrics.items():
if metric == '预测表现':
print(f"{metric}: {value}")
else:
print(f"{metric}: {value:.4f}")
return metrics, predictions
def _calculate_mrpe(self, y_true, y_pred):
"""计算平均相对预测误差"""
relative_errors = np.abs((y_true - y_pred) / y_true)
mrpe = np.mean(relative_errors) * 100
return mrpe
def plot_results(train_data, test_data, predictions, title="三体预测法结果", save_path=None):
"""
绘制预测结果图
Parameters:
-----------
train_data : array, 训练数据
test_data : array, 测试数据
predictions : array, 预测值
title : str, 图表标题
save_path : str, 保存路径,如果为None则不保存
"""
plt.figure(figsize=(12, 6))
# 训练集
train_idx = np.arange(len(train_data))
plt.plot(train_idx, train_data, 'b-', label='训练数据', linewidth=2)
# 测试集和预测
test_idx = np.arange(len(train_data), len(train_data) + len(test_data))
plt.plot(test_idx, test_data, 'g-', label='测试数据', linewidth=2, marker='o', markersize=4)
plt.plot(test_idx, predictions, 'r--', label='预测值', linewidth=2, marker='s', markersize=4)
# 添加分界线
plt.axvline(x=len(train_data), color='gray', linestyle='--', alpha=0.7, label='训练/测试分界')
plt.xlabel('时间')
plt.ylabel('数值')
plt.title(title)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"图表已保存至: {save_path}")
plt.show()
def plot_base_regression(data, base_info, save_path=None):
"""
绘制第一步线性回归模型图
Parameters:
-----------
data : array, 原始数据
base_info : dict, 第一步模型信息
save_path : str, 保存路径
"""
model_base = base_info['model']
y_base = base_info['y']
y_pred_base = base_info['y_pred']
r2_base = base_info['r2']
# 时间索引(由于create_features中移除了NaN行,所以y_base的长度可能小于原始数据)
time_idx = np.arange(len(y_base))
plt.figure(figsize=(12, 6))
# 绘制原始数据
plt.plot(time_idx, y_base, 'b-', label='原始数据', linewidth=2, alpha=0.7, marker='o', markersize=3)
# 绘制回归预测线
plt.plot(time_idx, y_pred_base, 'r--', label='线性回归拟合', linewidth=2)
# 构建公式字符串
coef = model_base.coef_
intercept = model_base.intercept_
feature_names = base_info['feature_names']
formula_parts = []
if abs(intercept) > 1e-6:
formula_parts.append(f"{intercept:.4f}")
for i, (name, c) in enumerate(zip(feature_names, coef)):
if abs(c) > 1e-6:
sign = '+' if c >= 0 else ''
formula_parts.append(f"{sign}{c:.4f}*{name}")
formula = "y = " + " ".join(formula_parts)
if len(formula) > 100:
# 如果公式太长,截断并换行
formula_lines = []
current_line = ""
for part in formula_parts:
if len(current_line + part) > 60:
if current_line:
formula_lines.append(current_line)
current_line = part
else:
current_line += (" " + part if current_line else part)
if current_line:
formula_lines.append(current_line)
formula = "y = " + "
".join(formula_lines)
# 添加文本信息
textstr = f"线性回归模型:
{formula}
R^2 = {r2_base:.4f}"
plt.text(0.02, 0.98, textstr, transform=plt.gca().transAxes,
fontsize=9, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
plt.xlabel('时间')
plt.ylabel('数值')
plt.title('第一步:线性回归模型(趋势+季节性)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"图表已保存至: {save_path}")
plt.show()
def plot_future_predictions(data, future_predictions, save_path=None):
"""
绘制未来预测结果图
Parameters:
-----------
data : array, 历史数据
future_predictions : array, 未来预测值
save_path : str, 保存路径
"""
plt.figure(figsize=(12, 6))
# 历史数据
hist_idx = np.arange(len(data))
plt.plot(hist_idx, data, 'b-', label='历史数据', linewidth=2)
# 未来预测
future_idx = np.arange(len(data), len(data) + len(future_predictions))
plt.plot(future_idx, future_predictions, 'r--', label='未来预测', linewidth=2, marker='o', markersize=5)
# 添加分界线
plt.axvline(x=len(data), color='gray', linestyle='--', alpha=0.7, label='历史/预测分界')
plt.xlabel('时间')
plt.ylabel('数值')
plt.title('未来预测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"图表已保存至: {save_path}")
plt.show()
def plot_comparison(true_data, tb_predictions, arima_predictions, title="算法对比", save_path=None):
"""
绘制真实值、三体预测和ARIMA预测的对比图
Parameters:
-----------
true_data : array, 真实值
tb_predictions : array, 三体预测值
arima_predictions : array, ARIMA预测值
title : str, 图表标题
save_path : str, 保存路径
"""
plt.figure(figsize=(14, 6))
# 时间索引
time_idx = np.arange(len(true_data))
# 绘制三条线
plt.plot(time_idx, true_data, 'g-', label='真实值', linewidth=2.5, marker='o', markersize=6)
plt.plot(time_idx, tb_predictions, 'b--', label='三体预测法', linewidth=2, marker='s', markersize=5)
plt.plot(time_idx, arima_predictions, 'r--', label='ARIMA', linewidth=2, marker='^', markersize=5)
plt.xlabel('时间(月份)', fontsize=12)
plt.ylabel('数值', fontsize=12)
plt.title(title, fontsize=14)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"对比图已保存至: {save_path}")
plt.show()
# 附加功能:批量测试不同参数
def parameter_sensitivity_analysis(save_results=True):
"""
参数敏感性分析
Parameters:
-----------
save_results : bool, 是否保存结果
"""
print("
=== 参数敏感性分析 ===")
# 创建输出目录
output_dir = "output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 从CSV文件读取数据
df = pd.read_csv('时间序列数据.csv', encoding='utf-8')
data = df['数值'].values
split_idx = int(len(data) * 0.8)
train_data = data[:split_idx]
test_data = data[split_idx:]
# 测试不同的滞后阶数
lag_results = []
for max_lags in [1, 2, 3, 4, 5]:
predictor = ThreeBodyPredictor(max_lags=max_lags)
metrics, _ = predictor.evaluate(train_data, test_data, verbose=False)
lag_results.append({
'滞后阶数': max_lags,
'MRPE': metrics['MRPE'],
'MAE': metrics['MAE'],
'RMSE': metrics['RMSE'],
'预测表现': metrics['预测表现']
})
print("不同滞后阶数的MRPE:")
for result in lag_results:
print(f"滞后{result['滞后阶数']}阶: MRPE = {result['MRPE']:.2f}%")
# 保存文本结果
if save_results:
result_file = os.path.join(output_dir, '05_敏感性分析结果.txt')
with open(result_file, 'w', encoding='utf-8') as f:
f.write("=== 参数敏感性分析结果 ===
")
f.write(f"数据量: {len(data)} 条
")
f.write(f"训练集: {len(train_data)} 条, 测试集: {len(test_data)} 条
")
f.write("不同滞后阶数的评估结果:
")
f.write("-" * 60 + "
")
f.write(f"{'滞后阶数':<10} {'MRPE(%)':<15} {'MAE':<15} {'RMSE':<15} {'预测表现':<10}
")
f.write("-" * 60 + "
")
for result in lag_results:
f.write(f"{result['滞后阶数']:<10} {result['MRPE']:<15.4f} "
f"{result['MAE']:<15.4f} {result['RMSE']:<15.4f} {result['预测表现']:<10}
")
f.write("-" * 60 + "
")
print(f"
敏感性分析结果已保存至: {result_file}")
# 绘制可视化图表
if save_results:
lags = [r['滞后阶数'] for r in lag_results]
mrpe_values = [r['MRPE'] for r in lag_results]
mae_values = [r['MAE'] for r in lag_results]
rmse_values = [r['RMSE'] for r in lag_results]
# 创建子图
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# 左图:MRPE
ax1.plot(lags, mrpe_values, 'o-', linewidth=2, markersize=8, color='blue')
ax1.set_xlabel('滞后阶数', fontsize=12)
ax1.set_ylabel('MRPE (%)', fontsize=12)
ax1.set_title('不同滞后阶数的MRPE变化', fontsize=14)
ax1.grid(True, alpha=0.3)
ax1.set_xticks(lags)
# 在点上标注数值
for lag, mrpe in zip(lags, mrpe_values):
ax1.annotate(f'{mrpe:.2f}%', (lag, mrpe),
textcoords="offset points", xytext=(0,10), ha='center', fontsize=9)
# 右图:MAE和RMSE(归一化后显示)
# 归一化以便在同一图上比较
max_mae = max(mae_values) if max(mae_values) > 0 else 1
max_rmse = max(rmse_values) if max(rmse_values) > 0 else 1
mae_norm = [v / max_mae for v in mae_values]
rmse_norm = [v / max_rmse for v in rmse_values]
ax2.plot(lags, mae_norm, 's-', linewidth=2, markersize=8, label='MAE (归一化)', color='green')
ax2.plot(lags, rmse_norm, '^-', linewidth=2, markersize=8, label='RMSE (归一化)', color='red')
ax2.set_xlabel('滞后阶数', fontsize=12)
ax2.set_ylabel('归一化误差', fontsize=12)
ax2.set_title('不同滞后阶数的MAE和RMSE变化', fontsize=14)
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.set_xticks(lags)
plt.tight_layout()
chart_path = os.path.join(output_dir, '05_敏感性分析图表.png')
plt.savefig(chart_path, dpi=300, bbox_inches='tight')
print(f"敏感性分析图表已保存至: {chart_path}")
plt.show()
# 示例使用
if __name__ == "__main__":
print("=== 三体预测法 vs ARIMA 对比分析 ===
")
# 创建输出目录
output_dir = "output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"创建输出目录: {output_dir}")
# 1. 从CSV文件读取数据
print("1. 从CSV文件读取时间序列数据...")
try:
df = pd.read_csv('时间序列数据.csv', encoding='utf-8')
# 提取数值列和时间列
data = df['数值'].values
time_col = df['时间'].values
print(f"成功读取 {len(data)} 条数据")
# 找到2020年12月的位置(训练集结束)
# 数据从2011年1月开始,每个月一条,所以2020年12月是第120条(索引119)
# 2021年1月是第121条(索引120)
train_end_idx = 120 # 2011年1月到2020年12月共120个月
train_data = data[:train_end_idx]
test_data = data[train_end_idx:] # 2021年1月到2022年12月
print(f"训练集: {len(train_data)} 条 (2011年1月 - 2020年12月)")
print(f"测试集: {len(test_data)} 条 (2021年1月 - 2022年12月)")
# 提取2021年的数据用于预测和评估
# 2021年1月到8月是测试集的前8个
# 2021年1月到12月是测试集的前12个
test_2021_01_08 = test_data[:8] # 2021年1-8月
test_2021_01_12 = test_data[:12] # 2021年1-12月
except FileNotFoundError:
print("错误: 找不到文件 '时间序列数据.csv'")
raise
except Exception as e:
print(f"读取CSV文件时出错: {e}")
raise
# 2. 数据可视化并保存
print("
2. 数据可视化...")
plt.figure(figsize=(12, 5))
plt.plot(data, 'b-', linewidth=1.5)
plt.axvline(x=train_end_idx, color='r', linestyle='--', linewidth=2, label='训练/测试分界 (2020年12月)')
plt.xlabel('时间(月份)', fontsize=11)
plt.ylabel('数值', fontsize=11)
plt.title('时间序列数据 (2011年1月 - 2022年12月)', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(output_dir, '01_原始数据.png'), dpi=300, bbox_inches='tight')
print(f"原始数据图已保存至: {os.path.join(output_dir, '01_原始数据.png')}")
plt.show()
# 3. 训练三体预测模型
print("
" + "="*60)
print("3. 训练三体预测模型...")
print("="*60)
tb_predictor = ThreeBodyPredictor(seasonal_period=12, max_lags=3)
tb_predictor.fit(train_data, verbose=True)
# 4. 训练ARIMA模型
print("
" + "="*60)
print("4. 训练ARIMA模型...")
print("="*60)
arima_predictor = ARIMAPredictor(order=(1, 1, 1), seasonal_order=(1, 1, 1, 12))
arima_predictor.fit(train_data, verbose=True)
# 5. 预测任务1:2021年1-8月
print("
" + "="*60)
print("5. 预测任务1:2021年1-8月")
print("="*60)
# 三体预测法预测
tb_pred_01_08 = tb_predictor.predict(train_data, forecast_steps=8)
tb_metrics_01_08 = {}
tb_metrics_01_08['MRPE'] = tb_predictor.calculate_mrpe(test_2021_01_08, tb_pred_01_08)
tb_metrics_01_08['MAE'] = mean_absolute_error(test_2021_01_08, tb_pred_01_08)
tb_metrics_01_08['RMSE'] = np.sqrt(mean_squared_error(test_2021_01_08, tb_pred_01_08))
print("
三体预测法结果 (2021年1-8月):")
print(f" MRPE: {tb_metrics_01_08['MRPE']:.4f}%")
print(f" MAE: {tb_metrics_01_08['MAE']:.4f}")
print(f" RMSE: {tb_metrics_01_08['RMSE']:.4f}")
# ARIMA预测
arima_pred_01_08 = arima_predictor.predict(steps=8)
arima_metrics_01_08 = {}
arima_metrics_01_08['MRPE'] = arima_predictor._calculate_mrpe(test_2021_01_08, arima_pred_01_08)
arima_metrics_01_08['MAE'] = mean_absolute_error(test_2021_01_08, arima_pred_01_08)
arima_metrics_01_08['RMSE'] = np.sqrt(mean_squared_error(test_2021_01_08, arima_pred_01_08))
print("
ARIMA结果 (2021年1-8月):")
print(f" MRPE: {arima_metrics_01_08['MRPE']:.4f}%")
print(f" MAE: {arima_metrics_01_08['MAE']:.4f}")
print(f" RMSE: {arima_metrics_01_08['RMSE']:.4f}")
# 6. 预测任务2:2021年1-12月
print("
" + "="*60)
print("6. 预测任务2:2021年1-12月")
print("="*60)
# 三体预测法预测
tb_pred_01_12 = tb_predictor.predict(train_data, forecast_steps=12)
tb_metrics_01_12 = {}
tb_metrics_01_12['MRPE'] = tb_predictor.calculate_mrpe(test_2021_01_12, tb_pred_01_12)
tb_metrics_01_12['MAE'] = mean_absolute_error(test_2021_01_12, tb_pred_01_12)
tb_metrics_01_12['RMSE'] = np.sqrt(mean_squared_error(test_2021_01_12, tb_pred_01_12))
print("
三体预测法结果 (2021年1-12月):")
print(f" MRPE: {tb_metrics_01_12['MRPE']:.4f}%")
print(f" MAE: {tb_metrics_01_12['MAE']:.4f}")
print(f" RMSE: {tb_metrics_01_12['RMSE']:.4f}")
# ARIMA预测
arima_pred_01_12 = arima_predictor.predict(steps=12)
arima_metrics_01_12 = {}
arima_metrics_01_12['MRPE'] = arima_predictor._calculate_mrpe(test_2021_01_12, arima_pred_01_12)
arima_metrics_01_12['MAE'] = mean_absolute_error(test_2021_01_12, arima_pred_01_12)
arima_metrics_01_12['RMSE'] = np.sqrt(mean_squared_error(test_2021_01_12, arima_pred_01_12))
print("
ARIMA结果 (2021年1-12月):")
print(f" MRPE: {arima_metrics_01_12['MRPE']:.4f}%")
print(f" MAE: {arima_metrics_01_12['MAE']:.4f}")
print(f" RMSE: {arima_metrics_01_12['RMSE']:.4f}")
# 7. 绘制2021年1-12月对比图
print("
" + "="*60)
print("7. 绘制2021年1-12月对比图")
print("="*60)
plot_comparison(test_2021_01_12, tb_pred_01_12, arima_pred_01_12,
title="2021年1-12月预测对比:三体预测法 vs ARIMA",
save_path=os.path.join(output_dir, '06_2021年预测对比图.png'))
# 8. 保存评估结果
print("
" + "="*60)
print("8. 保存评估结果")
print("="*60)
result_file = os.path.join(output_dir, '07_算法对比评估结果.txt')
with open(result_file, 'w', encoding='utf-8') as f:
f.write("=== 三体预测法 vs ARIMA 算法对比评估结果 ===
")
f.write(f"训练集: 2011年1月 - 2020年12月 ({len(train_data)} 个月)
")
f.write(f"测试集: 2021年1月 - 2022年12月 ({len(test_data)} 个月)
")
f.write("="*60 + "
")
f.write("预测任务1: 2021年1-8月
")
f.write("="*60 + "
")
f.write(f"{'指标':<15} {'三体预测法':<20} {'ARIMA':<20}
")
f.write("-"*60 + "
")
f.write(f"{'MRPE (%)':<15} {tb_metrics_01_08['MRPE']:<20.4f} {arima_metrics_01_08['MRPE']:<20.4f}
")
f.write(f"{'MAE':<15} {tb_metrics_01_08['MAE']:<20.4f} {arima_metrics_01_08['MAE']:<20.4f}
")
f.write(f"{'RMSE':<15} {tb_metrics_01_08['RMSE']:<20.4f} {arima_metrics_01_08['RMSE']:<20.4f}
")
f.write("
" + "="*60 + "
")
f.write("预测任务2: 2021年1-12月
")
f.write("="*60 + "
")
f.write(f"{'指标':<15} {'三体预测法':<20} {'ARIMA':<20}
")
f.write("-"*60 + "
")
f.write(f"{'MRPE (%)':<15} {tb_metrics_01_12['MRPE']:<20.4f} {arima_metrics_01_12['MRPE']:<20.4f}
")
f.write(f"{'MAE':<15} {tb_metrics_01_12['MAE']:<20.4f} {arima_metrics_01_12['MAE']:<20.4f}
")
f.write(f"{'RMSE':<15} {tb_metrics_01_12['RMSE']:<20.4f} {arima_metrics_01_12['RMSE']:<20.4f}
")
# 判断哪个算法更好
f.write("
" + "="*60 + "
")
f.write("算法性能总结
")
f.write("="*60 + "
")
f.write("2021年1-8月:
")
if tb_metrics_01_08['MRPE'] < arima_metrics_01_08['MRPE']:
f.write(" 三体预测法表现更好 (MRPE更低)
")
else:
f.write(" ARIMA表现更好 (MRPE更低)
")
f.write("
2021年1-12月:
")
if tb_metrics_01_12['MRPE'] < arima_metrics_01_12['MRPE']:
f.write(" 三体预测法表现更好 (MRPE更低)
")
else:
f.write(" ARIMA表现更好 (MRPE更低)
")
print(f"评估结果已保存至: {result_file}")
print("
" + "="*60)
print("所有任务完成!")
print("="*60)
