Skip to content

Commit 654d96c

Browse files
author
xiao.dong
committed
feat: add avro input&output stream based on arrow stream impl
1 parent 1c4c047 commit 654d96c

File tree

4 files changed

+287
-4
lines changed

4 files changed

+287
-4
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,12 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
7979
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)
8080

8181
if(ICEBERG_BUILD_BUNDLE)
82-
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
83-
avro/demo_avro.cc avro/avro_schema_util.cc)
82+
set(ICEBERG_BUNDLE_SOURCES
83+
arrow/demo_arrow.cc
84+
arrow/arrow_fs_file_io.cc
85+
avro/demo_avro.cc
86+
avro/avro_schema_util.cc
87+
avro/avro_stream.cpp)
8488

8589
# Libraries to link with exported libiceberg_bundle.{so,a}.
8690
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/avro/avro_stream.cpp

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#include "avro_stream.h"
2+
3+
#include <arrow/result.h>
4+
#include <iceberg/exception.h>
5+
6+
namespace iceberg::avro {
7+
8+
AvroInputStream::AvroInputStream(
9+
std::shared_ptr<arrow::io::RandomAccessFile> input_stream, int64_t buffer_size)
10+
: input_stream_(std::move(input_stream)),
11+
buffer_size_(buffer_size),
12+
buffer_(buffer_size) {}
13+
14+
AvroInputStream::~AvroInputStream() = default;
15+
16+
bool AvroInputStream::next(const uint8_t** data, size_t* len) {
17+
// Return all unconsumed data in the buffer
18+
if (buffer_pos_ < available_bytes_) {
19+
*data = buffer_.data() + buffer_pos_;
20+
*len = available_bytes_ - buffer_pos_;
21+
byte_count_ += available_bytes_ - buffer_pos_;
22+
buffer_pos_ = available_bytes_;
23+
return true;
24+
}
25+
26+
// Read from the input stream when the buffer is empty
27+
auto result = input_stream_->Read(buffer_.size(), buffer_.data());
28+
if (!result.ok()) {
29+
throw IcebergError(
30+
std::format("Read failed:{} at pos:{}", result.status().ToString(), buffer_pos_));
31+
}
32+
if (result.ValueUnsafe() <= 0) {
33+
return false;
34+
}
35+
available_bytes_ = result.ValueUnsafe();
36+
buffer_pos_ = 0;
37+
38+
// Return the whole buffer
39+
*data = buffer_.data();
40+
*len = available_bytes_;
41+
byte_count_ += available_bytes_;
42+
buffer_pos_ = available_bytes_;
43+
44+
return true;
45+
}
46+
47+
void AvroInputStream::backup(size_t len) {
48+
if (len > buffer_pos_) {
49+
throw IcebergError(
50+
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
51+
}
52+
53+
buffer_pos_ -= len;
54+
byte_count_ -= len;
55+
}
56+
57+
void AvroInputStream::skip(size_t len) {
58+
// The range to skip is within the buffer
59+
if (buffer_pos_ + len <= available_bytes_) {
60+
buffer_pos_ += len;
61+
byte_count_ += len;
62+
return;
63+
}
64+
65+
seek(byte_count_ + len);
66+
}
67+
68+
size_t AvroInputStream::byteCount() const { return byte_count_; }
69+
70+
void AvroInputStream::seek(int64_t position) {
71+
auto status = input_stream_->Seek(position);
72+
if (!status.ok()) {
73+
throw IcebergError(
74+
std::format("Failed to seek to {}, got {}", position, status.ToString()));
75+
}
76+
77+
buffer_pos_ = 0;
78+
available_bytes_ = 0;
79+
byte_count_ = position;
80+
}
81+
82+
AvroOutputStream::AvroOutputStream(std::shared_ptr<arrow::io::OutputStream> output_stream,
83+
int64_t buffer_size)
84+
: output_stream_(std::move(output_stream)),
85+
buffer_size_(buffer_size),
86+
buffer_(buffer_size) {}
87+
88+
AvroOutputStream::~AvroOutputStream() = default;
89+
90+
bool AvroOutputStream::next(uint8_t** data, size_t* len) {
91+
if (buffer_pos_ > 0) {
92+
flush();
93+
}
94+
95+
*data = buffer_.data();
96+
*len = buffer_.size();
97+
buffer_pos_ = buffer_.size(); // Assume all will be used until backup is called
98+
99+
return true;
100+
}
101+
102+
void AvroOutputStream::backup(size_t len) {
103+
if (len > buffer_pos_) {
104+
throw IcebergError(
105+
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
106+
}
107+
buffer_pos_ -= len;
108+
}
109+
110+
uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_pos_; }
111+
112+
void AvroOutputStream::flush() {
113+
if (buffer_pos_ > 0) {
114+
auto status = output_stream_->Write(buffer_.data(), buffer_pos_);
115+
if (!status.ok()) {
116+
throw IcebergError(std::format("Write failed {}", status.ToString()));
117+
}
118+
flushed_bytes_ += buffer_pos_;
119+
buffer_pos_ = 0;
120+
}
121+
auto status = output_stream_->Flush();
122+
if (!status.ok()) {
123+
throw IcebergError(std::format("Flush failed {}", status.ToString()));
124+
}
125+
}
126+
127+
} // namespace iceberg::avro

src/iceberg/avro/avro_stream.h

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#pragma once
2+
3+
#include <arrow/io/interfaces.h>
4+
#include <avro/Stream.hh>
5+
6+
namespace iceberg::avro {
7+
8+
class AvroInputStream : public ::avro::SeekableInputStream {
9+
public:
10+
explicit AvroInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input_stream,
11+
int64_t buffer_size);
12+
13+
~AvroInputStream() override;
14+
15+
/**
16+
* Returns some of available data.
17+
*
18+
* Returns true if some data is available, false if no more data is
19+
* available or an error has occurred.
20+
*/
21+
bool next(const uint8_t** data, size_t* len) override;
22+
23+
/**
24+
* "Returns" back some of the data to the stream. The returned
25+
* data must be less than what was obtained in the last call to
26+
* next().
27+
*/
28+
void backup(size_t len) override;
29+
30+
/**
31+
* Skips number of bytes specified by len.
32+
*/
33+
void skip(size_t len) override;
34+
35+
/**
36+
* Returns the number of bytes read from this stream so far.
37+
* All the bytes made available through next are considered
38+
* to be used unless, returned back using backup.
39+
*/
40+
size_t byteCount() const override;
41+
42+
/**
43+
* Seek to a specific position in the stream. This may invalidate pointers
44+
* returned from next(). This will also reset byteCount() to the given
45+
* position.
46+
*/
47+
void seek(int64_t position) override;
48+
49+
private:
50+
std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_;
51+
const int64_t buffer_size_;
52+
std::vector<uint8_t> buffer_;
53+
size_t byte_count_ = 0; // bytes read from the input stream
54+
size_t buffer_pos_ = 0; // next position to read in the buffer
55+
size_t available_bytes_ = 0; // bytes available in the buffer
56+
};
57+
58+
class AvroOutputStream : public ::avro::OutputStream {
59+
public:
60+
explicit AvroOutputStream(std::shared_ptr<::arrow::io::OutputStream> output_stream,
61+
int64_t buffer_size);
62+
63+
~AvroOutputStream() override;
64+
65+
/**
66+
* Returns a buffer that can be written into.
67+
* On successful return, data has the pointer to the buffer
68+
* and len has the number of bytes available at data.
69+
*/
70+
bool next(uint8_t** data, size_t* len) override;
71+
72+
/**
73+
* "Returns" back to the stream some of the buffer obtained
74+
* from in the last call to next().
75+
*/
76+
void backup(size_t len) override;
77+
78+
/**
79+
* Number of bytes written so far into this stream. The whole buffer
80+
* returned by next() is assumed to be written unless some of
81+
* it was returned using backup().
82+
*/
83+
uint64_t byteCount() const override;
84+
85+
/**
86+
* Flushes any data remaining in the buffer to the stream's underlying
87+
* store, if any.
88+
*/
89+
void flush() override;
90+
91+
private:
92+
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
93+
const int64_t buffer_size_;
94+
std::vector<uint8_t> buffer_;
95+
size_t buffer_pos_ = 0; // position in the buffer
96+
uint64_t flushed_bytes_ = 0; // bytes flushed to the output stream
97+
};
98+
99+
} // namespace iceberg::avro

