Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg)

if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc
avro/demo_avro.cc avro/avro_schema_util.cc)
set(ICEBERG_BUNDLE_SOURCES
arrow/demo_arrow.cc
arrow/arrow_fs_file_io.cc
avro/demo_avro.cc
avro/avro_schema_util.cc
avro/avro_stream.cc)

# Libraries to link with exported libiceberg_bundle.{so,a}.
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)
Expand Down
148 changes: 148 additions & 0 deletions src/iceberg/avro/avro_stream.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "avro_stream.h"

#include <format>

#include <arrow/result.h>
#include <iceberg/exception.h>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include <iceberg/exception.h>
#include "iceberg/exception.h"

For iceberg headers, it's better to use double quote style since it's in the current working directory.


namespace iceberg::avro {

AvroInputStream::AvroInputStream(
std::shared_ptr<arrow::io::RandomAccessFile> input_stream, int64_t buffer_size)
: input_stream_(std::move(input_stream)),
buffer_size_(buffer_size),
buffer_(buffer_size) {}

AvroInputStream::~AvroInputStream() = default;

bool AvroInputStream::next(const uint8_t** data, size_t* len) {
// Return all unconsumed data in the buffer
if (buffer_pos_ < available_bytes_) {
*data = buffer_.data() + buffer_pos_;
*len = available_bytes_ - buffer_pos_;
byte_count_ += available_bytes_ - buffer_pos_;
buffer_pos_ = available_bytes_;
return true;
}

// Read from the input stream when the buffer is empty
auto result = input_stream_->Read(buffer_.size(), buffer_.data());
if (!result.ok()) {
throw IcebergError(
std::format("Read failed:{} at pos:{}", result.status().ToString(), buffer_pos_));
}
if (result.ValueUnsafe() <= 0) {
return false;
}
available_bytes_ = result.ValueUnsafe();
buffer_pos_ = 0;

// Return the whole buffer
*data = buffer_.data();
*len = available_bytes_;
byte_count_ += available_bytes_;
buffer_pos_ = available_bytes_;

return true;
}

void AvroInputStream::backup(size_t len) {
if (len > buffer_pos_) {
throw IcebergError(
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
}

buffer_pos_ -= len;
byte_count_ -= len;
}

void AvroInputStream::skip(size_t len) {
// The range to skip is within the buffer
if (buffer_pos_ + len <= available_bytes_) {
buffer_pos_ += len;
byte_count_ += len;
return;
}

seek(byte_count_ + len);
}

size_t AvroInputStream::byteCount() const { return byte_count_; }

void AvroInputStream::seek(int64_t position) {
auto status = input_stream_->Seek(position);
if (!status.ok()) {
throw IcebergError(
std::format("Failed to seek to {}, got {}", position, status.ToString()));
}

buffer_pos_ = 0;
available_bytes_ = 0;
byte_count_ = position;
}

AvroOutputStream::AvroOutputStream(std::shared_ptr<arrow::io::OutputStream> output_stream,
int64_t buffer_size)
: output_stream_(std::move(output_stream)),
buffer_size_(buffer_size),
buffer_(buffer_size) {}

AvroOutputStream::~AvroOutputStream() = default;

bool AvroOutputStream::next(uint8_t** data, size_t* len) {
if (buffer_pos_ > 0) {
flush();
}

*data = buffer_.data();
*len = buffer_.size();
buffer_pos_ = buffer_.size(); // Assume all will be used until backup is called

return true;
}

void AvroOutputStream::backup(size_t len) {
if (len > buffer_pos_) {
throw IcebergError(
std::format("Cannot backup {} bytes, only {} bytes available", len, buffer_pos_));
}
buffer_pos_ -= len;
}

uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_pos_; }

void AvroOutputStream::flush() {
if (buffer_pos_ > 0) {
auto status = output_stream_->Write(buffer_.data(), buffer_pos_);
if (!status.ok()) {
throw IcebergError(std::format("Write failed {}", status.ToString()));
}
flushed_bytes_ += buffer_pos_;
buffer_pos_ = 0;
}
auto status = output_stream_->Flush();
if (!status.ok()) {
throw IcebergError(std::format("Flush failed {}", status.ToString()));
}
}

} // namespace iceberg::avro
98 changes: 98 additions & 0 deletions src/iceberg/avro/avro_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <arrow/io/interfaces.h>
#include <avro/Stream.hh>

namespace iceberg::avro {

class AvroInputStream : public ::avro::SeekableInputStream {
public:
explicit AvroInputStream(std::shared_ptr<::arrow::io::RandomAccessFile> input_stream,
int64_t buffer_size);

~AvroInputStream() override;

/// \brief Returns some of available data.
/// \return true if some data is available, false if no more data is available or an
/// error has occurred.
bool next(const uint8_t** data, size_t* len) override;

/// \brief "Returns" back some of the data to the stream. The returned data must be less
/// than what was obtained in the last call to next().
void backup(size_t len) override;

/// \brief Skips number of bytes specified by len.
void skip(size_t len) override;

/// \brief Returns the number of bytes read from this stream so far.
/// All the bytes made available through next are considered to be used unless,
/// returned back using backup.
size_t byteCount() const override;

/// \brief Seek to a specific position in the stream. This may invalidate pointers
/// returned from next(). This will also reset byteCount() to the given
/// position.
void seek(int64_t position) override;

private:
std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_;
const int64_t buffer_size_;
std::vector<uint8_t> buffer_;
size_t byte_count_ = 0; // bytes read from the input stream
size_t buffer_pos_ = 0; // next position to read in the buffer
size_t available_bytes_ = 0; // bytes available in the buffer
};

class AvroOutputStream : public ::avro::OutputStream {
public:
explicit AvroOutputStream(std::shared_ptr<::arrow::io::OutputStream> output_stream,
int64_t buffer_size);

~AvroOutputStream() override;

/// \brief Returns a buffer that can be written into.
/// On successful return, data has the pointer to the buffer
/// and len has the number of bytes available at data.
bool next(uint8_t** data, size_t* len) override;

/// \brief "Returns" back to the stream some of the buffer obtained
/// from in the last call to next().
void backup(size_t len) override;

/// \brief Number of bytes written so far into this stream. The whole buffer
/// returned by next() is assumed to be written unless some of
/// it was returned using backup().
uint64_t byteCount() const override;

/// \brief Flushes any data remaining in the buffer to the stream's underlying
/// store, if any.
void flush() override;

private:
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
const int64_t buffer_size_;
std::vector<uint8_t> buffer_;
size_t buffer_pos_ = 0; // position in the buffer
uint64_t flushed_bytes_ = 0; // bytes flushed to the output stream
};

} // namespace iceberg::avro
2 changes: 1 addition & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ add_test(NAME util_test COMMAND util_test)

if(ICEBERG_BUILD_BUNDLE)
add_executable(avro_test)
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc)
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc)
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
add_test(NAME avro_test COMMAND avro_test)
Expand Down
Loading
Loading