Skip to content

Commit f207b12

Browse files
Add a union type
1 parent 65c9848 commit f207b12

File tree

2 files changed

+130
-3
lines changed

2 files changed

+130
-3
lines changed

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.arrow.adapter.avro.producers.AvroNullableProducer;
3131
import org.apache.arrow.adapter.avro.producers.AvroStringProducer;
3232
import org.apache.arrow.adapter.avro.producers.AvroStructProducer;
33+
import org.apache.arrow.adapter.avro.producers.AvroUnionsProducer;
3334
import org.apache.arrow.adapter.avro.producers.BaseAvroProducer;
3435
import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer;
3536
import org.apache.arrow.adapter.avro.producers.Producer;
@@ -59,6 +60,7 @@
5960
import org.apache.arrow.vector.complex.ListVector;
6061
import org.apache.arrow.vector.complex.MapVector;
6162
import org.apache.arrow.vector.complex.StructVector;
63+
import org.apache.arrow.vector.complex.UnionVector;
6264
import org.apache.arrow.vector.types.Types;
6365

6466
import java.util.ArrayList;
@@ -93,13 +95,15 @@ private static BaseAvroProducer<?> createProducer(FieldVector vector, boolean nu
9395

9496
Preconditions.checkNotNull(vector, "Arrow vector object can't be null");
9597

96-
if (nullable) {
98+
final Types.MinorType minorType = vector.getMinorType();
99+
100+
// Avro understands nullable types as a union of type | null
101+
// Most nullable fields in a VSR will not be unions, so provide a special wrapper
102+
if (nullable && minorType != Types.MinorType.UNION) {
97103
final BaseAvroProducer<?> innerProducer = createProducer(vector, false);
98104
return new AvroNullableProducer<>(innerProducer);
99105
}
100106

101-
final Types.MinorType minorType = vector.getMinorType();
102-
103107
switch (minorType) {
104108

105109
// Primitive types with direct mapping to Avro
@@ -169,6 +173,17 @@ private static BaseAvroProducer<?> createProducer(FieldVector vector, boolean nu
169173
Producer<?> entryProducer = new AvroStructProducer(entryVector, new Producer<?>[] {keyProducer, valueProducer});
170174
return new AvroMapProducer(mapVector, entryProducer);
171175

176+
case UNION:
177+
178+
UnionVector unionVector = (UnionVector) vector;
179+
List<FieldVector> unionChildVectors = unionVector.getChildrenFromFields();
180+
Producer<?>[] unionChildProducers = new Producer<?>[unionChildVectors.size()];
181+
for (int i = 0; i < unionChildVectors.size(); i++) {
182+
FieldVector unionChildVector = unionChildVectors.get(i);
183+
unionChildProducers[i] = createProducer(unionChildVector, /* nullable = */ false); // Do not nest union types
184+
}
185+
return new AvroUnionsProducer(unionVector, unionChildProducers);
186+
172187
// Not all Arrow types are supported for encoding (yet)!
173188

174189
default:
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.adapter.avro.producers;
19+
20+
import org.apache.arrow.vector.FieldVector;
21+
import org.apache.arrow.vector.complex.UnionVector;
22+
import org.apache.arrow.vector.types.Types;
23+
import org.apache.arrow.vector.types.UnionMode;
24+
import org.apache.avro.io.Encoder;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
29+
/**
30+
* Producer which produces unions type values to avro encoder. Write the data to
31+
* {@link org.apache.arrow.vector.complex.UnionVector}.
32+
*/
33+
public class AvroUnionsProducer extends BaseAvroProducer<UnionVector> {
34+
35+
private final Producer<?>[] delegates;
36+
private final UnionMode unionMode;
37+
private final int nullTypeIndex;
38+
39+
/** Instantiate an AvroUnionsProducer. */
40+
public AvroUnionsProducer(UnionVector vector, Producer<?>[] delegates) {
41+
super(vector);
42+
this.delegates = delegates;
43+
this.unionMode = vector.getMinorType() == Types.MinorType.DENSEUNION ? UnionMode.Dense : UnionMode.Sparse;
44+
this.nullTypeIndex = findNullTypeIndex();
45+
}
46+
47+
private int findNullTypeIndex() {
48+
List<FieldVector> childVectors = vector.getChildrenFromFields();
49+
for (int i = 0; i < childVectors.size(); i++) {
50+
if (childVectors.get(i).getMinorType() == Types.MinorType.NULL) {
51+
return i;
52+
}
53+
}
54+
// For nullable unions with no explicit null type, a null type is appended to the schema
55+
return childVectors.size();
56+
}
57+
58+
@Override
59+
public void produce(Encoder encoder) throws IOException {
60+
61+
if (vector.isNull(currentIndex)) {
62+
encoder.writeInt(nullTypeIndex);
63+
encoder.writeNull();
64+
}
65+
else {
66+
67+
int typeIndex = vector.getTypeValue(currentIndex);
68+
int typeVectorIndex;
69+
70+
if (unionMode == UnionMode.Dense) {
71+
typeVectorIndex = vector.getOffsetBuffer().getInt(currentIndex * (long) Integer.BYTES);
72+
}
73+
else {
74+
typeVectorIndex = currentIndex;
75+
}
76+
77+
FieldVector typeVector = vector.getChildrenFromFields().get(typeIndex);
78+
79+
if (typeVector.isNull(typeVectorIndex)) {
80+
encoder.writeInt(nullTypeIndex);
81+
encoder.writeNull();
82+
}
83+
else {
84+
Producer<?> delegate = delegates[typeIndex];
85+
encoder.writeInt(typeIndex);
86+
delegate.setPosition(typeVectorIndex);
87+
delegate.produce(encoder);
88+
}
89+
}
90+
91+
currentIndex++;
92+
}
93+
94+
@Override
95+
@SuppressWarnings("unchecked")
96+
public boolean resetValueVector(UnionVector vector) {
97+
boolean result = true;
98+
for (int i = 0; i < delegates.length; i++) {
99+
Producer<FieldVector> delegate = (Producer<FieldVector>) delegates[i];
100+
result &= delegate.resetValueVector(vector.getChildrenFromFields().get(i));
101+
}
102+
return result & super.resetValueVector(vector);
103+
}
104+
105+
@Override
106+
public void close() throws Exception {
107+
for (Producer<?> delegate : delegates) {
108+
delegate.close();
109+
}
110+
super.close();
111+
}
112+
}

0 commit comments

Comments
 (0)