Skip to content

Commit a5d9846

Browse files
authored
Handle lz4 compression (#30)
1 parent dae8209 commit a5d9846

31 files changed

+970
-221
lines changed

CMakeLists.txt

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ set(SPARROW_IPC_HEADERS
108108
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_serializer.hpp
109109
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
110110
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
111+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp
111112
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp
112113
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
113114
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp
114115
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
115-
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
116116
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
117117
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
118118
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp
@@ -132,6 +132,7 @@ set(SPARROW_IPC_SRC
132132
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp
133133
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp
134134
${SPARROW_IPC_SOURCE_DIR}/chunk_memory_serializer.cpp
135+
${SPARROW_IPC_SOURCE_DIR}/compression.cpp
135136
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp
136137
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
137138
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp
@@ -253,6 +254,8 @@ target_link_libraries(sparrow-ipc
253254
PUBLIC
254255
sparrow::sparrow
255256
flatbuffers::flatbuffers
257+
PRIVATE
258+
lz4::lz4
256259
)
257260

258261
# Ensure generated headers are available when building sparrow-ipc
@@ -318,6 +321,25 @@ if (TARGET flatbuffers)
318321
endif()
319322
endif()
320323

324+
if (TARGET lz4)
325+
get_target_property(is_imported lz4 IMPORTED)
326+
if(NOT is_imported)
327+
# This means `lz4` was fetched using FetchContent
328+
# We need to export `lz4` target explicitly
329+
list(APPEND SPARROW_IPC_EXPORTED_TARGETS lz4)
330+
endif()
331+
endif()
332+
333+
if (TARGET lz4_static)
334+
get_target_property(is_imported lz4_static IMPORTED)
335+
if(NOT is_imported)
336+
# `lz4_static` is needed as this is the actual library
337+
# and `lz4` is an interface pointing to it.
338+
# If `lz4_shared` is used instead for some reason, modify this accordingly
339+
list(APPEND SPARROW_IPC_EXPORTED_TARGETS lz4_static)
340+
endif()
341+
endif()
342+
321343
install(TARGETS ${SPARROW_IPC_EXPORTED_TARGETS}
322344
EXPORT ${PROJECT_NAME}-targets)
323345

cmake/Findlz4.cmake

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Find LZ4 library and headers
2+
3+
# This module defines:
4+
# LZ4_FOUND - True if lz4 is found
5+
# LZ4_INCLUDE_DIRS - LZ4 include directories
6+
# LZ4_LIBRARIES - Libraries needed to use LZ4
7+
# LZ4_VERSION - LZ4 version number
8+
#
9+
10+
find_package(PkgConfig)
11+
if(PKG_CONFIG_FOUND)
12+
pkg_check_modules(LZ4 QUIET liblz4)
13+
if(NOT LZ4_FOUND)
14+
message(STATUS "Did not find 'liblz4.pc', trying 'lz4.pc'")
15+
pkg_check_modules(LZ4 QUIET lz4)
16+
endif()
17+
endif()
18+
19+
find_path(LZ4_INCLUDE_DIR lz4.h)
20+
# HINTS ${LZ4_INCLUDEDIR} ${LZ4_INCLUDE_DIRS})
21+
find_library(LZ4_LIBRARY NAMES lz4 liblz4)
22+
# HINTS ${LZ4_LIBDIR} ${LZ4_LIBRARY_DIRS})
23+
24+
include(FindPackageHandleStandardArgs)
25+
find_package_handle_standard_args(lz4 DEFAULT_MSG
26+
LZ4_LIBRARY LZ4_INCLUDE_DIR)
27+
mark_as_advanced(LZ4_INCLUDE_DIR LZ4_LIBRARY)
28+
29+
set(LZ4_LIBRARIES ${LZ4_LIBRARY})
30+
set(LZ4_INCLUDE_DIRS ${LZ4_INCLUDE_DIR})
31+
32+
if(LZ4_FOUND AND NOT TARGET lz4::lz4)
33+
add_library(lz4::lz4 UNKNOWN IMPORTED)
34+
set_target_properties(lz4::lz4 PROPERTIES
35+
IMPORTED_LOCATION "${LZ4_LIBRARIES}"
36+
INTERFACE_INCLUDE_DIRECTORIES "${LZ4_INCLUDE_DIRS}")
37+
if (NOT TARGET LZ4::LZ4 AND TARGET lz4::lz4)
38+
add_library(LZ4::LZ4 ALIAS lz4::lz4)
39+
endif ()
40+
endif()
41+
42+
#TODO add version?

cmake/external_dependencies.cmake

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ endif()
1111

1212
function(find_package_or_fetch)
1313
set(options)
14-
set(oneValueArgs CONAN_PKG_NAME PACKAGE_NAME GIT_REPOSITORY TAG)
15-
set(multiValueArgs)
14+
set(oneValueArgs CONAN_PKG_NAME PACKAGE_NAME GIT_REPOSITORY TAG SOURCE_SUBDIR)
15+
set(multiValueArgs CMAKE_ARGS)
1616
cmake_parse_arguments(PARSE_ARGV 0 arg
1717
"${options}" "${oneValueArgs}" "${multiValueArgs}"
1818
)
@@ -30,14 +30,25 @@ function(find_package_or_fetch)
3030
if(FETCH_DEPENDENCIES_WITH_CMAKE STREQUAL "ON" OR FETCH_DEPENDENCIES_WITH_CMAKE STREQUAL "MISSING")
3131
if(NOT ${actual_pkg_name}_FOUND)
3232
message(STATUS "📦 Fetching ${arg_PACKAGE_NAME}")
33-
FetchContent_Declare(
33+
# Apply CMAKE_ARGS before fetching
34+
foreach(cmake_arg ${arg_CMAKE_ARGS})
35+
string(REGEX MATCH "^([^=]+)=(.*)$" _ ${cmake_arg})
36+
if(CMAKE_MATCH_1)
37+
set(${CMAKE_MATCH_1} ${CMAKE_MATCH_2} CACHE BOOL "" FORCE)
38+
endif()
39+
endforeach()
40+
set(fetch_args
3441
${arg_PACKAGE_NAME}
3542
GIT_SHALLOW TRUE
3643
GIT_REPOSITORY ${arg_GIT_REPOSITORY}
3744
GIT_TAG ${arg_TAG}
3845
GIT_PROGRESS TRUE
3946
SYSTEM
4047
EXCLUDE_FROM_ALL)
48+
if(arg_SOURCE_SUBDIR)
49+
list(APPEND fetch_args SOURCE_SUBDIR ${arg_SOURCE_SUBDIR})
50+
endif()
51+
FetchContent_Declare(${fetch_args})
4152
FetchContent_MakeAvailable(${arg_PACKAGE_NAME})
4253
message(STATUS "\t✅ Fetched ${arg_PACKAGE_NAME}")
4354
else()
@@ -93,6 +104,25 @@ if(NOT TARGET flatbuffers::flatbuffers)
93104
endif()
94105
unset(FLATBUFFERS_BUILD_TESTS CACHE)
95106

107+
# Fetching lz4
108+
# Disable bundled mode to allow shared libraries if needed
109+
# lz4 is built as static by default if bundled
110+
# set(LZ4_BUNDLED_MODE OFF CACHE BOOL "" FORCE)
111+
# set(BUILD_SHARED_LIBS ON CACHE BOOL "" FORCE)
112+
find_package_or_fetch(
113+
PACKAGE_NAME lz4
114+
GIT_REPOSITORY https://github.com/lz4/lz4.git
115+
TAG v1.10.0
116+
SOURCE_SUBDIR build/cmake
117+
CMAKE_ARGS
118+
"LZ4_BUILD_CLI=OFF"
119+
"LZ4_BUILD_LEGACY_LZ4C=OFF"
120+
)
121+
122+
if(NOT TARGET lz4::lz4)
123+
add_library(lz4::lz4 ALIAS lz4)
124+
endif()
125+
96126
if(SPARROW_IPC_BUILD_TESTS)
97127
find_package_or_fetch(
98128
PACKAGE_NAME doctest
@@ -123,10 +153,18 @@ if(SPARROW_IPC_BUILD_TESTS)
123153
)
124154
message(STATUS "\t✅ Fetched arrow-testing")
125155

126-
# Iterate over all the files in the arrow-testing-data source directiory. When it's a gz, extract in place.
127-
file(GLOB_RECURSE arrow_testing_data_targz_files CONFIGURE_DEPENDS
156+
# Fetch all the files in the cpp-21.0.0 directory
157+
file(GLOB_RECURSE arrow_testing_data_targz_files_cpp_21 CONFIGURE_DEPENDS
128158
"${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/cpp-21.0.0/*.json.gz"
129159
)
160+
# Fetch all the files in the 2.0.0-compression directory
161+
file(GLOB_RECURSE arrow_testing_data_targz_files_compression CONFIGURE_DEPENDS
162+
"${arrow-testing_SOURCE_DIR}/data/arrow-ipc-stream/integration/2.0.0-compression/*.json.gz"
163+
)
164+
165+
# Combine lists of files
166+
list(APPEND arrow_testing_data_targz_files ${arrow_testing_data_targz_files_cpp_21} ${arrow_testing_data_targz_files_compression})
167+
# Iterate over all the files in the arrow-testing-data source directory. When it's a gz, extract in place.
130168
foreach(file_path IN LISTS arrow_testing_data_targz_files)
131169
cmake_path(GET file_path PARENT_PATH parent_dir)
132170
cmake_path(GET file_path STEM filename)
@@ -142,5 +180,4 @@ if(SPARROW_IPC_BUILD_TESTS)
142180
endif()
143181
endif()
144182
endforeach()
145-
146183
endif()

conanfile.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def configure(self):
4545
def requirements(self):
4646
self.requires("sparrow/1.0.0")
4747
self.requires(f"flatbuffers/{self._flatbuffers_version}")
48+
self.requires("lz4/1.9.4")
49+
#self.requires("zstd/1.5.5")
4850
if self.options.get_safe("build_tests"):
4951
self.test_requires("doctest/2.4.12")
5052

environment-dev.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ dependencies:
88
- cxx-compiler
99
# Libraries dependencies
1010
- flatbuffers
11+
- lz4-c
1112
- nlohmann_json
1213
- sparrow-devel
1314
- sparrow-json-reader
15+
# Testing dependencies
1416
- doctest
1517
# Documentation dependencies
1618
- doxygen
Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,86 @@
1-
21
#pragma once
32

4-
#include <vector>
3+
#include <utility>
54

65
#include <sparrow/c_interface.hpp>
6+
#include <sparrow/utils/contracts.hpp>
77

88
#include "sparrow_ipc/config/config.hpp"
9+
#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp"
910

1011
namespace sparrow_ipc
1112
{
12-
[[nodiscard]] SPARROW_IPC_API ArrowArray make_non_owning_arrow_array(
13+
SPARROW_IPC_API void release_arrow_array_children_and_dictionary(ArrowArray* array);
14+
15+
template <ArrowPrivateData T>
16+
void arrow_array_release(ArrowArray* array)
17+
{
18+
SPARROW_ASSERT_TRUE(array != nullptr)
19+
SPARROW_ASSERT_TRUE(array->release == std::addressof(arrow_array_release<T>))
20+
21+
SPARROW_ASSERT_TRUE(array->private_data != nullptr);
22+
23+
delete static_cast<T*>(array->private_data);
24+
array->private_data = nullptr;
25+
array->buffers = nullptr; // The buffers were deleted with the private data
26+
27+
release_arrow_array_children_and_dictionary(array);
28+
array->release = nullptr;
29+
}
30+
31+
template <ArrowPrivateData T, typename Arg>
32+
void fill_arrow_array(
33+
ArrowArray& array,
1334
int64_t length,
1435
int64_t null_count,
1536
int64_t offset,
16-
std::vector<std::uint8_t*>&& buffers,
1737
size_t children_count,
1838
ArrowArray** children,
19-
ArrowArray* dictionary
20-
);
39+
ArrowArray* dictionary,
40+
Arg&& private_data_arg
41+
)
42+
{
43+
SPARROW_ASSERT_TRUE(length >= 0);
44+
SPARROW_ASSERT_TRUE(null_count >= -1);
45+
SPARROW_ASSERT_TRUE(offset >= 0);
2146

22-
SPARROW_IPC_API void release_non_owning_arrow_array(ArrowArray* array);
47+
array.length = length;
48+
array.null_count = null_count;
49+
array.offset = offset;
50+
array.n_children = static_cast<int64_t>(children_count);
51+
array.children = children;
52+
array.dictionary = dictionary;
2353

24-
SPARROW_IPC_API void fill_non_owning_arrow_array(
25-
ArrowArray& array,
54+
auto private_data = new T(std::forward<Arg>(private_data_arg));
55+
array.private_data = private_data;
56+
array.n_buffers = private_data->n_buffers();
57+
array.buffers = private_data->buffers_ptrs();
58+
59+
array.release = &arrow_array_release<T>;
60+
}
61+
62+
template <ArrowPrivateData T, typename Arg>
63+
[[nodiscard]] ArrowArray make_arrow_array(
2664
int64_t length,
2765
int64_t null_count,
2866
int64_t offset,
29-
std::vector<std::uint8_t*>&& buffers,
3067
size_t children_count,
3168
ArrowArray** children,
32-
ArrowArray* dictionary
33-
);
34-
}
69+
ArrowArray* dictionary,
70+
Arg&& private_data_arg
71+
)
72+
{
73+
ArrowArray array{};
74+
fill_arrow_array<T>(
75+
array,
76+
length,
77+
null_count,
78+
offset,
79+
children_count,
80+
children,
81+
dictionary,
82+
std::forward<Arg>(private_data_arg)
83+
);
84+
return array;
85+
}
86+
}
Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
#pragma once
2-
2+
#include <concepts>
33
#include <cstdint>
4+
#include <span>
5+
#include <variant>
46
#include <vector>
57

68
#include "sparrow_ipc/config/config.hpp"
79

810
namespace sparrow_ipc
911
{
10-
class non_owning_arrow_array_private_data
12+
template <typename T>
13+
concept ArrowPrivateData = requires(T& t)
1114
{
12-
public:
15+
{ t.buffers_ptrs() } -> std::same_as<const void**>;
16+
{ t.n_buffers() } -> std::convertible_to<std::size_t>;
17+
};
1318

14-
explicit constexpr non_owning_arrow_array_private_data(std::vector<std::uint8_t*>&& buffers_pointers)
15-
: m_buffers_pointers(std::move(buffers_pointers))
16-
{
17-
}
19+
class arrow_array_private_data
20+
{
21+
public:
22+
using optionally_owned_buffer = std::variant<std::vector<uint8_t>, std::span<const uint8_t>>;
23+
explicit arrow_array_private_data(std::vector<optionally_owned_buffer>&& buffers);
1824

1925
[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept;
26+
[[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept;
2027

2128
private:
2229

23-
std::vector<std::uint8_t*> m_buffers_pointers;
30+
std::vector<optionally_owned_buffer> m_buffers;
31+
std::vector<const void*> m_buffer_pointers;
2432
};
2533
}

include/sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
namespace sparrow_ipc
1010
{
11+
// TODO Find a way to use sparrow internals directly and avoid duplicated code
1112
/**
1213
* Release the children and dictionnary of an `ArrowArray` or `ArrowSchema`.
1314
*
@@ -20,7 +21,7 @@ namespace sparrow_ipc
2021
{
2122
using private_data_type = std::conditional_t<
2223
std::same_as<T, ArrowArray>,
23-
non_owning_arrow_array_private_data,
24+
arrow_array_private_data,
2425
non_owning_arrow_schema_private_data>;
2526
if (t.release == nullptr)
2627
{

0 commit comments

Comments
 (0)