Skip to content

Commit f98fa0a

Browse files
authored
add RELAXED/ADAPTIVE backoff modes to reduce CPU usage (#76)
1 parent 8127787 commit f98fa0a

File tree

6 files changed

+197
-13
lines changed

6 files changed

+197
-13
lines changed

docs/components/engine/engine_config.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ namespace config
9191
inline constexpr size_t DEFAULT_EVENTBUS_CAPACITY = 4096;
9292
inline constexpr size_t DEFAULT_EVENTBUS_MAX_CONSUMERS = 128;
9393

94+
// Connector pool capacity (must be > EventBus capacity to prevent exhaustion)
95+
inline constexpr size_t DEFAULT_CONNECTOR_POOL_CAPACITY = 8191;
96+
9497
// CPU Affinity Priority Constants
9598
inline constexpr int ISOLATED_CORE_PRIORITY_BOOST = 5;
9699
inline constexpr int DEFAULT_REALTIME_PRIORITY = 80;
@@ -112,6 +115,9 @@ These can be overridden via preprocessor defines:
112115
- `FLOX_DEFAULT_EVENTBUS_CAPACITY`
113116
- `FLOX_DEFAULT_EVENTBUS_MAX_CONSUMERS`
114117
- `FLOX_DEFAULT_ORDER_TRACKER_CAPACITY`
118+
- `FLOX_DEFAULT_CONNECTOR_POOL_CAPACITY`
119+
120+
**Important:** `DEFAULT_CONNECTOR_POOL_CAPACITY` must be greater than `DEFAULT_EVENTBUS_CAPACITY`. EventBus only reclaims events on wrap-around, so if pool capacity ≤ bus capacity, the pool will exhaust before any events are returned.
115121

116122
## Notes
117123

docs/components/util/memory/pool.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,24 @@ pool.setExhaustionCallback([](size_t capacity, size_t inUse) {
8383

8484
The exhaustion callback is invoked each time `acquire()` returns `nullopt` due to pool exhaustion.
8585

86+
## Sizing Guidelines
87+
88+
When using pools with `EventBus`, the pool capacity **must be greater than** the EventBus capacity:
89+
90+
```cpp
91+
// Correct: pool capacity (8191) > bus capacity (4096)
92+
Pool<BookUpdateEvent, 8191> pool;
93+
EventBus<Handle<BookUpdateEvent>, 4096> bus;
94+
95+
// Incorrect: will cause pool exhaustion
96+
Pool<BookUpdateEvent, 4096> pool; // Same as bus = will exhaust!
97+
EventBus<Handle<BookUpdateEvent>, 4096> bus;
98+
```
99+
100+
**Why?** EventBus only reclaims events when the ring buffer wraps around. If pool capacity ≤ bus capacity, all pool slots will be in-flight before any can be returned.
101+
102+
The default `config::DEFAULT_CONNECTOR_POOL_CAPACITY` (8191) is sized for this reason when used with `DEFAULT_EVENTBUS_CAPACITY` (4096).
103+
86104
## Notes
87105

88106
* Zero allocations in steady-state operation.

docs/reference/api/engine/engine_config.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ The header also defines compile-time constants that can be overridden via prepro
7373
#ifndef FLOX_DEFAULT_ORDER_TRACKER_CAPACITY
7474
#define FLOX_DEFAULT_ORDER_TRACKER_CAPACITY 4096
7575
#endif
76+
77+
#ifndef FLOX_DEFAULT_CONNECTOR_POOL_CAPACITY
78+
#define FLOX_DEFAULT_CONNECTOR_POOL_CAPACITY 8191
79+
#endif
7680
```
7781

7882
### `config` Namespace Constants
@@ -83,6 +87,9 @@ namespace config {
8387
inline constexpr size_t DEFAULT_EVENTBUS_CAPACITY = 4096;
8488
inline constexpr size_t DEFAULT_EVENTBUS_MAX_CONSUMERS = 128;
8589

90+
// Connector pool capacity (must be > EventBus capacity)
91+
inline constexpr size_t DEFAULT_CONNECTOR_POOL_CAPACITY = 8191;
92+
8693
// CPU Affinity Priority Constants
8794
inline constexpr int ISOLATED_CORE_PRIORITY_BOOST = 5;
8895
inline constexpr int DEFAULT_REALTIME_PRIORITY = 80;
@@ -104,6 +111,7 @@ namespace config {
104111
|----------|-------|-------------|
105112
| `DEFAULT_EVENTBUS_CAPACITY` | 4096 | Ring buffer size for EventBus |
106113
| `DEFAULT_EVENTBUS_MAX_CONSUMERS` | 128 | Maximum subscribers per bus |
114+
| `DEFAULT_CONNECTOR_POOL_CAPACITY` | 8191 | Pool capacity for exchange connectors (must be > EventBus capacity) |
107115
| `ISOLATED_CORE_PRIORITY_BOOST` | 5 | Priority boost for isolated cores |
108116
| `DEFAULT_REALTIME_PRIORITY` | 80 | Default RT priority for threads |
109117
| `FALLBACK_REALTIME_PRIORITY` | 90 | Fallback RT priority |

docs/reference/utilities.md

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -265,22 +265,71 @@ public:
265265

266266
**Header:** `flox/util/performance/busy_backoff.h`
267267

268-
Adaptive backoff strategy for spin loops.
268+
Configurable backoff strategy for spin loops with support for different deployment environments.
269269

270270
```cpp
271-
class BusyBackoff
271+
enum class BackoffMode
272272
{
273-
public:
274-
void pause(); // Spin, yield, or sleep based on iteration count
273+
AGGRESSIVE, // Dedicated colo: busy-spin with CPU pause, minimal yields
274+
RELAXED, // Shared VPS/cloud: early sleep, minimal CPU burn
275+
ADAPTIVE // Auto-adjust: starts aggressive, backs off under contention
276+
};
277+
278+
namespace config
279+
{
280+
inline BackoffMode defaultBackoffMode = BackoffMode::ADAPTIVE;
281+
}
282+
283+
struct BusyBackoff
284+
{
285+
explicit BusyBackoff(BackoffMode mode = config::defaultBackoffMode);
286+
void pause(); // Spin, yield, or sleep based on mode and iteration count
275287
void reset(); // Reset iteration counter
276288
};
277289
```
278290

279-
### Strategy
291+
### Backoff Modes
292+
293+
| Mode | CPU Usage | Latency | Use Case |
294+
|------|-----------|---------|----------|
295+
| `AGGRESSIVE` | ~100% | Lowest | Dedicated colo, bare metal, isolated cores |
296+
| `RELAXED` | <5% | +100-500µs | Shared VPS, cloud, monitoring-only |
297+
| `ADAPTIVE` | Variable | Variable | General purpose, good default |
298+
299+
### Mode Details
300+
301+
**AGGRESSIVE** (for dedicated hardware):
302+
1. First 2048 iterations: CPU pause instruction
303+
2. Beyond: yield(), then reset
280304

281-
1. First ~100 iterations: CPU pause instruction
282-
2. Next ~100 iterations: `std::this_thread::yield()`
283-
3. Beyond: Short sleep (microseconds)
305+
**RELAXED** (for shared infrastructure):
306+
1. First 8 iterations: CPU pause
307+
2. Next 8 iterations: yield()
308+
3. Beyond: sleep(100-500µs)
309+
310+
**ADAPTIVE** (auto-adjusting):
311+
1. First 128 iterations: CPU pause (burst handling)
312+
2. 128-512: yield() (medium contention)
313+
3. 512-2048: sleep(10µs)
314+
4. Beyond: sleep(100µs), reset to 512
315+
316+
### Configuration
317+
318+
Set the global default at startup:
319+
```cpp
320+
// For cloud/VPS deployments
321+
flox::config::defaultBackoffMode = flox::BackoffMode::RELAXED;
322+
323+
// For dedicated colo
324+
flox::config::defaultBackoffMode = flox::BackoffMode::AGGRESSIVE;
325+
```
326+
327+
Or configure per EventBus:
328+
```cpp
329+
EventBus<MyEvent> bus;
330+
bus.setBackoffMode(BackoffMode::RELAXED);
331+
bus.start();
332+
```
284333

285334
---
286335

include/flox/util/eventing/event_bus.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,9 @@ class EventBus : public ISubsystem
161161
auto* l = _consumers[i].listener;
162162
auto required = _consumers[i].required;
163163
auto coreIdx = _consumers[i].coreIndex;
164+
auto backoffMode = _backoffMode;
164165

165-
_consumers[i].thread.emplace([this, i, l, required, coreIdx]
166+
_consumers[i].thread.emplace([this, i, l, required, coreIdx, backoffMode]
166167
{
167168
#if FLOX_CPU_AFFINITY_ENABLED
168169
auto threadCpuAffinity = performance::createCpuAffinity();
@@ -214,9 +215,9 @@ class EventBus : public ISubsystem
214215
if (_active.fetch_sub(1, std::memory_order_acq_rel) == 1) _cv.notify_one();
215216
}
216217

217-
BusyBackoff backoff;
218+
BusyBackoff backoff(backoffMode);
218219
int64_t next = -1;
219-
220+
220221
while (_running.load(std::memory_order_acquire))
221222
{
222223
const int64_t seq = next + 1;
@@ -339,6 +340,8 @@ class EventBus : public ISubsystem
339340
uint32_t consumerCount() const { return _consumerCount.load(std::memory_order_acquire); }
340341
void enableDrainOnStop() { _drainOnStop = true; }
341342

343+
void setBackoffMode(BackoffMode mode) { _backoffMode = mode; }
344+
342345
#if FLOX_CPU_AFFINITY_ENABLED
343346
// ---------- CPU Affinity / RT priority ----------
344347
void setAffinityConfig(const AffinityConfig& cfg)
@@ -608,6 +611,7 @@ class EventBus : public ISubsystem
608611
std::atomic<uint32_t> _active{0};
609612

610613
bool _drainOnStop{false};
614+
BackoffMode _backoffMode{config::defaultBackoffMode};
611615

612616
#if FLOX_CPU_AFFINITY_ENABLED
613617
// CPU affinity / RT

include/flox/util/performance/busy_backoff.h

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,63 @@
77
* license information.
88
*/
99

10+
#pragma once
11+
12+
#include <chrono>
1013
#include <thread>
1114

1215
#if defined(__x86_64__) || defined(_M_X64)
1316
#include <immintrin.h>
1417
#define FLOX_CPU_PAUSE() _mm_pause()
1518
#elif defined(__aarch64__) || defined(_M_ARM64)
16-
#define FLOX_CPU_PAUSE() __asm__ __volatile__("yield" ::: "memory")
19+
#define FLOX_CPU_PAUSE() __asm__ __volatile__("yield" :: \
20+
: "memory")
1721
#else
1822
#define FLOX_CPU_PAUSE() ((void)0)
1923
#endif
2024

2125
namespace flox
2226
{
2327

28+
enum class BackoffMode
29+
{
30+
AGGRESSIVE, ///< Dedicated colo: busy-spin with CPU pause, minimal yields
31+
RELAXED, ///< Shared VPS/cloud: early sleep, minimal CPU burn
32+
ADAPTIVE ///< Auto-adjust: starts aggressive, backs off under contention
33+
};
34+
35+
namespace config
36+
{
37+
inline BackoffMode defaultBackoffMode = BackoffMode::ADAPTIVE;
38+
} // namespace config
39+
2440
struct BusyBackoff
2541
{
2642
int spins = 0;
43+
BackoffMode mode;
44+
45+
explicit BusyBackoff(BackoffMode m = config::defaultBackoffMode) : mode(m) {}
2746

2847
inline void pause()
48+
{
49+
switch (mode)
50+
{
51+
case BackoffMode::AGGRESSIVE:
52+
pauseAggressive();
53+
break;
54+
case BackoffMode::RELAXED:
55+
pauseRelaxed();
56+
break;
57+
case BackoffMode::ADAPTIVE:
58+
pauseAdaptive();
59+
break;
60+
}
61+
}
62+
63+
inline void reset() { spins = 0; }
64+
65+
private:
66+
inline void pauseAggressive()
2967
{
3068
if (spins < 2048)
3169
{
@@ -45,7 +83,68 @@ struct BusyBackoff
4583
}
4684
}
4785

48-
inline void reset() { spins = 0; }
86+
inline void pauseRelaxed()
87+
{
88+
if (spins < 8)
89+
{
90+
// Brief spin for immediate data
91+
FLOX_CPU_PAUSE();
92+
++spins;
93+
return;
94+
}
95+
96+
if (spins < 16)
97+
{
98+
// Quick yield
99+
std::this_thread::yield();
100+
++spins;
101+
return;
102+
}
103+
104+
// Sleep to release CPU - 100us is enough for market data which arrives
105+
// at most every few milliseconds per symbol
106+
std::this_thread::sleep_for(std::chrono::microseconds(100));
107+
if (spins < 64)
108+
{
109+
++spins;
110+
}
111+
else
112+
{
113+
// Increase sleep duration for sustained idle periods
114+
std::this_thread::sleep_for(std::chrono::microseconds(500));
115+
}
116+
}
117+
118+
inline void pauseAdaptive()
119+
{
120+
if (spins < 128)
121+
{
122+
// Initial aggressive phase for low-latency burst handling
123+
FLOX_CPU_PAUSE();
124+
++spins;
125+
return;
126+
}
127+
128+
if (spins < 512)
129+
{
130+
// Medium contention: yield to other threads
131+
std::this_thread::yield();
132+
++spins;
133+
return;
134+
}
135+
136+
if (spins < 2048)
137+
{
138+
// High contention: short sleep
139+
std::this_thread::sleep_for(std::chrono::microseconds(10));
140+
++spins;
141+
return;
142+
}
143+
144+
// Sustained contention: longer sleep, reset cycle
145+
std::this_thread::sleep_for(std::chrono::microseconds(100));
146+
spins = 512; // Reset to medium level, not zero
147+
}
49148
};
50149

51150
} // namespace flox

0 commit comments

Comments
 (0)