Skip to content

Commit 4615cb5

Browse files
authored
Merge pull request #63 from AKKI0511/analyze-quanttradeai-for-new-feature-implementation
feat(streaming): add live trading orchestrator pipeline
2 parents f05d05f + 19de0ff commit 4615cb5

File tree

8 files changed

+916
-21
lines changed

8 files changed

+916
-21
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ Export the key and run the pipeline. A `sentiment_score` column is added when a
140140

141141
## Streaming (Optional)
142142

143-
- CLI: `poetry run quanttradeai live-trade --url wss://... -c config/model_config.yaml`
143+
- CLI: `poetry run quanttradeai live-trade -m models/experiments/<run>/AAPL --config config/model_config.yaml --streaming-config config/streaming.yaml`
144144
- YAML‑driven gateway via `config/streaming.yaml`:
145145

146146
```yaml
@@ -167,6 +167,9 @@ gw.subscribe_to_trades(["AAPL"], lambda m: print("TRADE", m))
167167
- Provider adapters are discovered dynamically via `quanttradeai.streaming.providers.ProviderDiscovery`,
168168
validated with `ProviderConfigValidator`, and monitored through `ProviderHealthMonitor`. See
169169
[docs/api/streaming.md](docs/api/streaming.md) for detailed provider configuration and health tooling.
170+
- The live trading pipeline (`quanttradeai.streaming.live_trading.LiveTradingEngine`) combines the
171+
streaming gateway, feature generation, model inference, risk controls, and optional health API. Use
172+
`--health-api true` to expose `/health` and `/metrics` while streaming.
170173

171174
### Streaming Health Monitoring
172175

docs/quick-reference.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ poetry run quanttradeai backtest-model -m models/experiments/<timestamp>/<SYMBOL
2828
-c config/model_config.yaml -b config/backtest_config.yaml \
2929
--cost-bps 5 --slippage-fixed 0.01 --liquidity-max-participation 0.25 \
3030
--skip-validation # optional
31+
32+
# Live trading (streaming)
33+
poetry run quanttradeai live-trade -m models/trained/<SYMBOL> \
34+
-c config/model_config.yaml \
35+
-s config/streaming.yaml \
36+
--risk-config config/risk_config.yaml \
37+
--position-manager-config config/position_manager.yaml \
38+
--initial-capital 1000000 --min-history 220 --history-window 512
3139
```
3240

3341
## 📊 Python API Patterns

