Skip to content

Commit bd42933

Browse files
committed
wip
1 parent 2953f10 commit bd42933

File tree

7 files changed

+74
-28
lines changed

7 files changed

+74
-28
lines changed

integration_tests/arrow_file_to_stream.cpp

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
#include <iostream>
55
#include <vector>
66

7-
#include <sparrow_ipc/deserialize.hpp>
8-
#include <sparrow_ipc/memory_output_stream.hpp>
9-
#include <sparrow_ipc/serializer.hpp>
10-
#include <sparrow_ipc/stream_file_serializer.hpp>
7+
#include "integration_tools.hpp"
118

129
/**
1310
* @brief Reads an Arrow IPC file and outputs the serialized Arrow IPC stream to a file.
@@ -36,37 +33,52 @@ int main(int argc, char* argv[])
3633

3734
try
3835
{
39-
// Read the Arrow file
36+
if (!std::filesystem::exists(input_path))
37+
{
38+
std::cerr << "Error: Input file not found: " << input_path << "\n";
39+
return EXIT_FAILURE;
40+
}
41+
4042
std::ifstream input_file(input_path, std::ios::binary);
41-
if (!input_file)
43+
if (!input_file.is_open())
4244
{
4345
std::cerr << "Error: Could not open input file: " << input_path << "\n";
4446
return EXIT_FAILURE;
4547
}
46-
48+
4749
const std::vector<uint8_t> file_data(
4850
(std::istreambuf_iterator<char>(input_file)),
49-
(std::istreambuf_iterator<char>())
51+
std::istreambuf_iterator<char>()
5052
);
5153
input_file.close();
5254

53-
const auto batches = sparrow_ipc::deserialize_file(file_data);
54-
55-
std::vector<uint8_t> stream_data;
56-
sparrow_ipc::memory_output_stream mem_stream(stream_data);
57-
sparrow_ipc::serializer serializer(mem_stream);
58-
serializer << batches << sparrow_ipc::end_stream;
59-
55+
if (file_data.empty())
56+
{
57+
std::cerr << "Error: Input file is empty.\n";
58+
return EXIT_FAILURE;
59+
}
60+
61+
const std::vector<uint8_t> stream_data = integration_tools::file_to_stream(file_data);
62+
6063
std::ofstream output_file(output_path, std::ios::binary);
61-
if (!output_file)
64+
if (!output_file.is_open())
6265
{
6366
std::cerr << "Error: Could not open output file: " << output_path << "\n";
6467
return EXIT_FAILURE;
6568
}
66-
67-
output_file.write(reinterpret_cast<const char*>(stream_data.data()), static_cast<std::streamsize>(stream_data.size()));
69+
70+
output_file.write(
71+
reinterpret_cast<const char*>(stream_data.data()),
72+
static_cast<std::streamsize>(stream_data.size())
73+
);
6874
output_file.close();
6975

76+
if (!output_file.good())
77+
{
78+
std::cerr << "Error: Failed to write to output file: " << output_path << "\n";
79+
return EXIT_FAILURE;
80+
}
81+
7082
return EXIT_SUCCESS;
7183
}
7284
catch (const std::exception& e)

integration_tests/arrow_json_to_file.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@ int main(int argc, char* argv[])
3434
try
3535
{
3636
const std::vector<uint8_t> file_data = integration_tools::json_file_to_arrow_file(json_path);
37+
3738
std::ofstream output_file(output_path, std::ios::out | std::ios::binary);
3839
if (!output_file.is_open())
3940
{
4041
std::cerr << "Error: Could not open output file: " << output_path << "\n";
4142
return EXIT_FAILURE;
4243
}
4344

44-
output_file.write(reinterpret_cast<const char*>(file_data.data()), file_data.size());
45+
output_file.write(reinterpret_cast<const char*>(file_data.data()), static_cast<std::streamsize>(file_data.size()));
4546
output_file.close();
4647

4748
if (!output_file.good())

integration_tests/arrow_stream_to_file.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#include <filesystem>
33
#include <fstream>
44
#include <iostream>
5-
#include <iterator>
65
#include <vector>
76

87
#include "integration_tools.hpp"
@@ -22,7 +21,6 @@
2221
*/
2322
int main(int argc, char* argv[])
2423
{
25-
// Check command-line arguments
2624
if (argc != 3)
2725
{
2826
std::cerr << "Usage: " << argv[0] << " <input_file_path> <output_file_path>\n";
@@ -60,9 +58,7 @@ int main(int argc, char* argv[])
6058
return EXIT_FAILURE;
6159
}
6260

63-
const std::vector<uint8_t> output_file_data = integration_tools::stream_to_file(
64-
std::span<const uint8_t>(input_stream_data)
65-
);
61+
const std::vector<uint8_t> output_file_data = integration_tools::stream_to_file(input_stream_data);
6662

6763
std::ofstream output_file(output_path, std::ios::out | std::ios::binary);
6864
if (!output_file.is_open())
@@ -71,7 +67,10 @@ int main(int argc, char* argv[])
7167
return EXIT_FAILURE;
7268
}
7369

