Files
btc_price_anany/main.py
riba2534 d480712b40 fix: 全面修复代码质量和报告准确性问题
代码修复 (16 个模块):
- GARCH 模型统一改用 t 分布 + 收敛检查 (returns/volatility/anomaly)
- KS 检验替换为 Lilliefors 检验 (returns)
- 修复数据泄漏: StratifiedKFold→TimeSeriesSplit, scaler 逐折 fit (anomaly)
- 前兆标签 shift(-1) 预测次日异常 (anomaly)
- PSD 归一化加入采样频率和单边谱×2 (fft)
- AR(1) 红噪声基线经验缩放 (fft)
- 盒计数法独立 x/y 归一化, MF-DFA q=0 (fractal)
- ADF 平稳性检验 + 移除双重 Bonferroni (causality)
- R/S Hurst 添加 R² 拟合优度 (hurst)
- Prophet 递推预测避免信息泄露 (time_series)
- IC 计算过滤零信号, 中性形态 hit_rate=NaN (indicators/patterns)
- 聚类阈值自适应化 (clustering)
- 日历效应前后半段稳健性检查 (calendar)
- 证据评分标准文本与代码对齐 (visualization)
- 核心管道 NaN/空值防护 (data_loader/preprocessing/main)

报告修复 (docs/REPORT.md, 15 处):
- 标度指数 H_scaling 与 Hurst 指数消歧
- GBM 6 个月概率锥数值重算
- CLT 限定、减半措辞弱化、情景概率逻辑修正
- GPD 形状参数解读修正、异常 AUC 证据降级

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:07:50 +08:00

233 lines
10 KiB
Python

