Skip to content

Commit 3ad3b91

Browse files
committed
DEL: Remove decoding and usage of record_count
1 parent 72fe8da commit 3ad3b91

File tree

10 files changed

+130
-110
lines changed

10 files changed

+130
-110
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 0.6.0 - TBD
44
- Added support for imbalance schema
55
- Added support for decoding `ts_out` field
6+
- Removed `record_count` from `Metadata`
67
- Changed `Historical::BatchDownload` to return the paths of the downloaded files
78
- Added flags `kSnapshot` and `kMaybeBadBook`
89

include/databento/dbn.hpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ struct Metadata {
4040
UnixNanos end;
4141
// The maximum number of records for the query.
4242
std::uint64_t limit;
43-
// The total number of records.
44-
std::uint64_t record_count;
4543
// The input symbology type.
4644
SType stype_in;
4745
// The output symbology type.
@@ -71,10 +69,9 @@ inline bool operator==(const Metadata& lhs, const Metadata& rhs) {
7169
return lhs.version == rhs.version && lhs.dataset == rhs.dataset &&
7270
lhs.schema == rhs.schema && lhs.start == rhs.start &&
7371
lhs.end == rhs.end && lhs.limit == rhs.limit &&
74-
lhs.record_count == rhs.record_count && lhs.stype_in == rhs.stype_in &&
75-
lhs.stype_out == rhs.stype_out && lhs.symbols == rhs.symbols &&
76-
lhs.partial == rhs.partial && lhs.not_found == rhs.not_found &&
77-
lhs.mappings == rhs.mappings;
72+
lhs.stype_in == rhs.stype_in && lhs.stype_out == rhs.stype_out &&
73+
lhs.symbols == rhs.symbols && lhs.partial == rhs.partial &&
74+
lhs.not_found == rhs.not_found && lhs.mappings == rhs.mappings;
7875
}
7976

8077
std::string ToString(const Metadata& metadata);

include/databento/dbn_decoder.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ class DbnDecoder {
2929

3030
// Should only be called once
3131
Metadata DecodeMetadata();
32-
// Lifetime of returned Record is until next call to ParseRecord.
33-
Record DecodeRecord();
32+
// Lifetime of returned Record is until next call to DecodeRecord. Returns
33+
// nullptr once the end of the input has been reached.
34+
const Record* DecodeRecord();
3435

3536
private:
3637
static std::string DecodeSymbol(
@@ -51,5 +52,6 @@ class DbnDecoder {
5152
std::unique_ptr<IReadable> input_;
5253
std::vector<std::uint8_t> buffer_;
5354
std::size_t buffer_idx_{};
55+
Record current_record_{nullptr};
5456
};
5557
} // namespace databento

src/dbn.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ std::ostream& operator<<(std::ostream& stream, const Metadata& metadata) {
1919
.AddField("start", metadata.start)
2020
.AddField("end", metadata.end)
2121
.AddField("limit", metadata.limit)
22-
.AddField("record_count", metadata.record_count)
2322
.AddField("stype_in", metadata.stype_in)
2423
.AddField("stype_out", metadata.stype_out);
2524

src/dbn_decoder.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ databento::Metadata DbnDecoder::DecodeMetadataFields(
105105
res.end =
106106
UnixNanos{std::chrono::nanoseconds{Consume<std::uint64_t>(buffer_it)}};
107107
res.limit = Consume<std::uint64_t>(buffer_it);
108-
res.record_count = Consume<std::uint64_t>(buffer_it);
108+
// skip deprecated record_count
109+
buffer_it += 8;
109110
res.stype_in = static_cast<SType>(Consume<std::uint8_t>(buffer_it));
110111
res.stype_out = static_cast<SType>(Consume<std::uint8_t>(buffer_it));
111112
// skip reserved
@@ -136,23 +137,23 @@ databento::Metadata DbnDecoder::DecodeMetadata() {
136137
}
137138

138139
// assumes ParseMetadata has been called
139-
databento::Record DbnDecoder::DecodeRecord() {
140+
const databento::Record* DbnDecoder::DecodeRecord() {
140141
// need some unread unread_bytes
141142
const auto unread_bytes = buffer_.size() - buffer_idx_;
142143
if (unread_bytes == 0) {
143144
if (FillBuffer() == 0) {
144-
throw DbnResponseError{"Reached end of DBN stream"};
145+
return nullptr;
145146
}
146147
}
147148
// check length
148149
while (buffer_.size() - buffer_idx_ < BufferRecordHeader()->Size()) {
149150
if (FillBuffer() == 0) {
150-
throw DbnResponseError{"Reached end of DBN stream"};
151+
return nullptr;
151152
}
152153
}
153-
Record res{BufferRecordHeader()};
154-
buffer_idx_ += res.Size();
155-
return res;
154+
current_record_ = Record{BufferRecordHeader()};
155+
buffer_idx_ += current_record_.Size();
156+
return &current_record_;
156157
}
157158

158159
size_t DbnDecoder::FillBuffer() {

src/dbn_file_store.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@ DbnFileStore::DbnFileStore(const std::string& file_path)
1212
void DbnFileStore::Replay(const MetadataCallback& metadata_callback,
1313
const RecordCallback& record_callback) {
1414
auto metadata = parser_.DecodeMetadata();
15-
const auto record_count = metadata.record_count;
1615
if (metadata_callback) {
1716
metadata_callback(std::move(metadata));
1817
}
19-
for (std::size_t i = 0; i < record_count; ++i) {
20-
const auto record = parser_.DecodeRecord();
21-
if (record_callback(record) == KeepGoing::Stop) {
18+
const databento::Record* record;
19+
while ((record = parser_.DecodeRecord()) != nullptr) {
20+
if (record_callback(*record) == KeepGoing::Stop) {
2221
break;
2322
}
2423
}

src/historical.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,14 +1071,12 @@ void Historical::TimeseriesGetRange(const HttplibParams& params,
10711071
try {
10721072
DbnDecoder dbn_decoder{channel};
10731073
Metadata metadata = dbn_decoder.DecodeMetadata();
1074-
const auto record_count = metadata.record_count;
10751074
if (metadata_callback) {
10761075
metadata_callback(std::move(metadata));
10771076
}
1078-
for (auto i = 0UL; i < record_count; ++i) {
1079-
const bool should_stop =
1080-
record_callback(dbn_decoder.DecodeRecord()) == KeepGoing::Stop;
1081-
if (should_stop) {
1077+
const Record* record;
1078+
while ((record = dbn_decoder.DecodeRecord()) != nullptr) {
1079+
if (record_callback(*record) == KeepGoing::Stop) {
10821080
should_continue = false;
10831081
break;
10841082
}

0 commit comments

Comments
 (0)