Skip to content

Commit c1fa667

Browse files
authored
Merge pull request #38 from AKKI0511/implement-streaming-connection-health-monitoring
feat(streaming): add connection health monitoring
2 parents cf75d9e + 11e7992 commit c1fa667

File tree

16 files changed

+955
-111
lines changed

16 files changed

+955
-111
lines changed

README.md

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,53 @@ streaming:
148148
health_check_interval: 30
149149
```
150150

151-
```python
152-
from quanttradeai.streaming import StreamingGateway
153-
154-
gw = StreamingGateway("config/streaming.yaml")
155-
gw.subscribe_to_trades(["AAPL"], lambda m: print("TRADE", m))
156-
# gw.start_streaming() # blocking
157-
```
158-
159-
## Project Layout
151+
```python
152+
from quanttradeai.streaming import StreamingGateway
153+
154+
gw = StreamingGateway("config/streaming.yaml")
155+
gw.subscribe_to_trades(["AAPL"], lambda m: print("TRADE", m))
156+
# gw.start_streaming() # blocking
157+
```
158+
159+
### Streaming Health Monitoring
160+
161+
Enable advanced monitoring by adding a `streaming_health` section to your config and,
162+
optionally, starting the embedded REST server:
163+
164+
```yaml
165+
streaming_health:
166+
monitoring:
167+
enabled: true
168+
check_interval: 5
169+
thresholds:
170+
max_latency_ms: 100
171+
min_throughput_msg_per_sec: 50
172+
max_queue_depth: 5000
173+
alerts:
174+
enabled: true
175+
channels: ["log", "metrics"]
176+
escalation_threshold: 3
177+
api:
178+
enabled: true
179+
host: "0.0.0.0"
180+
port: 8000
181+
```
182+
183+
Query live status while streaming:
184+
185+
```bash
186+
curl http://localhost:8000/health # readiness probe
187+
curl http://localhost:8000/status # detailed metrics + incidents
188+
curl http://localhost:8000/metrics # Prometheus scrape
189+
```
190+
191+
Common patterns:
192+
193+
- Tune `escalation_threshold` to control alert promotion.
194+
- Increase `max_queue_depth` in high-volume environments.
195+
- Set `circuit_breaker_timeout` to avoid thrashing unstable providers.
196+
197+
## Project Layout
160198

161199
```
162200
quanttradeai/ # Core package

config/streaming.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,22 @@ streaming:
1515
buffer_size: 10000
1616
reconnect_attempts: 5
1717
health_check_interval: 30
18+
streaming_health:
19+
monitoring:
20+
enabled: true
21+
check_interval: 5
22+
metrics_retention: 3600
23+
thresholds:
24+
max_latency_ms: 100
25+
min_throughput_msg_per_sec: 50
26+
max_reconnect_attempts: 5
27+
max_queue_depth: 5000
28+
circuit_breaker_timeout: 60
29+
alerts:
30+
enabled: true
31+
channels: ["log", "metrics"]
32+
escalation_threshold: 3
33+
api:
34+
enabled: false
35+
host: "0.0.0.0"
36+
port: 8000

