Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,53 @@ streaming:
health_check_interval: 30
```

```python
from quanttradeai.streaming import StreamingGateway

gw = StreamingGateway("config/streaming.yaml")
gw.subscribe_to_trades(["AAPL"], lambda m: print("TRADE", m))
# gw.start_streaming() # blocking
```

## Project Layout
```python
from quanttradeai.streaming import StreamingGateway

gw = StreamingGateway("config/streaming.yaml")
gw.subscribe_to_trades(["AAPL"], lambda m: print("TRADE", m))
# gw.start_streaming() # blocking
```

### Streaming Health Monitoring

Enable advanced monitoring by adding a `streaming_health` section to your config and,
optionally, starting the embedded REST server:

```yaml
streaming_health:
monitoring:
enabled: true
check_interval: 5
thresholds:
max_latency_ms: 100
min_throughput_msg_per_sec: 50
max_queue_depth: 5000
alerts:
enabled: true
channels: ["log", "metrics"]
escalation_threshold: 3
api:
enabled: true
host: "0.0.0.0"
port: 8000
```

Query live status while streaming:

```bash
curl http://localhost:8000/health # readiness probe
curl http://localhost:8000/status # detailed metrics + incidents
curl http://localhost:8000/metrics # Prometheus scrape
```

Common patterns:

- Tune `escalation_threshold` to control alert promotion.
- Increase `max_queue_depth` in high-volume environments.
- Set `circuit_breaker_timeout` to avoid thrashing unstable providers.

## Project Layout

```
quanttradeai/ # Core package
Expand Down
19 changes: 19 additions & 0 deletions config/streaming.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,22 @@ streaming:
buffer_size: 10000
reconnect_attempts: 5
health_check_interval: 30
streaming_health:
monitoring:
enabled: true
check_interval: 5
metrics_retention: 3600
thresholds:
max_latency_ms: 100
min_throughput_msg_per_sec: 50
max_reconnect_attempts: 5
max_queue_depth: 5000
circuit_breaker_timeout: 60
alerts:
enabled: true
channels: ["log", "metrics"]
escalation_threshold: 3
api:
enabled: false
host: "0.0.0.0"
port: 8000
49 changes: 49 additions & 0 deletions docs/api/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,52 @@ Register it in your runtime (or fork and extend `AdapterMap` in `StreamingGatewa
- Prometheus metrics (`prometheus_client`) track message counts, connection latency, and active connections.
- Optional background health checks ping pooled connections (interval configured via YAML).

### Advanced Health Metrics

- **Message loss detection** surfaces gaps in sequence numbers and reports per-provider drop rates.
- **Queue depth gauges** expose backlog in internal processing buffers.
- **Bandwidth and throughput** statistics track messages per second and bytes processed.
- **Data freshness** timers flag stale feeds when updates stop arriving.

### Alerting & Incident History

- Configurable thresholds escalate repeated warnings to errors after a defined count.
- Incident history is retained in memory for post-mortem analysis and optional export.
- Alert channels include structured logs and Prometheus-compatible metrics.

### Recovery & Circuit Breaking

- Automatic retries use exponential backoff with jitter and respect circuit-breaker timeouts.
- Fallback connectors can be configured for provider outages.

### Health API

When enabled, an embedded REST server exposes:

- `GET /health` – lightweight readiness probe.
- `GET /status` – detailed status including recent incidents.
- `GET /metrics` – Prometheus scrape endpoint.

### Configuration Example

```yaml
streaming_health:
monitoring:
enabled: true
check_interval: 5
thresholds:
max_latency_ms: 100
min_throughput_msg_per_sec: 50
max_queue_depth: 5000
circuit_breaker_timeout: 60
alerts:
enabled: true
channels: ["log", "metrics"]
escalation_threshold: 3
api:
enabled: true
host: "0.0.0.0"
port: 8000
```


42 changes: 42 additions & 0 deletions docs/examples/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,45 @@ manager.add_adapter(MyAdapter(websocket_url="wss://example"), auth_method="none"
# Use StreamingGateway or call manager.connect_all()/run() directly.
```

## Health Monitoring Configuration

Append the following to `config/streaming.yaml` to enable comprehensive health checks:

```yaml
streaming_health:
monitoring:
enabled: true
check_interval: 5
thresholds:
max_latency_ms: 100
min_throughput_msg_per_sec: 50
max_queue_depth: 5000
circuit_breaker_timeout: 60
alerts:
enabled: true
channels: ["log", "metrics"]
escalation_threshold: 3
api:
enabled: true
host: "0.0.0.0"
port: 8000
```

Start the gateway and query metrics:

```bash
curl http://localhost:8000/status
```

## Alert Threshold Tuning

- Increase `max_latency_ms` or decrease `escalation_threshold` for noisy networks.
- Monitor `max_queue_depth` during peak sessions and adjust to avoid drops.
- Use Prometheus metrics to derive realistic throughput baselines.

## Production Deployment Recommendations

- Run the health API behind an authenticated ingress when exposing publicly.
- Scrape `/metrics` with Prometheus and forward alerts to your incident system.
- Configure fallback providers to ensure continuity during outages.

62 changes: 61 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ dependencies = [
"pybreaker (>=1.4.0,<2.0.0)",
"asyncio-throttle (>=1.0.2,<2.0.0)",
"prometheus-client (>=0.22.1,<0.23.0)",
"typer (>=0.16.1,<0.17.0)"
"typer (>=0.16.1,<0.17.0)",
"fastapi (>=0.115.0,<1.0.0)",
"uvicorn (>=0.30.0,<1.0.0)"
]

[tool.poetry]
Expand Down
20 changes: 11 additions & 9 deletions quanttradeai/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Streaming infrastructure package."""

from .gateway import StreamingGateway
from .auth_manager import AuthManager
from .rate_limiter import AdaptiveRateLimiter
from .connection_pool import ConnectionPool
from .gateway import StreamingGateway
from .auth_manager import AuthManager
from .rate_limiter import AdaptiveRateLimiter
from .connection_pool import ConnectionPool
from .monitoring import StreamingHealthMonitor

__all__ = [
"StreamingGateway",
"AuthManager",
"AdaptiveRateLimiter",
"ConnectionPool",
]
"StreamingGateway",
"AuthManager",
"AdaptiveRateLimiter",
"ConnectionPool",
"StreamingHealthMonitor",
]
Loading