Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)

if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
avro/demo_avro.cc avro/avro_schema_util.cc)
set(ICEBERG_BUNDLE_SOURCES
arrow/demo_arrow.cc
arrow/arrow_fs_file_io.cc
avro/demo_avro.cc
avro/avro_schema_util.cc
avro/avro_stream.cpp)

# Libraries to link with exported libiceberg_bundle.{so,a}.
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Expand Down
148 changes: 148 additions & 0 deletions src/iceberg/avro/avro_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "avro_stream.h"

#include <format>

#include <arrow/result.h>
#include <iceberg/exception.h>

namespace iceberg::avro {

AvroInputStream::AvroInputStream(
std::shared_ptr<arrow::io::RandomAccessFile> input_stream, int64_t buffer_size)
: input_stream_(std::move(input_stream)),
buffer_size_(buffer_size),
buffer_(buffer_size) {}

AvroInputStream::~AvroInputStream() = default;

bool AvroInputStream::next(const uint8_t** data, size_t* len) {
// Return all unconsumed data in the buffer
if (buffer_pos_ < available_bytes_) {
*data = buffer_.data() + buffer_pos_;
*len = available_bytes_ - buffer_pos_;
byte_count_ += available_bytes_ - buffer_pos_;
buffer_pos_ = available_bytes_;
return true;
}

// Read from the input stream when the buffer is empty
auto result = input_stream_->Read(buffer_.size(), buffer_.data());
if (!result.ok()) {
throw IcebergError(
std::format("Read failed:{} at pos:{}", result.status().ToString(), buffer_pos_));
}
if (result.ValueUnsafe() <= 0) {
return false;
}
available_bytes_ = result.ValueUnsafe();
buffer_pos_ = 0;

// Return the whole buffer
*data = buffer_.data();
*len = available_bytes_;
byte_count_ += available_bytes_;
buffer_pos_ = available_bytes_;

return true;
}

void AvroInputStream::backup(size_t len) {
if (len > buffer_pos_) {
throw IcebergError(
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
}

buffer_pos_ -= len;
byte_count_ -= len;
}

void AvroInputStream::skip(size_t len) {
// The range to skip is within the buffer
if (buffer_pos_ + len <= available_bytes_) {
buffer_pos_ += len;
byte_count_ += len;
return;
}

seek(byte_count_ + len);
}

size_t AvroInputStream::byteCount() const { return byte_count_; }

void AvroInputStream::seek(int64_t position) {
auto status = input_stream_->Seek(position);
if (!status.ok()) {
throw IcebergError(
std::format("Failed to seek to {}, got {}", position, status.ToString()));
}

buffer_pos_ = 0;
available_bytes_ = 0;
byte_count_ = position;
}

AvroOutputStream::AvroOutputStream(std::shared_ptr<arrow::io::OutputStream> output_stream,
int64_t buffer_size)
: output_stream_(std::move(output_stream)),
buffer_size_(buffer_size),
buffer_(buffer_size) {}

AvroOutputStream::~AvroOutputStream() = default;

bool AvroOutputStream::next(uint8_t** data, size_t* len) {
if (buffer_pos_ > 0) {
flush();
}

*data = buffer_.data();
*len = buffer_.size();
buffer_pos_ = buffer_.size(); // Assume all will be used until backup is called

return true;
}

void AvroOutputStream::backup(size_t len) {
if (len > buffer_pos_) {
throw IcebergError(
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
}
buffer_pos_ -= len;
}

uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_pos_; }

void AvroOutputStream::flush() {
if (buffer_pos_ > 0) {
auto status = output_stream_->Write(buffer_.data(), buffer_pos_);
if (!status.ok()) {
throw IcebergError(std::format("Write failed {}", status.ToString()));
}
flushed_bytes_ += buffer_pos_;
buffer_pos_ = 0;
}
auto status = output_stream_->Flush();
if (!status.ok()) {
throw IcebergError(std::format("Flush failed {}", status.ToString()));
}
}

} // namespace iceberg::avro
118 changes: 118 additions & 0 deletions src/iceberg/avro/avro_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <arrow/io/interfaces.h>
#include <avro/Stream.hh>

namespace iceberg::avro {

class AvroInputStream : public ::avro::SeekableInputStream {
public:
explicit AvroInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input_stream,
int64_t buffer_size);

~AvroInputStream() override;

/**
* Returns some of available data.
*
* Returns true if some data is available, false if no more data is
* available or an error has occurred.
*/
bool next(const uint8_t** data, size_t* len) override;

/**
* "Returns" back some of the data to the stream. The returned
* data must be less than what was obtained in the last call to
* next().
*/
void backup(size_t len) override;

/**
* Skips number of bytes specified by len.
*/
void skip(size_t len) override;

/**
* Returns the number of bytes read from this stream so far.
* All the bytes made available through next are considered
* to be used unless, returned back using backup.
*/
size_t byteCount() const override;

/**
* Seek to a specific position in the stream. This may invalidate pointers
* returned from next(). This will also reset byteCount() to the given
* position.
*/
void seek(int64_t position) override;

private:
std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_;
const int64_t buffer_size_;
std::vector<uint8_t> buffer_;
size_t byte_count_ = 0; // bytes read from the input stream
size_t buffer_pos_ = 0; // next position to read in the buffer
size_t available_bytes_ = 0; // bytes available in the buffer
};

class AvroOutputStream : public ::avro::OutputStream {
public:
explicit AvroOutputStream(std::shared_ptr<::arrow::io::OutputStream> output_stream,
int64_t buffer_size);

~AvroOutputStream() override;

/**
* Returns a buffer that can be written into.
* On successful return, data has the pointer to the buffer
* and len has the number of bytes available at data.
*/
bool next(uint8_t** data, size_t* len) override;

/**
* "Returns" back to the stream some of the buffer obtained
* from in the last call to next().
*/
void backup(size_t len) override;

/**
* Number of bytes written so far into this stream. The whole buffer
* returned by next() is assumed to be written unless some of
* it was returned using backup().
*/
uint64_t byteCount() const override;

/**
* Flushes any data remaining in the buffer to the stream's underlying
* store, if any.
*/
void flush() override;

private:
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
const int64_t buffer_size_;
std::vector<uint8_t> buffer_;
size_t buffer_pos_ = 0; // position in the buffer
uint64_t flushed_bytes_ = 0; // bytes flushed to the output stream
};

} // namespace iceberg::avro
57 changes: 55 additions & 2 deletions test/avro_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,35 @@
* under the License.
*/

