Skip to content

Commit 28aaee8

Browse files
authored
Regression Testing, Bug Fixes, and Public API Tightening for arrow-avro (#8492)
# Which issue does this PR close? - **Related to**: #4886 (“Add Avro Support”) # Rationale for this change **NOTE:** This PR contains over **2300 lines of test code**. The actual production code diff is **less than 800 LOC**. Before we publish `arrow-avro`, we want to "minimize its public API surface" and ship a well‑tested, spec‑compliant implementation. In the process of adding intensive regression tests and canonical‑form checks, we found several correctness gaps around alias handling, union resolution, Unicode/name validation, list child nullability, “null” string handling, and a mis-wired `Writer` capacity setting. This PR tightens the API and fixes those issues to align with the Avro spec (aliases and defaults, union resolution, names and Unicode, etc.). # What changes are included in this PR? **Public API tightening** - Restrict visibility of numerous schema/codec types and functions within `arrow-avro` so only intended entry points are public ahead of making the crate public. **Bug fixes discovered via regression testing (All fixed)** 1. **Alias bugs (aliases without defaults / restrictive namespaces)** - Enforce spec‑compliant alias resolution: aliases may be fully‑qualified or relative to the reader’s namespace, and alias‑based rewrites still require reader defaults when the writer field is absent. This follows Avro’s alias rules and record‑field default behavior. 2. **Special‑case union resolution (writer not a union, reader is)** - When the writer schema is **not** a union but the reader is, we no longer attempt to decode a union `type_id`; per spec, the reader must pick the first union branch that matches the writer’s schema. 3. **Valid Avro Unicode characters & name rules in Schema** - Distinguish between *Unicode strings* (which may contain any valid UTF‑8) and *identifiers* (names/enum symbols) which must match `[A-Za-z_][A-Za-z0-9_]*`. Tests were added to accept valid Unicode string content while enforcing the ASCII identifier regex. 4. **Nullable `ListArray` child item bug** - Correct mapping of Avro array item nullability to Arrow `ListArray`’s inner `"item"` field. (By convention the inner field is named `"item"` and nullability is explicit.) This aligns with Arrow’s builder/typing docs. 5. **“null” string vs. hard `null`** - Fix default/value handling to differentiate JSON `null` from the string literal `"null"` per the Avro defaults table. 6. **`Writer` capacity knob wired up** - Plumb the provided capacity through the writer implementation so preallocation/knobbed capacity is respected. (See changes under `arrow-avro/src/writer/mod.rs`.) # Are these changes tested? Yes. This PR adds substantial regression coverage: - Canonical‑form checks for schemas. - Alias/namespace + default‑value resolution cases. - Reader‑union vs. writer‑non‑union decoding paths. - Unicode content vs. identifier name rules. - `ListArray` inner field nullability behavior. - Round‑trips exercising the `Writer` with the capacity knob set. A new, comprehensive Avro fixture (`test/data/comprehensive_e2e.avro`) is included to drive end‑to‑end scenarios and edge cases,. # Are there any user-facing changes? N/A
1 parent 43c84fc commit 28aaee8

File tree

9 files changed

+3064
-214
lines changed

9 files changed

+3064
-214
lines changed

arrow-avro/src/codec.rs

Lines changed: 278 additions & 111 deletions
Large diffs are not rendered by default.

arrow-avro/src/reader/header.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl Header {
9191
}
9292
}
9393