poetry.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quanttradeai/cli.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,65 @@ def cmd_backtest_model(
169169

170170
@app.command("live-trade")
171171
def cmd_live_trade(
172-
url: str = typer.Option(..., "--url", help="WebSocket URL for streaming data"),
173-
config: str = typer.Option(
174-
"config/model_config.yaml", "-c", "--config", help="Path to config file"
172+
model_path: str = typer.Option(..., "-m", "--model-path", help="Saved model dir"),
173+
model_config: str = typer.Option(
174+
"config/model_config.yaml",
175+
"-c",
176+
"--config",
177+
help="Path to model configuration file",
178+
),
179+
streaming_config: str = typer.Option(
180+
"config/streaming.yaml",
181+
"-s",
182+
"--streaming-config",
183+
help="Path to streaming configuration file",
184+
),
185+
risk_config: Optional[str] = typer.Option(
186+
"config/risk_config.yaml",
187+
"--risk-config",
188+
help="Risk management configuration (drawdown/turnover)",
189+
),
190+
position_manager_config: Optional[str] = typer.Option(
191+
"config/position_manager.yaml",
192+
"--position-manager-config",
193+
help="Position manager configuration for live execution",
194+
),
195+
health_api: Optional[bool] = typer.Option(
196+
None,
197+
"--health-api",
198+
help="Override health API enable flag in streaming config (true/false)",
199+
),
200+
initial_capital: float = typer.Option(
201+
1_000_000.0, "--initial-capital", help="Starting capital for sizing"
202+
),
203+
history_window: int = typer.Option(
204+
512, "--history-window", help="History window for feature generation"
205+
),
206+
min_history_for_features: int = typer.Option(
207+
220,
208+
"--min-history",
209+
help="Minimum bars required before running inference",
210+
),
211+
stop_loss_pct: float = typer.Option(
212+
0.01, "--stop-loss-pct", help="Stop loss percentage applied to new trades"
175213
),
176214
):
177215
"""Run real-time trading pipeline using streaming input."""
178216

179-
asyncio.run(run_live_pipeline(config, url))
217+
asyncio.run(
218+
run_live_pipeline(
219+
model_config=model_config,
220+
model_path=model_path,
221+
streaming_config=streaming_config,
222+
risk_config=risk_config,
223+
position_manager_config=position_manager_config,
224+
enable_health_api=health_api,
225+
initial_capital=initial_capital,
226+
history_window=history_window,
227+
min_history_for_features=min_history_for_features,
228+
stop_loss_pct=stop_loss_pct,
229+
)
230+
)
180231

181232

182233
@app.command("validate-config")

quanttradeai/main.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
from quanttradeai.data.loader import DataLoader
2525
from quanttradeai.data.processor import DataProcessor
26-
from quanttradeai.data.datasource import WebSocketDataSource
2726
from quanttradeai.backtest.backtester import simulate_trades, compute_metrics
2827
from quanttradeai.models.classifier import MomentumClassifier
2928
from quanttradeai.trading.drawdown_guard import DrawdownGuard
@@ -387,21 +386,36 @@ def evaluate_model(
387386
json.dump(results, f, indent=4)
388387

389388

390-
async def run_live_pipeline(config_path: str, url: str) -> None:
391-
"""Run a minimal real-time pipeline using WebSocket input.
392-
393-
Example
394-
-------
395-
>>> # asyncio.run(run_live_pipeline("config/model_config.yaml", "wss://example"))
396-
"""
397-
398-
processor = DataProcessor()
399-
loader = DataLoader(config_path, data_source=WebSocketDataSource(url))
400-
401-
def handle(df: pd.DataFrame) -> None:
402-
logger.info("Received update:\n%s", df.tail(1))
403-
404-
await loader.stream_data(processor, callback=handle)
389+
async def run_live_pipeline(
390+
*,
391+
model_config: str = "config/model_config.yaml",
392+
model_path: str,
393+
streaming_config: str = "config/streaming.yaml",
394+
risk_config: str | None = "config/risk_config.yaml",
395+
position_manager_config: str | None = "config/position_manager.yaml",
396+
enable_health_api: bool | None = None,
397+
initial_capital: float = 1_000_000.0,
398+
history_window: int = 512,
399+
min_history_for_features: int = 220,
400+
stop_loss_pct: float = 0.01,
401+
) -> None:
402+
"""Run end-to-end real-time trading using the streaming gateway."""
403+
404+
from quanttradeai.streaming.live_trading import LiveTradingEngine
405+
406+
engine = LiveTradingEngine(
407+
model_config=model_config,
408+
model_path=model_path,
409+
streaming_config=streaming_config,
410+
risk_config=risk_config,
411+
position_manager_config=position_manager_config,
412+
enable_health_api=enable_health_api,
413+
initial_capital=initial_capital,
414+
history_window=history_window,
415+
min_history_for_features=min_history_for_features,
416+
stop_loss_pct=stop_loss_pct,
417+
)
418+
await engine.start()
405419

406420

407421
def _load_execution_cfg(

quanttradeai/streaming/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
ProviderRegistry,
1313
StreamingProviderAdapter,
1414
)
15+
from .live_trading import LiveTradingEngine
1516

1617
__all__ = [
1718
"StreamingGateway",
@@ -24,4 +25,5 @@
2425
"ProviderHealthMonitor",
2526
"ProviderRegistry",
2627
"StreamingProviderAdapter",
28+
"LiveTradingEngine",
2729
]

0 commit comments

Comments
 (0)