From 654d96c8ca1abe89b82622d44468952f5b25a272 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 20 May 2025 15:16:12 +0800 Subject: [PATCH 1/6] feat: add avro input&output stream based on arrow stream impl --- src/iceberg/CMakeLists.txt | 8 +- src/iceberg/avro/avro_stream.cpp | 127 +++++++++++++++++++++++++++++++ src/iceberg/avro/avro_stream.h | 99 ++++++++++++++++++++++++ test/avro_test.cc | 57 +++++++++++++- 4 files changed, 287 insertions(+), 4 deletions(-) create mode 100644 src/iceberg/avro/avro_stream.cpp create mode 100644 src/iceberg/avro/avro_stream.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0dfb57790..d949560c3 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -79,8 +79,12 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) if(ICEBERG_BUILD_BUNDLE) - set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc - avro/demo_avro.cc avro/avro_schema_util.cc) + set(ICEBERG_BUNDLE_SOURCES + arrow/demo_arrow.cc + arrow/arrow_fs_file_io.cc + avro/demo_avro.cc + avro/avro_schema_util.cc + avro/avro_stream.cpp) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/avro/avro_stream.cpp b/src/iceberg/avro/avro_stream.cpp new file mode 100644 index 000000000..e002a9a06 --- /dev/null +++ b/src/iceberg/avro/avro_stream.cpp @@ -0,0 +1,127 @@ +#include "avro_stream.h" + +#include +#include + +namespace iceberg::avro { + +AvroInputStream::AvroInputStream( + std::shared_ptr input_stream, int64_t buffer_size) + : input_stream_(std::move(input_stream)), + buffer_size_(buffer_size), + buffer_(buffer_size) {} + +AvroInputStream::~AvroInputStream() = default; + +bool AvroInputStream::next(const uint8_t** data, size_t* len) { + // Return all unconsumed data in the buffer + if (buffer_pos_ < available_bytes_) { + *data = buffer_.data() + buffer_pos_; + *len = available_bytes_ - buffer_pos_; + byte_count_ += available_bytes_ - buffer_pos_; + buffer_pos_ = available_bytes_; + return true; + } + + // Read from the input stream when the buffer is empty + auto result = input_stream_->Read(buffer_.size(), buffer_.data()); + if (!result.ok()) { + throw IcebergError( + std::format("Read failed:{} at pos:{}", result.status().ToString(), buffer_pos_)); + } + if (result.ValueUnsafe() <= 0) { + return false; + } + available_bytes_ = result.ValueUnsafe(); + buffer_pos_ = 0; + + // Return the whole buffer + *data = buffer_.data(); + *len = available_bytes_; + byte_count_ += available_bytes_; + buffer_pos_ = available_bytes_; + + return true; +} + +void AvroInputStream::backup(size_t len) { + if (len > buffer_pos_) { + throw IcebergError( + std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_)); + } + + buffer_pos_ -= len; + byte_count_ -= len; +} + +void AvroInputStream::skip(size_t len) { + // The range to skip is within the buffer + if (buffer_pos_ + len <= available_bytes_) { + buffer_pos_ += len; + byte_count_ += len; + return; + } + + seek(byte_count_ + len); +} + +size_t AvroInputStream::byteCount() const { return byte_count_; } + +void AvroInputStream::seek(int64_t position) { + auto status = input_stream_->Seek(position); + if (!status.ok()) { + throw IcebergError( + std::format("Failed to seek to {}, got {}", position, status.ToString())); + } + + buffer_pos_ = 0; + available_bytes_ = 0; + byte_count_ = position; +} + +AvroOutputStream::AvroOutputStream(std::shared_ptr output_stream, + int64_t buffer_size) + : output_stream_(std::move(output_stream)), + buffer_size_(buffer_size), + buffer_(buffer_size) {} + +AvroOutputStream::~AvroOutputStream() = default; + +bool AvroOutputStream::next(uint8_t** data, size_t* len) { + if (buffer_pos_ > 0) { + flush(); + } + + *data = buffer_.data(); + *len = buffer_.size(); + buffer_pos_ = buffer_.size(); // Assume all will be used until backup is called + + return true; +} + +void AvroOutputStream::backup(size_t len) { + if (len > buffer_pos_) { + throw IcebergError( + std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_)); + } + buffer_pos_ -= len; +} + +uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_pos_; } + +void AvroOutputStream::flush() { + if (buffer_pos_ > 0) { + auto status = output_stream_->Write(buffer_.data(), buffer_pos_); + if (!status.ok()) { + throw IcebergError(std::format("Write failed {}", status.ToString())); + } + flushed_bytes_ += buffer_pos_; + buffer_pos_ = 0; + } + auto status = output_stream_->Flush(); + if (!status.ok()) { + throw IcebergError(std::format("Flush failed {}", status.ToString())); + } +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_stream.h b/src/iceberg/avro/avro_stream.h new file mode 100644 index 000000000..5ea26dd55 --- /dev/null +++ b/src/iceberg/avro/avro_stream.h @@ -0,0 +1,99 @@ +#pragma once + +#include +#include + +namespace iceberg::avro { + +class AvroInputStream : public ::avro::SeekableInputStream { + public: + explicit AvroInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input_stream, + int64_t buffer_size); + + ~AvroInputStream() override; + + /** + * Returns some of available data. + * + * Returns true if some data is available, false if no more data is + * available or an error has occurred. + */ + bool next(const uint8_t** data, size_t* len) override; + + /** + * "Returns" back some of the data to the stream. The returned + * data must be less than what was obtained in the last call to + * next(). + */ + void backup(size_t len) override; + + /** + * Skips number of bytes specified by len. + */ + void skip(size_t len) override; + + /** + * Returns the number of bytes read from this stream so far. + * All the bytes made available through next are considered + * to be used unless, returned back using backup. + */ + size_t byteCount() const override; + + /** + * Seek to a specific position in the stream. This may invalidate pointers + * returned from next(). This will also reset byteCount() to the given + * position. + */ + void seek(int64_t position) override; + + private: + std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_; + const int64_t buffer_size_; + std::vector buffer_; + size_t byte_count_ = 0; // bytes read from the input stream + size_t buffer_pos_ = 0; // next position to read in the buffer + size_t available_bytes_ = 0; // bytes available in the buffer +}; + +class AvroOutputStream : public ::avro::OutputStream { + public: + explicit AvroOutputStream(std::shared_ptr<::arrow::io::OutputStream> output_stream, + int64_t buffer_size); + + ~AvroOutputStream() override; + + /** + * Returns a buffer that can be written into. + * On successful return, data has the pointer to the buffer + * and len has the number of bytes available at data. + */ + bool next(uint8_t** data, size_t* len) override; + + /** + * "Returns" back to the stream some of the buffer obtained + * from in the last call to next(). + */ + void backup(size_t len) override; + + /** + * Number of bytes written so far into this stream. The whole buffer + * returned by next() is assumed to be written unless some of + * it was returned using backup(). + */ + uint64_t byteCount() const override; + + /** + * Flushes any data remaining in the buffer to the stream's underlying + * store, if any. + */ + void flush() override; + + private: + std::shared_ptr<::arrow::io::OutputStream> output_stream_; + const int64_t buffer_size_; + std::vector buffer_; + size_t buffer_pos_ = 0; // position in the buffer + uint64_t flushed_bytes_ = 0; // bytes flushed to the output stream +}; + +} // namespace iceberg::avro diff --git a/test/avro_test.cc b/test/avro_test.cc index 5ffcbc01a..f6e5f637a 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -17,15 +17,35 @@ * under the License. */ +#include +#include +#include +#include #include +#include +#include #include #include #include "matchers.h" +#include "temp_file_test_base.h" namespace iceberg::avro { -TEST(AVROTest, TestDemoAvro) { +class AVROTest : public TempFileTestBase { + public: + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = std::make_shared( + std::make_shared<::arrow::fs::LocalFileSystem>()); + temp_filepath_ = CreateNewTempFilePath(); + } + + std::shared_ptr file_io_; + std::string temp_filepath_; +}; + +TEST_F(AVROTest, TestDemoAvro) { std::string expected = "{\n\ \"type\": \"record\",\n\ @@ -44,7 +64,40 @@ TEST(AVROTest, TestDemoAvro) { EXPECT_EQ(avro.print(), expected); } -TEST(AVROTest, TestDemoAvroReader) { +TEST_F(AVROTest, TestAvroBasicStream) { + auto fs = std::make_shared<::arrow::fs::LocalFileSystem>(); + std::cout << temp_filepath_ << std::endl; + auto arrow_out_ret = fs->OpenOutputStream(temp_filepath_); + ASSERT_TRUE(arrow_out_ret.ok()); + auto avro_output_stream = + std::make_shared(std::move(arrow_out_ret.ValueUnsafe()), 1024); + std::string test_data = "test data"; + { + uint8_t* buf; + size_t buf_size; + ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size)); + std::memcpy(buf, test_data.data(), test_data.size()); + avro_output_stream->backup(1024 - test_data.size()); + avro_output_stream->flush(); + } + + auto arrow_in_ret = fs->OpenInputFile(temp_filepath_); + ASSERT_TRUE(arrow_in_ret.ok()); + auto avro_input_stream = + std::make_shared(std::move(arrow_in_ret.ValueUnsafe()), 1024); + { + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size()); + + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); + EXPECT_EQ(std::string(reinterpret_cast(data), len), test_data); + std::cout << std::string(reinterpret_cast(data), len) << std::endl; + ASSERT_FALSE(avro_input_stream->next(&data, &len)); + } +} +TEST_F(AVROTest, TestDemoAvroReader) { auto result = ReaderFactoryRegistry::Create(FileFormatType::kAvro, {}); ASSERT_THAT(result, IsOk()); From 9b2f08014389e83c47e16bd038a5b60a1bb70393 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 20 May 2025 15:32:25 +0800 Subject: [PATCH 2/6] add license header --- src/iceberg/avro/avro_stream.cpp | 19 +++++++++++++++++++ src/iceberg/avro/avro_stream.h | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/iceberg/avro/avro_stream.cpp b/src/iceberg/avro/avro_stream.cpp index e002a9a06..15e0cc7f2 100644 --- a/src/iceberg/avro/avro_stream.cpp +++ b/src/iceberg/avro/avro_stream.cpp @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + #include "avro_stream.h" #include diff --git a/src/iceberg/avro/avro_stream.h b/src/iceberg/avro/avro_stream.h index 5ea26dd55..1d8e34d5a 100644 --- a/src/iceberg/avro/avro_stream.h +++ b/src/iceberg/avro/avro_stream.h @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + #pragma once #include From 52f073f4a2036e5403a39a65a70038832bcec9ec Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 20 May 2025 15:46:01 +0800 Subject: [PATCH 3/6] fix std::format --- src/iceberg/avro/avro_stream.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/iceberg/avro/avro_stream.cpp b/src/iceberg/avro/avro_stream.cpp index 15e0cc7f2..6720719ee 100644 --- a/src/iceberg/avro/avro_stream.cpp +++ b/src/iceberg/avro/avro_stream.cpp @@ -19,6 +19,8 @@ #include "avro_stream.h" +#include + #include #include From 6b07e7ff11dc8ba8a93b057fca276801b0ae047d Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 20 May 2025 17:41:24 +0800 Subject: [PATCH 4/6] add more case and independ stream test --- src/iceberg/CMakeLists.txt | 2 +- .../avro/{avro_stream.cpp => avro_stream.cc} | 0 src/iceberg/avro/avro_stream.h | 64 ++-- test/CMakeLists.txt | 2 +- test/avro_stream_test.cc | 352 ++++++++++++++++++ test/avro_test.cc | 57 +-- 6 files changed, 378 insertions(+), 99 deletions(-) rename src/iceberg/avro/{avro_stream.cpp => avro_stream.cc} (100%) create mode 100644 test/avro_stream_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index d949560c3..ab71b5c4b 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -84,7 +84,7 @@ if(ICEBERG_BUILD_BUNDLE) arrow/arrow_fs_file_io.cc avro/demo_avro.cc avro/avro_schema_util.cc - avro/avro_stream.cpp) + avro/avro_stream.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/avro/avro_stream.cpp b/src/iceberg/avro/avro_stream.cc similarity index 100% rename from src/iceberg/avro/avro_stream.cpp rename to src/iceberg/avro/avro_stream.cc diff --git a/src/iceberg/avro/avro_stream.h b/src/iceberg/avro/avro_stream.h index 1d8e34d5a..bb75b4d8a 100644 --- a/src/iceberg/avro/avro_stream.h +++ b/src/iceberg/avro/avro_stream.h @@ -31,38 +31,26 @@ class AvroInputStream : public ::avro::SeekableInputStream { ~AvroInputStream() override; - /** - * Returns some of available data. - * - * Returns true if some data is available, false if no more data is - * available or an error has occurred. - */ + /// \brief Returns some of available data. + /// \return true if some data is available, false if no more data is available or an + /// error has occurred. bool next(const uint8_t** data, size_t* len) override; - /** - * "Returns" back some of the data to the stream. The returned - * data must be less than what was obtained in the last call to - * next(). - */ + /// \brief "Returns" back some of the data to the stream. The returned data must be less + /// than what was obtained in the last call to next(). void backup(size_t len) override; - /** - * Skips number of bytes specified by len. - */ + /// \brief Skips number of bytes specified by len. void skip(size_t len) override; - /** - * Returns the number of bytes read from this stream so far. - * All the bytes made available through next are considered - * to be used unless, returned back using backup. - */ + /// \brief Returns the number of bytes read from this stream so far. + /// All the bytes made available through next are considered to be used unless, + /// returned back using backup. size_t byteCount() const override; - /** - * Seek to a specific position in the stream. This may invalidate pointers - * returned from next(). This will also reset byteCount() to the given - * position. - */ + /// \brief Seek to a specific position in the stream. This may invalidate pointers + /// returned from next(). This will also reset byteCount() to the given + /// position. void seek(int64_t position) override; private: @@ -81,30 +69,22 @@ class AvroOutputStream : public ::avro::OutputStream { ~AvroOutputStream() override; - /** - * Returns a buffer that can be written into. - * On successful return, data has the pointer to the buffer - * and len has the number of bytes available at data. - */ + /// \brief Returns a buffer that can be written into. + /// On successful return, data has the pointer to the buffer + /// and len has the number of bytes available at data. bool next(uint8_t** data, size_t* len) override; - /** - * "Returns" back to the stream some of the buffer obtained - * from in the last call to next(). - */ + /// \brief "Returns" back to the stream some of the buffer obtained + /// from in the last call to next(). void backup(size_t len) override; - /** - * Number of bytes written so far into this stream. The whole buffer - * returned by next() is assumed to be written unless some of - * it was returned using backup(). - */ + /// \brief Number of bytes written so far into this stream. The whole buffer + /// returned by next() is assumed to be written unless some of + /// it was returned using backup(). uint64_t byteCount() const override; - /** - * Flushes any data remaining in the buffer to the stream's underlying - * store, if any. - */ + /// \brief Flushes any data remaining in the buffer to the stream's underlying + /// store, if any. void flush() override; private: diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0c3776bea..4a88229f5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -65,7 +65,7 @@ add_test(NAME util_test COMMAND util_test) if(ICEBERG_BUILD_BUNDLE) add_executable(avro_test) - target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc) + target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME avro_test COMMAND avro_test) diff --git a/test/avro_stream_test.cc b/test/avro_stream_test.cc new file mode 100644 index 000000000..6f004f54e --- /dev/null +++ b/test/avro_stream_test.cc @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include "temp_file_test_base.h" + +namespace iceberg::avro { + +class AVROStreamTest : public TempFileTestBase { + public: + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = std::make_shared( + std::make_shared<::arrow::fs::LocalFileSystem>()); + temp_filepath_ = CreateNewTempFilePath(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + } + + std::shared_ptr CreateOutputStream(const std::string& path, + int64_t buffer_size) { + std::cout << "CreateOutputStream" << path << std::endl; + auto arrow_out_ret = local_fs_->OpenOutputStream(path); + if (!arrow_out_ret.ok()) { + throw std::runtime_error("Failed to open output stream: " + + arrow_out_ret.status().message()); + } + return std::make_shared(std::move(arrow_out_ret.ValueUnsafe()), + buffer_size); + } + + std::shared_ptr CreateInputStream(const std::string& path, + int64_t buffer_size) { + std::cout << "CreateInputStream" << path << std::endl; + auto arrow_in_ret = local_fs_->OpenInputFile(path); + if (!arrow_in_ret.ok()) { + throw std::runtime_error("Failed to open input stream: " + + arrow_in_ret.status().message()); + } + return std::make_shared(std::move(arrow_in_ret.ValueUnsafe()), + buffer_size); + } + + void WriteDataToStream(const std::shared_ptr& avro_output_stream, + const std::string& data) { + uint8_t* buf; + size_t buf_size; + ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size)); + std::memcpy(buf, data.data(), data.size()); + avro_output_stream->backup(1024 - data.size()); + avro_output_stream->flush(); + } + + void ReadDataFromStream(const std::shared_ptr& avro_input_stream, + std::string& data) { + const uint8_t* buf{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&buf, &len)); + data = std::string(reinterpret_cast(buf), len); + } + + void CheckStreamEof(const std::shared_ptr& avro_input_stream) { + const uint8_t* buf{}; + size_t len{}; + ASSERT_FALSE(avro_input_stream->next(&buf, &len)); + } + + int64_t buffer_size_ = 1024; + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::shared_ptr file_io_; + std::string temp_filepath_; +}; + +TEST_F(AVROStreamTest, TestAvroBasicStream) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + { + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size()); + + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); + EXPECT_EQ(std::string(reinterpret_cast(data), len), test_data); + std::cout << std::string(reinterpret_cast(data), len) << std::endl; + ASSERT_FALSE(avro_input_stream->next(&data, &len)); + } +} + +TEST_F(AVROStreamTest, InputStreamBackup) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + // Create a test input stream + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + + // Read data + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size()); + + // Backup 10 bytes + const size_t backupSize = 10; + avro_input_stream->backup(backupSize); + + // Check byteCount after backup + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size() - backupSize); + + // Read the backed-up data again + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, backupSize); + EXPECT_EQ(std::string(reinterpret_cast(data), len), + test_data.substr(test_data.size() - backupSize)); // NOLINT + + // Check that we've reached the end of the stream + ASSERT_FALSE(avro_input_stream->next(&data, &len)); +} + +TEST_F(AVROStreamTest, InputStreamSkip) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + // Create a test input stream + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + + // Skip the first 10 bytes + const size_t skipSize = 10; + avro_input_stream->skip(skipSize); + + // Check byteCount after skip + EXPECT_EQ(avro_input_stream->byteCount(), skipSize); + + // Read the remaining data + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size() - skipSize); + EXPECT_EQ(std::string(reinterpret_cast(data), len), + test_data.substr(skipSize)); // NOLINT + + // Check that we've reached the end of the stream + ASSERT_FALSE(avro_input_stream->next(&data, &len)); +} + +TEST_F(AVROStreamTest, InputStreamSeek) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + // Create a test input stream + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + + // Seek to position 15 + const int64_t seekPos = 15; + avro_input_stream->seek(seekPos); + + // Check byteCount after seek + EXPECT_EQ(avro_input_stream->byteCount(), static_cast(seekPos)); + + // Read the remaining data + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size() - seekPos); + EXPECT_EQ(std::string(reinterpret_cast(data), len), + test_data.substr(seekPos)); // NOLINT + + // Check that we've reached the end of the stream + ASSERT_FALSE(avro_input_stream->next(&data, &len)); +} + +TEST_F(AVROStreamTest, OutputStreamBasic) { + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + + // Test next() + uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_output_stream->next(&data, &len)); + EXPECT_EQ(len, static_cast(buffer_size_)); + + // Write some data + const std::string test_data = "Hello, Avro!"; + std::memcpy(data, test_data.data(), test_data.size()); + + // Backup unused bytes + avro_output_stream->backup(len - test_data.size()); + + // Check byteCount + EXPECT_EQ(avro_output_stream->byteCount(), test_data.size()); + + // Flush the data + avro_output_stream->flush(); + + // Verify the written data + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + std::string read_data; + ReadDataFromStream(avro_input_stream, read_data); + EXPECT_EQ(read_data, test_data); + CheckStreamEof(avro_input_stream); +} + +TEST_F(AVROStreamTest, OutputStreamMultipleWrites) { + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + + // Write first chunk + const std::string chunk1 = "First chunk of data. "; + uint8_t* data1{}; + size_t len1{}; + ASSERT_TRUE(avro_output_stream->next(&data1, &len1)); + std::memcpy(data1, chunk1.data(), chunk1.size()); + avro_output_stream->backup(len1 - chunk1.size()); + + // Write second chunk + const std::string chunk2 = "Second chunk of data."; + uint8_t* data2{}; + size_t len2{}; + ASSERT_TRUE(avro_output_stream->next(&data2, &len2)); + std::memcpy(data2, chunk2.data(), chunk2.size()); + avro_output_stream->backup(len2 - chunk2.size()); + + // Check byteCount + EXPECT_EQ(avro_output_stream->byteCount(), chunk1.size() + chunk2.size()); + + // Flush and close + avro_output_stream->flush(); + + // Verify the written data + std::string expectedData = chunk1 + chunk2; + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + std::string read_data; + ReadDataFromStream(avro_input_stream, read_data); + EXPECT_EQ(read_data, expectedData); + CheckStreamEof(avro_input_stream); +} + +TEST_F(AVROStreamTest, InputStreamLargeData) { + // Create a large test data set (larger than the buffer size) + const size_t dataSize = buffer_size_ * 2.5; + std::vector test_data(dataSize); + for (size_t i = 0; i < dataSize; ++i) { + test_data[i] = static_cast(i % 256); + } + + auto arrow_out_ret = local_fs_->OpenOutputStream(temp_filepath_); + if (!arrow_out_ret.ok()) { + throw std::runtime_error("Failed to open output stream: " + + arrow_out_ret.status().message()); + } + auto arrow_out = arrow_out_ret.ValueUnsafe(); + // Write the test data + auto status = arrow_out->Write(test_data.data(), test_data.size()); + ASSERT_TRUE(status.ok()); + status = arrow_out->Flush(); + ASSERT_TRUE(status.ok()); + status = arrow_out->Close(); + ASSERT_TRUE(status.ok()); + + // Create an AvroInputStream with a smaller buffer + const int64_t smallBufferSize = buffer_size_ / 2; + auto avro_input_stream = CreateInputStream(temp_filepath_, smallBufferSize); + + // Read all the data in chunks + std::vector readData; + const uint8_t* data{}; + size_t len{}; + while (avro_input_stream->next(&data, &len)) { + readData.insert(readData.end(), data, data + len); + } + + // Verify all data was read correctly + EXPECT_EQ(readData.size(), test_data.size()); + EXPECT_EQ(std::memcmp(readData.data(), test_data.data(), test_data.size()), 0); + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); +} + +TEST_F(AVROStreamTest, OutputStreamLargeData) { + // Create an AvroOutputStream with a small buffer + const int64_t smallBufferSize = 256; + auto avro_output_stream = CreateOutputStream(temp_filepath_, smallBufferSize); + + // Create a large test data set (larger than the buffer size) + const size_t dataSize = smallBufferSize * 3.5; + std::vector test_data(dataSize); + for (size_t i = 0; i < dataSize; ++i) { + test_data[i] = static_cast(i % 256); + } + + // Write the data in chunks + size_t bytesWritten = 0; + while (bytesWritten < dataSize) { + uint8_t* buffer{}; + size_t bufferSize{}; + ASSERT_TRUE(avro_output_stream->next(&buffer, &bufferSize)); + + size_t bytesToWrite = std::min(bufferSize, dataSize - bytesWritten); + std::memcpy(buffer, test_data.data() + bytesWritten, bytesToWrite); + + if (bytesToWrite < bufferSize) { + avro_output_stream->backup(bufferSize - bytesToWrite); + } + + bytesWritten += bytesToWrite; + } + + // Flush and close + avro_output_stream->flush(); + + // Verify the written data + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + std::vector readData; + const uint8_t* data{}; + size_t len{}; + while (avro_input_stream->next(&data, &len)) { + readData.insert(readData.end(), data, data + len); + } + // Verify all data was read correctly + EXPECT_EQ(readData.size(), test_data.size()); + EXPECT_EQ(std::memcmp(readData.data(), test_data.data(), test_data.size()), 0); + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); +} + +} // namespace iceberg::avro diff --git a/test/avro_test.cc b/test/avro_test.cc index f6e5f637a..5ffcbc01a 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -17,35 +17,15 @@ * under the License. */ -#include -#include -#include -#include #include -#include -#include #include #include #include "matchers.h" -#include "temp_file_test_base.h" namespace iceberg::avro { -class AVROTest : public TempFileTestBase { - public: - void SetUp() override { - TempFileTestBase::SetUp(); - file_io_ = std::make_shared( - std::make_shared<::arrow::fs::LocalFileSystem>()); - temp_filepath_ = CreateNewTempFilePath(); - } - - std::shared_ptr file_io_; - std::string temp_filepath_; -}; - -TEST_F(AVROTest, TestDemoAvro) { +TEST(AVROTest, TestDemoAvro) { std::string expected = "{\n\ \"type\": \"record\",\n\ @@ -64,40 +44,7 @@ TEST_F(AVROTest, TestDemoAvro) { EXPECT_EQ(avro.print(), expected); } -TEST_F(AVROTest, TestAvroBasicStream) { - auto fs = std::make_shared<::arrow::fs::LocalFileSystem>(); - std::cout << temp_filepath_ << std::endl; - auto arrow_out_ret = fs->OpenOutputStream(temp_filepath_); - ASSERT_TRUE(arrow_out_ret.ok()); - auto avro_output_stream = - std::make_shared(std::move(arrow_out_ret.ValueUnsafe()), 1024); - std::string test_data = "test data"; - { - uint8_t* buf; - size_t buf_size; - ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size)); - std::memcpy(buf, test_data.data(), test_data.size()); - avro_output_stream->backup(1024 - test_data.size()); - avro_output_stream->flush(); - } - - auto arrow_in_ret = fs->OpenInputFile(temp_filepath_); - ASSERT_TRUE(arrow_in_ret.ok()); - auto avro_input_stream = - std::make_shared(std::move(arrow_in_ret.ValueUnsafe()), 1024); - { - const uint8_t* data{}; - size_t len{}; - ASSERT_TRUE(avro_input_stream->next(&data, &len)); - EXPECT_EQ(len, test_data.size()); - - EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); - EXPECT_EQ(std::string(reinterpret_cast(data), len), test_data); - std::cout << std::string(reinterpret_cast(data), len) << std::endl; - ASSERT_FALSE(avro_input_stream->next(&data, &len)); - } -} -TEST_F(AVROTest, TestDemoAvroReader) { +TEST(AVROTest, TestDemoAvroReader) { auto result = ReaderFactoryRegistry::Create(FileFormatType::kAvro, {}); ASSERT_THAT(result, IsOk()); From ca3b4606ffb52a285ee36d720f482ab6a802d875 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 21 May 2025 10:14:12 +0800 Subject: [PATCH 5/6] fix header include&other comments --- src/iceberg/CMakeLists.txt | 2 +- ...avro_stream.cc => avro_stream_internal.cc} | 5 +-- .../{avro_stream.h => avro_stream_internal.h} | 0 test/avro_stream_test.cc | 32 +++++++------------ 4 files changed, 16 insertions(+), 23 deletions(-) rename src/iceberg/avro/{avro_stream.cc => avro_stream_internal.cc} (98%) rename src/iceberg/avro/{avro_stream.h => avro_stream_internal.h} (100%) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ab71b5c4b..776001c8d 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -84,7 +84,7 @@ if(ICEBERG_BUILD_BUNDLE) arrow/arrow_fs_file_io.cc avro/demo_avro.cc avro/avro_schema_util.cc - avro/avro_stream.cc) + avro/avro_stream_internal.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/avro/avro_stream.cc b/src/iceberg/avro/avro_stream_internal.cc similarity index 98% rename from src/iceberg/avro/avro_stream.cc rename to src/iceberg/avro/avro_stream_internal.cc index 6720719ee..90f12408e 100644 --- a/src/iceberg/avro/avro_stream.cc +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -17,12 +17,13 @@ * under the License. */ -#include "avro_stream.h" +#include "avro_stream_internal.h" #include #include -#include + +#include "iceberg/exception.h" namespace iceberg::avro { diff --git a/src/iceberg/avro/avro_stream.h b/src/iceberg/avro/avro_stream_internal.h similarity index 100% rename from src/iceberg/avro/avro_stream.h rename to src/iceberg/avro/avro_stream_internal.h diff --git a/test/avro_stream_test.cc b/test/avro_stream_test.cc index 6f004f54e..304e0516e 100644 --- a/test/avro_stream_test.cc +++ b/test/avro_stream_test.cc @@ -20,9 +20,8 @@ #include #include #include -#include -#include +#include "iceberg/avro/avro_stream_internal.h" #include "temp_file_test_base.h" namespace iceberg::avro { @@ -31,15 +30,12 @@ class AVROStreamTest : public TempFileTestBase { public: void SetUp() override { TempFileTestBase::SetUp(); - file_io_ = std::make_shared( - std::make_shared<::arrow::fs::LocalFileSystem>()); temp_filepath_ = CreateNewTempFilePath(); local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); } std::shared_ptr CreateOutputStream(const std::string& path, int64_t buffer_size) { - std::cout << "CreateOutputStream" << path << std::endl; auto arrow_out_ret = local_fs_->OpenOutputStream(path); if (!arrow_out_ret.ok()) { throw std::runtime_error("Failed to open output stream: " + @@ -51,7 +47,6 @@ class AVROStreamTest : public TempFileTestBase { std::shared_ptr CreateInputStream(const std::string& path, int64_t buffer_size) { - std::cout << "CreateInputStream" << path << std::endl; auto arrow_in_ret = local_fs_->OpenInputFile(path); if (!arrow_in_ret.ok()) { throw std::runtime_error("Failed to open input stream: " + @@ -87,7 +82,6 @@ class AVROStreamTest : public TempFileTestBase { int64_t buffer_size_ = 1024; std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; - std::shared_ptr file_io_; std::string temp_filepath_; }; @@ -98,17 +92,15 @@ TEST_F(AVROStreamTest, TestAvroBasicStream) { WriteDataToStream(avro_output_stream, test_data); auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); - { - const uint8_t* data{}; - size_t len{}; - ASSERT_TRUE(avro_input_stream->next(&data, &len)); - EXPECT_EQ(len, test_data.size()); - EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); - EXPECT_EQ(std::string(reinterpret_cast(data), len), test_data); - std::cout << std::string(reinterpret_cast(data), len) << std::endl; - ASSERT_FALSE(avro_input_stream->next(&data, &len)); - } + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size()); + + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); + EXPECT_EQ(std::string(reinterpret_cast(data), len), test_data); + ASSERT_FALSE(avro_input_stream->next(&data, &len)); } TEST_F(AVROStreamTest, InputStreamBackup) { @@ -137,7 +129,7 @@ TEST_F(AVROStreamTest, InputStreamBackup) { ASSERT_TRUE(avro_input_stream->next(&data, &len)); EXPECT_EQ(len, backupSize); EXPECT_EQ(std::string(reinterpret_cast(data), len), - test_data.substr(test_data.size() - backupSize)); // NOLINT + test_data.substr(test_data.size() - backupSize)); // Check that we've reached the end of the stream ASSERT_FALSE(avro_input_stream->next(&data, &len)); @@ -165,7 +157,7 @@ TEST_F(AVROStreamTest, InputStreamSkip) { ASSERT_TRUE(avro_input_stream->next(&data, &len)); EXPECT_EQ(len, test_data.size() - skipSize); EXPECT_EQ(std::string(reinterpret_cast(data), len), - test_data.substr(skipSize)); // NOLINT + test_data.substr(skipSize)); // Check that we've reached the end of the stream ASSERT_FALSE(avro_input_stream->next(&data, &len)); @@ -193,7 +185,7 @@ TEST_F(AVROStreamTest, InputStreamSeek) { ASSERT_TRUE(avro_input_stream->next(&data, &len)); EXPECT_EQ(len, test_data.size() - seekPos); EXPECT_EQ(std::string(reinterpret_cast(data), len), - test_data.substr(seekPos)); // NOLINT + test_data.substr(seekPos)); // Check that we've reached the end of the stream ASSERT_FALSE(avro_input_stream->next(&data, &len)); From 111bf4ac85e5e71ec6aa37091003e0ddefe2a2ef Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 21 May 2025 16:52:57 +0800 Subject: [PATCH 6/6] fix next() return value --- src/iceberg/avro/avro_stream_internal.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/iceberg/avro/avro_stream_internal.cc b/src/iceberg/avro/avro_stream_internal.cc index 90f12408e..0ec603bd0 100644 --- a/src/iceberg/avro/avro_stream_internal.cc +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -47,11 +47,10 @@ bool AvroInputStream::next(const uint8_t** data, size_t* len) { // Read from the input stream when the buffer is empty auto result = input_stream_->Read(buffer_.size(), buffer_.data()); - if (!result.ok()) { - throw IcebergError( - std::format("Read failed:{} at pos:{}", result.status().ToString(), buffer_pos_)); - } - if (result.ValueUnsafe() <= 0) { + // TODO(xiao.dong) Avro interface requires to return false if an error has occurred or + // reach EOF, so error message can not be raised to the caller, add some log after we + // have a logging system + if (!result.ok() || result.ValueUnsafe() <= 0) { return false; } available_bytes_ = result.ValueUnsafe();