Skip to content
Open
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_array_impl.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/dictionary_cache.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/dictionary_tracker.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/dictionary_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_decimal_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_duration_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixed_size_binary_array.hpp
Expand Down Expand Up @@ -161,6 +164,9 @@ set(SPARROW_IPC_SRC
${SPARROW_IPC_SOURCE_DIR}/compression.cpp
${SPARROW_IPC_SOURCE_DIR}/compression_impl.hpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixed_size_binary_array.cpp
${SPARROW_IPC_SOURCE_DIR}/dictionary_cache.cpp
${SPARROW_IPC_SOURCE_DIR}/dictionary_tracker.cpp
${SPARROW_IPC_SOURCE_DIR}/dictionary_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_null_array.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp
Expand Down
87 changes: 87 additions & 0 deletions include/sparrow_ipc/dictionary_cache.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2024 Man Group Operations Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <cstdint>
#include <map>
#include <optional>

#include <sparrow/record_batch.hpp>

#include "sparrow_ipc/config/config.hpp"

namespace sparrow_ipc
{
/**
* @brief Caches dictionaries during deserialization.
*
* This class stores dictionaries received from DictionaryBatch messages and
* provides them when reconstructing dictionary-encoded arrays. It handles
* both delta updates (appending to existing dictionaries) and replacement
* (overwriting existing dictionaries).
*
* Dictionaries are stored as single-column record batches and are referenced
* by their integer ID. Multiple fields can share the same dictionary by
* referencing the same ID.
*/
class SPARROW_IPC_API dictionary_cache
{
public:
/**
* @brief Store or update a dictionary.
*
* If is_delta is true and a dictionary with the given ID already exists,
* the new data is appended to the existing dictionary. Otherwise, the
* dictionary is replaced (or inserted if it doesn't exist).
*
* @param id The dictionary ID
* @param batch The dictionary data as a single-column record batch
* @param is_delta Whether to append (true) or replace (false)
* @throws std::invalid_argument if batch doesn't have exactly one column
*/
void store_dictionary(int64_t id, sparrow::record_batch batch, bool is_delta);

/**
* @brief Retrieve a cached dictionary.
*
* @param id The dictionary ID to retrieve
* @return An optional containing the dictionary if found, std::nullopt otherwise
*/
std::optional<std::reference_wrapper<const sparrow::record_batch>> get_dictionary(int64_t id) const;

/**
* @brief Check if a dictionary is cached.
*
* @param id The dictionary ID to check
* @return true if the dictionary exists in the cache, false otherwise
*/
bool contains(int64_t id) const;

/**
* @brief Clear all cached dictionaries.
*/
void clear();

/**
* @brief Get the number of cached dictionaries.
*
* @return The number of dictionaries in the cache
*/
size_t size() const;

private:
std::map<int64_t, sparrow::record_batch> m_dictionaries;
};
}
93 changes: 93 additions & 0 deletions include/sparrow_ipc/dictionary_tracker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 Man Group Operations Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <cstdint>
#include <set>
#include <vector>

#include <sparrow/record_batch.hpp>

#include "sparrow_ipc/config/config.hpp"

namespace sparrow_ipc
{
/**
* @brief Information about a dictionary used for encoding.
*/
struct dictionary_info
{
int64_t id; ///< Dictionary identifier
sparrow::record_batch data; ///< Dictionary values as a single-column record batch
bool is_ordered; ///< Whether dictionary values are ordered
bool is_delta; ///< Whether this is a delta update
};

/**
* @brief Tracks dictionaries during serialization.
*
* This class is responsible for discovering dictionary-encoded fields in record batches,
* extracting their dictionary data, and managing which dictionaries have been emitted
* to the output stream.
*
* Dictionaries must be emitted before any RecordBatch that references them. This class
* ensures proper ordering by tracking emitted dictionary IDs and providing methods to
* determine which dictionaries need to be sent before each record batch.
*/
class SPARROW_IPC_API dictionary_tracker
{
public:
/**
* @brief Extract dictionaries from a record batch.
*
* Scans all columns in the record batch for dictionary-encoded fields.
* Returns a vector of dictionaries that need to be emitted before this
* record batch can be serialized.
*
* @param batch The record batch to scan for dictionaries
* @return Vector of dictionary_info for dictionaries that haven't been emitted yet
*/
std::vector<dictionary_info> extract_dictionaries_from_batch(const sparrow::record_batch& batch);

/**
* @brief Mark a dictionary as emitted.
*
* After a dictionary has been written to the stream, call this method to
* record that it has been emitted. This prevents re-emission of the same
* dictionary for subsequent record batches (unless it's a delta update).
*
* @param id The dictionary ID that was emitted
*/
void mark_emitted(int64_t id);

/**
* @brief Check if a dictionary has been emitted.
*
* @param id The dictionary ID to check
* @return true if the dictionary has been emitted, false otherwise
*/
bool is_emitted(int64_t id) const;

/**
* @brief Reset tracking state.
*
* Clears all tracking information. Useful when starting a new stream.
*/
void reset();

private:
std::set<int64_t> m_emitted_dict_ids; ///< IDs of dictionaries already emitted
};
}
39 changes: 39 additions & 0 deletions include/sparrow_ipc/dictionary_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Man Group Operations Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <cstddef>
#include <cstdint>
#include <optional>
#include <string_view>

#include <sparrow/c_interface.hpp>

#include "sparrow_ipc/config/config.hpp"

namespace sparrow_ipc
{
struct SPARROW_IPC_API dictionary_metadata
{
std::optional<int64_t> id;
bool is_ordered = false;
};

[[nodiscard]] SPARROW_IPC_API int64_t
compute_fallback_dictionary_id(std::string_view field_name, size_t field_index);

[[nodiscard]] SPARROW_IPC_API dictionary_metadata
parse_dictionary_metadata(const ArrowSchema& schema);
}
36 changes: 35 additions & 1 deletion include/sparrow_ipc/flatbuffer_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ namespace sparrow_ipc
[[nodiscard]] ::flatbuffers::Offset<org::apache::arrow::flatbuf::Field> create_field(
flatbuffers::FlatBufferBuilder& builder,
const ArrowSchema& arrow_schema,
std::optional<std::string_view> name_override = std::nullopt
std::optional<std::string_view> name_override = std::nullopt,
std::optional<int64_t> dictionary_id_override = std::nullopt
);

/**
Expand Down Expand Up @@ -363,6 +364,39 @@ namespace sparrow_ipc
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt
);

/**
* @brief Creates a FlatBuffer message for a dictionary batch.
*
* This function serializes a dictionary batch into a FlatBuffer format conforming
* to the Arrow IPC specification. A dictionary batch contains the actual dictionary
* values that are referenced by dictionary-encoded arrays. The dictionary data is
* serialized as a RecordBatch with a single column.
*
* @param dictionary_id The unique identifier for this dictionary, used to match
* dictionary-encoded fields in record batches
* @param record_batch A single-column record batch containing the dictionary values
* @param is_delta If true, the dictionary values should be appended to an existing
* dictionary with the same ID. If false, this replaces any existing
* dictionary with the same ID.
* @param compression Optional: The compression algorithm to be used for the message body
* @param cache Optional: A cache for compressed buffers to avoid recompression if
* compression is enabled. If compression is given, cache should be set as well.
* @return A FlatBufferBuilder containing the complete serialized dictionary batch
* message ready for transmission or storage
* @throws std::invalid_argument if compression is given but not cache, or if the
* record_batch doesn't have exactly one column
*
* @note Dictionary batches must be emitted before any RecordBatch that references them
* @note The returned message uses Arrow IPC format version V5
*/
[[nodiscard]] flatbuffers::FlatBufferBuilder get_dictionary_batch_message_builder(
int64_t dictionary_id,
const sparrow::record_batch& record_batch,
bool is_delta = false,
std::optional<CompressionType> compression = std::nullopt,
std::optional<std::reference_wrapper<CompressionCache>> cache = std::nullopt
);

// Helper function to extract and parse the footer from Arrow IPC file data
[[nodiscard]] SPARROW_IPC_API const org::apache::arrow::flatbuf::Footer* get_footer_from_file_data(std::span<const uint8_t> file_data);
}
28 changes: 28 additions & 0 deletions include/sparrow_ipc/serialize.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,34 @@ namespace sparrow_ipc
std::optional<CompressionType> compression,
std::optional<std::reference_wrapper<CompressionCache>> cache);

