@@ -20,22 +20,17 @@ namespace integration_tools
2020{
2121 std::vector<uint8_t > json_file_to_arrow_file (const std::filesystem::path& json_path)
2222 {
23- // Convert JSON file to stream first
24- std::vector<uint8_t > stream_data = json_file_to_stream (json_path);
25-
26- // Then convert stream to file format
23+ const std::vector<uint8_t > stream_data = json_file_to_stream (json_path);
2724 return stream_to_file (std::span<const uint8_t >(stream_data));
2825 }
2926
3027 std::vector<uint8_t > json_file_to_stream (const std::filesystem::path& json_path)
3128 {
32- // Check if the JSON file exists
3329 if (!std::filesystem::exists (json_path))
3430 {
3531 throw std::runtime_error (" JSON file not found: " + json_path.string ());
3632 }
3733
38- // Open and parse the JSON file
3934 std::ifstream json_file (json_path);
4035 if (!json_file.is_open ())
4136 {
@@ -58,15 +53,13 @@ namespace integration_tools
5853
5954 std::vector<uint8_t > json_to_stream (const nlohmann::json& json_data)
6055 {
61- // Get the number of batches
6256 if (!json_data.contains (" batches" ) || !json_data[" batches" ].is_array ())
6357 {
6458 throw std::runtime_error (" JSON file does not contain a 'batches' array" );
6559 }
6660
6761 const size_t num_batches = json_data[" batches" ].size ();
6862
69- // Parse all record batches from JSON
7063 std::vector<sparrow::record_batch> record_batches;
7164 record_batches.reserve (num_batches);
7265
@@ -86,7 +79,6 @@ namespace integration_tools
8679 }
8780 }
8881
89- // Serialize record batches to Arrow IPC stream format
9082 std::vector<uint8_t > stream_data;
9183 sparrow_ipc::memory_output_stream stream (stream_data);
9284 sparrow_ipc::serializer serializer (stream);
@@ -102,7 +94,6 @@ namespace integration_tools
10294 throw std::runtime_error (" Input stream data is empty" );
10395 }
10496
105- // Deserialize the stream to validate it and extract record batches
10697 std::vector<sparrow::record_batch> record_batches;
10798 try
10899 {
@@ -113,7 +104,6 @@ namespace integration_tools
113104 throw std::runtime_error (" Failed to deserialize stream: " + std::string (e.what ()));
114105 }
115106
116- // Re-serialize the record batches to ensure a valid output stream
117107 std::vector<uint8_t > output_stream_data;
118108 sparrow_ipc::memory_output_stream stream (output_stream_data);
119109 sparrow_ipc::stream_file_serializer serializer (stream);
@@ -131,7 +121,6 @@ namespace integration_tools
131121 {
132122 bool all_match = true ;
133123
134- // Check number of columns
135124 if (rb1.nb_columns () != rb2.nb_columns ())
136125 {
137126 if (verbose)
@@ -142,7 +131,6 @@ namespace integration_tools
142131 return false ;
143132 }
144133
145- // Check number of rows
146134 if (rb1.nb_rows () != rb2.nb_rows ())
147135 {
148136 if (verbose)
@@ -153,7 +141,6 @@ namespace integration_tools
153141 return false ;
154142 }
155143
156- // Check column names
157144 const auto & names1 = rb1.names ();
158145 const auto & names2 = rb2.names ();
159146 if (names1.size () != names2.size ())
@@ -180,13 +167,11 @@ namespace integration_tools
180167 }
181168 }
182169
183- // Check each column
184170 for (size_t col_idx = 0 ; col_idx < rb1.nb_columns (); ++col_idx)
185171 {
186172 const auto & col1 = rb1.get_column (col_idx);
187173 const auto & col2 = rb2.get_column (col_idx);
188174
189- // Check column size
190175 if (col1.size () != col2.size ())
191176 {
192177 if (verbose)
@@ -198,7 +183,6 @@ namespace integration_tools
198183 continue ;
199184 }
200185
201- // Check column data type
202186 if (col1.data_type () != col2.data_type ())
203187 {
204188 if (verbose)
@@ -210,7 +194,6 @@ namespace integration_tools
210194 continue ;
211195 }
212196
213- // Check column name
214197 const auto col_name1 = col1.name ();
215198 const auto col_name2 = col2.name ();
216199 if (col_name1 != col_name2)
@@ -222,7 +205,6 @@ namespace integration_tools
222205 }
223206 }
224207
225- // Check each value in the column
226208 for (size_t row_idx = 0 ; row_idx < col1.size (); ++row_idx)
227209 {
228210 if (col1[row_idx] != col2[row_idx])
@@ -250,13 +232,12 @@ namespace integration_tools
250232 std::span<const uint8_t > arrow_file_data
251233 )
252234 {
253- // Check if the JSON file exists
235+
254236 if (!std::filesystem::exists (json_path))
255237 {
256238 throw std::runtime_error (" JSON file not found: " + json_path.string ());
257239 }
258240
259- // Load and parse the JSON file
260241 std::ifstream json_file (json_path);
261242 if (!json_file.is_open ())
262243 {
@@ -274,15 +255,13 @@ namespace integration_tools
274255 }
275256 json_file.close ();
276257
277- // Check for batches in JSON
278258 if (!json_data.contains (" batches" ) || !json_data[" batches" ].is_array ())
279259 {
280260 throw std::runtime_error (" JSON file does not contain a 'batches' array" );
281261 }
282262
283263 const size_t num_batches = json_data[" batches" ].size ();
284264
285- // Parse all record batches from JSON
286265 std::vector<sparrow::record_batch> json_batches;
287266 json_batches.reserve (num_batches);
288267
@@ -302,7 +281,6 @@ namespace integration_tools
302281 }
303282 }
304283
305- // Deserialize the stream
306284 if (arrow_file_data.empty ())
307285 {
308286 throw std::runtime_error (" Stream data is empty" );
@@ -318,13 +296,11 @@ namespace integration_tools
318296 throw std::runtime_error (" Failed to deserialize stream: " + std::string (e.what ()));
319297 }
320298
321- // Compare the number of batches
322299 if (json_batches.size () != stream_batches.size ())
323300 {
324301 return false ;
325302 }
326303
327- // Compare each batch
328304 for (size_t batch_idx = 0 ; batch_idx < json_batches.size (); ++batch_idx)
329305 {
330306 if (!compare_record_batch (json_batches[batch_idx], stream_batches[batch_idx], batch_idx, false ))
0 commit comments