Skip to content

Commit 2e96099

Browse files
committed
ADD: Add BlockForStop to C++ client
1 parent 6b0ba00 commit 2e96099

File tree

11 files changed

+133
-86
lines changed

11 files changed

+133
-86
lines changed

CHANGELOG.md

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.10.0 - TBD
4+
5+
#### Enhancements
6+
- Added `LiveThreaded::BlockForStop` to make it easier to wait for one or more records
7+
before closing the session
8+
39
## 0.9.1 - 2023-07-11
410

511
#### Enhancements
@@ -44,18 +50,24 @@
4450
- Added initial support for live data with `LiveBlocking` and `LiveThreaded` clients
4551
- Added support for statistics schema
4652
- Added `SystemMsg` and `ErrorMsg` records for use in live data
47-
- Added `strike_price`, `strike_price_currency`, and `instrument_class` to `InstrumentDefMsg`
53+
- Added `strike_price`, `strike_price_currency`, and `instrument_class` to
54+
`InstrumentDefMsg`
4855
- Added `FixedPx` helper class for formatting fixed prices
4956
- Added configurable log receiver `ILogReceiver`
50-
- Added `instrument_class`, `strike_price`, and `strike_price_currency` to definition schema
51-
- Added additional `condition` variants for `DatasetConditionDetail` (degraded, pending, missing)
52-
- Added additional member `last_modified_date` to `DatasetConditionDetail` Added `has_mixed_schema`, `has_mixed_stype_in`, and `ts_out` to `Metadata` to support live data
57+
- Added `instrument_class`, `strike_price`, and `strike_price_currency` to definition
58+
schema
59+
- Added additional `condition` variants for `DatasetConditionDetail` (degraded, pending,
60+
missing)
61+
- Added additional member `last_modified_date` to `DatasetConditionDetail`
62+
- Added `has_mixed_schema`, `has_mixed_stype_in`, and `ts_out` to `Metadata` to support
63+
live data
5364
- Added optional `compression` parameter to `BatchSubmitJob`
5465

5566
#### Breaking changes
5667
- Removed `related` and `related_security_id` from `InstrumentDefMsg`
5768
- Renamed `BatchJob.cost` to `cost_usd` and value now expressed as US dollars
58-
- Renamed `SType::ProductId` to `SType::InstrumentId` and `SType::Native` to `SType::RawSymbol`
69+
- Renamed `SType::ProductId` to `SType::InstrumentId` and `SType::Native` to
70+
`SType::RawSymbol`
5971
- Renamed `RecordHeader::product_id` to `instrument_id`
6072
- Renamed `InstrumentDefMsg::symbol` to `raw_symbol`
6173
- Renamed `SymbolMapping::native_symbol` to `raw_symbol`

