Skip to content

Commit 37f8471

Browse files
committed
FIX: Fix Zstd decoding and file reading
1 parent e365fb5 commit 37f8471

File tree

9 files changed

+115
-31
lines changed

9 files changed

+115
-31
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
build
2+
!*.zst

include/databento/detail/file_stream.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
#include <cstddef> // size_t
44
#include <cstdint> // uint8_t
5-
#include <fstream>
5+
#include <fstream> // ifstream
6+
#include <string>
67

78
#include "databento/ireadable.hpp"
89

include/databento/detail/zstd_stream.hpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ namespace databento {
1313
namespace detail {
1414
class ZstdStream : public IReadable {
1515
public:
16-
explicit ZstdStream(std::unique_ptr<IReadable> input,
17-
std::vector<std::uint8_t>&& in_buffer);
16+
explicit ZstdStream(std::unique_ptr<IReadable> input);
17+
ZstdStream(std::unique_ptr<IReadable> input,
18+
std::vector<std::uint8_t>&& in_buffer);
1819

1920
// Read exactly `length` bytes into `buffer`.
2021
void ReadExact(std::uint8_t* buffer, std::size_t length) override;
@@ -28,8 +29,6 @@ class ZstdStream : public IReadable {
2829
std::size_t read_suggestion_;
2930
std::vector<std::uint8_t> in_buffer_;
3031
ZSTD_inBuffer z_in_buffer_;
31-
// std::vector<std::uint8_t> out_buffer_;
32-
// ZSTD_outBuffer z_out_buffer_;
3332
};
3433
} // namespace detail
3534
} // namespace databento

src/detail/file_stream.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "databento/detail/file_stream.hpp"
22

33
#include <ios> // streamsize
4+
#include <sstream>
45

56
#include "databento/exceptions.hpp"
67

@@ -14,12 +15,17 @@ FileStream::FileStream(const std::string& file_path) : stream_{file_path} {
1415
}
1516

1617
void FileStream::ReadExact(std::uint8_t* buffer, std::size_t length) {
17-
stream_.read(reinterpret_cast<char*>(buffer),
18-
static_cast<std::streamsize>(length));
18+
const auto size = ReadSome(buffer, length);
19+
if (size != length) {
20+
std::ostringstream err_msg;
21+
err_msg << "Unexpected end of file, expected " << length << " bytes, got "
22+
<< size;
23+
throw DbnResponseError{err_msg.str()};
24+
}
1925
}
2026

2127
std::size_t FileStream::ReadSome(std::uint8_t* buffer, std::size_t max_length) {
22-
return static_cast<std::size_t>(
23-
stream_.readsome(reinterpret_cast<char*>(buffer),
24-
static_cast<std::streamsize>(max_length)));
28+
stream_.read(reinterpret_cast<char*>(buffer),
29+
static_cast<std::streamsize>(max_length));
30+
return static_cast<std::size_t>(stream_.gcount());
2531
}

src/detail/zstd_stream.cpp

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
using databento::detail::ZstdStream;
99

10+
ZstdStream::ZstdStream(std::unique_ptr<IReadable> input)
11+
: ZstdStream{std::move(input), {}} {}
12+
1013
ZstdStream::ZstdStream(std::unique_ptr<IReadable> input,
1114
std::vector<std::uint8_t>&& in_buffer)
1215
: input_{std::move(input)},
@@ -31,26 +34,33 @@ void ZstdStream::ReadExact(std::uint8_t* buffer, std::size_t length) {
3134

3235
size_t ZstdStream::ReadSome(std::uint8_t* buffer, std::size_t max_length) {
3336
ZSTD_outBuffer z_out_buffer{buffer, max_length, 0};
34-
const auto unread_input = z_in_buffer_.size - z_in_buffer_.pos;
35-
if (unread_input > 0) {
36-
std::copy(
37-
in_buffer_.cbegin() + static_cast<std::ptrdiff_t>(z_in_buffer_.pos),
38-
in_buffer_.cend(), in_buffer_.begin());
39-
}
40-
const auto new_size = unread_input + read_suggestion_;
41-
if (new_size != in_buffer_.size()) {
42-
in_buffer_.resize(new_size);
43-
z_in_buffer_.src = in_buffer_.data();
44-
}
45-
z_in_buffer_.size = unread_input + input_->ReadSome(&in_buffer_[unread_input],
46-
read_suggestion_);
47-
z_in_buffer_.pos = 0;
48-
49-
read_suggestion_ =
50-
::ZSTD_decompressStream(z_dstream_.get(), &z_out_buffer, &z_in_buffer_);
51-
if (::ZSTD_isError(read_suggestion_)) {
52-
throw DbnResponseError{std::string{"Zstd error decompressing record: "} +
53-
::ZSTD_getErrorName(read_suggestion_)};
54-
}
37+
std::size_t read_size = 0;
38+
do {
39+
const auto unread_input = z_in_buffer_.size - z_in_buffer_.pos;
40+
if (unread_input > 0) {
41+
std::copy(
42+
in_buffer_.cbegin() + static_cast<std::ptrdiff_t>(z_in_buffer_.pos),
43+
in_buffer_.cend(), in_buffer_.begin());
44+
}
45+
if (read_suggestion_ == 0) {
46+
// next frame
47+
read_suggestion_ = ::ZSTD_initDStream(z_dstream_.get());
48+
}
49+
const auto new_size = unread_input + read_suggestion_;
50+
if (new_size != in_buffer_.size()) {
51+
in_buffer_.resize(new_size);
52+
z_in_buffer_.src = in_buffer_.data();
53+
}
54+
read_size = input_->ReadSome(&in_buffer_[unread_input], read_suggestion_);
55+
z_in_buffer_.size = unread_input + read_size;
56+
z_in_buffer_.pos = 0;
57+
58+
read_suggestion_ =
59+
::ZSTD_decompressStream(z_dstream_.get(), &z_out_buffer, &z_in_buffer_);
60+
if (::ZSTD_isError(read_suggestion_)) {
61+
throw DbnResponseError{std::string{"Zstd error decompressing: "} +
62+
::ZSTD_getErrorName(read_suggestion_)};
63+
}
64+
} while (z_out_buffer.pos == 0 && read_size > 0);
5565
return z_out_buffer.pos;
5666
}

test/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ set(
2929
src/datetime_tests.cpp
3030
src/dbn_tests.cpp
3131
src/dbn_decoder_tests.cpp
32+
src/file_stream_tests.cpp
3233
src/flag_set_tests.cpp
3334
src/historical_tests.cpp
3435
src/live_tests.cpp
@@ -44,6 +45,7 @@ set(
4445
src/stream_op_helper_tests.cpp
4546
src/symbology_tests.cpp
4647
src/tcp_client_tests.cpp
48+
src/zstd_stream_tests.cpp
4749
)
4850
add_executable(${PROJECT_NAME} ${test_headers} ${test_sources})
4951
find_package(Threads REQUIRED)
1.85 KB
Binary file not shown.

test/src/file_stream_tests.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#include <gtest/gtest.h>
2+
3+
#include "databento/detail/file_stream.hpp"
4+
#include "databento/exceptions.hpp"
5+
6+
namespace databento {
7+
namespace detail {
8+
namespace test {
9+
TEST(FileStreamTests, TestReadExactInsufficient) {
10+
const std::string file_path = TEST_BUILD_DIR "/data/test_data.ohlcv-1d.dbn";
11+
databento::detail::FileStream target{file_path};
12+
std::vector<std::uint8_t> buffer(1024); // File is less than 1KiB
13+
try {
14+
target.ReadExact(buffer.data(), buffer.size());
15+
FAIL() << "Expected throw";
16+
} catch (const databento::Exception& exc) {
17+
ASSERT_STREQ(exc.what(),
18+
"Unexpected end of file, expected 1024 bytes, got 206");
19+
}
20+
}
21+
22+
TEST(FileStreamTests, TestReadSomeLessThanMax) {
23+
const std::string file_path = TEST_BUILD_DIR "/data/test_data.ohlcv-1d.dbn";
24+
databento::detail::FileStream target{file_path};
25+
std::vector<std::uint8_t> buffer(1024); // File is less than 1KiB
26+
const auto read_size = target.ReadSome(buffer.data(), buffer.size());
27+
ASSERT_GT(read_size, 0);
28+
ASSERT_TRUE(std::any_of(buffer.cbegin(), buffer.cend(),
29+
[](std::uint8_t byte) { return byte != 0; }));
30+
}
31+
} // namespace test
32+
} // namespace detail
33+
} // namespace databento

test/src/zstd_stream_tests.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#include <gtest/gtest.h>
2+
3+
#include <cstddef>
4+
#include <cstdint>
5+
#include <memory>
6+
7+
#include "databento/detail/file_stream.hpp"
8+
#include "databento/detail/zstd_stream.hpp"
9+
#include "databento/enums.hpp"
10+
#include "databento/ireadable.hpp"
11+
#include "databento/record.hpp"
12+
13+
namespace databento {
14+
namespace detail {
15+
namespace test {
16+
TEST(ZstdStreamTests, TestMultiFrameFiles) {
17+
constexpr auto kRecordCount = 8;
18+
const std::string file_path =
19+
TEST_BUILD_DIR "/data/multi-frame.definition.zst";
20+
21+
databento::detail::ZstdStream target{std::unique_ptr<databento::IReadable>{
22+
new databento::detail::FileStream{file_path}}};
23+
for (std::size_t i = 0; i < kRecordCount; ++i) {
24+
databento::InstrumentDefMsg def_msg;
25+
target.ReadExact(reinterpret_cast<std::uint8_t*>(&def_msg),
26+
sizeof(def_msg));
27+
EXPECT_EQ(def_msg.hd.rtype, databento::rtype::InstrumentDef);
28+
}
29+
}
30+
} // namespace test
31+
} // namespace detail
32+
} // namespace databento

0 commit comments

Comments
 (0)