Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/sparrow/arrow_interface/arrow_array_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#pragma once


#include "sparrow/arrow_interface/arrow_array_stream/private_data.hpp"
#include "sparrow/c_interface.hpp"
#include "sparrow/c_stream_interface.hpp"
Expand Down Expand Up @@ -128,4 +127,7 @@ namespace sparrow
SPARROW_API const char* get_last_error_from_arrow_array_stream(ArrowArrayStream* stream);

SPARROW_API void fill_arrow_array_stream(ArrowArrayStream& stream);
}

SPARROW_API ArrowArrayStream move_array_stream(ArrowArrayStream&& source);
SPARROW_API ArrowArrayStream move_array_stream(ArrowArrayStream& source);
}
Comment on lines +131 to +133
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The move_array_stream functions lack documentation. These functions appear to implement move semantics for ArrowArrayStream (a C struct without native move constructor). Consider adding documentation that explains:

  • What the function does (transfers ownership of the stream's resources)
  • The state of the source after the move (all pointers zeroed)
  • When this should be used instead of regular assignment
  • Any preconditions or postconditions

Example:

/**
 * @brief Move an ArrowArrayStream by transferring ownership of its resources.
 *
 * Performs a shallow copy of the stream structure and zeros out the source.
 * After this operation, the source stream is left in a released-like state
 * with all function pointers and data pointers set to null.
 *
 * @param source The source stream to move from.
 * @return A new ArrowArrayStream with ownership of the source's resources.
 *
 * @post source will have all pointers set to nullptr/null
 */
Suggested change
SPARROW_API ArrowArrayStream move_array_stream(ArrowArrayStream&& source);
SPARROW_API ArrowArrayStream move_array_stream(ArrowArrayStream& source);
}
/**
* @brief Move an ArrowArrayStream by transferring ownership of its resources.
*
* Performs a shallow copy of the stream structure and zeros out the source.
* After this operation, the source stream is left in a released-like state
* with all function pointers and data pointers set to nullptr or zero.
*
* This should be used when you need to transfer ownership of the stream's resources,
* rather than copying. Regular assignment does not transfer ownership and may lead to
* double-free or resource leaks.
*
* @param source The source stream to move from (rvalue reference).
* Must not be released or nullptr.
* @return A new ArrowArrayStream with ownership of the source's resources.
*
* @post source will have all pointers set to nullptr/null/zero.
*
* @see https://arrow.apache.org/docs/format/CStreamInterface.html
*/
SPARROW_API ArrowArrayStream move_array_stream(ArrowArrayStream&& source);
/**
* @brief Move an ArrowArrayStream by transferring ownership of its resources.
*
* Performs a shallow copy of the stream structure and zeros out the source.
* After this operation, the source stream is left in a released-like state
* with all function pointers and data pointers set to nullptr or zero.
*
* This should be used when you need to transfer ownership of the stream's resources,
* rather than copying. Regular assignment does not transfer ownership and may lead to
* double-free or resource leaks.
*
* @param source The source stream to move from (lvalue reference).
* Must not be released or nullptr.
* @return A new ArrowArrayStream with ownership of the source's resources.
*
* @post source will have all pointers set to nullptr/null/zero.
*
* @see https://arrow.apache.org/docs/format/CStreamInterface.html
*/
SPARROW_API ArrowArrayStream move_array_stream(ArrowArrayStream& source);

Copilot uses AI. Check for mistakes.
21 changes: 11 additions & 10 deletions include/sparrow/arrow_interface/arrow_array_stream_proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ namespace sparrow
arrow_array_stream_proxy(const arrow_array_stream_proxy&) = delete;
arrow_array_stream_proxy& operator=(const arrow_array_stream_proxy&) = delete;

SPARROW_API
arrow_array_stream_proxy(arrow_array_stream_proxy&& other) noexcept;
SPARROW_API
arrow_array_stream_proxy& operator=(arrow_array_stream_proxy&& other) noexcept;

/**
Expand All @@ -111,6 +113,12 @@ namespace sparrow
* released. This ensures proper cleanup of all Arrow C interface objects.
*/
SPARROW_API ~arrow_array_stream_proxy();

/**
* Check whether the proxy has ownership of its internal `ArrowArrayStream`.
*/
SPARROW_API bool owns_stream() const;