docs/api/streaming.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,52 @@ Register it in your runtime (or fork and extend `AdapterMap` in `StreamingGatewa
9090
- Prometheus metrics (`prometheus_client`) track message counts, connection latency, and active connections.
9191
- Optional background health checks ping pooled connections (interval configured via YAML).
9292

93+
### Advanced Health Metrics
94+
95+
- **Message loss detection** surfaces gaps in sequence numbers and reports per-provider drop rates.
96+
- **Queue depth gauges** expose backlog in internal processing buffers.
97+
- **Bandwidth and throughput** statistics track messages per second and bytes processed.
98+
- **Data freshness** timers flag stale feeds when updates stop arriving.
99+
100+
### Alerting & Incident History
101+
102+
- Configurable thresholds escalate repeated warnings to errors after a defined count.
103+
- Incident history is retained in memory for post-mortem analysis and optional export.
104+
- Alert channels include structured logs and Prometheus-compatible metrics.
105+
106+
### Recovery & Circuit Breaking
107+
108+
- Automatic retries use exponential backoff with jitter and respect circuit-breaker timeouts.
109+
- Fallback connectors can be configured for provider outages.
110+
111+
### Health API
112+
113+
When enabled, an embedded REST server exposes:
114+
115+
- `GET /health` – lightweight readiness probe.
116+
- `GET /status` – detailed status including recent incidents.
117+
- `GET /metrics` – Prometheus scrape endpoint.
118+
119+
### Configuration Example
120+
121+
```yaml
122+
streaming_health:
123+
monitoring:
124+
enabled: true
125+
check_interval: 5
126+
thresholds:
127+
max_latency_ms: 100
128+
min_throughput_msg_per_sec: 50
129+
max_queue_depth: 5000
130+
circuit_breaker_timeout: 60
131+
alerts:
132+
enabled: true
133+
channels: ["log", "metrics"]
134+
escalation_threshold: 3
135+
api:
136+
enabled: true
137+
host: "0.0.0.0"
138+
port: 8000
139+
```
140+
141+

docs/examples/streaming.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,45 @@ manager.add_adapter(MyAdapter(websocket_url="wss://example"), auth_method="none"
7575
# Use StreamingGateway or call manager.connect_all()/run() directly.
7676
```
7777

78+
## Health Monitoring Configuration
79+
80+
Append the following to `config/streaming.yaml` to enable comprehensive health checks:
81+
82+
```yaml
83+
streaming_health:
84+
monitoring:
85+
enabled: true
86+
check_interval: 5
87+
thresholds:
88+
max_latency_ms: 100
89+
min_throughput_msg_per_sec: 50
90+
max_queue_depth: 5000
91+
circuit_breaker_timeout: 60
92+
alerts:
93+
enabled: true
94+
channels: ["log", "metrics"]
95+
escalation_threshold: 3
96+
api:
97+
enabled: true
98+
host: "0.0.0.0"
99+
port: 8000
100+
```
101+
102+
Start the gateway and query metrics:
103+
104+
```bash
105+
curl http://localhost:8000/status
106+
```
107+
108+
## Alert Threshold Tuning
109+
110+
- Increase `max_latency_ms` or decrease `escalation_threshold` for noisy networks.
111+
- Monitor `max_queue_depth` during peak sessions and adjust to avoid drops.
112+
- Use Prometheus metrics to derive realistic throughput baselines.
113+
114+
## Production Deployment Recommendations
115+
116+
- Run the health API behind an authenticated ingress when exposing publicly.
117+
- Scrape `/metrics` with Prometheus and forward alerts to your incident system.
118+
- Configure fallback providers to ensure continuity during outages.
119+

poetry.lock

Lines changed: 61 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ dependencies = [
3131
"pybreaker (>=1.4.0,<2.0.0)",
3232
"asyncio-throttle (>=1.0.2,<2.0.0)",
3333
"prometheus-client (>=0.22.1,<0.23.0)",
34-
"typer (>=0.16.1,<0.17.0)"
34+
"typer (>=0.16.1,<0.17.0)",
35+
"fastapi (>=0.115.0,<1.0.0)",
36+
"uvicorn (>=0.30.0,<1.0.0)"
3537
]
3638

3739
[tool.poetry]

quanttradeai/streaming/__init__.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
"""Streaming infrastructure package."""
22

3-
from .gateway import StreamingGateway
4-
from .auth_manager import AuthManager
5-
from .rate_limiter import AdaptiveRateLimiter
6-
from .connection_pool import ConnectionPool
3+
from .gateway import StreamingGateway
4+
from .auth_manager import AuthManager
5+
from .rate_limiter import AdaptiveRateLimiter
6+
from .connection_pool import ConnectionPool
7+
from .monitoring import StreamingHealthMonitor
78

89
__all__ = [
9-
"StreamingGateway",
10-
"AuthManager",
11-
"AdaptiveRateLimiter",
12-
"ConnectionPool",
13-
]
10+
"StreamingGateway",
11+
"AuthManager",
12+
"AdaptiveRateLimiter",
13+
"ConnectionPool",
14+
"StreamingHealthMonitor",
15+
]

0 commit comments

Comments
 (0)