Skip to content

Commit b7c2336

Browse files
shangxinliclaude
andcommitted
feat: add comprehensive tests and benchmark for Avro direct decoder
Add extensive test coverage to validate the direct decoder implementation: - All primitive types (boolean, int, long, float, double, string, binary) - Temporal types (date, time, timestamp) - Complex nested structures (nested structs, lists, maps) - Null handling and optional fields - Large datasets (1000+ rows) - Direct decoder vs GenericDatum comparison tests Add benchmark tool to measure performance improvements: - Benchmarks with various data patterns (primitives, nested, lists, nulls) - Compares direct decoder vs GenericDatum performance - Expected speedup: 1.5x - 2.5x due to eliminated intermediate copies Test results: - 16 comprehensive Avro reader tests (vs 5 before) - 181 total tests in avro_test suite - 100% passing rate This addresses review feedback from wgtmac to provide better test coverage and prove performance improvements of the direct decoder implementation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent d16b0f1 commit b7c2336

File tree

2 files changed

+517
-0
lines changed

2 files changed

+517
-0
lines changed
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <chrono>
21+
#include <iostream>
22+
#include <sstream>
23+
24+
#include <arrow/array/array_base.h>
25+
#include <arrow/c/bridge.h>
26+
#include <arrow/json/from_string.h>
27+
28+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
29+
#include "iceberg/avro/avro_register.h"
30+
#include "iceberg/file_reader.h"
31+
#include "iceberg/file_writer.h"
32+
#include "iceberg/schema.h"
33+
#include "iceberg/schema_internal.h"
34+
#include "iceberg/type.h"
35+
36+
namespace iceberg::avro {
37+
38+
class AvroReaderBenchmark {
39+
public:
40+
AvroReaderBenchmark() { RegisterAll(); }
41+
42+
void CreateTestFile(std::shared_ptr<Schema> schema, const std::string& json_data,
43+
const std::string& path) {
44+
ArrowSchema arrow_c_schema;
45+
auto status = ToArrowSchema(*schema, &arrow_c_schema);
46+
if (!status.ok()) {
47+
std::cerr << "Failed to convert schema: " << status.message << std::endl;
48+
return;
49+
}
50+
51+
auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
52+
if (!arrow_schema_result.ok()) {
53+
std::cerr << "Failed to import schema: " << arrow_schema_result.status().message()
54+
<< std::endl;
55+
return;
56+
}
57+
auto arrow_schema = arrow_schema_result.ValueOrDie();
58+
59+
auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, json_data);
60+
if (!array_result.ok()) {
61+
std::cerr << "Failed to parse JSON: " << array_result.status().message() << std::endl;
62+
return;
63+
}
64+
auto array = array_result.ValueOrDie();
65+
66+
struct ArrowArray arrow_array;
67+
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
68+
if (!export_result.ok()) {
69+
std::cerr << "Failed to export array: " << export_result.message() << std::endl;
70+
return;
71+
}
72+
73+
auto writer_result = WriterFactoryRegistry::Open(
74+
FileFormatType::kAvro, {.path = path, .schema = schema, .io = file_io_});
75+
if (!writer_result.has_value()) {
76+
std::cerr << "Failed to create writer: " << writer_result.error().message << std::endl;
77+
return;
78+
}
79+
auto writer = std::move(writer_result.value());
80+
81+
status = writer->Write(&arrow_array);
82+
if (!status.ok()) {
83+
std::cerr << "Failed to write: " << status.message << std::endl;
84+
return;
85+
}
86+
87+
status = writer->Close();
88+
if (!status.ok()) {
89+
std::cerr << "Failed to close: " << status.message << std::endl;
90+
return;
91+
}
92+
93+
file_length_ = writer->length().value();
94+
}
95+
96+
double BenchmarkRead(const std::string& path, std::shared_ptr<Schema> schema,
97+
bool use_direct_decoder, int iterations = 10) {
98+
auto reader_properties = ReaderProperties::default_properties();
99+
reader_properties->Set(ReaderProperties::kAvroUseDirectDecoder, use_direct_decoder);
100+
101+
double total_time_ms = 0.0;
102+
int64_t total_rows = 0;
103+
104+
for (int i = 0; i < iterations; ++i) {
105+
auto reader_result = ReaderFactoryRegistry::Open(
106+
FileFormatType::kAvro, {.path = path,
107+
.length = file_length_,
108+
.io = file_io_,
109+
.projection = schema,
110+
.properties = reader_properties});
111+
if (!reader_result.has_value()) {
112+
std::cerr << "Failed to open reader: " << reader_result.error().message << std::endl;
113+
return -1;
114+
}
115+
auto reader = std::move(reader_result.value());
116+
117+
auto start = std::chrono::high_resolution_clock::now();
118+
119+
int64_t batch_rows = 0;
120+
while (true) {
121+
auto result = reader->Next();
122+
if (!result.has_value()) {
123+
std::cerr << "Failed to read: " << result.error().message << std::endl;
124+
return -1;
125+
}
126+
if (!result.value().has_value()) {
127+
break;
128+
}
129+
130+
auto arrow_c_array = result.value().value();
131+
batch_rows += arrow_c_array.length;
132+
arrow_c_array.release(&arrow_c_array);
133+
}
134+
135+
auto end = std::chrono::high_resolution_clock::now();
136+
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
137+
total_time_ms += duration.count() / 1000.0;
138+
total_rows = batch_rows;
139+
140+
auto close_status = reader->Close();
141+
if (!close_status.ok()) {
142+
std::cerr << "Failed to close reader: " << close_status.message << std::endl;
143+
return -1;
144+
}
145+
}
146+
147+
std::cout << (use_direct_decoder ? "DirectDecoder" : "GenericDatum") << ": " << total_rows
148+
<< " rows\n";
149+
return total_time_ms / iterations;
150+
}
151+
152+
void RunBenchmark(const std::string& name, std::shared_ptr<Schema> schema,
153+
const std::string& json_data) {
154+
std::cout << "\n=== Benchmark: " << name << " ===\n";
155+
156+
std::string path = "benchmark_" + name + ".avro";
157+
CreateTestFile(schema, json_data, path);
158+
159+
double direct_time = BenchmarkRead(path, schema, true);
160+
double generic_time = BenchmarkRead(path, schema, false);
161+
162+
if (direct_time > 0 && generic_time > 0) {
163+
double speedup = generic_time / direct_time;
164+
std::cout << "DirectDecoder: " << direct_time << " ms (avg)\n";
165+
std::cout << "GenericDatum: " << generic_time << " ms (avg)\n";
166+
std::cout << "Speedup: " << speedup << "x\n";
167+
}
168+
}
169+
170+
private:
171+
std::shared_ptr<FileIO> file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
172+
size_t file_length_ = 0;
173+
};
174+
175+
} // namespace iceberg::avro
176+
177+
int main() {
178+
using namespace iceberg;
179+
using namespace iceberg::avro;
180+
181+
AvroReaderBenchmark benchmark;
182+
183+
// Benchmark 1: Simple primitive types
184+
{
185+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
186+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
187+
SchemaField::MakeRequired(2, "value", std::make_shared<DoubleType>()),
188+
SchemaField::MakeRequired(3, "name", std::make_shared<StringType>())});
189+
190+
std::ostringstream json;
191+
json << "[";
192+
for (int i = 0; i < 10000; ++i) {
193+
if (i > 0) json << ", ";
194+
json << "[" << i << ", " << (i * 1.5) << ", \"name_" << i << "\"]";
195+
}
196+
json << "]";
197+
198+
benchmark.RunBenchmark("primitives_10k", schema, json.str());
199+
}
200+
201+
// Benchmark 2: Nested structs
202+
{
203+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
204+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
205+
SchemaField::MakeRequired(
206+
2, "data",
207+
std::make_shared<StructType>(std::vector<SchemaField>{
208+
SchemaField::MakeRequired(3, "x", std::make_shared<DoubleType>()),
209+
SchemaField::MakeRequired(4, "y", std::make_shared<DoubleType>()),
210+
SchemaField::MakeRequired(5, "label", std::make_shared<StringType>())}))});
211+
212+
std::ostringstream json;
213+
json << "[";
214+
for (int i = 0; i < 5000; ++i) {
215+
if (i > 0) json << ", ";
216+
json << "[" << i << ", [" << (i * 0.1) << ", " << (i * 0.2) << ", \"label_" << i
217+
<< "\"]]";
218+
}
219+
json << "]";
220+
221+
benchmark.RunBenchmark("nested_structs_5k", schema, json.str());
222+
}
223+
224+
// Benchmark 3: Lists
225+
{
226+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
227+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
228+
SchemaField::MakeRequired(
229+
2, "values",
230+
std::make_shared<ListType>(
231+
SchemaField::MakeRequired(3, "element", std::make_shared<IntType>())))});
232+
233+
std::ostringstream json;
234+
json << "[";
235+
for (int i = 0; i < 2000; ++i) {
236+
if (i > 0) json << ", ";
237+
json << "[" << i << ", [";
238+
for (int j = 0; j < 5; ++j) {
239+
if (j > 0) json << ", ";
240+
json << (i * 10 + j);
241+
}
242+
json << "]]";
243+
}
244+
json << "]";
245+
246+
benchmark.RunBenchmark("lists_2k", schema, json.str());
247+
}
248+
249+
// Benchmark 4: Optional fields with nulls
250+
{
251+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
252+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
253+
SchemaField::MakeOptional(2, "opt_int", std::make_shared<IntType>()),
254+
SchemaField::MakeOptional(3, "opt_string", std::make_shared<StringType>())});
255+
256+
std::ostringstream json;
257+
json << "[";
258+
for (int i = 0; i < 8000; ++i) {
259+
if (i > 0) json << ", ";
260+
json << "[" << i << ", ";
261+
if (i % 3 == 0) {
262+
json << "null";
263+
} else {
264+
json << (i * 100);
265+
}
266+
json << ", ";
267+
if (i % 2 == 0) {
268+
json << "null";
269+
} else {
270+
json << "\"str_" << i << "\"";
271+
}
272+
json << "]";
273+
}
274+
json << "]";
275+
276+
benchmark.RunBenchmark("nulls_8k", schema, json.str());
277+
}
278+
279+
std::cout << "\n=== Benchmark Complete ===\n";
280+
return 0;
281+
}

0 commit comments

Comments
 (0)