74-
output_file.write(reinterpret_cast<const char*>(output_file_data.data()), output_file_data.size());
70+
output_file.write(
71+
reinterpret_cast<const char*>(output_file_data.data()),
72+
static_cast<std::streamsize>(output_file_data.size())
73+
);
7574
output_file.close();
7675

7776
if (!output_file.good())

integration_tests/include/integration_tools.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ namespace integration_tools
5252
*/
5353
std::vector<uint8_t> stream_to_file(std::span<const uint8_t> input_stream_data);
5454

55+
/**
56+
* @brief Reads an Arrow IPC file and re-serializes it to stream format.
57+
*
58+
* @param input_file_data Binary Arrow IPC file data
59+
* @return Vector of bytes containing the re-serialized Arrow IPC stream format
60+
* @throws std::runtime_error if the file cannot be deserialized
61+
*/
62+
std::vector<uint8_t> file_to_stream(std::span<const uint8_t> input_file_data);
63+
5564
/**
5665
* @brief Validates that a JSON file and an Arrow file contain identical data.
5766
*

integration_tests/src/integration_tools.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,31 @@ namespace integration_tools
112112
return output_stream_data;
113113
}
114114

115+
std::vector<uint8_t> file_to_stream(std::span<const uint8_t> input_file_data)
116+
{
117+
if (input_file_data.empty())
118+
{
119+
throw std::runtime_error("Input file data is empty");
120+
}
121+
122+
std::vector<sparrow::record_batch> record_batches;
123+
try
124+
{
125+
record_batches = sparrow_ipc::deserialize_file(input_file_data);
126+
}
127+
catch (const std::exception& e)
128+
{
129+
throw std::runtime_error("Failed to deserialize file: " + std::string(e.what()));
130+
}
131+
132+
std::vector<uint8_t> output_stream_data;
133+
sparrow_ipc::memory_output_stream stream(output_stream_data);
134+
sparrow_ipc::serializer serializer(stream);
135+
serializer << record_batches << sparrow_ipc::end_stream;
136+
137+
return output_stream_data;
138+
}
139+
115140
bool compare_record_batch(
116141
const sparrow::record_batch& rb1,
117142
const sparrow::record_batch& rb2,

integration_tests/test_integration_tools.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ TEST_SUITE("Integration Tools Tests")
184184

185185
REQUIRE(json_data.contains("batches"));
186186
const size_t expected_batch_count = json_data["batches"].size();
187-
REQUIRE_GT(expected_batch_count, 0);
187+
REQUIRE_EQ(expected_batch_count, 2);
188188

189189
// Step 1: JSON -> Arrow file
190190
const std::vector<uint8_t> arrow_file_data = integration_tools::json_file_to_arrow_file(json_file);

src/stream_file_serializer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ namespace sparrow_ipc
7575
const auto fields_vec = create_children(footer_builder, record_batch);
7676
const auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(
7777
footer_builder,
78-
org::apache::arrow::flatbuf::Endianness::Little,
78+
org::apache::arrow::flatbuf::Endianness::Little, // TODO: make configurable
7979
fields_vec
8080
);
8181

82-
// Create empty dictionaries vector
82+
// Create empty dictionaries vector // TODO: Support dictionaries if needed
8383
auto dictionaries_fb = footer_builder.CreateVectorOfStructs(
8484
std::vector<org::apache::arrow::flatbuf::Block>{}
8585
);

0 commit comments

Comments
 (0)