test/avro_test.cc

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,35 @@
1717
* under the License.
1818
*/
1919

20+
#include <arrow/filesystem/filesystem.h>
21+
#include <arrow/filesystem/localfs.h>
22+
#include <arrow/io/interfaces.h>
23+
#include <arrow/result.h>
2024
#include <gtest/gtest.h>
25+
#include <iceberg/arrow/arrow_fs_file_io.h>
26+
#include <iceberg/avro/avro_stream.h>
2127
#include <iceberg/avro/demo_avro.h>
2228
#include <iceberg/file_reader.h>
2329

2430
#include "matchers.h"
31+
#include "temp_file_test_base.h"
2532

2633
namespace iceberg::avro {
2734

28-
TEST(AVROTest, TestDemoAvro) {
35+
class AVROTest : public TempFileTestBase {
36+
public:
37+
void SetUp() override {
38+
TempFileTestBase::SetUp();
39+
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
40+
std::make_shared<::arrow::fs::LocalFileSystem>());
41+
temp_filepath_ = CreateNewTempFilePath();
42+
}
43+
44+
std::shared_ptr<iceberg::FileIO> file_io_;
45+
std::string temp_filepath_;
46+
};
47+
48+
TEST_F(AVROTest, TestDemoAvro) {
2949
std::string expected =
3050
"{\n\
3151
\"type\": \"record\",\n\
@@ -44,7 +64,40 @@ TEST(AVROTest, TestDemoAvro) {
4464
EXPECT_EQ(avro.print(), expected);
4565
}
4666

47-
TEST(AVROTest, TestDemoAvroReader) {
67+
TEST_F(AVROTest, TestAvroBasicStream) {
68+
auto fs = std::make_shared<::arrow::fs::LocalFileSystem>();
69+
std::cout << temp_filepath_ << std::endl;
70+
auto arrow_out_ret = fs->OpenOutputStream(temp_filepath_);
71+
ASSERT_TRUE(arrow_out_ret.ok());
72+
auto avro_output_stream =
73+
std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()), 1024);
74+
std::string test_data = "test data";
75+
{
76+
uint8_t* buf;
77+
size_t buf_size;
78+
ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size));
79+
std::memcpy(buf, test_data.data(), test_data.size());
80+
avro_output_stream->backup(1024 - test_data.size());
81+
avro_output_stream->flush();
82+
}
83+
84+
auto arrow_in_ret = fs->OpenInputFile(temp_filepath_);
85+
ASSERT_TRUE(arrow_in_ret.ok());
86+
auto avro_input_stream =
87+
std::make_shared<AvroInputStream>(std::move(arrow_in_ret.ValueUnsafe()), 1024);
88+
{
89+
const uint8_t* data{};
90+
size_t len{};
91+
ASSERT_TRUE(avro_input_stream->next(&data, &len));
92+
EXPECT_EQ(len, test_data.size());
93+
94+
EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
95+
EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len), test_data);
96+
std::cout << std::string(reinterpret_cast<const char*>(data), len) << std::endl;
97+
ASSERT_FALSE(avro_input_stream->next(&data, &len));
98+
}
99+
}
100+
TEST_F(AVROTest, TestDemoAvroReader) {
48101
auto result = ReaderFactoryRegistry::Create(FileFormatType::kAvro, {});
49102
ASSERT_THAT(result, IsOk());
50103

0 commit comments

Comments
 (0)