-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbacktest.py
More file actions
202 lines (176 loc) · 6.33 KB
/
backtest.py
File metadata and controls
202 lines (176 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""
Historical backtesting module (v0.8) — signal → price action correlation.
Loads signals from log, fetches historical prices, computes hit rate and correlation.
"""
import json
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional
import requests
@dataclass
class BacktestResult:
total_signals: int
buy_signals: int
sell_signals: int
hit_rate_buy: float
hit_rate_sell: float
avg_return_buy_24h: float
avg_return_sell_24h: float
tier1_hit_rate: float
tier2_hit_rate: float
by_asset: dict
def _parse_signal_log(log_path: Path) -> List[dict]:
"""Parse signals.log JSONL format into list of signal dicts."""
if not log_path.exists():
return []
signals = []
for line in log_path.read_text().strip().split("\n"):
if not line.strip():
continue
try:
s = json.loads(line)
signals.append(s)
except json.JSONDecodeError:
continue
return signals
def _fetch_price_history(symbol: str, start_ts: datetime, end_ts: datetime) -> Optional[dict]:
"""
Fetch OHLC from Binance public API (no auth).
Returns {timestamp: price} for 1h candles.
"""
try:
# Binance uses symbol like BTCUSDT
binance_symbol = f"{symbol}USDT"
url = "https://api.binance.com/api/v3/klines"
start_ms = int(start_ts.timestamp() * 1000)
end_ms = int(end_ts.timestamp() * 1000)
params = {"symbol": binance_symbol, "interval": "1h", "startTime": start_ms, "endTime": end_ms, "limit": 500}
r = requests.get(url, params=params, timeout=15)
r.raise_for_status()
data = r.json()
result = {}
for candle in data:
ts = datetime.utcfromtimestamp(candle[0] / 1000)
close = float(candle[4])
result[ts] = close
return result
except Exception:
return None
def _get_price_at(price_history: dict, ts: datetime) -> Optional[float]:
"""Get price at or just before timestamp."""
if not price_history:
return None
sorted_ts = sorted(price_history.keys())
for t in reversed(sorted_ts):
if t <= ts:
return price_history[t]
return price_history.get(sorted_ts[0]) if sorted_ts else None
def _compute_return(price_at_signal: float, price_24h_later: float, direction: str) -> float:
"""Return pct: positive = correct direction."""
if not price_at_signal or price_at_signal == 0:
return 0.0
ret = (price_24h_later - price_at_signal) / price_at_signal * 100
if direction == "SELL":
ret = -ret # For SELL, price down = hit
return ret
def run_backtest(
signal_log_path: str = "logs/signals.log",
lookforward_hours: int = 24,
min_signals: int = 5,
) -> Optional[BacktestResult]:
"""
Run backtest: load signals, fetch prices, compute hit rates.
Hit = BUY signal followed by price up, or SELL followed by price down.
"""
log_path = Path(signal_log_path)
if not log_path.is_absolute():
log_path = Path(__file__).resolve().parent / signal_log_path
signals = _parse_signal_log(log_path)
if len(signals) < min_signals:
return None
buy_hits = 0
buy_total = 0
sell_hits = 0
sell_total = 0
buy_returns = []
sell_returns = []
tier1_hits = 0
tier1_total = 0
tier2_hits = 0
tier2_total = 0
by_asset = {}
for s in signals:
direction = s.get("direction")
if direction not in ("BUY", "SELL"):
continue
ts_str = s.get("timestamp")
if not ts_str:
continue
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except Exception:
continue
assets = s.get("assets") or s.get("asset") or []
if isinstance(assets, str):
assets = [assets]
if not assets:
assets = ["BTC"]
tier = s.get("tier", 2)
for asset in assets:
start_ts = ts.replace(tzinfo=None) - timedelta(hours=1)
end_ts = ts.replace(tzinfo=None) + timedelta(hours=lookforward_hours + 1)
prices = _fetch_price_history(asset, start_ts, end_ts)
if not prices:
continue
price_at = _get_price_at(prices, ts.replace(tzinfo=None))
ts_24h = ts.replace(tzinfo=None) + timedelta(hours=lookforward_hours)
price_24h = _get_price_at(prices, ts_24h)
if not price_at or not price_24h:
continue
ret = _compute_return(price_at, price_24h, direction)
hit = (direction == "BUY" and ret > 0) or (direction == "SELL" and ret > 0)
if direction == "BUY":
buy_total += 1
buy_returns.append(ret)
if hit:
buy_hits += 1
else:
sell_total += 1
sell_returns.append(ret)
if hit:
sell_hits += 1
if tier == 1:
tier1_total += 1
if hit:
tier1_hits += 1
else:
tier2_total += 1
if hit:
tier2_hits += 1
if asset not in by_asset:
by_asset[asset] = {"hits": 0, "total": 0}
by_asset[asset]["total"] += 1
if hit:
by_asset[asset]["hits"] += 1
hr_buy = buy_hits / buy_total if buy_total else 0.0
hr_sell = sell_hits / sell_total if sell_total else 0.0
avg_ret_buy = sum(buy_returns) / len(buy_returns) if buy_returns else 0.0
avg_ret_sell = sum(sell_returns) / len(sell_returns) if sell_returns else 0.0
t1_hr = tier1_hits / tier1_total if tier1_total else 0.0
t2_hr = tier2_hits / tier2_total if tier2_total else 0.0
for a, d in by_asset.items():
d["hit_rate"] = d["hits"] / d["total"] if d["total"] else 0.0
return BacktestResult(
total_signals=len(signals),
buy_signals=buy_total,
sell_signals=sell_total,
hit_rate_buy=round(hr_buy, 2),
hit_rate_sell=round(hr_sell, 2),
avg_return_buy_24h=round(avg_ret_buy, 2),
avg_return_sell_24h=round(avg_ret_sell, 2),
tier1_hit_rate=round(t1_hr, 2),
tier2_hit_rate=round(t2_hr, 2),
by_asset=by_asset,
)