Skip to content

Commit 660a0e8

Browse files
committed
Add deserialize_schema_message
1 parent 733e7c0 commit 660a0e8

File tree

4 files changed

+49
-78
lines changed

4 files changed

+49
-78
lines changed

include/serialize.hpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
namespace sparrow_ipc
1111
{
12-
namespace details
13-
{
14-
SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional<sparrow::key_value_view>& metadata, std::vector<uint8_t>& final_buffer);
15-
SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector<int64_t>& buffers_sizes, std::vector<uint8_t>& final_buffer);
16-
}
12+
namespace details
13+
{
14+
SPARROW_IPC_API void serialize_schema_message(const ArrowSchema& arrow_schema, const std::optional<sparrow::key_value_view>& metadata, std::vector<uint8_t>& final_buffer);
15+
SPARROW_IPC_API void serialize_record_batch_message(const ArrowArray& arrow_arr, const std::vector<int64_t>& buffers_sizes, std::vector<uint8_t>& final_buffer);
16+
17+
SPARROW_IPC_API void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional<std::string>& name, std::optional<std::vector<sparrow::metadata_pair>>& metadata);
18+
}
1719
}

include/serialize_null_array.hpp

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -53,46 +53,9 @@ namespace sparrow_ipc
5353
size_t current_offset = 0;
5454

5555
// I - Deserialize the Schema message
56-
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
57-
current_offset += sizeof(uint32_t);
58-
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
59-
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
60-
{
61-
throw std::runtime_error("Expected Schema message at the start of the buffer.");
62-
}
63-
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
64-
auto fields = flatbuffer_schema->fields();
65-
if (fields->size() != 1)
66-
{
67-
throw std::runtime_error("Expected schema with exactly one field for null_array.");
68-
}
69-
auto field = fields->Get(0);
70-
if (field->type_type() != org::apache::arrow::flatbuf::Type::Null)
71-
{
72-
throw std::runtime_error("Expected Null type in schema.");
73-
}
74-
7556
std::optional<std::string> name;
76-
if (auto fb_name = field->name())
77-
{
78-
name = std::string(fb_name->c_str(), fb_name->size());
79-
}
80-
8157
std::optional<std::vector<sparrow::metadata_pair>> metadata;
82-
if (auto fb_metadata = field->custom_metadata())
83-
{
84-
if (fb_metadata->size() > 0)
85-
{
86-
metadata = std::vector<sparrow::metadata_pair>();
87-
metadata->reserve(fb_metadata->size());
88-
for (const auto& kv : *fb_metadata)
89-
{
90-
metadata->emplace_back(kv->key()->str(), kv->value()->str());
91-
}
92-
}
93-
}
94-
95-
current_offset += schema_meta_len;
58+
details::deserialize_schema_message(buf_ptr, current_offset, name, metadata);
9659

9760
// II - Deserialize the RecordBatch message
9861
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));

include/serialize_primitive_array.hpp

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,9 @@ namespace sparrow_ipc
6767
size_t current_offset = 0;
6868

6969
// I - Deserialize the Schema message
70-
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
71-
current_offset += sizeof(uint32_t);
72-
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
73-
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
74-
{
75-
throw std::runtime_error("Expected Schema message at the start of the buffer.");
76-
}
77-
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
78-
auto fields = flatbuffer_schema->fields();
79-
if (fields->size() != 1)
80-
{
81-
throw std::runtime_error("Expected schema with exactly one field for primitive_array.");
82-
}
83-
current_offset += schema_meta_len;
70+
std::optional<std::string> name;
71+
std::optional<std::vector<sparrow::metadata_pair>> metadata;
72+
details::deserialize_schema_message(buf_ptr, current_offset, name, metadata);
8473

8574
// II - Deserialize the RecordBatch message
8675
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
@@ -110,27 +99,6 @@ namespace sparrow_ipc
11099
uint8_t* data_buffer_copy = new uint8_t[data_len];
111100
memcpy(data_buffer_copy, body_ptr + buffers_meta->Get(1)->offset(), data_len);
112101

113-
// Get name
114-
std::optional<std::string> name;
115-
const flatbuffers::String* fb_name_flatbuffer = fields->Get(0)->name();
116-
if (fb_name_flatbuffer)
117-
{
118-
name = std::string(fb_name_flatbuffer->c_str(), fb_name_flatbuffer->size());
119-
}
120-
121-
// Handle metadata
122-
std::optional<std::vector<sparrow::metadata_pair>> metadata;
123-
auto fb_metadata = fields->Get(0)->custom_metadata();
124-
if (fb_metadata && !fb_metadata->empty())
125-
{
126-
metadata = std::vector<sparrow::metadata_pair>();
127-
metadata->reserve(fb_metadata->size());
128-
for (const auto& kv : *fb_metadata)
129-
{
130-
// TODO use str() instead of c_str()
131-
metadata->emplace_back(kv->key()->c_str(), kv->value()->c_str());
132-
}
133-
}
134102

135103
auto data = sparrow::u8_buffer<T>(reinterpret_cast<T*>(data_buffer_copy), node_meta->length());
136104
auto bitmap = sparrow::validity_bitmap(validity_buffer_copy, node_meta->length());

src/serialize.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,5 +158,43 @@ namespace sparrow_ipc
158158
dst += buffers_sizes[i];
159159
}
160160
}
161+
162+
void deserialize_schema_message(const uint8_t* buf_ptr, size_t& current_offset, std::optional<std::string>& name, std::optional<std::vector<sparrow::metadata_pair>>& metadata)
163+
{
164+
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
165+
current_offset += sizeof(uint32_t);
166+
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
167+
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
168+
{
169+
throw std::runtime_error("Expected Schema message at the start of the buffer.");
170+
}
171+
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
172+
auto fields = flatbuffer_schema->fields();
173+
if (fields->size() != 1)
174+
{
175+
throw std::runtime_error("Expected schema with exactly one field.");
176+
}
177+
178+
auto field = fields->Get(0);
179+
180+
// Get name
181+
if (const auto fb_name = field->name())
182+
{
183+
name = fb_name->str();
184+
}
185+
186+
// Handle metadata
187+
auto fb_metadata = field->custom_metadata();
188+
if (fb_metadata && !fb_metadata->empty())
189+
{
190+
metadata = std::vector<sparrow::metadata_pair>();
191+
metadata->reserve(fb_metadata->size());
192+
for (const auto& kv : *fb_metadata)
193+
{
194+
metadata->emplace_back(kv->key()->str(), kv->value()->str());
195+
}
196+
}
197+
current_offset += schema_meta_len;
198+
}
161199
} // namespace details
162200
} // namespace sparrow-ipc

0 commit comments

Comments
 (0)