/**
* @brief Serializes a dictionary batch into a binary format following the Arrow IPC specification.
*
* This function serializes a dictionary batch containing the actual values referenced by
* dictionary-encoded fields. The dictionary data is provided as a single-column record batch.
* The serialized output follows the Arrow IPC encapsulated message format.
*
* @param dictionary_id The unique identifier for this dictionary, used to match with
* dictionary-encoded fields in record batches
* @param record_batch A single-column record batch containing the dictionary values
* @param is_delta If true, this dictionary should be appended to an existing dictionary
* with the same ID. If false, it replaces any existing dictionary.
* @param stream The output stream where the serialized dictionary batch will be written
* @param compression Optional: The compression type to use when serializing
* @param cache Optional: A cache to store and retrieve compressed buffers, avoiding recompression.
* If compression is given, cache should be set as well.
* @return Information about the serialized dictionary batch (metadata and body lengths)
* @throws std::invalid_argument if record_batch doesn't have exactly one column
* @note Dictionary batches must be emitted before any RecordBatch that references them
*/
SPARROW_IPC_API serialized_record_batch_info
serialize_dictionary_batch(int64_t dictionary_id,
const sparrow::record_batch& record_batch,
bool is_delta,
any_output_stream& stream,
std::optional<CompressionType> compression,
std::optional<std::reference_wrapper<CompressionCache>> cache);