README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,8 @@ using namespace databento;
8686
int main() {
8787
std::unordered_map<std::uint32_t, std::string> symbol_mappings;
8888

89-
auto client = LiveBuilder{}
90-
.SetKey("$YOUR_API_KEY")
91-
.SetDataset("GLBX.MDP3")
92-
.BuildThreaded();
89+
auto client =
90+
LiveBuilder{}.SetKeyFromEnv().SetDataset("GLBX.MDP3").BuildThreaded();
9391

9492
auto handler = [&symbol_mappings](const Record& rec) {
9593
if (rec.Holds<TradeMsg>()) {
@@ -99,8 +97,7 @@ int main() {
9997
<< '\n';
10098
} else if (rec.Holds<SymbolMappingMsg>()) {
10199
auto mapping = rec.Get<SymbolMappingMsg>();
102-
symbol_mappings[mapping.hd.instrument_id] =
103-
mapping.stype_out_symbol.data();
100+
symbol_mappings[mapping.hd.instrument_id] = mapping.STypeOutSymbol();
104101
}
105102
return KeepGoing::Continue;
106103
};

example/live/readme.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@ using namespace databento;
1313
int main() {
1414
std::unordered_map<std::uint32_t, std::string> symbol_mappings;
1515

16-
auto client = LiveBuilder{}
17-
.SetKey("$YOUR_API_KEY")
18-
.SetDataset("GLBX.MDP3")
19-
.BuildThreaded();
16+
auto client =
17+
LiveBuilder{}.SetKeyFromEnv().SetDataset("GLBX.MDP3").BuildThreaded();
2018

2119
auto handler = [&symbol_mappings](const Record& rec) {
2220
if (rec.Holds<TradeMsg>()) {
@@ -26,8 +24,7 @@ int main() {
2624
<< '\n';
2725
} else if (rec.Holds<SymbolMappingMsg>()) {
2826
auto mapping = rec.Get<SymbolMappingMsg>();
29-
symbol_mappings[mapping.hd.instrument_id] =
30-
mapping.stype_out_symbol.data();
27+
symbol_mappings[mapping.hd.instrument_id] = mapping.STypeOutSymbol();
3128
}
3229
return KeepGoing::Continue;
3330
};

example/live/simple.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ int main() {
4040
};
4141
auto record_callback = [&symbol_mappings](const databento::Record& rec) {
4242
using databento::RType;
43-
switch (rec.Header().rtype) {
43+
switch (rec.RType()) {
4444
case RType::Mbo: {
4545
auto ohlcv = rec.Get<databento::WithTsOut<databento::MboMsg>>();
4646
std::cout << "Received tick for "
@@ -58,7 +58,7 @@ int main() {
5858
auto mapping = rec.Get<databento::SymbolMappingMsg>();
5959
std::cout << "Received symbol mapping: " << mapping << '\n';
6060
symbol_mappings.emplace(mapping.hd.instrument_id,
61-
mapping.stype_in_symbol.data());
61+
mapping.STypeInSymbol());
6262
break;
6363
}
6464
case RType::System: {
@@ -75,7 +75,7 @@ int main() {
7575
}
7676
default: {
7777
std::cerr << "Received unknown record with rtype " << std::hex
78-
<< static_cast<std::uint16_t>(rec.Header().rtype) << '\n';
78+
<< static_cast<std::uint16_t>(rec.RType()) << '\n';
7979
}
8080
}
8181
return databento::KeepGoing::Continue;

include/databento/live_threaded.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <chrono>
34
#include <functional> // function
45
#include <memory> // unique_ptr
56
#include <string>
@@ -76,6 +77,10 @@ class LiveThreaded {
7677
ExceptionCallback exception_callback);
7778
// Closes the current connection, and attempts to reconnect to the gateway.
7879
void Reconnect();
80+
// Blocking wait with an optional timeout for the session to close when the
81+
// record_callback or the exception_callback return Stop.
82+
void BlockForStop();
83+
KeepGoing BlockForStop(std::chrono::milliseconds timeout);
7984

8085
private:
8186
struct Impl;

src/live_threaded.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
#include <atomic>
44
#include <chrono> // milliseconds
5+
#include <condition_variable>
56
#include <exception>
7+
#include <mutex>
68
#include <sstream>
79
#include <thread>
810
#include <utility> // forward, move, swap
@@ -19,8 +21,18 @@ struct LiveThreaded::Impl {
1921
: log_receiver{log_receiver},
2022
blocking{log_receiver, std::forward<A>(args)...} {}
2123

24+
void NotifyOfStop() {
25+
const std::lock_guard<std::mutex> lock{last_cb_ret_mutex};
26+
last_cb_ret = KeepGoing::Stop;
27+
last_cb_ret_cv.notify_all();
28+
}
29+
2230
ILogReceiver* log_receiver;
31+
// Set to false when destructor is called
2332
std::atomic<bool> keep_going{true};
33+
KeepGoing last_cb_ret{KeepGoing::Continue};
34+
std::mutex last_cb_ret_mutex;
35+
std::condition_variable last_cb_ret_cv;
2436
LiveBlocking blocking;
2537
};
2638

@@ -111,6 +123,27 @@ void LiveThreaded::Start(MetadataCallback metadata_callback,
111123

112124
void LiveThreaded::Reconnect() { impl_->blocking.Reconnect(); }
113125

126+
void LiveThreaded::BlockForStop() {
127+
std::unique_lock<std::mutex> lock{impl_->last_cb_ret_mutex};
128+
auto* impl = impl_.get();
129+
// wait for stop
130+
impl_->last_cb_ret_cv.wait(
131+
lock, [impl] { return impl->last_cb_ret == KeepGoing::Stop; });
132+
}
133+
134+
databento::KeepGoing LiveThreaded::BlockForStop(
135+
std::chrono::milliseconds timeout) {
136+
std::unique_lock<std::mutex> lock{impl_->last_cb_ret_mutex};
137+
auto* impl = impl_.get();
138+
// wait for stop
139+
if (impl_->last_cb_ret_cv.wait_for(lock, timeout, [impl] {
140+
return impl->last_cb_ret == KeepGoing::Stop;
141+
})) {
142+
return KeepGoing::Stop;
143+
}
144+
return KeepGoing::Continue;
145+
}
146+
114147
void LiveThreaded::ProcessingThread(Impl* impl,
115148
MetadataCallback&& metadata_callback,
116149
RecordCallback&& record_callback,
@@ -146,6 +179,7 @@ void LiveThreaded::ProcessingThread(Impl* impl,
146179
if (rec) {
147180
if (record_cb(*rec) == KeepGoing::Stop) {
148181
impl->blocking.Stop();
182+
impl->NotifyOfStop();
149183
return;
150184
}
151185
} // else timeout
@@ -155,6 +189,7 @@ void LiveThreaded::ProcessingThread(Impl* impl,
155189
ExceptionAction::Restart) {
156190
break; // break out of NextRecord loop, to restart Start loop
157191
} else {
192+
impl->NotifyOfStop();
158193
return;
159194
}
160195
}

test/include/mock/mock_lsg_server.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class MockLsgServer {
2828
void Authenticate();
2929
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
3030
SType stype);
31-
void Start(Schema schema);
31+
void Start();
3232
std::size_t Send(const std::string& msg);
3333
::ssize_t UncheckedSend(const std::string& msg);
3434
template <typename Rec>

test/src/dbn_decoder_tests.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -707,8 +707,8 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) {
707707
const auto& ch_def1 = ch_record1->Get<InstrumentDefMsg>();
708708
const auto& f_def1 = f_record1->Get<InstrumentDefMsg>();
709709
EXPECT_EQ(ch_def1, f_def1);
710-
EXPECT_STREQ(ch_def1.exchange.data(), "XNAS");
711-
EXPECT_STREQ(ch_def1.raw_symbol.data(), "MSFT");
710+
EXPECT_STREQ(ch_def1.Exchange(), "XNAS");
711+
EXPECT_STREQ(ch_def1.RawSymbol(), "MSFT");
712712
EXPECT_EQ(ch_def1.security_update_action, SecurityUpdateAction::Add);
713713
EXPECT_EQ(ch_def1.min_lot_size_round_lot, 100);
714714
EXPECT_EQ(ch_def1.instrument_class, InstrumentClass::Stock);
@@ -723,8 +723,8 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) {
723723
const auto& ch_def2 = ch_record2->Get<InstrumentDefMsg>();
724724
const auto& f_def2 = f_record2->Get<InstrumentDefMsg>();
725725
EXPECT_EQ(ch_def2, f_def2);
726-
EXPECT_STREQ(ch_def2.exchange.data(), "XNAS");
727-
EXPECT_STREQ(ch_def2.raw_symbol.data(), "MSFT");
726+
EXPECT_STREQ(ch_def2.Exchange(), "XNAS");
727+
EXPECT_STREQ(ch_def2.RawSymbol(), "MSFT");
728728
EXPECT_EQ(ch_def2.security_update_action, SecurityUpdateAction::Add);
729729
EXPECT_EQ(ch_def2.min_lot_size_round_lot, 100);
730730
EXPECT_EQ(ch_def2.instrument_class, InstrumentClass::Stock);

test/src/live_blocking_tests.cpp

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class LiveBlockingTests : public testing::Test {
3131
}
3232

3333
static constexpr auto kKey = "32-character-with-lots-of-filler";
34+
static constexpr auto kLocalhost = "127.0.0.1";
3435

3536
std::unique_ptr<ILogReceiver> logger_{new NullLogReceiver};
3637
};
@@ -44,26 +45,24 @@ TEST_F(LiveBlockingTests, TestAuthentication) {
4445
}};
4546

4647
const LiveBlocking target{logger_.get(), kKey,
47-
dataset::kXnasItch, "127.0.0.1",
48+
dataset::kXnasItch, kLocalhost,
4849
mock_server.Port(), kTsOut};
4950
}
5051

5152
TEST_F(LiveBlockingTests, TestStart) {
5253
constexpr auto kTsOut = false;
53-
constexpr auto kSchema = Schema::Mbo;
5454
const mock::MockLsgServer mock_server{dataset::kGlbxMdp3, kTsOut,
5555
[](mock::MockLsgServer& self) {
5656
self.Accept();
5757
self.Authenticate();
58-
self.Start(kSchema);
58+
self.Start();
5959
}};
6060

61-
LiveBlocking target{logger_.get(), kKey,
62-
dataset::kGlbxMdp3, "127.0.0.1",
61+
LiveBlocking target{logger_.get(), kKey, dataset::kGlbxMdp3, kLocalhost,
6362
mock_server.Port(), kTsOut};
6463
const auto metadata = target.Start();
6564
EXPECT_EQ(metadata.version, 1);
66-
EXPECT_EQ(metadata.schema, kSchema);
65+
EXPECT_TRUE(metadata.has_mixed_schema);
6766
EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3);
6867
}
6968

@@ -81,7 +80,7 @@ TEST_F(LiveBlockingTests, TestSubscribe) {
8180
self.Subscribe(kSymbols, kSchema, kSType);
8281
}};
8382

84-
LiveBlocking target{logger_.get(), kKey, kDataset, "127.0.0.1",
83+
LiveBlocking target{logger_.get(), kKey, kDataset, kLocalhost,
8584
mock_server.Port(), kTsOut};
8685
target.Subscribe(kSymbols, kSchema, kSType);
8786
}
@@ -107,7 +106,7 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunking) {
107106
}
108107
}};
109108

110-
LiveBlocking target{logger_.get(), kKey, kDataset, "127.0.0.1",
109+
LiveBlocking target{logger_.get(), kKey, kDataset, kLocalhost,
111110
mock_server.Port(), kTsOut};
112111
const std::vector<std::string> kSymbols(kSymbolCount, kSymbol);
113112
target.Subscribe(kSymbols, kSchema, kSType);
@@ -126,8 +125,7 @@ TEST_F(LiveBlockingTests, TestNextRecord) {
126125
}
127126
}};
128127

129-
LiveBlocking target{logger_.get(), kKey,
130-
dataset::kXnasItch, "127.0.0.1",
128+
LiveBlocking target{logger_.get(), kKey, dataset::kXnasItch, kLocalhost,
131129
mock_server.Port(), kTsOut};
132130
for (size_t i = 0; i < kRecCount; ++i) {
133131
const auto rec = target.NextRecord();
@@ -177,8 +175,7 @@ TEST_F(LiveBlockingTests, TestNextRecordTimeout) {
177175
self.SendRecord(kRec);
178176
}};
179177

180-
LiveBlocking target{logger_.get(), kKey,
181-
dataset::kXnasItch, "127.0.0.1",
178+
LiveBlocking target{logger_.get(), kKey, dataset::kXnasItch, kLocalhost,
182179
mock_server.Port(), kTsOut};
183180
{
184181
// wait for server to send first record to avoid flaky timeouts
@@ -232,8 +229,7 @@ TEST_F(LiveBlockingTests, TestNextRecordPartialRead) {
232229
send_remaining_cv);
233230
}};
234231

235-
LiveBlocking target{logger_.get(), kKey,
236-
dataset::kGlbxMdp3, "127.0.0.1",
232+
LiveBlocking target{logger_.get(), kKey, dataset::kGlbxMdp3, kLocalhost,
237233
mock_server.Port(), kTsOut};
238234
auto rec = target.NextRecord();
239235
ASSERT_TRUE(rec.Holds<MboMsg>());
@@ -276,8 +272,7 @@ TEST_F(LiveBlockingTests, TestNextRecordWithTsOut) {
276272
}
277273
}};
278274

279-
LiveBlocking target{logger_.get(), kKey,
280-
dataset::kXnasItch, "127.0.0.1",
275+
LiveBlocking target{logger_.get(), kKey, dataset::kXnasItch, kLocalhost,
281276
mock_server.Port(), kTsOut};
282277
for (size_t i = 0; i < kRecCount; ++i) {
283278
const auto rec = target.NextRecord();
@@ -323,7 +318,7 @@ TEST_F(LiveBlockingTests, TestStop) {
323318
}; // namespace databento
324319

325320
LiveBlocking target{logger_.get(), kKey,
326-
dataset::kXnasItch, "127.0.0.1",
321+
dataset::kXnasItch, kLocalhost,
327322
mock_server->Port(), kTsOut};
328323
ASSERT_EQ(target.NextRecord().Get<WithTsOut<TradeMsg>>(), kRec);
329324
target.Stop();
@@ -335,8 +330,8 @@ TEST_F(LiveBlockingTests, TestStop) {
335330

336331
TEST_F(LiveBlockingTests, TestConnectWhenGatewayNotUp) {
337332
constexpr auto kTsOut = true;
338-
ASSERT_THROW(LiveBlocking(logger_.get(), kKey, dataset::kXnasItch,
339-
"127.0.0.1", 80, kTsOut),
333+
ASSERT_THROW(LiveBlocking(logger_.get(), kKey, dataset::kXnasItch, kLocalhost,
334+
80, kTsOut),
340335
databento::TcpError);
341336
}
342337

@@ -380,11 +375,11 @@ TEST_F(LiveBlockingTests, TestReconnect) {
380375
self.Accept();
381376
self.Authenticate();
382377
self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol);
383-
self.Start(Schema::Trades);
378+
self.Start();
384379
self.SendRecord(kRec);
385380
}}};
386381
LiveBlocking target{logger_.get(), kKey,
387-
dataset::kXnasItch, "127.0.0.1",
382+
dataset::kXnasItch, kLocalhost,
388383
mock_server->Port(), kTsOut};
389384
// Tell server to close connection
390385
{
@@ -401,7 +396,7 @@ TEST_F(LiveBlockingTests, TestReconnect) {
401396
target.Reconnect();
402397
target.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol);
403398
const auto metadata = target.Start();
404-
ASSERT_EQ(metadata.schema, Schema::Trades);
399+
EXPECT_TRUE(metadata.has_mixed_schema);
405400
const auto rec = target.NextRecord();
406401
ASSERT_TRUE(rec.Holds<TradeMsg>());
407402
ASSERT_EQ(rec.Get<TradeMsg>(), kRec);

0 commit comments

Comments
 (0)