|
| 1 | +#include <algorithm> |
| 2 | +#include <iostream> |
| 3 | +#include <vector> |
| 4 | +#include <random> |
| 5 | + |
| 6 | +#include <sparrow/record_batch.hpp> |
| 7 | + |
| 8 | +#include <sparrow_ipc/memory_output_stream.hpp> |
| 9 | +#include <sparrow_ipc/serializer.hpp> |
| 10 | +#include <sparrow_ipc/deserialize.hpp> |
| 11 | + |
| 12 | +namespace sp = sparrow; |
| 13 | + |
| 14 | +// Random number generator |
| 15 | +std::random_device rd; |
| 16 | +std::mt19937 gen(rd()); |
| 17 | + |
| 18 | +/** |
| 19 | + * Helper function to create a record batch with the same schema but random values |
| 20 | + * All batches have: int32 column, float column, bool column, and string column |
| 21 | + */ |
| 22 | +sp::record_batch create_random_record_batch(size_t num_rows) |
| 23 | +{ |
| 24 | + std::uniform_int_distribution<int32_t> int_dist(0, 1000); |
| 25 | + std::uniform_real_distribution<float> float_dist(-100.0f, 100.0f); |
| 26 | + std::uniform_int_distribution<int> bool_dist(0, 1); |
| 27 | + |
| 28 | + // Create integer column with random values |
| 29 | + std::vector<int32_t> int_values; |
| 30 | + int_values.reserve(num_rows); |
| 31 | + for (size_t i = 0; i < num_rows; ++i) { |
| 32 | + int_values.push_back(int_dist(gen)); |
| 33 | + } |
| 34 | + auto int_array = sp::primitive_array<int32_t>(std::move(int_values) ); |
| 35 | + |
| 36 | + // Create float column with random values |
| 37 | + std::vector<float> float_values; |
| 38 | + float_values.reserve(num_rows); |
| 39 | + for (size_t i = 0; i < num_rows; ++i) { |
| 40 | + float_values.push_back(float_dist(gen)); |
| 41 | + } |
| 42 | + auto float_array = sp::primitive_array<float>(std::move(float_values)); |
| 43 | + |
| 44 | + // Create boolean column with random values |
| 45 | + std::vector<bool> bool_values; |
| 46 | + bool_values.reserve(num_rows); |
| 47 | + for (size_t i = 0; i < num_rows; ++i) { |
| 48 | + bool_values.push_back(static_cast<bool>(bool_dist(gen))); |
| 49 | + } |
| 50 | + auto bool_array = sp::primitive_array<bool>(std::move(bool_values)); |
| 51 | + |
| 52 | + // Create string column with random values |
| 53 | + std::vector<std::string> string_values; |
| 54 | + string_values.reserve(num_rows); |
| 55 | + const std::vector<std::string> sample_strings = { |
| 56 | + "alpha", "beta", "gamma", "delta", "epsilon", |
| 57 | + "zeta", "eta", "theta", "iota", "kappa" |
| 58 | + }; |
| 59 | + std::uniform_int_distribution<size_t> str_dist(0, sample_strings.size() - 1); |
| 60 | + |
| 61 | + for (size_t i = 0; i < num_rows; ++i) { |
| 62 | + string_values.push_back(sample_strings[str_dist(gen)] + "_" + std::to_string(i)); |
| 63 | + } |
| 64 | + auto string_array = sp::string_array(std::move(string_values)); |
| 65 | + |
| 66 | + // Create record batch with named columns (same schema for all batches) |
| 67 | + return sp::record_batch({ |
| 68 | + {"id", sp::array(std::move(int_array))}, |
| 69 | + {"value", sp::array(std::move(float_array))}, |
| 70 | + {"flag", sp::array(std::move(bool_array))}, |
| 71 | + {"name", sp::array(std::move(string_array))} |
| 72 | + }); |
| 73 | +} |
| 74 | + |
| 75 | +int main() |
| 76 | +{ |
| 77 | + std::cout << "=== Sparrow IPC Stream Write and Read Example ===\n"; |
| 78 | + std::cout << "Note: All record batches in a stream must have the same schema.\n\n"; |
| 79 | + |
| 80 | + try { |
| 81 | + // Configuration |
| 82 | + constexpr size_t num_batches = 5; |
| 83 | + constexpr size_t rows_per_batch = 10; |
| 84 | + |
| 85 | + // Step 1: Create several record batches with the SAME schema but random values |
| 86 | + std::cout << "1. Creating " << num_batches << " record batches with random values...\n"; |
| 87 | + std::cout << " Each batch has the same schema: (id: int32, value: float, flag: bool, name: string)\n"; |
| 88 | + |
| 89 | + std::vector<sp::record_batch> original_batches; |
| 90 | + original_batches.reserve(num_batches); |
| 91 | + |
| 92 | + for (size_t i = 0; i < num_batches; ++i) { |
| 93 | + original_batches.push_back(create_random_record_batch(rows_per_batch)); |
| 94 | + } |
| 95 | + |
| 96 | + std::cout << " Created " << original_batches.size() << " record batches\n"; |
| 97 | + for(size_t i = 0; i < original_batches.size(); ++i) { |
| 98 | + std::cout << std::format("{}\n", original_batches[i]); |
| 99 | + } |
| 100 | + |
| 101 | + // Step 2: Serialize the record batches to a stream |
| 102 | + std::cout << "\n2. Serializing record batches to stream...\n"; |
| 103 | + |
| 104 | + std::vector<uint8_t> stream_data; |
| 105 | + sparrow_ipc::memory_output_stream stream(stream_data); |
| 106 | + sparrow_ipc::serializer serializer(stream); |
| 107 | + |
| 108 | + // Serialize all batches using the streaming operator |
| 109 | + serializer << original_batches << sparrow_ipc::end_stream; |
| 110 | + |
| 111 | + std::cout << " Serialized stream size: " << stream_data.size() << " bytes\n"; |
| 112 | + |
| 113 | + // Step 3: Deserialize the stream back to record batches |
| 114 | + std::cout << "\n3. Deserializing stream back to record batches...\n"; |
| 115 | + |
| 116 | + auto deserialized_batches = sparrow_ipc::deserialize_stream( |
| 117 | + std::span<const uint8_t>(stream_data) |
| 118 | + ); |
| 119 | + |
| 120 | + std::cout << " Deserialized " << deserialized_batches.size() << " record batches\n"; |
| 121 | + |
| 122 | + // Step 4: Verify that original and deserialized data match |
| 123 | + std::cout << "\n4. Verifying data integrity...\n"; |
| 124 | + |
| 125 | + if (original_batches.size() != deserialized_batches.size()) { |
| 126 | + std::cerr << "ERROR: Batch count mismatch! Original: " << original_batches.size() |
| 127 | + << ", Deserialized: " << deserialized_batches.size() << "\n"; |
| 128 | + return 1; |
| 129 | + } |
| 130 | + |
| 131 | + bool all_match = true; |
| 132 | + for (size_t batch_idx = 0; batch_idx < original_batches.size(); ++batch_idx) { |
| 133 | + const auto& original = original_batches[batch_idx]; |
| 134 | + const auto& deserialized = deserialized_batches[batch_idx]; |
| 135 | + |
| 136 | + // Check basic structure |
| 137 | + if (original.nb_columns() != deserialized.nb_columns() || |
| 138 | + original.nb_rows() != deserialized.nb_rows()) { |
| 139 | + std::cerr << "ERROR: Batch " << batch_idx << " structure mismatch!\n"; |
| 140 | + all_match = false; |
| 141 | + continue; |
| 142 | + } |
| 143 | + |
| 144 | + // Check column names |
| 145 | + if(!std::ranges::equal(original.names(), deserialized.names())) { |
| 146 | + std::cerr << "WARNING: Batch " << batch_idx << " column names mismatch!\n"; |
| 147 | + } |
| 148 | + |
| 149 | + // Check column data |
| 150 | + for (size_t col_idx = 0; col_idx < original.nb_columns(); ++col_idx) { |
| 151 | + const auto& orig_col = original.get_column(col_idx); |
| 152 | + const auto& deser_col = deserialized.get_column(col_idx); |
| 153 | + |
| 154 | + if (orig_col.data_type() != deser_col.data_type()) { |
| 155 | + std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx |
| 156 | + << " type mismatch!\n"; |
| 157 | + all_match = false; |
| 158 | + continue; |
| 159 | + } |
| 160 | + |
| 161 | + // Check values |
| 162 | + for (size_t row_idx = 0; row_idx < orig_col.size(); ++row_idx) { |
| 163 | + if (orig_col[row_idx] != deser_col[row_idx]) { |
| 164 | + std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx |
| 165 | + << ", row " << row_idx << " value mismatch!\n"; |
| 166 | + std::cerr << " Original: " << orig_col[row_idx] |
| 167 | + << ", Deserialized: " << deser_col[row_idx] << "\n"; |
| 168 | + all_match = false; |
| 169 | + } |
| 170 | + } |
| 171 | + } |
| 172 | + } |
| 173 | + |
| 174 | + if (all_match) { |
| 175 | + std::cout << " ✓ All data matches perfectly!\n"; |
| 176 | + } else { |
| 177 | + std::cerr << " ✗ Data verification failed!\n"; |
| 178 | + return 1; |
| 179 | + } |
| 180 | + |
| 181 | + // Step 5: Display sample data from the first batch |
| 182 | + std::cout << "\n5. Sample data from the first batch:\n"; |
| 183 | + std::cout << std::format("{}\n", original_batches[0]); |
| 184 | + |
| 185 | + // Step 6: Demonstrate individual serialization vs batch serialization |
| 186 | + std::cout << "\n6. Demonstrating individual vs batch serialization...\n"; |
| 187 | + |
| 188 | + // Serialize individual batches one by one |
| 189 | + std::vector<uint8_t> individual_stream_data; |
| 190 | + sparrow_ipc::memory_output_stream individual_stream(individual_stream_data); |
| 191 | + sparrow_ipc::serializer individual_serializer(individual_stream); |
| 192 | + |
| 193 | + for (const auto& batch : original_batches) { |
| 194 | + individual_serializer << batch; |
| 195 | + } |
| 196 | + individual_serializer << sparrow_ipc::end_stream; |
| 197 | + |
| 198 | + std::cout << " Individual serialization size: " << individual_stream_data.size() << " bytes\n"; |
| 199 | + std::cout << " Batch serialization size: " << stream_data.size() << " bytes\n"; |
| 200 | + |
| 201 | + // Both should produce the same result |
| 202 | + auto individual_deserialized = sparrow_ipc::deserialize_stream( |
| 203 | + std::span<const uint8_t>(individual_stream_data) |
| 204 | + ); |
| 205 | + |
| 206 | + if (individual_deserialized.size() == deserialized_batches.size()) { |
| 207 | + std::cout << " ✓ Individual and batch serialization produce equivalent results\n"; |
| 208 | + } else { |
| 209 | + std::cerr << " ✗ Individual and batch serialization mismatch!\n"; |
| 210 | + } |
| 211 | + |
| 212 | + // Step 7: Verify schema consistency |
| 213 | + std::cout << "\n7. Verifying schema consistency across all batches...\n"; |
| 214 | + bool schema_consistent = true; |
| 215 | + for (size_t i = 1; i < deserialized_batches.size(); ++i) { |
| 216 | + if (deserialized_batches[0].nb_columns() != deserialized_batches[i].nb_columns()) { |
| 217 | + std::cerr << " ERROR: Batch " << i << " has different number of columns!\n"; |
| 218 | + schema_consistent = false; |
| 219 | + } |
| 220 | + |
| 221 | + for (size_t col_idx = 0; col_idx < deserialized_batches[0].nb_columns() && col_idx < deserialized_batches[i].nb_columns(); ++col_idx) { |
| 222 | + const auto& col0 = deserialized_batches[0].get_column(col_idx); |
| 223 | + const auto& col_i = deserialized_batches[i].get_column(col_idx); |
| 224 | + |
| 225 | + if (col0.data_type() != col_i.data_type()) { |
| 226 | + std::cerr << " ERROR: Batch " << i << ", column " << col_idx |
| 227 | + << " has different type!\n"; |
| 228 | + schema_consistent = false; |
| 229 | + } |
| 230 | + |
| 231 | + if (col0.name() != col_i.name()) { |
| 232 | + std::cerr << " ERROR: Batch " << i << ", column " << col_idx |
| 233 | + << " has different name!\n"; |
| 234 | + schema_consistent = false; |
| 235 | + } |
| 236 | + } |
| 237 | + } |
| 238 | + |
| 239 | + if (schema_consistent) { |
| 240 | + std::cout << " ✓ All batches have consistent schema!\n"; |
| 241 | + } else { |
| 242 | + std::cerr << " ✗ Schema inconsistency detected!\n"; |
| 243 | + } |
| 244 | + |
| 245 | + std::cout << "\n=== Example completed successfully! ===\n"; |
| 246 | + |
| 247 | + } catch (const std::exception& e) { |
| 248 | + std::cerr << "Error: " << e.what() << "\n"; |
| 249 | + return 1; |
| 250 | + } |
| 251 | + |
| 252 | + return 0; |
| 253 | +} |
0 commit comments