94-
/// Returns the [`Schema`] if any
94+
/// Returns the `Schema` if any
9595
pub(crate) fn schema(&self) -> Result<Option<Schema<'_>>, ArrowError> {
9696
self.get(SCHEMA_METADATA_KEY)
9797
.map(|x| {

arrow-avro/src/reader/mod.rs

Lines changed: 2274 additions & 3 deletions
Large diffs are not rendered by default.

arrow-avro/src/reader/record.rs

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ use uuid::Uuid;
4040

4141
const DEFAULT_CAPACITY: usize = 1024;
4242

43+
/// Runtime plan for decoding reader-side `["null", T]` types.
44+
#[derive(Clone, Copy, Debug)]
45+
enum NullablePlan {
46+
/// Writer actually wrote a union (branch tag present).
47+
ReadTag,
48+
/// Writer wrote a single (non-union) value resolved to the non-null branch
49+
/// of the reader union; do NOT read a branch tag, but apply any promotion.
50+
FromSingle { promotion: Promotion },
51+
}
52+
4353
/// Macro to decode a decimal payload for a given width and integer type.
4454
macro_rules! decode_decimal {
4555
($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{
@@ -267,7 +277,7 @@ enum Decoder {
267277
Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
268278
Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
269279
Union(UnionDecoder),
270-
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
280+
Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
271281
}
272282

273283
impl Decoder {
@@ -508,11 +518,23 @@ impl Decoder {
508518
}
509519
};
510520
Ok(match data_type.nullability() {
511-
Some(nullability) => Self::Nullable(
512-
nullability,
513-
NullBufferBuilder::new(DEFAULT_CAPACITY),
514-
Box::new(decoder),
515-
),
521+
Some(nullability) => {
522+
// Default to reading a union branch tag unless the resolution proves otherwise.
523+
let mut plan = NullablePlan::ReadTag;
524+
if let Some(ResolutionInfo::Union(info)) = data_type.resolution.as_ref() {
525+
if !info.writer_is_union && info.reader_is_union {
526+
if let Some(Some((_reader_idx, promo))) = info.writer_to_reader.first() {
527+
plan = NullablePlan::FromSingle { promotion: *promo };
528+
}
529+
}
530+
}
531+
Self::Nullable(
532+
nullability,
533+
NullBufferBuilder::new(DEFAULT_CAPACITY),
534+
Box::new(decoder),
535+
plan,
536+
)
537+
}
516538
None => decoder,
517539
})
518540
}
@@ -571,7 +593,7 @@ impl Decoder {
571593
Self::Enum(indices, _, _) => indices.push(0),
572594
Self::Duration(builder) => builder.append_null(),
573595
Self::Union(u) => u.append_null()?,
574-
Self::Nullable(_, null_buffer, inner) => {
596+
Self::Nullable(_, null_buffer, inner, _) => {
575597
null_buffer.append(false);
576598
inner.append_null();
577599
}
@@ -582,7 +604,7 @@ impl Decoder {
582604
/// Append a single default literal into the decoder's buffers
583605
fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
584606
match self {
585-
Self::Nullable(_, nb, inner) => {
607+
Self::Nullable(_, nb, inner, _) => {
586608
if matches!(lit, AvroLiteral::Null) {
587609
nb.append(false);
588610
inner.append_null()
@@ -939,19 +961,27 @@ impl Decoder {
939961
builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
940962
}
941963
Self::Union(u) => u.decode(buf)?,
942-
Self::Nullable(order, nb, encoding) => {
943-
let branch = buf.read_vlq()?;
944-
let is_not_null = match *order {
945-
Nullability::NullFirst => branch != 0,
946-
Nullability::NullSecond => branch == 0,
947-
};
948-
if is_not_null {
949-
// It is important to decode before appending to null buffer in case of decode error
950-
encoding.decode(buf)?;
951-
} else {
952-
encoding.append_null();
964+
Self::Nullable(order, nb, encoding, plan) => {
965+
match *plan {
966+
NullablePlan::FromSingle { promotion } => {
967+
encoding.decode_with_promotion(buf, promotion)?;
968+
nb.append(true);
969+
}
970+
NullablePlan::ReadTag => {
971+
let branch = buf.read_vlq()?;
972+
let is_not_null = match *order {
973+
Nullability::NullFirst => branch != 0,
974+
Nullability::NullSecond => branch == 0,
975+
};
976+
if is_not_null {
977+
// It is important to decode before appending to null buffer in case of decode error
978+
encoding.decode(buf)?;
979+
} else {
980+
encoding.append_null();
981+
}
982+
nb.append(is_not_null);
983+
}
953984
}
954-
nb.append(is_not_null);
955985
}
956986
}
957987
Ok(())
@@ -1018,7 +1048,7 @@ impl Decoder {
10181048
/// Flush decoded records to an [`ArrayRef`]
10191049
fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, ArrowError> {
10201050
Ok(match self {
1021-
Self::Nullable(_, n, e) => e.flush(n.finish())?,
1051+
Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
10221052
Self::Null(size) => Arc::new(NullArray::new(std::mem::replace(size, 0))),
10231053
Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
10241054
Self::Int32(values) => Arc::new(flush_primitive::<Int32Type>(values, nulls)),
@@ -2742,6 +2772,7 @@ mod tests {
27422772
Nullability::NullSecond,
27432773
NullBufferBuilder::new(DEFAULT_CAPACITY),
27442774
Box::new(inner),
2775+
NullablePlan::ReadTag,
27452776
);
27462777
let mut data = Vec::new();
27472778
data.extend_from_slice(&encode_avro_int(0));
@@ -2784,6 +2815,7 @@ mod tests {
27842815
Nullability::NullSecond,
27852816
NullBufferBuilder::new(DEFAULT_CAPACITY),
27862817
Box::new(inner),
2818+
NullablePlan::ReadTag,
27872819
);
27882820
let row1 = [
27892821
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
@@ -3663,6 +3695,7 @@ mod tests {
36633695
Nullability::NullFirst,
36643696
NullBufferBuilder::new(DEFAULT_CAPACITY),
36653697
Box::new(inner),
3698+
NullablePlan::ReadTag,
36663699
);
36673700
dec.append_default(&AvroLiteral::Null).unwrap();
36683701
dec.append_default(&AvroLiteral::Int(11)).unwrap();
@@ -3916,6 +3949,7 @@ mod tests {
39163949
Nullability::NullSecond,
39173950
NullBufferBuilder::new(DEFAULT_CAPACITY),
39183951
Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
3952+
NullablePlan::ReadTag,
39193953
);
39203954
let enc_b = Decoder::Nullable(
39213955
Nullability::NullSecond,
@@ -3924,6 +3958,7 @@ mod tests {
39243958
OffsetBufferBuilder::new(DEFAULT_CAPACITY),
39253959
Vec::with_capacity(DEFAULT_CAPACITY),
39263960
)),
3961+
NullablePlan::ReadTag,
39273962
);
39283963
encoders.push(enc_a);
39293964
encoders.push(enc_b);

0 commit comments

Comments
 (0)