#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/io/interfaces.h>
#include <arrow/result.h>
#include <gtest/gtest.h>
#include <iceberg/arrow/arrow_fs_file_io.h>
#include <iceberg/avro/avro_stream.h>
#include <iceberg/avro/demo_avro.h>
#include <iceberg/file_reader.h>

#include "matchers.h"
#include "temp_file_test_base.h"

namespace iceberg::avro {

TEST(AVROTest, TestDemoAvro) {
class AVROTest : public TempFileTestBase {
public:
void SetUp() override {
TempFileTestBase::SetUp();
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(
std::make_shared<::arrow::fs::LocalFileSystem>());
temp_filepath_ = CreateNewTempFilePath();
}

std::shared_ptr<iceberg::FileIO> file_io_;
std::string temp_filepath_;
};

TEST_F(AVROTest, TestDemoAvro) {
std::string expected =
"{\n\
\"type\": \"record\",\n\
Expand All @@ -44,7 +64,40 @@ TEST(AVROTest, TestDemoAvro) {
EXPECT_EQ(avro.print(), expected);
}

TEST(AVROTest, TestDemoAvroReader) {
TEST_F(AVROTest, TestAvroBasicStream) {
auto fs = std::make_shared<::arrow::fs::LocalFileSystem>();
std::cout << temp_filepath_ << std::endl;
auto arrow_out_ret = fs->OpenOutputStream(temp_filepath_);
ASSERT_TRUE(arrow_out_ret.ok());
auto avro_output_stream =
std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()), 1024);
std::string test_data = "test data";
{
uint8_t* buf;
size_t buf_size;
ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size));
std::memcpy(buf, test_data.data(), test_data.size());
avro_output_stream->backup(1024 - test_data.size());
avro_output_stream->flush();
}

auto arrow_in_ret = fs->OpenInputFile(temp_filepath_);
ASSERT_TRUE(arrow_in_ret.ok());
auto avro_input_stream =
std::make_shared<AvroInputStream>(std::move(arrow_in_ret.ValueUnsafe()), 1024);
{
const uint8_t* data{};
size_t len{};
ASSERT_TRUE(avro_input_stream->next(&data, &len));
EXPECT_EQ(len, test_data.size());

EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len), test_data);
std::cout << std::string(reinterpret_cast<const char*>(data), len) << std::endl;
ASSERT_FALSE(avro_input_stream->next(&data, &len));
}
}
TEST_F(AVROTest, TestDemoAvroReader) {
auto result = ReaderFactoryRegistry::Create(FileFormatType::kAvro, {});
ASSERT_THAT(result, IsOk());

Expand Down
Loading