diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0dfb57790..776001c8d 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -79,8 +79,12 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) if(ICEBERG_BUILD_BUNDLE) - set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc - avro/demo_avro.cc avro/avro_schema_util.cc) + set(ICEBERG_BUNDLE_SOURCES + arrow/demo_arrow.cc + arrow/arrow_fs_file_io.cc + avro/demo_avro.cc + avro/avro_schema_util.cc + avro/avro_stream_internal.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/avro/avro_stream_internal.cc b/src/iceberg/avro/avro_stream_internal.cc new file mode 100644 index 000000000..0ec603bd0 --- /dev/null +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "avro_stream_internal.h" + +#include + +#include + +#include "iceberg/exception.h" + +namespace iceberg::avro { + +AvroInputStream::AvroInputStream( + std::shared_ptr input_stream, int64_t buffer_size) + : input_stream_(std::move(input_stream)), + buffer_size_(buffer_size), + buffer_(buffer_size) {} + +AvroInputStream::~AvroInputStream() = default; + +bool AvroInputStream::next(const uint8_t** data, size_t* len) { + // Return all unconsumed data in the buffer + if (buffer_pos_ < available_bytes_) { + *data = buffer_.data() + buffer_pos_; + *len = available_bytes_ - buffer_pos_; + byte_count_ += available_bytes_ - buffer_pos_; + buffer_pos_ = available_bytes_; + return true; + } + + // Read from the input stream when the buffer is empty + auto result = input_stream_->Read(buffer_.size(), buffer_.data()); + // TODO(xiao.dong) Avro interface requires to return false if an error has occurred or + // reach EOF, so error message can not be raised to the caller, add some log after we + // have a logging system + if (!result.ok() || result.ValueUnsafe() <= 0) { + return false; + } + available_bytes_ = result.ValueUnsafe(); + buffer_pos_ = 0; + + // Return the whole buffer + *data = buffer_.data(); + *len = available_bytes_; + byte_count_ += available_bytes_; + buffer_pos_ = available_bytes_; + + return true; +} + +void AvroInputStream::backup(size_t len) { + if (len > buffer_pos_) { + throw IcebergError( + std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_)); + } + + buffer_pos_ -= len; + byte_count_ -= len; +} + +void AvroInputStream::skip(size_t len) { + // The range to skip is within the buffer + if (buffer_pos_ + len <= available_bytes_) { + buffer_pos_ += len; + byte_count_ += len; + return; + } + + seek(byte_count_ + len); +} + +size_t AvroInputStream::byteCount() const { return byte_count_; } + +void AvroInputStream::seek(int64_t position) { + auto status = input_stream_->Seek(position); + if (!status.ok()) { + throw IcebergError( + std::format("Failed to seek to {}, got {}", position, status.ToString())); + } + + buffer_pos_ = 0; + available_bytes_ = 0; + byte_count_ = position; +} + +AvroOutputStream::AvroOutputStream(std::shared_ptr output_stream, + int64_t buffer_size) + : output_stream_(std::move(output_stream)), + buffer_size_(buffer_size), + buffer_(buffer_size) {} + +AvroOutputStream::~AvroOutputStream() = default; + +bool AvroOutputStream::next(uint8_t** data, size_t* len) { + if (buffer_pos_ > 0) { + flush(); + } + + *data = buffer_.data(); + *len = buffer_.size(); + buffer_pos_ = buffer_.size(); // Assume all will be used until backup is called + + return true; +} + +void AvroOutputStream::backup(size_t len) { + if (len > buffer_pos_) { + throw IcebergError( + std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_)); + } + buffer_pos_ -= len; +} + +uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_pos_; } + +void AvroOutputStream::flush() { + if (buffer_pos_ > 0) { + auto status = output_stream_->Write(buffer_.data(), buffer_pos_); + if (!status.ok()) { + throw IcebergError(std::format("Write failed {}", status.ToString())); + } + flushed_bytes_ += buffer_pos_; + buffer_pos_ = 0; + } + auto status = output_stream_->Flush(); + if (!status.ok()) { + throw IcebergError(std::format("Flush failed {}", status.ToString())); + } +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_stream_internal.h b/src/iceberg/avro/avro_stream_internal.h new file mode 100644 index 000000000..bb75b4d8a --- /dev/null +++ b/src/iceberg/avro/avro_stream_internal.h @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +namespace iceberg::avro { + +class AvroInputStream : public ::avro::SeekableInputStream { + public: + explicit AvroInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input_stream, + int64_t buffer_size); + + ~AvroInputStream() override; + + /// \brief Returns some of available data. + /// \return true if some data is available, false if no more data is available or an + /// error has occurred. + bool next(const uint8_t** data, size_t* len) override; + + /// \brief "Returns" back some of the data to the stream. The returned data must be less + /// than what was obtained in the last call to next(). + void backup(size_t len) override; + + /// \brief Skips number of bytes specified by len. + void skip(size_t len) override; + + /// \brief Returns the number of bytes read from this stream so far. + /// All the bytes made available through next are considered to be used unless, + /// returned back using backup. + size_t byteCount() const override; + + /// \brief Seek to a specific position in the stream. This may invalidate pointers + /// returned from next(). This will also reset byteCount() to the given + /// position. + void seek(int64_t position) override; + + private: + std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_; + const int64_t buffer_size_; + std::vector buffer_; + size_t byte_count_ = 0; // bytes read from the input stream + size_t buffer_pos_ = 0; // next position to read in the buffer + size_t available_bytes_ = 0; // bytes available in the buffer +}; + +class AvroOutputStream : public ::avro::OutputStream { + public: + explicit AvroOutputStream(std::shared_ptr<::arrow::io::OutputStream> output_stream, + int64_t buffer_size); + + ~AvroOutputStream() override; + + /// \brief Returns a buffer that can be written into. + /// On successful return, data has the pointer to the buffer + /// and len has the number of bytes available at data. + bool next(uint8_t** data, size_t* len) override; + + /// \brief "Returns" back to the stream some of the buffer obtained + /// from in the last call to next(). + void backup(size_t len) override; + + /// \brief Number of bytes written so far into this stream. The whole buffer + /// returned by next() is assumed to be written unless some of + /// it was returned using backup(). + uint64_t byteCount() const override; + + /// \brief Flushes any data remaining in the buffer to the stream's underlying + /// store, if any. + void flush() override; + + private: + std::shared_ptr<::arrow::io::OutputStream> output_stream_; + const int64_t buffer_size_; + std::vector buffer_; + size_t buffer_pos_ = 0; // position in the buffer + uint64_t flushed_bytes_ = 0; // bytes flushed to the output stream +}; + +} // namespace iceberg::avro diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0c3776bea..4a88229f5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -65,7 +65,7 @@ add_test(NAME util_test COMMAND util_test) if(ICEBERG_BUILD_BUNDLE) add_executable(avro_test) - target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc) + target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME avro_test COMMAND avro_test) diff --git a/test/avro_stream_test.cc b/test/avro_stream_test.cc new file mode 100644 index 000000000..304e0516e --- /dev/null +++ b/test/avro_stream_test.cc @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include "iceberg/avro/avro_stream_internal.h" +#include "temp_file_test_base.h" + +namespace iceberg::avro { + +class AVROStreamTest : public TempFileTestBase { + public: + void SetUp() override { + TempFileTestBase::SetUp(); + temp_filepath_ = CreateNewTempFilePath(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + } + + std::shared_ptr CreateOutputStream(const std::string& path, + int64_t buffer_size) { + auto arrow_out_ret = local_fs_->OpenOutputStream(path); + if (!arrow_out_ret.ok()) { + throw std::runtime_error("Failed to open output stream: " + + arrow_out_ret.status().message()); + } + return std::make_shared(std::move(arrow_out_ret.ValueUnsafe()), + buffer_size); + } + + std::shared_ptr CreateInputStream(const std::string& path, + int64_t buffer_size) { + auto arrow_in_ret = local_fs_->OpenInputFile(path); + if (!arrow_in_ret.ok()) { + throw std::runtime_error("Failed to open input stream: " + + arrow_in_ret.status().message()); + } + return std::make_shared(std::move(arrow_in_ret.ValueUnsafe()), + buffer_size); + } + + void WriteDataToStream(const std::shared_ptr& avro_output_stream, + const std::string& data) { + uint8_t* buf; + size_t buf_size; + ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size)); + std::memcpy(buf, data.data(), data.size()); + avro_output_stream->backup(1024 - data.size()); + avro_output_stream->flush(); + } + + void ReadDataFromStream(const std::shared_ptr& avro_input_stream, + std::string& data) { + const uint8_t* buf{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&buf, &len)); + data = std::string(reinterpret_cast(buf), len); + } + + void CheckStreamEof(const std::shared_ptr& avro_input_stream) { + const uint8_t* buf{}; + size_t len{}; + ASSERT_FALSE(avro_input_stream->next(&buf, &len)); + } + + int64_t buffer_size_ = 1024; + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::string temp_filepath_; +}; + +TEST_F(AVROStreamTest, TestAvroBasicStream) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size()); + + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); + EXPECT_EQ(std::string(reinterpret_cast(data), len), test_data); + ASSERT_FALSE(avro_input_stream->next(&data, &len)); +} + +TEST_F(AVROStreamTest, InputStreamBackup) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + // Create a test input stream + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + + // Read data + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size()); + + // Backup 10 bytes + const size_t backupSize = 10; + avro_input_stream->backup(backupSize); + + // Check byteCount after backup + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size() - backupSize); + + // Read the backed-up data again + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, backupSize); + EXPECT_EQ(std::string(reinterpret_cast(data), len), + test_data.substr(test_data.size() - backupSize)); + + // Check that we've reached the end of the stream + ASSERT_FALSE(avro_input_stream->next(&data, &len)); +} + +TEST_F(AVROStreamTest, InputStreamSkip) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + // Create a test input stream + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + + // Skip the first 10 bytes + const size_t skipSize = 10; + avro_input_stream->skip(skipSize); + + // Check byteCount after skip + EXPECT_EQ(avro_input_stream->byteCount(), skipSize); + + // Read the remaining data + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size() - skipSize); + EXPECT_EQ(std::string(reinterpret_cast(data), len), + test_data.substr(skipSize)); + + // Check that we've reached the end of the stream + ASSERT_FALSE(avro_input_stream->next(&data, &len)); +} + +TEST_F(AVROStreamTest, InputStreamSeek) { + // Write test data + const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + WriteDataToStream(avro_output_stream, test_data); + + // Create a test input stream + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + + // Seek to position 15 + const int64_t seekPos = 15; + avro_input_stream->seek(seekPos); + + // Check byteCount after seek + EXPECT_EQ(avro_input_stream->byteCount(), static_cast(seekPos)); + + // Read the remaining data + const uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_input_stream->next(&data, &len)); + EXPECT_EQ(len, test_data.size() - seekPos); + EXPECT_EQ(std::string(reinterpret_cast(data), len), + test_data.substr(seekPos)); + + // Check that we've reached the end of the stream + ASSERT_FALSE(avro_input_stream->next(&data, &len)); +} + +TEST_F(AVROStreamTest, OutputStreamBasic) { + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + + // Test next() + uint8_t* data{}; + size_t len{}; + ASSERT_TRUE(avro_output_stream->next(&data, &len)); + EXPECT_EQ(len, static_cast(buffer_size_)); + + // Write some data + const std::string test_data = "Hello, Avro!"; + std::memcpy(data, test_data.data(), test_data.size()); + + // Backup unused bytes + avro_output_stream->backup(len - test_data.size()); + + // Check byteCount + EXPECT_EQ(avro_output_stream->byteCount(), test_data.size()); + + // Flush the data + avro_output_stream->flush(); + + // Verify the written data + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + std::string read_data; + ReadDataFromStream(avro_input_stream, read_data); + EXPECT_EQ(read_data, test_data); + CheckStreamEof(avro_input_stream); +} + +TEST_F(AVROStreamTest, OutputStreamMultipleWrites) { + auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_); + + // Write first chunk + const std::string chunk1 = "First chunk of data. "; + uint8_t* data1{}; + size_t len1{}; + ASSERT_TRUE(avro_output_stream->next(&data1, &len1)); + std::memcpy(data1, chunk1.data(), chunk1.size()); + avro_output_stream->backup(len1 - chunk1.size()); + + // Write second chunk + const std::string chunk2 = "Second chunk of data."; + uint8_t* data2{}; + size_t len2{}; + ASSERT_TRUE(avro_output_stream->next(&data2, &len2)); + std::memcpy(data2, chunk2.data(), chunk2.size()); + avro_output_stream->backup(len2 - chunk2.size()); + + // Check byteCount + EXPECT_EQ(avro_output_stream->byteCount(), chunk1.size() + chunk2.size()); + + // Flush and close + avro_output_stream->flush(); + + // Verify the written data + std::string expectedData = chunk1 + chunk2; + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + std::string read_data; + ReadDataFromStream(avro_input_stream, read_data); + EXPECT_EQ(read_data, expectedData); + CheckStreamEof(avro_input_stream); +} + +TEST_F(AVROStreamTest, InputStreamLargeData) { + // Create a large test data set (larger than the buffer size) + const size_t dataSize = buffer_size_ * 2.5; + std::vector test_data(dataSize); + for (size_t i = 0; i < dataSize; ++i) { + test_data[i] = static_cast(i % 256); + } + + auto arrow_out_ret = local_fs_->OpenOutputStream(temp_filepath_); + if (!arrow_out_ret.ok()) { + throw std::runtime_error("Failed to open output stream: " + + arrow_out_ret.status().message()); + } + auto arrow_out = arrow_out_ret.ValueUnsafe(); + // Write the test data + auto status = arrow_out->Write(test_data.data(), test_data.size()); + ASSERT_TRUE(status.ok()); + status = arrow_out->Flush(); + ASSERT_TRUE(status.ok()); + status = arrow_out->Close(); + ASSERT_TRUE(status.ok()); + + // Create an AvroInputStream with a smaller buffer + const int64_t smallBufferSize = buffer_size_ / 2; + auto avro_input_stream = CreateInputStream(temp_filepath_, smallBufferSize); + + // Read all the data in chunks + std::vector readData; + const uint8_t* data{}; + size_t len{}; + while (avro_input_stream->next(&data, &len)) { + readData.insert(readData.end(), data, data + len); + } + + // Verify all data was read correctly + EXPECT_EQ(readData.size(), test_data.size()); + EXPECT_EQ(std::memcmp(readData.data(), test_data.data(), test_data.size()), 0); + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); +} + +TEST_F(AVROStreamTest, OutputStreamLargeData) { + // Create an AvroOutputStream with a small buffer + const int64_t smallBufferSize = 256; + auto avro_output_stream = CreateOutputStream(temp_filepath_, smallBufferSize); + + // Create a large test data set (larger than the buffer size) + const size_t dataSize = smallBufferSize * 3.5; + std::vector test_data(dataSize); + for (size_t i = 0; i < dataSize; ++i) { + test_data[i] = static_cast(i % 256); + } + + // Write the data in chunks + size_t bytesWritten = 0; + while (bytesWritten < dataSize) { + uint8_t* buffer{}; + size_t bufferSize{}; + ASSERT_TRUE(avro_output_stream->next(&buffer, &bufferSize)); + + size_t bytesToWrite = std::min(bufferSize, dataSize - bytesWritten); + std::memcpy(buffer, test_data.data() + bytesWritten, bytesToWrite); + + if (bytesToWrite < bufferSize) { + avro_output_stream->backup(bufferSize - bytesToWrite); + } + + bytesWritten += bytesToWrite; + } + + // Flush and close + avro_output_stream->flush(); + + // Verify the written data + auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_); + std::vector readData; + const uint8_t* data{}; + size_t len{}; + while (avro_input_stream->next(&data, &len)) { + readData.insert(readData.end(), data, data + len); + } + // Verify all data was read correctly + EXPECT_EQ(readData.size(), test_data.size()); + EXPECT_EQ(std::memcmp(readData.data(), test_data.data(), test_data.size()), 0); + EXPECT_EQ(avro_input_stream->byteCount(), test_data.size()); +} + +} // namespace iceberg::avro