/**
* @brief Export the stream pointer.
*
Expand Down Expand Up @@ -143,7 +151,7 @@ namespace sparrow
requires layout<std::ranges::range_value_t<R>>
void push(R&& arrays)
{
arrow_array_stream_private_data& private_data = *get_private_data();
arrow_array_stream_private_data& private_data = get_private_data();

// Check if we need to create schema from first array
if (private_data.schema() == nullptr)
Expand Down Expand Up @@ -234,18 +242,11 @@ namespace sparrow
*/
void throw_if_immutable() const;

/**
* @brief Gets the private data (const version).
*
* @return Const pointer to the stream's private data.
*/
[[nodiscard]] SPARROW_API const arrow_array_stream_private_data* get_private_data() const;

/**
* @brief Gets the private data (mutable version).
*
* @return Mutable pointer to the stream's private data.
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states "Mutable pointer to the stream's private data" but the return type was changed from pointer to reference. Update the documentation to say "Mutable reference to the stream's private data".

Suggested change
* @return Mutable pointer to the stream's private data.
* @return Mutable reference to the stream's private data.

Copilot uses AI. Check for mistakes.
*/
[[nodiscard]] SPARROW_API arrow_array_stream_private_data* get_private_data();
[[nodiscard]] SPARROW_API arrow_array_stream_private_data& get_private_data();
};
}
}
14 changes: 13 additions & 1 deletion src/arrow_interface/arrow_array_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,16 @@ namespace sparrow
stream.release = &release_arrow_array_stream;
stream.private_data = new arrow_array_stream_private_data();
}
}

ArrowArrayStream move_array_stream(ArrowArrayStream&& source)
{
ArrowArrayStream target = source;
source = {};
return target;
}

ArrowArrayStream move_array_stream(ArrowArrayStream& source)
{
return move_array_stream(std::move(source));
}
}
24 changes: 14 additions & 10 deletions src/arrow_interface/arrow_array_stream_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace sparrow
}

arrow_array_stream_proxy::arrow_array_stream_proxy(ArrowArrayStream&& stream)
: m_stream(stream)
: m_stream(move_array_stream(stream))
{
}

Expand Down Expand Up @@ -89,6 +89,11 @@ namespace sparrow
}
}

bool arrow_array_stream_proxy::owns_stream() const
{
return std::holds_alternative<ArrowArrayStream>(m_stream);
}

ArrowArrayStream* arrow_array_stream_proxy::get_stream_ptr()
{
if (std::holds_alternative<ArrowArrayStream*>(m_stream))
Expand All @@ -113,16 +118,10 @@ namespace sparrow
}
}

const arrow_array_stream_private_data* arrow_array_stream_proxy::get_private_data() const
{
throw_if_immutable();
return static_cast<const arrow_array_stream_private_data*>(get_stream_ptr()->private_data);
}

arrow_array_stream_private_data* arrow_array_stream_proxy::get_private_data()
arrow_array_stream_private_data& arrow_array_stream_proxy::get_private_data()
{
throw_if_immutable();
return static_cast<arrow_array_stream_private_data*>(get_stream_ptr()->private_data);
return *static_cast<arrow_array_stream_private_data*>(get_stream_ptr()->private_data);
}

ArrowArrayStream* arrow_array_stream_proxy::export_stream()
Expand All @@ -145,6 +144,11 @@ namespace sparrow
std::optional<array> arrow_array_stream_proxy::pop()
{
ArrowArrayStream* stream = get_stream_ptr();
if (!stream)
{
throw std::runtime_error("ArrowArrayStream pointer is null");
}

ArrowArray array{};
if (int err = stream->get_next(stream, &array); err != 0)
{
Expand Down Expand Up @@ -190,4 +194,4 @@ namespace sparrow
throw std::runtime_error("ArrowArrayStream private data is not initialized");
}
}
}
}
194 changes: 153 additions & 41 deletions test/test_arrow_array_stream_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,127 @@ namespace sparrow

