Skip to content

Commit cb44bdc

Browse files
dongxiao1198xiao.dong
andauthored
feat: add avro input&output stream based on arrow stream impl (#105)
support basic avro stream based on arrow implement --------- Co-authored-by: xiao.dong <[email protected]>
1 parent 70cb244 commit cb44bdc

File tree

5 files changed

+597
-3
lines changed

5 files changed

+597
-3
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,12 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
8181
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)
8282

8383
if(ICEBERG_BUILD_BUNDLE)
84-
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
85-
avro/demo_avro.cc avro/avro_schema_util.cc)
84+
set(ICEBERG_BUNDLE_SOURCES
85+
arrow/demo_arrow.cc
86+
arrow/arrow_fs_file_io.cc
87+
avro/demo_avro.cc
88+
avro/avro_schema_util.cc
89+
avro/avro_stream_internal.cc)
8690

8791
# Libraries to link with exported libiceberg_bundle.{so,a}.
8892
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "avro_stream_internal.h"
21+
22+
#include <format>
23+
24+
#include <arrow/result.h>
25+
26+
#include "iceberg/exception.h"
27+
28+
namespace iceberg::avro {
29+
30+
AvroInputStream::AvroInputStream(
31+
std::shared_ptr<arrow::io::RandomAccessFile> input_stream, int64_t buffer_size)
32+
: input_stream_(std::move(input_stream)),
33+
buffer_size_(buffer_size),
34+
buffer_(buffer_size) {}
35+
36+
AvroInputStream::~AvroInputStream() = default;
37+
38+
bool AvroInputStream::next(const uint8_t** data, size_t* len) {
39+
// Return all unconsumed data in the buffer
40+
if (buffer_pos_ < available_bytes_) {
41+
*data = buffer_.data() + buffer_pos_;
42+
*len = available_bytes_ - buffer_pos_;
43+
byte_count_ += available_bytes_ - buffer_pos_;
44+
buffer_pos_ = available_bytes_;
45+
return true;
46+
}
47+
48+
// Read from the input stream when the buffer is empty
49+
auto result = input_stream_->Read(buffer_.size(), buffer_.data());
50+
// TODO(xiao.dong) Avro interface requires to return false if an error has occurred or
51+
// reach EOF, so error message can not be raised to the caller, add some log after we
52+
// have a logging system
53+
if (!result.ok() || result.ValueUnsafe() <= 0) {
54+
return false;
55+
}
56+
available_bytes_ = result.ValueUnsafe();
57+
buffer_pos_ = 0;
58+
59+
// Return the whole buffer
60+
*data = buffer_.data();
61+
*len = available_bytes_;
62+
byte_count_ += available_bytes_;
63+
buffer_pos_ = available_bytes_;
64+
65+
return true;
66+
}
67+
68+
void AvroInputStream::backup(size_t len) {
69+
if (len > buffer_pos_) {
70+
throw IcebergError(
71+
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
72+
}
73+
74+
buffer_pos_ -= len;
75+
byte_count_ -= len;
76+
}
77+
78+
void AvroInputStream::skip(size_t len) {
79+
// The range to skip is within the buffer
80+
if (buffer_pos_ + len <= available_bytes_) {
81+
buffer_pos_ += len;
82+
byte_count_ += len;
83+
return;
84+
}
85+
86+
seek(byte_count_ + len);
87+
}
88+
89+
size_t AvroInputStream::byteCount() const { return byte_count_; }
90+
91+
void AvroInputStream::seek(int64_t position) {
92+
auto status = input_stream_->Seek(position);
93+
if (!status.ok()) {
94+
throw IcebergError(
95+
std::format("Failed to seek to {}, got {}", position, status.ToString()));
96+
}
97+
98+
buffer_pos_ = 0;
99+
available_bytes_ = 0;
100+
byte_count_ = position;
101+
}
102+
103+
AvroOutputStream::AvroOutputStream(std::shared_ptr<arrow::io::OutputStream> output_stream,
104+
int64_t buffer_size)
105+
: output_stream_(std::move(output_stream)),
106+
buffer_size_(buffer_size),
107+
buffer_(buffer_size) {}
108+
109+
AvroOutputStream::~AvroOutputStream() = default;
110+
111+
bool AvroOutputStream::next(uint8_t** data, size_t* len) {
112+
if (buffer_pos_ > 0) {
113+
flush();
114+
}
115+
116+
*data = buffer_.data();
117+
*len = buffer_.size();
118+
buffer_pos_ = buffer_.size(); // Assume all will be used until backup is called
119+
120+
return true;
121+
}
122+
123+
void AvroOutputStream::backup(size_t len) {
124+
if (len > buffer_pos_) {
125+
throw IcebergError(
126+
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
127+
}
128+
buffer_pos_ -= len;
129+
}
130+
131+
uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_pos_; }
132+
133+
void AvroOutputStream::flush() {
134+
if (buffer_pos_ > 0) {
135+
auto status = output_stream_->Write(buffer_.data(), buffer_pos_);
136+
if (!status.ok()) {
137+
throw IcebergError(std::format("Write failed {}", status.ToString()));
138+
}
139+
flushed_bytes_ += buffer_pos_;
140+
buffer_pos_ = 0;
141+
}
142+
auto status = output_stream_->Flush();
143+
if (!status.ok()) {
144+
throw IcebergError(std::format("Flush failed {}", status.ToString()));
145+
}
146+
}
147+
148+
} // namespace iceberg::avro
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <arrow/io/interfaces.h>
23+
#include <avro/Stream.hh>
24+
25+
namespace iceberg::avro {
26+
27+
class AvroInputStream : public ::avro::SeekableInputStream {
28+
public:
29+
explicit AvroInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input_stream,
30+
int64_t buffer_size);
31+
32+
~AvroInputStream() override;
33+
34+
/// \brief Returns some of available data.
35+
/// \return true if some data is available, false if no more data is available or an
36+
/// error has occurred.
37+
bool next(const uint8_t** data, size_t* len) override;
38+
39+
/// \brief "Returns" back some of the data to the stream. The returned data must be less
40+
/// than what was obtained in the last call to next().
41+
void backup(size_t len) override;
42+
43+
/// \brief Skips number of bytes specified by len.
44+
void skip(size_t len) override;
45+
46+
/// \brief Returns the number of bytes read from this stream so far.
47+
/// All the bytes made available through next are considered to be used unless,
48+
/// returned back using backup.
49+
size_t byteCount() const override;
50+
51+
/// \brief Seek to a specific position in the stream. This may invalidate pointers
52+
/// returned from next(). This will also reset byteCount() to the given
53+
/// position.
54+
void seek(int64_t position) override;
55+
56+
private:
57+
std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_;
58+
const int64_t buffer_size_;
59+
std::vector<uint8_t> buffer_;
60+
size_t byte_count_ = 0; // bytes read from the input stream
61+
size_t buffer_pos_ = 0; // next position to read in the buffer
62+
size_t available_bytes_ = 0; // bytes available in the buffer
63+
};
64+
65+
class AvroOutputStream : public ::avro::OutputStream {
66+
public:
67+
explicit AvroOutputStream(std::shared_ptr<::arrow::io::OutputStream> output_stream,
68+
int64_t buffer_size);
69+
70+
~AvroOutputStream() override;
71+
72+
/// \brief Returns a buffer that can be written into.
73+
/// On successful return, data has the pointer to the buffer
74+
/// and len has the number of bytes available at data.
75+
bool next(uint8_t** data, size_t* len) override;
76+
77+
/// \brief "Returns" back to the stream some of the buffer obtained
78+
/// from in the last call to next().
79+
void backup(size_t len) override;
80+
81+
/// \brief Number of bytes written so far into this stream. The whole buffer
82+
/// returned by next() is assumed to be written unless some of
83+
/// it was returned using backup().
84+
uint64_t byteCount() const override;
85+
86+
/// \brief Flushes any data remaining in the buffer to the stream's underlying
87+
/// store, if any.
88+
void flush() override;
89+
90+
private:
91+
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
92+
const int64_t buffer_size_;
93+
std::vector<uint8_t> buffer_;
94+
size_t buffer_pos_ = 0; // position in the buffer
95+
uint64_t flushed_bytes_ = 0; // bytes flushed to the output stream
96+
};
97+
98+
} // namespace iceberg::avro

test/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ add_test(NAME util_test COMMAND util_test)
6565

6666
if(ICEBERG_BUILD_BUNDLE)
6767
add_executable(avro_test)
68-
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc)
68+
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc)
6969
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
7070
GTest::gmock)
7171
add_test(NAME avro_test COMMAND avro_test)

0 commit comments

Comments
 (0)