#!/usr/bin/env python3
"""BTC/USDT 价格规律性全面分析 — 主入口
串联执行所有分析模块,输出结果到 output/ 目录。
每个模块独立运行,单个模块失败不影响其他模块。
用法:
python3 main.py # 运行全部模块
python3 main.py --modules fft wavelet # 只运行指定模块
python3 main.py --list # 列出所有可用模块
"""
import sys
import time
import argparse
import traceback
from pathlib import Path
from collections import OrderedDict
# 确保 src 在路径中
ROOT = Path(__file__).parent
sys.path.insert(0, str(ROOT))
from src.data_loader import load_klines, load_daily, load_hourly, validate_data
from src.preprocessing import add_derived_features
# ── 模块注册表 ─────────────────────────────────────────────
def _import_module(name):
"""延迟导入分析模块,避免启动时全部加载"""
import importlib
return importlib.import_module(f"src.{name}")
# (模块key, 显示名称, 源模块名, 入口函数名, 是否需要hourly数据)
MODULE_REGISTRY = OrderedDict([
("fft", ("FFT频谱分析", "fft_analysis", "run_fft_analysis", False)),
("wavelet", ("小波变换分析", "wavelet_analysis", "run_wavelet_analysis", False)),
("acf", ("ACF/PACF分析", "acf_analysis", "run_acf_analysis", False)),
("returns", ("收益率分布分析", "returns_analysis", "run_returns_analysis", False)),
("volatility", ("波动率聚集分析", "volatility_analysis", "run_volatility_analysis", False)),
("hurst", ("Hurst指数分析", "hurst_analysis", "run_hurst_analysis", False)),
("fractal", ("分形维度分析", "fractal_analysis", "run_fractal_analysis", False)),
("power_law", ("幂律增长分析", "power_law_analysis", "run_power_law_analysis", False)),
("volume_price", ("量价关系分析", "volume_price_analysis", "run_volume_price_analysis", False)),
("calendar", ("日历效应分析", "calendar_analysis", "run_calendar_analysis", True)),
("halving", ("减半周期分析", "halving_analysis", "run_halving_analysis", False)),
("indicators", ("技术指标验证", "indicators", "run_indicators_analysis", False)),
("patterns", ("K线形态分析", "patterns", "run_patterns_analysis", False)),
("clustering", ("市场状态聚类", "clustering", "run_clustering_analysis", False)),
("time_series", ("时序预测", "time_series", "run_time_series_analysis", False)),
("causality", ("因果检验", "causality", "run_causality_analysis", False)),
("anomaly", ("异常检测", "anomaly", "run_anomaly_analysis", False)),
# === 新增8个扩展模块 ===
("microstructure", ("市场微观结构", "microstructure", "run_microstructure_analysis", False)),
("intraday", ("日内模式分析", "intraday_patterns", "run_intraday_analysis", False)),
("scaling", ("统计标度律", "scaling_laws", "run_scaling_analysis", False)),
("multiscale_vol", ("多尺度波动率", "multi_scale_vol", "run_multiscale_vol_analysis", False)),
("entropy", ("信息熵分析", "entropy_analysis", "run_entropy_analysis", False)),
("extreme", ("极端值分析", "extreme_value", "run_extreme_value_analysis", False)),
("cross_tf", ("跨尺度关联", "cross_timeframe", "run_cross_timeframe_analysis", False)),
("momentum_rev", ("动量均值回归", "momentum_reversion", "run_momentum_reversion_analysis", False)),
])
OUTPUT_DIR = ROOT / "output"
def run_single_module(key, df, df_hourly, output_base):
"""
运行单个分析模块
Returns
-------
dict or None
模块返回的结果字典,失败返回 None
"""
display_name, mod_name, func_name, needs_hourly = MODULE_REGISTRY[key]
module_output = str(output_base / key)
Path(module_output).mkdir(parents=True, exist_ok=True)
print(f"\n{'='*60}")
print(f" [{key}] {display_name}")
print(f"{'='*60}")
try:
mod = _import_module(mod_name)
func = getattr(mod, func_name)
if needs_hourly and df_hourly is None:
print(f" [{key}] 跳过(需要小时数据但未加载)")
return {"status": "skipped", "error": "小时数据未加载", "findings": []}
if needs_hourly:
result = func(df, df_hourly, module_output)
else:
result = func(df, module_output)
if result is None:
result = {"status": "completed", "findings": []}
result.setdefault("status", "success")
print(f" [{key}] 完成 ✓")
return result
except Exception as e:
print(f" [{key}] 失败 ✗: {e}")
traceback.print_exc()
return {"status": "error", "error": str(e), "findings": []}
def main():
parser = argparse.ArgumentParser(description="BTC/USDT 价格规律性全面分析")
parser.add_argument("--modules", nargs="*", default=None,
help="指定要运行的模块 (默认运行全部)")
parser.add_argument("--list", action="store_true",
help="列出所有可用模块")
parser.add_argument("--start", type=str, default=None,
help="数据起始日期, 如 2020-01-01")
parser.add_argument("--end", type=str, default=None,
help="数据结束日期, 如 2025-12-31")
args = parser.parse_args()
if args.list:
print("\n可用分析模块:")
print("-" * 50)
for key, (name, _, _, _) in MODULE_REGISTRY.items():
print(f" {key:<15} {name}")
print()
return
# ── 1. 加载数据 ──────────────────────────────────────
print("=" * 60)
print(" BTC/USDT 价格规律性全面分析")
print("=" * 60)
print("\n[1/3] 加载日线数据...")
df_daily = load_daily(start=args.start, end=args.end)
report = validate_data(df_daily, "1d")
print(f" 行数: {report['rows']}")
print(f" 日期范围: {report['date_range']}")
print(f" 价格范围: {report['price_range']}")
print("\n[2/3] 添加衍生特征...")
df = add_derived_features(df_daily)
print(f" 特征列: {list(df.columns)}")
print("\n[3/3] 加载小时数据 (日历效应需要)...")
try:
df_hourly_raw = load_hourly(start=args.start, end=args.end)
df_hourly = add_derived_features(df_hourly_raw)
print(f" 小时数据行数: {len(df_hourly)}")
except Exception as e:
print(f" 小时数据加载失败 (日历效应小时分析将跳过): {e}")
df_hourly = None
# ── 2. 确定要运行的模块 ──────────────────────────────
if args.modules:
modules_to_run = []
for m in args.modules:
if m in MODULE_REGISTRY:
modules_to_run.append(m)
else:
print(f" 警告: 未知模块 '{m}', 跳过")
else:
modules_to_run = list(MODULE_REGISTRY.keys())
print(f"\n将运行 {len(modules_to_run)} 个分析模块:")
for m in modules_to_run:
print(f" - {m}: {MODULE_REGISTRY[m][0]}")
# ── 3. 逐一运行模块 ─────────────────────────────────
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
all_results = {}
timings = {}
for key in modules_to_run:
t0 = time.time()
result = run_single_module(key, df, df_hourly, OUTPUT_DIR)
elapsed = time.time() - t0
timings[key] = elapsed
if result is not None:
all_results[key] = result
print(f" 耗时: {elapsed:.1f}s")
# ── 4. 生成综合报告 ──────────────────────────────────
print(f"\n{'='*60}")
print(" 生成综合分析报告")
print(f"{'='*60}")
from src.visualization import generate_summary_dashboard, plot_price_overview
# 价格概览图
plot_price_overview(df_daily, str(OUTPUT_DIR))
# 综合仪表盘
dashboard_result = generate_summary_dashboard(all_results, str(OUTPUT_DIR))
# ── 5. 打印执行摘要 ──────────────────────────────────
print(f"\n{'='*60}")
print(" 执行摘要")
print(f"{'='*60}")
success = sum(1 for r in all_results.values() if r.get("status") == "success")
failed = sum(1 for r in all_results.values() if r.get("status") == "error")
total_time = sum(timings.values())
print(f"\n 模块总数: {len(modules_to_run)}")
print(f" 成功: {success}")
print(f" 失败: {failed}")
print(f" 总耗时: {total_time:.1f}s")
print(f"\n 各模块耗时:")
for key, t in sorted(timings.items(), key=lambda x: -x[1]):
status = all_results.get(key, {}).get("status", "unknown")
mark = "" if status == "success" else ""
print(f" {mark} {key:<15} {t:>8.1f}s")
print(f"\n 输出目录: {OUTPUT_DIR.resolve()}")
if dashboard_result:
print(f" 综合报告: {dashboard_result.get('report_path', 'N/A')}")
print(f" 仪表盘图: {dashboard_result.get('dashboard_path', 'N/A')}")
print(f" JSON结果: {dashboard_result.get('json_path', 'N/A')}")
print(f"\n{'='*60}")
print(" 分析完成!")
print(f"{'='*60}\n")
if __name__ == "__main__":
main()