TEST_SUITE("arrow_array_stream_proxy")
{
TEST_CASE("constructor - default")
TEST_CASE("constructor")
{
arrow_array_stream_proxy proxy;
ArrowArrayStream* aas = proxy.export_stream();
REQUIRE_NE(aas, nullptr);
aas->release(aas);
delete aas;
SUBCASE("default")
{
arrow_array_stream_proxy proxy;
ArrowArrayStream* aas = proxy.export_stream();
REQUIRE_NE(aas, nullptr);
aas->release(aas);
delete aas;
}

SUBCASE("pointer")
{
ArrowArrayStream* stream = new ArrowArrayStream;
fill_arrow_array_stream(*stream);
arrow_array_stream_proxy proxy(stream);
ArrowArrayStream* aas = proxy.export_stream();
REQUIRE_NE(aas, nullptr);
aas->release(aas);
delete aas;
}

SUBCASE("move")
{
ArrowArrayStream stream{};
fill_arrow_array_stream(stream);
arrow_array_stream_proxy proxy(std::move(stream));
REQUIRE_EQ(stream.private_data, nullptr);
REQUIRE_EQ(stream.release, nullptr);
ArrowArrayStream* aas = proxy.export_stream();
REQUIRE_NE(aas, nullptr);
aas->release(aas);
delete aas;
}
}

TEST_CASE("constructor - from existing stream")
TEST_CASE("move semantics")
{
ArrowArrayStream* stream = new ArrowArrayStream;
fill_arrow_array_stream(*stream);
arrow_array_stream_proxy proxy(stream);
ArrowArrayStream* aas = proxy.export_stream();
REQUIRE_NE(aas, nullptr);
aas->release(aas);
delete aas;
SUBCASE("move constructor")
{
{
ArrowArrayStream stream{};
fill_arrow_array_stream(stream);
arrow_array_stream_proxy src(std::move(stream));
arrow_array_stream_proxy dst(std::move(src));

auto* src_str = src.export_stream();
auto* dst_str = dst.export_stream();
REQUIRE_EQ(src_str->release, nullptr);
REQUIRE_NE(dst_str->release, nullptr);
delete src_str;
delete dst_str;
}
Comment on lines +117 to +129
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test has potential for memory leaks if assertions fail. The src_str and dst_str pointers are allocated by export_stream() but may not be deleted if a REQUIRE_EQ or REQUIRE_NE assertion fails before reaching the delete statements. Consider using RAII (e.g., std::unique_ptr) to ensure proper cleanup even when tests fail.

Example:

std::unique_ptr<ArrowArrayStream> src_str(src.export_stream());
std::unique_ptr<ArrowArrayStream> dst_str(dst.export_stream());
REQUIRE_EQ(src_str->release, nullptr);
REQUIRE_NE(dst_str->release, nullptr);

Copilot uses AI. Check for mistakes.

{
ArrowArrayStream stream{};
fill_arrow_array_stream(stream);
arrow_array_stream_proxy src(&stream);
arrow_array_stream_proxy dst(std::move(src));

auto* src_str = src.export_stream();
auto* dst_str = dst.export_stream();
REQUIRE_EQ(src_str, nullptr);
REQUIRE_NE(dst_str, nullptr);
}
Comment on lines +131 to +141
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case has a memory leak. The stack-allocated stream is filled with fill_arrow_array_stream(stream) which allocates private_data using new. The stream is passed by pointer to the proxy (not owned), and when the test ends, the stack-allocated stream goes out of scope without calling its release function to free the private_data.

You should add cleanup code before the closing brace:

if (stream.release != nullptr)
{
    stream.release(&stream);
}

Copilot uses AI. Check for mistakes.
}

SUBCASE("move assignment")
{
{
auto test_array = make_test_primitive_array<int32_t>(10);
ArrowArrayStream stream;
fill_arrow_array_stream(stream);
arrow_array_stream_proxy src(std::move(stream));
src.push(std::move(test_array));

ArrowArrayStream stream2;
fill_arrow_array_stream(stream2);
arrow_array_stream_proxy dst(std::move(stream2));
dst = std::move(src);

auto* src_str = src.export_stream();
REQUIRE_EQ(src_str->release, nullptr);
auto dst_arr = dst.pop();
REQUIRE(dst_arr.has_value());
delete src_str;
}

Comment on lines +147 to +164
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the previous test case, this has potential for memory leaks if assertions fail. The src_str pointer allocated on line 159 may not be deleted if REQUIRE_EQ on line 160 fails. Consider using RAII (e.g., std::unique_ptr) to ensure proper cleanup:

std::unique_ptr<ArrowArrayStream> src_str(src.export_stream());
REQUIRE_EQ(src_str->release, nullptr);

Copilot uses AI. Check for mistakes.
{
auto test_array = make_test_primitive_array<int32_t>(10);
ArrowArrayStream stream;
fill_arrow_array_stream(stream);
arrow_array_stream_proxy src(&stream);
src.push(std::move(test_array));

ArrowArrayStream stream2;
fill_arrow_array_stream(stream2);
arrow_array_stream_proxy dst(&stream2);
dst = std::move(src);

auto* src_str = src.export_stream();
REQUIRE_EQ(src_str, nullptr);
auto dst_arr = dst.pop();
REQUIRE(dst_arr.has_value());
}
}
Comment on lines +166 to +182
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case has memory leaks. Both stream and stream2 are stack-allocated and filled with fill_arrow_array_stream() which allocates private_data. When the test ends, these streams go out of scope without calling their release functions to free the private_data.

You should add cleanup code before the closing brace:

if (stream.release != nullptr)
{
    stream.release(&stream);
}
if (stream2.release != nullptr)
{
    stream2.release(&stream2);
}

Copilot uses AI. Check for mistakes.
}

TEST_CASE("owns_stream")
{
{
arrow_array_stream_proxy proxy{};
REQUIRE(proxy.owns_stream());
}

{
ArrowArrayStream stream{};
fill_arrow_array_stream(stream);
arrow_array_stream_proxy proxy(&stream);
REQUIRE(!proxy.owns_stream());
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case has a memory leak. The stack-allocated stream is filled with fill_arrow_array_stream(stream) which allocates private_data. The stream is passed by pointer to the proxy (not owned), and when the test ends, the stack-allocated stream goes out of scope without calling its release function to free the private_data.

You should add cleanup code before the closing brace:

if (stream.release != nullptr)
{
    stream.release(&stream);
}
Suggested change
REQUIRE(!proxy.owns_stream());
REQUIRE(!proxy.owns_stream());
if (stream.release != nullptr)
{
stream.release(&stream);
}

Copilot uses AI. Check for mistakes.
}
}

TEST_CASE("export_stream")
Expand All @@ -110,7 +213,16 @@ namespace sparrow

TEST_CASE("push and pop - single int32 array")
{
SUBCASE("single array")
SUBCASE("empty stream")
{
ArrowArrayStream stream{};
fill_arrow_array_stream(stream);
arrow_array_stream_proxy src(&stream);
arrow_array_stream_proxy dst(std::move(src));
CHECK_THROWS_AS(src.pop(), std::runtime_error);
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case has a memory leak. The stack-allocated stream is filled with fill_arrow_array_stream(stream) which allocates private_data. After being passed to src by pointer and then moved to dst, the original stream goes out of scope without calling its release function to free the private_data.

You should add cleanup code before the closing brace:

if (stream.release != nullptr)
{
    stream.release(&stream);
}
Suggested change
CHECK_THROWS_AS(src.pop(), std::runtime_error);
CHECK_THROWS_AS(src.pop(), std::runtime_error);
if (stream.release != nullptr)
{
stream.release(&stream);
}

Copilot uses AI. Check for mistakes.
}

SUBCASE("single int32 array")
{
arrow_array_stream_proxy proxy;
auto test_array = make_test_primitive_array<int32_t>(10);
Expand All @@ -119,35 +231,35 @@ namespace sparrow
REQUIRE(result.has_value());
CHECK_EQ(result->size(), 10);
}
}

TEST_CASE("push and pop - multiple arrays")
{
arrow_array_stream_proxy proxy;

// Create and push multiple arrays (schema created from first array)
std::vector<primitive_array<int32_t>> arrays;
arrays.push_back(make_test_primitive_array<int32_t>(5, 0));
arrays.push_back(make_test_primitive_array<int32_t>(7, 10));
arrays.push_back(make_test_primitive_array<int32_t>(3, 20));

for (auto& arr : arrays)
SUBCASE("multiple arrays")
{
proxy.push(std::move(arr));
}

// Pop all arrays
const auto result1 = proxy.pop();
REQUIRE(result1.has_value());
CHECK_EQ(result1->size(), 5);

auto result2 = proxy.pop();
REQUIRE(result2.has_value());
CHECK_EQ(result2->size(), 7);
arrow_array_stream_proxy proxy;

const auto result3 = proxy.pop();
REQUIRE(result3.has_value());
CHECK_EQ(result3->size(), 3);
// Create and push multiple arrays (schema created from first array)
std::vector<primitive_array<int32_t>> arrays;
arrays.push_back(make_test_primitive_array<int32_t>(5, 0));
arrays.push_back(make_test_primitive_array<int32_t>(7, 10));
arrays.push_back(make_test_primitive_array<int32_t>(3, 20));

for (auto& arr : arrays)
{
proxy.push(std::move(arr));
}

// Pop all arrays
const auto result1 = proxy.pop();
REQUIRE(result1.has_value());
CHECK_EQ(result1->size(), 5);

auto result2 = proxy.pop();
REQUIRE(result2.has_value());
CHECK_EQ(result2->size(), 7);

const auto result3 = proxy.pop();
REQUIRE(result3.has_value());
CHECK_EQ(result3->size(), 3);
}
}

TEST_CASE("end of stream")
Expand Down
Loading