Skip to content

Commit cdd3729

Browse files
committed
wip
1 parent c0f7d03 commit cdd3729

File tree

3 files changed

+156
-102
lines changed

3 files changed

+156
-102
lines changed

examples/write_and_read_streams.cpp

Lines changed: 112 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
#include <algorithm>
22
#include <iostream>
3-
#include <vector>
43
#include <random>
4+
#include <vector>
55

6-
#include <sparrow/record_batch.hpp>
7-
6+
#include <sparrow_ipc/deserialize.hpp>
87
#include <sparrow_ipc/memory_output_stream.hpp>
98
#include <sparrow_ipc/serializer.hpp>
10-
#include <sparrow_ipc/deserialize.hpp>
9+
10+
#include <sparrow/record_batch.hpp>
1111

1212
namespace sp = sparrow;
1313

@@ -19,232 +19,256 @@ std::mt19937 gen(rd());
1919
* Helper function to create a record batch with the same schema but random values
2020
* All batches have: int32 column, float column, bool column, and string column
2121
*/
22-
sp::record_batch create_random_record_batch(size_t num_rows)
22+
sp::record_batch create_random_record_batch(size_t num_rows)
2323
{
2424
std::uniform_int_distribution<int32_t> int_dist(0, 1000);
2525
std::uniform_real_distribution<float> float_dist(-100.0f, 100.0f);
2626
std::uniform_int_distribution<int> bool_dist(0, 1);
27-
27+
2828
// Create integer column with random values
2929
std::vector<int32_t> int_values;
3030
int_values.reserve(num_rows);
31-
for (size_t i = 0; i < num_rows; ++i) {
31+
for (size_t i = 0; i < num_rows; ++i)
32+
{
3233
int_values.push_back(int_dist(gen));
3334
}
34-
auto int_array = sp::primitive_array<int32_t>(std::move(int_values) );
35+
auto int_array = sp::primitive_array<int32_t>(std::move(int_values));
3536

3637
// Create float column with random values
3738
std::vector<float> float_values;
3839
float_values.reserve(num_rows);
39-
for (size_t i = 0; i < num_rows; ++i) {
40+
for (size_t i = 0; i < num_rows; ++i)
41+
{
4042
float_values.push_back(float_dist(gen));
4143
}
4244
auto float_array = sp::primitive_array<float>(std::move(float_values));
4345

4446
// Create boolean column with random values
4547
std::vector<bool> bool_values;
4648
bool_values.reserve(num_rows);
47-
for (size_t i = 0; i < num_rows; ++i) {
49+
for (size_t i = 0; i < num_rows; ++i)
50+
{
4851
bool_values.push_back(static_cast<bool>(bool_dist(gen)));
4952
}
5053
auto bool_array = sp::primitive_array<bool>(std::move(bool_values));
5154

5255
// Create string column with random values
5356
std::vector<std::string> string_values;
5457
string_values.reserve(num_rows);
55-
const std::vector<std::string> sample_strings = {
56-
"alpha", "beta", "gamma", "delta", "epsilon",
57-
"zeta", "eta", "theta", "iota", "kappa"
58-
};
58+
const std::vector<std::string> sample_strings =
59+
{"alpha", "beta", "gamma", "delta", "epsilon", "zeta", "eta", "theta", "iota", "kappa"};
5960
std::uniform_int_distribution<size_t> str_dist(0, sample_strings.size() - 1);
60-
61-
for (size_t i = 0; i < num_rows; ++i) {
61+
62+
for (size_t i = 0; i < num_rows; ++i)
63+
{
6264
string_values.push_back(sample_strings[str_dist(gen)] + "_" + std::to_string(i));
6365
}
6466
auto string_array = sp::string_array(std::move(string_values));
6567

6668
// Create record batch with named columns (same schema for all batches)
67-
return sp::record_batch({
68-
{"id", sp::array(std::move(int_array))},
69-
{"value", sp::array(std::move(float_array))},
70-
{"flag", sp::array(std::move(bool_array))},
71-
{"name", sp::array(std::move(string_array))}
72-
});
69+
return sp::record_batch(
70+
{{"id", sp::array(std::move(int_array))},
71+
{"value", sp::array(std::move(float_array))},
72+
{"flag", sp::array(std::move(bool_array))},
73+
{"name", sp::array(std::move(string_array))}}
74+
);
7375
}
7476

75-
int main()
77+
int main()
7678
{
7779
std::cout << "=== Sparrow IPC Stream Write and Read Example ===\n";
7880
std::cout << "Note: All record batches in a stream must have the same schema.\n\n";
7981

80-
try {
82+
try
83+
{
8184
// Configuration
8285
constexpr size_t num_batches = 5;
8386
constexpr size_t rows_per_batch = 10;
8487

8588
// Step 1: Create several record batches with the SAME schema but random values
8689
std::cout << "1. Creating " << num_batches << " record batches with random values...\n";
8790
std::cout << " Each batch has the same schema: (id: int32, value: float, flag: bool, name: string)\n";
88-
91+
8992
std::vector<sp::record_batch> original_batches;
9093
original_batches.reserve(num_batches);
91-
92-
for (size_t i = 0; i < num_batches; ++i) {
94+
95+
for (size_t i = 0; i < num_batches; ++i)
96+
{
9397
original_batches.push_back(create_random_record_batch(rows_per_batch));
9498
}
9599

96100
std::cout << " Created " << original_batches.size() << " record batches\n";
97-
for(size_t i = 0; i < original_batches.size(); ++i) {
98-
std::cout << std::format("{}\n", original_batches[i]);
101+
for (size_t i = 0; i < original_batches.size(); ++i)
102+
{
103+
std::cout << std::format("{}\n\n", original_batches[i]);
99104
}
100105

101106
// Step 2: Serialize the record batches to a stream
102107
std::cout << "\n2. Serializing record batches to stream...\n";
103-
108+
104109
std::vector<uint8_t> stream_data;
105110
sparrow_ipc::memory_output_stream stream(stream_data);
106111
sparrow_ipc::serializer serializer(stream);
107112

108113
// Serialize all batches using the streaming operator
109114
serializer << original_batches << sparrow_ipc::end_stream;
110-
115+
111116
std::cout << " Serialized stream size: " << stream_data.size() << " bytes\n";
112117

113118
// Step 3: Deserialize the stream back to record batches
114119
std::cout << "\n3. Deserializing stream back to record batches...\n";
115-
116-
auto deserialized_batches = sparrow_ipc::deserialize_stream(
117-
std::span<const uint8_t>(stream_data)
118-
);
119-
120+
121+
auto deserialized_batches = sparrow_ipc::deserialize_stream(stream_data);
122+
120123
std::cout << " Deserialized " << deserialized_batches.size() << " record batches\n";
121124

122125
// Step 4: Verify that original and deserialized data match
123126
std::cout << "\n4. Verifying data integrity...\n";
124-
125-
if (original_batches.size() != deserialized_batches.size()) {
126-
std::cerr << "ERROR: Batch count mismatch! Original: " << original_batches.size()
127+
128+
if (original_batches.size() != deserialized_batches.size())
129+
{
130+
std::cerr << "ERROR: Batch count mismatch! Original: " << original_batches.size()
127131
<< ", Deserialized: " << deserialized_batches.size() << "\n";
128132
return 1;
129133
}
130134

131135
bool all_match = true;
132-
for (size_t batch_idx = 0; batch_idx < original_batches.size(); ++batch_idx) {
136+
for (size_t batch_idx = 0; batch_idx < original_batches.size(); ++batch_idx)
137+
{
133138
const auto& original = original_batches[batch_idx];
134139
const auto& deserialized = deserialized_batches[batch_idx];
135-
140+
136141
// Check basic structure
137-
if (original.nb_columns() != deserialized.nb_columns() ||
138-
original.nb_rows() != deserialized.nb_rows()) {
142+
if (original.nb_columns() != deserialized.nb_columns()
143+
|| original.nb_rows() != deserialized.nb_rows())
144+
{
139145
std::cerr << "ERROR: Batch " << batch_idx << " structure mismatch!\n";
140146
all_match = false;
141147
continue;
142148
}
143149

144150
// Check column names
145-
if(!std::ranges::equal(original.names(), deserialized.names())) {
151+
if (!std::ranges::equal(original.names(), deserialized.names()))
152+
{
146153
std::cerr << "WARNING: Batch " << batch_idx << " column names mismatch!\n";
147154
}
148-
155+
149156
// Check column data
150-
for (size_t col_idx = 0; col_idx < original.nb_columns(); ++col_idx) {
157+
for (size_t col_idx = 0; col_idx < original.nb_columns(); ++col_idx)
158+
{
151159
const auto& orig_col = original.get_column(col_idx);
152160
const auto& deser_col = deserialized.get_column(col_idx);
153-
154-
if (orig_col.data_type() != deser_col.data_type()) {
155-
std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx
156-
<< " type mismatch!\n";
161+
162+
if (orig_col.data_type() != deser_col.data_type())
163+
{
164+
std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx << " type mismatch!\n";
157165
all_match = false;
158166
continue;
159167
}
160-
168+
161169
// Check values
162-
for (size_t row_idx = 0; row_idx < orig_col.size(); ++row_idx) {
163-
if (orig_col[row_idx] != deser_col[row_idx]) {
164-
std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx
165-
<< ", row " << row_idx << " value mismatch!\n";
166-
std::cerr << " Original: " << orig_col[row_idx]
170+
for (size_t row_idx = 0; row_idx < orig_col.size(); ++row_idx)
171+
{
172+
if (orig_col[row_idx] != deser_col[row_idx])
173+
{
174+
std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx << ", row "
175+
<< row_idx << " value mismatch!\n";
176+
std::cerr << " Original: " << orig_col[row_idx]
167177
<< ", Deserialized: " << deser_col[row_idx] << "\n";
168178
all_match = false;
169179
}
170180
}
171181
}
172182
}
173-
174-
if (all_match) {
183+
184+
if (all_match)
185+
{
175186
std::cout << " ✓ All data matches perfectly!\n";
176-
} else {
187+
}
188+
else
189+
{
177190
std::cerr << " ✗ Data verification failed!\n";
178191
return 1;
179192
}
180193

181194
// Step 5: Display sample data from the first batch
182195
std::cout << "\n5. Sample data from the first batch:\n";
183196
std::cout << std::format("{}\n", original_batches[0]);
184-
197+
185198
// Step 6: Demonstrate individual serialization vs batch serialization
186199
std::cout << "\n6. Demonstrating individual vs batch serialization...\n";
187-
200+
188201
// Serialize individual batches one by one
189202
std::vector<uint8_t> individual_stream_data;
190203
sparrow_ipc::memory_output_stream individual_stream(individual_stream_data);
191204
sparrow_ipc::serializer individual_serializer(individual_stream);
192-
193-
for (const auto& batch : original_batches) {
205+
206+
for (const auto& batch : original_batches)
207+
{
194208
individual_serializer << batch;
195209
}
196210
individual_serializer << sparrow_ipc::end_stream;
197-
211+
198212
std::cout << " Individual serialization size: " << individual_stream_data.size() << " bytes\n";
199213
std::cout << " Batch serialization size: " << stream_data.size() << " bytes\n";
200-
214+
201215
// Both should produce the same result
202-
auto individual_deserialized = sparrow_ipc::deserialize_stream(
203-
std::span<const uint8_t>(individual_stream_data)
204-
);
205-
206-
if (individual_deserialized.size() == deserialized_batches.size()) {
216+
auto individual_deserialized = sparrow_ipc::deserialize_stream(individual_stream_data);
217+
218+
if (individual_deserialized.size() == deserialized_batches.size())
219+
{
207220
std::cout << " ✓ Individual and batch serialization produce equivalent results\n";
208-
} else {
221+
}
222+
else
223+
{
209224
std::cerr << " ✗ Individual and batch serialization mismatch!\n";
210225
}
211226

212227
// Step 7: Verify schema consistency
213228
std::cout << "\n7. Verifying schema consistency across all batches...\n";
214229
bool schema_consistent = true;
215-
for (size_t i = 1; i < deserialized_batches.size(); ++i) {
216-
if (deserialized_batches[0].nb_columns() != deserialized_batches[i].nb_columns()) {
230+
for (size_t i = 1; i < deserialized_batches.size(); ++i)
231+
{
232+
if (deserialized_batches[0].nb_columns() != deserialized_batches[i].nb_columns())
233+
{
217234
std::cerr << " ERROR: Batch " << i << " has different number of columns!\n";
218235
schema_consistent = false;
219236
}
220-
221-
for (size_t col_idx = 0; col_idx < deserialized_batches[0].nb_columns() && col_idx < deserialized_batches[i].nb_columns(); ++col_idx) {
237+
238+
for (size_t col_idx = 0; col_idx < deserialized_batches[0].nb_columns()
239+
&& col_idx < deserialized_batches[i].nb_columns();
240+
++col_idx)
241+
{
222242
const auto& col0 = deserialized_batches[0].get_column(col_idx);
223243
const auto& col_i = deserialized_batches[i].get_column(col_idx);
224-
225-
if (col0.data_type() != col_i.data_type()) {
226-
std::cerr << " ERROR: Batch " << i << ", column " << col_idx
227-
<< " has different type!\n";
244+
245+
if (col0.data_type() != col_i.data_type())
246+
{
247+
std::cerr << " ERROR: Batch " << i << ", column " << col_idx << " has different type!\n";
228248
schema_consistent = false;
229249
}
230-
231-
if (col0.name() != col_i.name()) {
232-
std::cerr << " ERROR: Batch " << i << ", column " << col_idx
233-
<< " has different name!\n";
250+
251+
if (col0.name() != col_i.name())
252+
{
253+
std::cerr << " ERROR: Batch " << i << ", column " << col_idx << " has different name!\n";
234254
schema_consistent = false;
235255
}
236256
}
237257
}
238-
239-
if (schema_consistent) {
258+
259+
if (schema_consistent)
260+
{
240261
std::cout << " ✓ All batches have consistent schema!\n";
241-
} else {
262+
}
263+
else
264+
{
242265
std::cerr << " ✗ Schema inconsistency detected!\n";
243266
}
244267

245268
std::cout << "\n=== Example completed successfully! ===\n";
246-
247-
} catch (const std::exception& e) {
269+
}
270+
catch (const std::exception& e)
271+
{
248272
std::cerr << "Error: " << e.what() << "\n";
249273
return 1;
250274
}

0 commit comments

Comments
 (0)