Skip to content

Commit 6a77c25

Browse files
authored
Stability and lifecycle improvements (#59)
1 parent 2bb4e92 commit 6a77c25

28 files changed

+495
-142
lines changed

demo/include/demo/simple_components.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,11 @@ class SimplePositionManager : public IPositionManager
195195
update(order, order.quantity);
196196
}
197197

198+
void onOrderPendingCancel(const Order& order) override
199+
{
200+
FLOX_LOG("[position] order pending cancel: id=" << order.id);
201+
}
202+
198203
void onOrderCanceled(const Order& order) override
199204
{
200205
FLOX_LOG("[position] order canceled: id=" << order.id);

docs/components/book/nlevel_order_book.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,32 @@ class NLevelOrderBook : public IOrderBook {
3939
5. **No Dynamic Allocation**
4040
Uses `std::array` of fixed size; fully cache-friendly and allocation-free after construction.
4141
42+
## Market State Helpers
43+
44+
```cpp
45+
[[nodiscard]] bool isCrossed() const noexcept;
46+
[[nodiscard]] std::optional<Price> spread() const noexcept;
47+
[[nodiscard]] std::optional<Price> mid() const noexcept;
48+
```
49+
50+
| Method | Description |
51+
| ----------- | ---------------------------------------------------------------- |
52+
| `isCrossed` | Returns `true` if best bid >= best ask (crossed/locked market). |
53+
| `spread` | Returns ask - bid spread, or `nullopt` if either side is empty. |
54+
| `mid` | Returns midpoint price, or `nullopt` if either side is empty. |
55+
56+
These methods are useful for detecting market anomalies and calculating fair value:
57+
58+
```cpp
59+
if (book.isCrossed()) {
60+
// Handle crossed book condition
61+
}
62+
63+
if (auto mid = book.mid()) {
64+
// Use midpoint for fair value calculations
65+
}
66+
```
67+
4268
## Notes
4369

4470
* Extremely fast and deterministic — suitable for backtests and production.
Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,34 @@
11
# IExchangeConnector
22

3-
`IExchangeConnector` is the abstract interface for real-time market data adapters. It provides lifecycle control and typed callback delivery for `BookUpdateEvent` and `TradeEvent`.
3+
`IExchangeConnector` is the abstract interface for real-time market data adapters. It provides lifecycle control, typed callback delivery for market data events, and error notifications.
44

55
```cpp
6-
class IExchangeConnector : public ISubsystem
6+
class IExchangeConnector : public ISubsystem, public IDrainable
77
{
88
public:
9-
using BookUpdateCallback = std::move_only_function<void(const BookUpdateEvent&)>;
10-
using TradeCallback = std::move_only_function<void(const TradeEvent&)>;
9+
using BookUpdateCallback = MoveOnlyFunction<void(const BookUpdateEvent&)>;
10+
using TradeCallback = MoveOnlyFunction<void(const TradeEvent&)>;
11+
using DisconnectCallback = MoveOnlyFunction<void(std::string_view reason)>;
12+
using SequenceGapCallback = MoveOnlyFunction<void(uint64_t expected, uint64_t received)>;
13+
using StaleDataCallback = MoveOnlyFunction<void(SymbolId symbol, uint64_t lastUpdateMs)>;
1114

1215
virtual ~IExchangeConnector() = default;
1316

17+
bool drain(uint32_t timeoutMs) override { return true; }
18+
1419
virtual std::string exchangeId() const = 0;
1520

1621
virtual void setCallbacks(BookUpdateCallback onBookUpdate, TradeCallback onTrade);
22+
virtual void setErrorCallbacks(DisconnectCallback onDisconnect,
23+
SequenceGapCallback onSequenceGap,
24+
StaleDataCallback onStaleData);
1725

1826
protected:
1927
void emitBookUpdate(const BookUpdateEvent& bu);
2028
void emitTrade(const TradeEvent& t);
29+
void emitDisconnect(std::string_view reason);
30+
void emitSequenceGap(uint64_t expected, uint64_t received);
31+
void emitStaleData(SymbolId symbol, uint64_t lastUpdateMs);
2132
};
2233
```
2334

@@ -27,16 +38,26 @@ protected:
2738

2839
## Responsibilities
2940

30-
| Aspect | Details |
31-
| ------------- | ----------------------------------------------------------------------- |
32-
| Lifecycle | Inherits `start()` and `stop()` from `ISubsystem`. |
33-
| Identity | `exchangeId()` provides a stable identifier for the connector instance. |
34-
| Callbacks | `setCallbacks()` binds downstream handlers for book and trade events. |
35-
| Event Routing | `emitBookUpdate()` and `emitTrade()` dispatch data to subscribers. |
41+
| Aspect | Details |
42+
| --------------- | ----------------------------------------------------------------------- |
43+
| Lifecycle | Inherits `start()` and `stop()` from `ISubsystem`. |
44+
| Draining | Implements `IDrainable` for graceful shutdown with pending operations. |
45+
| Identity | `exchangeId()` provides a stable identifier for the connector instance. |
46+
| Data Callbacks | `setCallbacks()` binds handlers for book and trade events. |
47+
| Error Callbacks | `setErrorCallbacks()` binds handlers for connection/data errors. |
48+
| Event Routing | `emit*()` methods dispatch data and errors to subscribers. |
49+
50+
## Error Callbacks
51+
52+
| Callback | When Called |
53+
| -------------- | -------------------------------------------------------------- |
54+
| onDisconnect | Connection lost or closed unexpectedly. |
55+
| onSequenceGap | Sequence number gap detected (expected vs received). |
56+
| onStaleData | No updates received for a symbol beyond threshold. |
3657

3758
## Notes
3859

39-
* Inherits from `ISubsystem`, enabling unified lifecycle management via `start()` and `stop()`.
40-
* Callbacks use `std::move_only_function` to avoid `std::function` overhead and enable capturing closures with ownership.
60+
* Inherits from `ISubsystem` and `IDrainable`, enabling unified lifecycle and graceful shutdown.
61+
* Callbacks use `MoveOnlyFunction` to avoid `std::function` overhead and enable capturing closures with ownership.
4162
* Implementations must call `emit*()` manually from internal processing (e.g. websocket handler).
4263
* The class is intentionally non-copyable and non-thread-safe — connectors are expected to run in isolated threads.

docs/components/engine/abstract_subsystem.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,31 @@ public:
3030
* Used by core modules like `CandleAggregator`, `Strategy`, `ExecutionTracker`, `SymbolRegistry`, etc.
3131
* Lifecycle is typically orchestrated by the engine or test harness.
3232
* No assumptions about threading — start/stop are always externally coordinated.
33+
34+
---
35+
36+
# IDrainable
37+
38+
`IDrainable` is a separate interface for components with pending async work that must complete before shutdown.
39+
40+
```cpp
41+
class IDrainable
42+
{
43+
public:
44+
virtual ~IDrainable() = default;
45+
virtual bool drain(uint32_t timeoutMs) = 0;
46+
};
47+
```
48+
49+
## Purpose
50+
51+
* Wait for in-flight operations (e.g., pending orders, network requests) to complete.
52+
53+
## When to Implement
54+
55+
Only implement `IDrainable` for components with actual async work:
56+
* Exchange connectors (in-flight order confirmations)
57+
* Order executors (pending order submissions)
58+
* Network transports (pending requests)
59+
60+
Do NOT implement for stateless or synchronous components like strategies, aggregators, or registries.

docs/components/engine/engine_config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ struct EngineConfig
99
KillSwitchConfig killSwitchConfig;
1010
std::string logLevel = "info";
1111
std::string logFile;
12+
uint32_t drainTimeoutMs = 5000;
1213
};
1314
```
1415

@@ -24,6 +25,7 @@ struct EngineConfig
2425
| killSwitchConfig | Limits for order size, frequency, and loss (see `KillSwitchConfig`). |
2526
| logLevel | Runtime log verbosity (`info`, `debug`, `trace`, etc.). |
2627
| logFile | Optional path to write logs to disk. |
28+
| drainTimeoutMs | Timeout for draining subsystems during shutdown (default: 5000ms). |
2729

2830

2931
## Substructures

docs/components/execution/events/order_event.md

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ struct OrderEvent {
1010
Order order{};
1111
Order newOrder{};
1212
Quantity fillQty{0};
13+
std::string rejectReason;
1314
uint64_t tickSequence = 0;
1415

1516
void dispatchTo(IOrderExecutionListener& listener) const;
@@ -28,6 +29,7 @@ struct OrderEvent {
2829
| order | The primary order involved in the event. |
2930
| newOrder | Used only for `REPLACED` events. |
3031
| fillQty | Quantity filled (used only in `PARTIALLY_FILLED`). |
32+
| rejectReason | Reason string (used only in `REJECTED` events). |
3133
| tickSequence | Event ordering marker for sequencing and backtesting. |
3234
3335
## Dispatch Logic
@@ -38,16 +40,17 @@ void dispatchTo(IOrderExecutionListener& listener) const;
3840

3941
Routes the event to the appropriate method:
4042

41-
| Type | Dispatched Method |
42-
| ------------------ | ---------------------------------------- |
43-
| `SUBMITTED` | `onOrderSubmitted(order)` |
44-
| `ACCEPTED` | `onOrderAccepted(order)` |
45-
| `PARTIALLY_FILLED` | `onOrderPartiallyFilled(order, fillQty)` |
46-
| `FILLED` | `onOrderFilled(order)` |
47-
| `CANCELED` | `onOrderCanceled(order)` |
48-
| `EXPIRED` | `onOrderExpired(order)` |
49-
| `REJECTED` | `onOrderRejected(order, /*reason*/ "")` |
50-
| `REPLACED` | `onOrderReplaced(order, newOrder)` |
43+
| Type | Dispatched Method |
44+
| ------------------ | ---------------------------------------------- |
45+
| `SUBMITTED` | `onOrderSubmitted(order)` |
46+
| `ACCEPTED` | `onOrderAccepted(order)` |
47+
| `PARTIALLY_FILLED` | `onOrderPartiallyFilled(order, fillQty)` |
48+
| `FILLED` | `onOrderFilled(order)` |
49+
| `PENDING_CANCEL` | `onOrderPendingCancel(order)` |
50+
| `CANCELED` | `onOrderCanceled(order)` |
51+
| `EXPIRED` | `onOrderExpired(order)` |
52+
| `REJECTED` | `onOrderRejected(order, rejectReason)` |
53+
| `REPLACED` | `onOrderReplaced(order, newOrder)` |
5154

5255
## Notes
5356

docs/components/util/memory/pool.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,26 @@ h->tickSequence = 123;
6363
* Objects are returned to the pool via an `SPSCQueue<T*>`.
6464
* Backed by a `monotonic_buffer_resource` and `unsynchronized_pool_resource` for internal vector-like allocations.
6565

66+
## Exhaustion Handling
67+
68+
The pool provides callbacks and statistics for monitoring pool usage:
69+
70+
```cpp
71+
pool.setExhaustionCallback([](size_t capacity, size_t inUse) {
72+
LOG_WARN("Pool exhausted: capacity={}, inUse={}", capacity, inUse);
73+
});
74+
```
75+
76+
| Method | Description |
77+
| ------------------ | ----------------------------------------------------- |
78+
| `capacity()` | Returns the pool's maximum capacity. |
79+
| `inUse()` | Returns the number of currently acquired objects. |
80+
| `exhaustionCount()`| Returns how many times `acquire()` failed. |
81+
| `acquireCount()` | Returns total number of successful acquisitions. |
82+
| `releaseCount()` | Returns total number of releases back to pool. |
83+
84+
The exhaustion callback is invoked each time `acquire()` returns `nullopt` due to pool exhaustion.
85+
6686
## Notes
6787

6888
* Zero allocations in steady-state operation.

include/flox/book/nlevel_order_book.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,35 @@ class NLevelOrderBook : public IOrderBook
537537

538538
[[nodiscard]] inline Price tickSize() const noexcept { return _tickSize; }
539539

540+
[[nodiscard]] inline bool isCrossed() const noexcept
541+
{
542+
if (_bestBidTick < 0 || _bestAskTick < 0)
543+
{
544+
return false;
545+
}
546+
return _bestBidTick >= _bestAskTick;
547+
}
548+
549+
[[nodiscard]] inline std::optional<Price> spread() const noexcept
550+
{
551+
if (_bestBidTick < 0 || _bestAskTick < 0)
552+
{
553+
return std::nullopt;
554+
}
555+
int64_t spreadTicks = _bestAskTick - _bestBidTick;
556+
return Price::fromRaw(_tickSize.raw() * spreadTicks);
557+
}
558+
559+
[[nodiscard]] inline std::optional<Price> mid() const noexcept
560+
{
561+
if (_bestBidTick < 0 || _bestAskTick < 0)
562+
{
563+
return std::nullopt;
564+
}
565+
int64_t midTick2 = _bestBidTick + _bestAskTick;
566+
return Price::fromRaw((_tickSize.raw() * midTick2) / 2);
567+
}
568+
540569
void clear() noexcept
541570
{
542571
_bids.fill({});

include/flox/connector/abstract_exchange_connector.h

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,28 @@
1111

1212
#include "flox/book/events/book_update_event.h"
1313
#include "flox/book/events/trade_event.h"
14+
#include "flox/common.h"
1415
#include "flox/engine/abstract_subsystem.h"
1516
#include "flox/util/base/move_only_function.h"
1617

1718
#include <string>
19+
#include <string_view>
1820

1921
namespace flox
2022
{
2123

22-
class IExchangeConnector : public ISubsystem
24+
class IExchangeConnector : public ISubsystem, public IDrainable
2325
{
2426
public:
2527
virtual ~IExchangeConnector() = default;
2628

29+
bool drain(uint32_t /*timeoutMs*/) override { return true; }
30+
2731
using BookUpdateCallback = MoveOnlyFunction<void(const BookUpdateEvent&)>;
2832
using TradeCallback = MoveOnlyFunction<void(const TradeEvent&)>;
33+
using DisconnectCallback = MoveOnlyFunction<void(std::string_view reason)>;
34+
using SequenceGapCallback = MoveOnlyFunction<void(uint64_t expected, uint64_t received)>;
35+
using StaleDataCallback = MoveOnlyFunction<void(SymbolId symbol, uint64_t lastUpdateMs)>;
2936

3037
virtual std::string exchangeId() const = 0;
3138

@@ -35,6 +42,15 @@ class IExchangeConnector : public ISubsystem
3542
_onTrade = std::move(onTrade);
3643
}
3744

45+
virtual void setErrorCallbacks(DisconnectCallback onDisconnect,
46+
SequenceGapCallback onSequenceGap,
47+
StaleDataCallback onStaleData)
48+
{
49+
_onDisconnect = std::move(onDisconnect);
50+
_onSequenceGap = std::move(onSequenceGap);
51+
_onStaleData = std::move(onStaleData);
52+
}
53+
3854
protected:
3955
void emitBookUpdate(const BookUpdateEvent& bu)
4056
{
@@ -52,9 +68,36 @@ class IExchangeConnector : public ISubsystem
5268
}
5369
}
5470

71+
void emitDisconnect(std::string_view reason)
72+
{
73+
if (_onDisconnect)
74+
{
75+
_onDisconnect(reason);
76+
}
77+
}
78+
79+
void emitSequenceGap(uint64_t expected, uint64_t received)
80+
{
81+
if (_onSequenceGap)
82+
{
83+
_onSequenceGap(expected, received);
84+
}
85+
}
86+
87+
void emitStaleData(SymbolId symbol, uint64_t lastUpdateMs)
88+
{
89+
if (_onStaleData)
90+
{
91+
_onStaleData(symbol, lastUpdateMs);
92+
}
93+
}
94+
5595
private:
5696
BookUpdateCallback _onBookUpdate;
5797
TradeCallback _onTrade;
98+
DisconnectCallback _onDisconnect;
99+
SequenceGapCallback _onSequenceGap;
100+
StaleDataCallback _onStaleData;
58101
};
59102

60103
} // namespace flox

include/flox/engine/abstract_subsystem.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
#pragma once
1111

12+
#include <cstdint>
13+
1214
namespace flox
1315
{
1416

@@ -17,8 +19,15 @@ class ISubsystem
1719
public:
1820
virtual ~ISubsystem() = default;
1921

20-
virtual void start() {};
21-
virtual void stop() {};
22+
virtual void start() {}
23+
virtual void stop() {}
24+
};
25+
26+
class IDrainable
27+
{
28+
public:
29+
virtual ~IDrainable() = default;
30+
virtual bool drain(uint32_t timeoutMs) = 0;
2231
};
2332

2433
} // namespace flox

0 commit comments

Comments
 (0)