/**
* @brief Serializes a schema message for a record batch into a byte buffer.
*
Expand Down
33 changes: 32 additions & 1 deletion include/sparrow_ipc/serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "sparrow_ipc/any_output_stream.hpp"
#include "sparrow_ipc/compression.hpp"
#include "sparrow_ipc/dictionary_tracker.hpp"
#include "sparrow_ipc/serialize.hpp"
#include "sparrow_ipc/serialize_utils.hpp"

Expand Down Expand Up @@ -105,7 +106,20 @@ namespace sparrow_ipc
m_stream.size(),
[&compressed_buffers_cache, this](size_t acc, const sparrow::record_batch& rb)
{
return acc + calculate_record_batch_message_size(rb, m_compression, compressed_buffers_cache);
size_t dictionaries_size = 0;
const auto dictionaries = m_dict_tracker.extract_dictionaries_from_batch(rb);
for (const auto& dict_info : dictionaries)
{
dictionaries_size += calculate_record_batch_message_size(
dict_info.data,
m_compression,
compressed_buffers_cache
);
}

return acc
+ dictionaries_size
+ calculate_record_batch_message_size(rb, m_compression, compressed_buffers_cache);
}
)
+ (m_schema_received ? 0 : calculate_schema_message_size(*record_batches.begin()));
Expand All @@ -126,6 +140,22 @@ namespace sparrow_ipc
{
throw std::invalid_argument("Record batch schema does not match serializer schema");
}

// Extract and emit dictionaries before the record batch
auto dictionaries = m_dict_tracker.extract_dictionaries_from_batch(rb);
for (const auto& dict_info : dictionaries)
{
serialize_dictionary_batch(
dict_info.id,
dict_info.data,
dict_info.is_delta,
m_stream,
m_compression,
compressed_buffers_cache
);
m_dict_tracker.mark_emitted(dict_info.id);
}

serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
}
}
Expand Down Expand Up @@ -218,6 +248,7 @@ namespace sparrow_ipc
any_output_stream m_stream;
bool m_ended{false};
std::optional<CompressionType> m_compression;
dictionary_tracker m_dict_tracker;
};

inline serializer& end_stream(serializer& serializer)
Expand Down
Loading
Loading