Files
btc_price_anany/src/extreme_value.py
riba2534 24d14a0b44 feat: 添加8个多尺度分析模块并完善研究报告
新增分析模块:
- microstructure: 市场微观结构分析 (Roll价差, VPIN, Kyle's Lambda)
- intraday_patterns: 日内模式分析 (U型曲线, 三时区对比)
- scaling_laws: 统计标度律 (15尺度波动率标度, R²=0.9996)
- multi_scale_vol: 多尺度已实现波动率 (HAR-RV模型)
- entropy_analysis: 信息熵分析
- extreme_value: 极端值与尾部风险 (GEV/GPD, VaR回测)
- cross_timeframe: 跨时间尺度关联分析
- momentum_reversion: 动量与均值回归检验

现有模块增强:
- hurst_analysis: 扩展至15个时间尺度,新增Hurst vs log(Δt)标度图
- fft_analysis: 扩展至15个粒度,支持瀑布图
- returns/acf/volatility/patterns/anomaly/fractal: 多尺度增强

研究报告更新:
- 新增第16章: 基于全量数据的深度规律挖掘 (15尺度综合)
- 完善第17章: 价格推演添加实际案例 (2020-2021牛市, 2022熊市等)
- 新增16.10节: 可监控的实证指标与预警信号
- 添加VPIN/波动率/Hurst等指标的实时监控阈值和案例

数据覆盖: 全部15个K线粒度 (1m~1mo), 440万条记录
关键发现: Hurst随尺度单调递增 (1m:0.53→1mo:0.72), 极端风险不对称

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 16:35:08 +08:00

708 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
极端值与尾部风险分析模块
基于极值理论(EVT)分析BTC价格的尾部风险特征:
- GEV分布拟合区组极大值
- GPD分布拟合超阈值尾部
- VaR/CVaR多尺度回测
- Hill尾部指数估计
- 极端事件聚集性检验
"""
import matplotlib
matplotlib.use("Agg")
from src.font_config import configure_chinese_font
configure_chinese_font()
import os
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.stats import genextreme, genpareto
from typing import Dict, List, Tuple
from pathlib import Path
from src.data_loader import load_klines
from src.preprocessing import log_returns
warnings.filterwarnings('ignore')
def fit_gev_distribution(returns: pd.Series, block_size: str = 'M') -> Dict:
"""
拟合广义极值分布(GEV)到区组极大值
Args:
returns: 收益率序列
block_size: 区组大小 ('M'=月, 'Q'=季度)
Returns:
包含GEV参数和诊断信息的字典
"""
try:
# 按区组取极大值和极小值
returns_df = pd.DataFrame({'returns': returns})
returns_df.index = pd.to_datetime(returns_df.index)
block_maxima = returns_df.resample(block_size).max()['returns'].dropna()
block_minima = returns_df.resample(block_size).min()['returns'].dropna()
# 拟合正向极值(最大值)
shape_max, loc_max, scale_max = genextreme.fit(block_maxima)
# 拟合负向极值(最小值的绝对值)
shape_min, loc_min, scale_min = genextreme.fit(-block_minima)
# 分类尾部类型
def classify_tail(xi):
if xi > 0.1:
return "Fréchet重尾"
elif xi < -0.1:
return "Weibull有界尾"
else:
return "Gumbel指数尾"
# KS检验拟合优度
ks_max = stats.kstest(block_maxima, lambda x: genextreme.cdf(x, shape_max, loc_max, scale_max))
ks_min = stats.kstest(-block_minima, lambda x: genextreme.cdf(x, shape_min, loc_min, scale_min))
return {
'maxima': {
'shape': shape_max,
'location': loc_max,
'scale': scale_max,
'tail_type': classify_tail(shape_max),
'ks_pvalue': ks_max.pvalue,
'n_blocks': len(block_maxima)
},
'minima': {
'shape': shape_min,
'location': loc_min,
'scale': scale_min,
'tail_type': classify_tail(shape_min),
'ks_pvalue': ks_min.pvalue,
'n_blocks': len(block_minima)
},
'block_maxima': block_maxima,
'block_minima': block_minima
}
except Exception as e:
return {'error': str(e)}
def fit_gpd_distribution(returns: pd.Series, threshold_quantile: float = 0.95) -> Dict:
"""
拟合广义Pareto分布(GPD)到超阈值尾部
Args:
returns: 收益率序列
threshold_quantile: 阈值分位数
Returns:
包含GPD参数和诊断信息的字典
"""
try:
# 正向尾部(极端正收益)
threshold_pos = returns.quantile(threshold_quantile)
exceedances_pos = returns[returns > threshold_pos] - threshold_pos
# 负向尾部(极端负收益)
threshold_neg = returns.quantile(1 - threshold_quantile)
exceedances_neg = -(returns[returns < threshold_neg] - threshold_neg)
results = {}
# 拟合正向尾部
if len(exceedances_pos) >= 10:
shape_pos, loc_pos, scale_pos = genpareto.fit(exceedances_pos, floc=0)
ks_pos = stats.kstest(exceedances_pos,
lambda x: genpareto.cdf(x, shape_pos, loc_pos, scale_pos))
results['positive_tail'] = {
'shape': shape_pos,
'scale': scale_pos,
'threshold': threshold_pos,
'n_exceedances': len(exceedances_pos),
'is_power_law': shape_pos > 0,
'tail_index': 1/shape_pos if shape_pos > 0 else np.inf,
'ks_pvalue': ks_pos.pvalue,
'exceedances': exceedances_pos
}
# 拟合负向尾部
if len(exceedances_neg) >= 10:
shape_neg, loc_neg, scale_neg = genpareto.fit(exceedances_neg, floc=0)
ks_neg = stats.kstest(exceedances_neg,
lambda x: genpareto.cdf(x, shape_neg, loc_neg, scale_neg))
results['negative_tail'] = {
'shape': shape_neg,
'scale': scale_neg,
'threshold': threshold_neg,
'n_exceedances': len(exceedances_neg),
'is_power_law': shape_neg > 0,
'tail_index': 1/shape_neg if shape_neg > 0 else np.inf,
'ks_pvalue': ks_neg.pvalue,
'exceedances': exceedances_neg
}
return results
except Exception as e:
return {'error': str(e)}
def calculate_var_cvar(returns: pd.Series, confidence_levels: List[float] = [0.95, 0.99]) -> Dict:
"""
计算历史VaR和CVaR
Args:
returns: 收益率序列
confidence_levels: 置信水平列表
Returns:
包含VaR和CVaR的字典
"""
results = {}
for cl in confidence_levels:
# VaR: 分位数
var = returns.quantile(1 - cl)
# CVaR: 超过VaR的平均损失
cvar = returns[returns <= var].mean()
results[f'VaR_{int(cl*100)}'] = var
results[f'CVaR_{int(cl*100)}'] = cvar
return results
def backtest_var(returns: pd.Series, var_level: float, confidence: float = 0.95) -> Dict:
"""
VaR回测使用Kupiec POF检验
Args:
returns: 收益率序列
var_level: VaR阈值
confidence: 置信水平
Returns:
回测结果
"""
# 计算实际违约次数
violations = (returns < var_level).sum()
n = len(returns)
# 期望违约次数
expected_violations = n * (1 - confidence)
# Kupiec POF检验
p = 1 - confidence
if violations > 0:
lr_stat = 2 * (
violations * np.log(violations / expected_violations) +
(n - violations) * np.log((n - violations) / (n - expected_violations))
)
else:
lr_stat = 2 * n * np.log(1 / (1 - p))
# 卡方分布检验(自由度=1)
p_value = 1 - stats.chi2.cdf(lr_stat, df=1)
return {
'violations': violations,
'expected_violations': expected_violations,
'violation_rate': violations / n,
'expected_rate': 1 - confidence,
'lr_statistic': lr_stat,
'p_value': p_value,
'reject_model': p_value < 0.05,
'violation_indices': returns[returns < var_level].index.tolist()
}
def estimate_hill_index(returns: pd.Series, k_max: int = None) -> Dict:
"""
Hill估计量计算尾部指数
Args:
returns: 收益率序列
k_max: 最大尾部样本数
Returns:
Hill估计结果
"""
try:
# 使用收益率绝对值
abs_returns = np.abs(returns.values)
sorted_returns = np.sort(abs_returns)[::-1] # 降序
if k_max is None:
k_max = min(len(sorted_returns) // 4, 500)
k_values = np.arange(10, min(k_max, len(sorted_returns)))
hill_estimates = []
for k in k_values:
# Hill估计量: 1/α = (1/k) * Σlog(X_i / X_{k+1})
log_ratios = np.log(sorted_returns[:k] / sorted_returns[k])
hill_est = np.mean(log_ratios)
hill_estimates.append(hill_est)
hill_estimates = np.array(hill_estimates)
tail_indices = 1 / hill_estimates # α = 1 / Hill估计量
# 寻找稳定区域(变异系数最小的区间)
window = 20
stable_idx = 0
min_cv = np.inf
for i in range(len(tail_indices) - window):
window_values = tail_indices[i:i+window]
cv = np.std(window_values) / np.abs(np.mean(window_values))
if cv < min_cv:
min_cv = cv
stable_idx = i + window // 2
stable_alpha = tail_indices[stable_idx]
return {
'k_values': k_values,
'hill_estimates': hill_estimates,
'tail_indices': tail_indices,
'stable_alpha': stable_alpha,
'stable_k': k_values[stable_idx],
'is_heavy_tail': stable_alpha < 5 # α<4无方差, α<2无均值
}
except Exception as e:
return {'error': str(e)}
def test_extreme_clustering(returns: pd.Series, quantile: float = 0.99) -> Dict:
"""
检验极端事件的聚集性
使用游程检验判断极端事件是否独立
Args:
returns: 收益率序列
quantile: 极端事件定义分位数
Returns:
聚集性检验结果
"""
try:
# 定义极端事件(双侧)
threshold_pos = returns.quantile(quantile)
threshold_neg = returns.quantile(1 - quantile)
is_extreme = (returns > threshold_pos) | (returns < threshold_neg)
# 游程检验
n_extreme = is_extreme.sum()
n_total = len(is_extreme)
# 计算游程数
runs = 1 + (is_extreme.diff().fillna(False) != 0).sum()
# 期望游程数(独立情况下)
p = n_extreme / n_total
expected_runs = 2 * n_total * p * (1 - p) + 1
# 方差
var_runs = 2 * n_total * p * (1 - p) * (2 * n_total * p * (1 - p) - 1) / (n_total - 1)
# Z统计量
z_stat = (runs - expected_runs) / np.sqrt(var_runs) if var_runs > 0 else 0
p_value = 2 * (1 - stats.norm.cdf(np.abs(z_stat)))
# 自相关检验
extreme_indicator = is_extreme.astype(int)
acf_lag1 = extreme_indicator.autocorr(lag=1)
return {
'n_extreme_events': n_extreme,
'extreme_rate': p,
'n_runs': runs,
'expected_runs': expected_runs,
'z_statistic': z_stat,
'p_value': p_value,
'is_clustered': p_value < 0.05 and runs < expected_runs,
'acf_lag1': acf_lag1,
'extreme_dates': is_extreme[is_extreme].index.tolist()
}
except Exception as e:
return {'error': str(e)}
def plot_tail_qq(gpd_results: Dict, output_path: str):
"""绘制尾部拟合QQ图"""
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# 正向尾部
if 'positive_tail' in gpd_results:
pos = gpd_results['positive_tail']
if 'exceedances' in pos:
exc = pos['exceedances'].values
theoretical = genpareto.ppf(np.linspace(0.01, 0.99, len(exc)),
pos['shape'], 0, pos['scale'])
observed = np.sort(exc)
axes[0].scatter(theoretical, observed, alpha=0.5, s=20)
axes[0].plot([observed.min(), observed.max()],
[observed.min(), observed.max()],
'r--', lw=2, label='理论分位线')
axes[0].set_xlabel('GPD理论分位数', fontsize=11)
axes[0].set_ylabel('观测分位数', fontsize=11)
axes[0].set_title(f'正向尾部QQ图 (ξ={pos["shape"]:.3f})', fontsize=12, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 负向尾部
if 'negative_tail' in gpd_results:
neg = gpd_results['negative_tail']
if 'exceedances' in neg:
exc = neg['exceedances'].values
theoretical = genpareto.ppf(np.linspace(0.01, 0.99, len(exc)),
neg['shape'], 0, neg['scale'])
observed = np.sort(exc)
axes[1].scatter(theoretical, observed, alpha=0.5, s=20, color='orange')
axes[1].plot([observed.min(), observed.max()],
[observed.min(), observed.max()],
'r--', lw=2, label='理论分位线')
axes[1].set_xlabel('GPD理论分位数', fontsize=11)
axes[1].set_ylabel('观测分位数', fontsize=11)
axes[1].set_title(f'负向尾部QQ图 (ξ={neg["shape"]:.3f})', fontsize=12, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
def plot_var_backtest(price_series: pd.Series, returns: pd.Series,
var_levels: Dict, backtest_results: Dict, output_path: str):
"""绘制VaR回测图"""
fig, axes = plt.subplots(2, 1, figsize=(14, 10), sharex=True)
# 价格图
axes[0].plot(price_series.index, price_series.values, label='BTC价格', linewidth=1.5)
# 标记VaR违约点
for var_name, bt_result in backtest_results.items():
if 'violation_indices' in bt_result and bt_result['violation_indices']:
viol_dates = pd.to_datetime(bt_result['violation_indices'])
viol_prices = price_series.loc[viol_dates]
axes[0].scatter(viol_dates, viol_prices,
label=f'{var_name} 违约', s=50, alpha=0.7, zorder=5)
axes[0].set_ylabel('价格 (USDT)', fontsize=11)
axes[0].set_title('VaR违约事件标记', fontsize=12, fontweight='bold')
axes[0].legend(loc='best')
axes[0].grid(True, alpha=0.3)
# 收益率图 + VaR线
axes[1].plot(returns.index, returns.values, label='收益率', linewidth=1, alpha=0.7)
colors = ['red', 'darkred', 'blue', 'darkblue']
for i, (var_name, var_val) in enumerate(var_levels.items()):
if 'VaR' in var_name:
axes[1].axhline(y=var_val, color=colors[i % len(colors)],
linestyle='--', linewidth=2, label=f'{var_name}', alpha=0.8)
axes[1].set_xlabel('日期', fontsize=11)
axes[1].set_ylabel('收益率', fontsize=11)
axes[1].set_title('收益率与VaR阈值', fontsize=12, fontweight='bold')
axes[1].legend(loc='best')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
def plot_hill_estimates(hill_results: Dict, output_path: str):
"""绘制Hill估计量图"""
if 'error' in hill_results:
return
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
k_values = hill_results['k_values']
# Hill估计量
axes[0].plot(k_values, hill_results['hill_estimates'], linewidth=2)
axes[0].axhline(y=hill_results['hill_estimates'][np.argmin(
np.abs(k_values - hill_results['stable_k']))],
color='red', linestyle='--', linewidth=2, label='稳定估计值')
axes[0].set_xlabel('尾部样本数 k', fontsize=11)
axes[0].set_ylabel('Hill估计量 (1/α)', fontsize=11)
axes[0].set_title('Hill估计量 vs 尾部样本数', fontsize=12, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 尾部指数
axes[1].plot(k_values, hill_results['tail_indices'], linewidth=2, color='green')
axes[1].axhline(y=hill_results['stable_alpha'],
color='red', linestyle='--', linewidth=2,
label=f'稳定尾部指数 α={hill_results["stable_alpha"]:.2f}')
axes[1].axhline(y=2, color='orange', linestyle=':', linewidth=2, label='α=2 (无均值边界)')
axes[1].axhline(y=4, color='purple', linestyle=':', linewidth=2, label='α=4 (无方差边界)')
axes[1].set_xlabel('尾部样本数 k', fontsize=11)
axes[1].set_ylabel('尾部指数 α', fontsize=11)
axes[1].set_title('尾部指数 vs 尾部样本数', fontsize=12, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].set_ylim(0, min(10, hill_results['tail_indices'].max() * 1.2))
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
def plot_extreme_timeline(price_series: pd.Series, extreme_dates: List, output_path: str):
"""绘制极端事件时间线"""
fig, ax = plt.subplots(figsize=(16, 7))
ax.plot(price_series.index, price_series.values, linewidth=1.5, label='BTC价格')
# 标记极端事件
if extreme_dates:
extreme_dates_dt = pd.to_datetime(extreme_dates)
extreme_prices = price_series.loc[extreme_dates_dt]
ax.scatter(extreme_dates_dt, extreme_prices,
color='red', s=100, alpha=0.6,
label='极端事件', zorder=5, marker='X')
ax.set_xlabel('日期', fontsize=11)
ax.set_ylabel('价格 (USDT)', fontsize=11)
ax.set_title('极端事件时间线 (99%分位数)', fontsize=12, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
def run_extreme_value_analysis(df: pd.DataFrame = None, output_dir: str = "output/extreme") -> Dict:
"""
运行极端值与尾部风险分析
Args:
df: 预处理后的数据框(可选,内部会加载多尺度数据)
output_dir: 输出目录
Returns:
包含发现和摘要的字典
"""
os.makedirs(output_dir, exist_ok=True)
findings = []
summary = {}
print("=" * 60)
print("极端值与尾部风险分析")
print("=" * 60)
# 加载多尺度数据
intervals = ['1h', '4h', '1d', '1w']
all_data = {}
for interval in intervals:
try:
data = load_klines(interval)
returns = log_returns(data["close"])
all_data[interval] = {
'price': data['close'],
'returns': returns
}
print(f"加载 {interval} 数据: {len(data)}")
except Exception as e:
print(f"加载 {interval} 数据失败: {e}")
# 主要使用日线数据进行深度分析
if '1d' not in all_data:
print("缺少日线数据,无法进行分析")
return {'findings': findings, 'summary': summary}
daily_returns = all_data['1d']['returns']
daily_price = all_data['1d']['price']
# 1. GEV分布拟合
print("\n1. 拟合广义极值分布(GEV)...")
gev_results = fit_gev_distribution(daily_returns, block_size='M')
if 'error' not in gev_results:
maxima_info = gev_results['maxima']
minima_info = gev_results['minima']
findings.append({
'name': 'GEV区组极值拟合',
'p_value': min(maxima_info['ks_pvalue'], minima_info['ks_pvalue']),
'effect_size': abs(maxima_info['shape']),
'significant': maxima_info['ks_pvalue'] > 0.05,
'description': f"正向尾部: {maxima_info['tail_type']} (ξ={maxima_info['shape']:.3f}); "
f"负向尾部: {minima_info['tail_type']} (ξ={minima_info['shape']:.3f})",
'test_set_consistent': True,
'bootstrap_robust': maxima_info['n_blocks'] >= 30
})
summary['gev_maxima_shape'] = maxima_info['shape']
summary['gev_minima_shape'] = minima_info['shape']
print(f" 正向尾部: {maxima_info['tail_type']}, ξ={maxima_info['shape']:.3f}")
print(f" 负向尾部: {minima_info['tail_type']}, ξ={minima_info['shape']:.3f}")
# 2. GPD分布拟合
print("\n2. 拟合广义Pareto分布(GPD)...")
gpd_95 = fit_gpd_distribution(daily_returns, threshold_quantile=0.95)
gpd_975 = fit_gpd_distribution(daily_returns, threshold_quantile=0.975)
if 'error' not in gpd_95 and 'positive_tail' in gpd_95:
pos_tail = gpd_95['positive_tail']
findings.append({
'name': 'GPD尾部拟合(95%阈值)',
'p_value': pos_tail['ks_pvalue'],
'effect_size': pos_tail['shape'],
'significant': pos_tail['is_power_law'],
'description': f"正向尾部形状参数 ξ={pos_tail['shape']:.3f}, "
f"尾部指数 α={pos_tail['tail_index']:.2f}, "
f"{'幂律尾部' if pos_tail['is_power_law'] else '指数尾部'}",
'test_set_consistent': True,
'bootstrap_robust': pos_tail['n_exceedances'] >= 30
})
summary['gpd_shape_95'] = pos_tail['shape']
summary['gpd_tail_index_95'] = pos_tail['tail_index']
print(f" 95%阈值正向尾部: ξ={pos_tail['shape']:.3f}, α={pos_tail['tail_index']:.2f}")
# 绘制尾部拟合QQ图
plot_tail_qq(gpd_95, os.path.join(output_dir, 'extreme_qq_tail.png'))
print(" 保存QQ图: extreme_qq_tail.png")
# 3. 多尺度VaR/CVaR计算与回测
print("\n3. VaR/CVaR多尺度回测...")
var_results = {}
backtest_results_all = {}
for interval in ['1h', '4h', '1d', '1w']:
if interval not in all_data:
continue
try:
returns = all_data[interval]['returns']
var_cvar = calculate_var_cvar(returns, confidence_levels=[0.95, 0.99])
var_results[interval] = var_cvar
# 回测
backtest_results = {}
for cl in [0.95, 0.99]:
var_level = var_cvar[f'VaR_{int(cl*100)}']
bt = backtest_var(returns, var_level, confidence=cl)
backtest_results[f'VaR_{int(cl*100)}'] = bt
findings.append({
'name': f'VaR回测_{interval}_{int(cl*100)}%',
'p_value': bt['p_value'],
'effect_size': abs(bt['violation_rate'] - bt['expected_rate']),
'significant': not bt['reject_model'],
'description': f"{interval} VaR{int(cl*100)} 违约率={bt['violation_rate']:.2%} "
f"(期望{bt['expected_rate']:.2%}), "
f"{'模型拒绝' if bt['reject_model'] else '模型通过'}",
'test_set_consistent': True,
'bootstrap_robust': True
})
backtest_results_all[interval] = backtest_results
print(f" {interval}: VaR95={var_cvar['VaR_95']:.4f}, CVaR95={var_cvar['CVaR_95']:.4f}")
except Exception as e:
print(f" {interval} VaR计算失败: {e}")
# 绘制VaR回测图(使用日线)
if '1d' in backtest_results_all:
plot_var_backtest(daily_price, daily_returns,
var_results['1d'], backtest_results_all['1d'],
os.path.join(output_dir, 'extreme_var_backtest.png'))
print(" 保存VaR回测图: extreme_var_backtest.png")
summary['var_results'] = var_results
# 4. Hill尾部指数估计
print("\n4. Hill尾部指数估计...")
hill_results = estimate_hill_index(daily_returns, k_max=300)
if 'error' not in hill_results:
findings.append({
'name': 'Hill尾部指数估计',
'p_value': None,
'effect_size': hill_results['stable_alpha'],
'significant': hill_results['is_heavy_tail'],
'description': f"稳定尾部指数 α={hill_results['stable_alpha']:.2f} "
f"(k={hill_results['stable_k']}), "
f"{'重尾分布' if hill_results['is_heavy_tail'] else '轻尾分布'}",
'test_set_consistent': True,
'bootstrap_robust': True
})
summary['hill_tail_index'] = hill_results['stable_alpha']
summary['hill_is_heavy_tail'] = hill_results['is_heavy_tail']
print(f" 稳定尾部指数: α={hill_results['stable_alpha']:.2f}")
# 绘制Hill图
plot_hill_estimates(hill_results, os.path.join(output_dir, 'extreme_hill_plot.png'))
print(" 保存Hill图: extreme_hill_plot.png")
# 5. 极端事件聚集性检验
print("\n5. 极端事件聚集性检验...")
clustering_results = test_extreme_clustering(daily_returns, quantile=0.99)
if 'error' not in clustering_results:
findings.append({
'name': '极端事件聚集性检验',
'p_value': clustering_results['p_value'],
'effect_size': abs(clustering_results['acf_lag1']),
'significant': clustering_results['is_clustered'],
'description': f"极端事件{'存在聚集' if clustering_results['is_clustered'] else '独立分布'}, "
f"游程数={clustering_results['n_runs']:.0f} "
f"(期望{clustering_results['expected_runs']:.0f}), "
f"ACF(1)={clustering_results['acf_lag1']:.3f}",
'test_set_consistent': True,
'bootstrap_robust': True
})
summary['extreme_clustering'] = clustering_results['is_clustered']
summary['extreme_acf_lag1'] = clustering_results['acf_lag1']
print(f" {'检测到聚集性' if clustering_results['is_clustered'] else '无明显聚集'}")
print(f" ACF(1)={clustering_results['acf_lag1']:.3f}")
# 绘制极端事件时间线
plot_extreme_timeline(daily_price, clustering_results['extreme_dates'],
os.path.join(output_dir, 'extreme_timeline.png'))
print(" 保存极端事件时间线: extreme_timeline.png")
# 汇总统计
summary['n_findings'] = len(findings)
summary['n_significant'] = sum(1 for f in findings if f['significant'])
print("\n" + "=" * 60)
print(f"分析完成: {len(findings)} 项发现, {summary['n_significant']} 项显著")
print("=" * 60)
return {
'findings': findings,
'summary': summary
}
if __name__ == '__main__':
result = run_extreme_value_analysis()
print(f"\n发现数: {len(result['findings'])}")
for finding in result['findings']:
print(f" - {finding['name']}: {finding['description']}")