Skip to content

Commit 29214a5

Browse files
authored
BinaryLog: add rotation callback (#71)
1 parent a9d8e0c commit 29214a5

File tree

3 files changed

+78
-6
lines changed

3 files changed

+78
-6
lines changed

docs/reference/api/replay/binary_log_writer.md

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,24 @@
33
`BinaryLogWriter` writes market data to binary log segments. It handles segment rotation, compression, indexing, and CRC checksums.
44

55
```cpp
6+
// Callback for custom segment naming on rotation
7+
using RotationCallback = std::filesystem::path (*)(
8+
void* user_data,
9+
const std::filesystem::path& output_dir,
10+
uint32_t segment_number);
11+
612
struct WriterConfig {
713
std::filesystem::path output_dir;
8-
std::string output_filename; // Optional: override generated name
14+
std::string output_filename; // Optional: first segment name
915
uint64_t max_segment_bytes{256ull << 20}; // 256 MB default
1016
uint64_t buffer_size{64ull << 10}; // 64 KB buffer
1117
uint8_t exchange_id{0};
1218
bool sync_on_rotate{true};
1319
bool create_index{true};
1420
uint16_t index_interval{1000}; // Events per index entry
1521
CompressionType compression{CompressionType::None};
22+
RotationCallback rotation_callback{nullptr}; // Custom naming on rotation
23+
void* rotation_user_data{nullptr}; // User data for callback
1624
};
1725

1826
class BinaryLogWriter {
@@ -44,14 +52,16 @@ public:
4452
| Field | Default | Description |
4553
|-------|---------|-------------|
4654
| `output_dir` | - | Directory for segment files. |
47-
| `output_filename` | - | Optional fixed filename (disables rotation). |
55+
| `output_filename` | - | Optional filename for first segment. |
4856
| `max_segment_bytes` | 256 MB | Maximum segment size before rotation. |
4957
| `buffer_size` | 64 KB | Internal write buffer size. |
5058
| `exchange_id` | 0 | Exchange identifier in segment header. |
5159
| `sync_on_rotate` | true | Sync to disk on segment rotation. |
5260
| `create_index` | true | Build seek index in segments. |
5361
| `index_interval` | 1000 | Events between index entries. |
5462
| `compression` | None | Compression type (`None` or `LZ4`). |
63+
| `rotation_callback` | nullptr | Custom callback for segment naming on rotation. |
64+
| `rotation_user_data` | nullptr | User data passed to rotation callback. |
5565
5666
## Methods
5767
@@ -107,6 +117,47 @@ writer.flush();
107117
writer.close();
108118
```
109119
120+
## Custom Rotation Callback
121+
122+
By default, rotated segments use timestamp-based names. To customize naming:
123+
124+
```cpp
125+
// Context for the callback
126+
struct MyContext {
127+
std::string prefix;
128+
int sequence{1};
129+
};
130+
131+
// Callback function (must be a plain function, not a lambda with captures)
132+
std::filesystem::path my_rotation_cb(
133+
void* user_data,
134+
const std::filesystem::path& output_dir,
135+
uint32_t segment_number)
136+
{
137+
auto* ctx = static_cast<MyContext*>(user_data);
138+
std::ostringstream fname;
139+
fname << ctx->prefix << "_" << std::setfill('0') << std::setw(3)
140+
<< (ctx->sequence + segment_number - 1) << ".floxlog";
141+
return output_dir / fname.str();
142+
}
143+
144+
// Usage
145+
MyContext ctx{"2025-01-15", 1};
146+
147+
replay::WriterConfig config{
148+
.output_dir = "/data/market",
149+
.output_filename = "2025-01-15_001.floxlog", // First segment
150+
.max_segment_bytes = 256ull << 20,
151+
.rotation_callback = my_rotation_cb,
152+
.rotation_user_data = &ctx, // Must outlive writer!
153+
};
154+
155+
replay::BinaryLogWriter writer(config);
156+
// When segment rotates, callback generates "2025-01-15_002.floxlog", etc.
157+
```
158+
159+
**Important:** The `rotation_user_data` pointer must remain valid for the lifetime of the writer.
160+
110161
## Notes
111162

112163
* Thread-safe via internal mutex.

include/flox/replay/writers/binary_log_writer.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,26 @@
2121
namespace flox::replay
2222
{
2323

24+
using RotationCallback = std::filesystem::path (*)(void* user_data,
25+
const std::filesystem::path& output_dir,
26+
uint32_t segment_number);
27+
2428
struct WriterConfig
2529
{
2630
std::filesystem::path output_dir;
27-
std::string output_filename; // If set, use this filename instead of generating one
31+
std::string output_filename; // If set, use this filename for first segment
2832
uint64_t max_segment_bytes{256ull << 20};
2933
uint64_t buffer_size{64ull << 10};
3034
uint8_t exchange_id{0};
3135
bool sync_on_rotate{true};
3236
bool create_index{true};
3337
uint16_t index_interval{kDefaultIndexInterval};
3438
CompressionType compression{CompressionType::None};
39+
40+
/// Optional callback for custom segment naming on rotation.
41+
/// If not set, rotated segments use timestamp-based names.
42+
RotationCallback rotation_callback{nullptr};
43+
void* rotation_user_data{nullptr};
3544
};
3645

3746
struct WriterStats
@@ -76,7 +85,7 @@ class BinaryLogWriter
7685
void updateSegmentHeader();
7786
void writeIndex();
7887
void closeInternal();
79-
std::filesystem::path generateSegmentPath() const;
88+
std::filesystem::path generateSegmentPath();
8089

8190
bool isCompressed() const { return _config.compression != CompressionType::None; }
8291

@@ -89,6 +98,7 @@ class BinaryLogWriter
8998

9099
SegmentHeader _segment_header{};
91100
uint64_t _segment_bytes{0};
101+
uint32_t _segment_number{0}; // current segment number (1-based after first open)
92102
bool _header_written{false};
93103

94104
std::vector<IndexEntry> _index_entries;

src/replay/binary_log_writer.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,24 @@ BinaryLogWriter& BinaryLogWriter::operator=(BinaryLogWriter&& other) noexcept
6868
return *this;
6969
}
7070

71-
std::filesystem::path BinaryLogWriter::generateSegmentPath() const
71+
std::filesystem::path BinaryLogWriter::generateSegmentPath()
7272
{
73-
if (!_config.output_filename.empty())
73+
++_segment_number;
74+
75+
// First segment: use output_filename if provided
76+
if (_segment_number == 1 && !_config.output_filename.empty())
7477
{
7578
return _config.output_dir / _config.output_filename;
7679
}
7780

81+
// Use rotation callback if provided
82+
if (_config.rotation_callback)
83+
{
84+
return _config.rotation_callback(_config.rotation_user_data, _config.output_dir,
85+
_segment_number);
86+
}
87+
88+
// Default: timestamp-based naming
7889
using namespace std::chrono;
7990
auto now = system_clock::now();
8091
auto ns = duration_cast<nanoseconds>(now.time_since_epoch()).count();

0 commit comments

Comments
 (0)