Skip to content

Commit 4266ee6

Browse files
Add producer for map type
1 parent 660583d commit 4266ee6

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/ArrowToAvroUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.arrow.adapter.avro.producers.AvroFloatProducer;
2626
import org.apache.arrow.adapter.avro.producers.AvroIntProducer;
2727
import org.apache.arrow.adapter.avro.producers.AvroLongProducer;
28+
import org.apache.arrow.adapter.avro.producers.AvroMapProducer;
2829
import org.apache.arrow.adapter.avro.producers.AvroNullProducer;
2930
import org.apache.arrow.adapter.avro.producers.AvroNullableProducer;
3031
import org.apache.arrow.adapter.avro.producers.AvroStringProducer;
@@ -56,6 +57,7 @@
5657
import org.apache.arrow.vector.VarBinaryVector;
5758
import org.apache.arrow.vector.VarCharVector;
5859
import org.apache.arrow.vector.complex.ListVector;
60+
import org.apache.arrow.vector.complex.MapVector;
5961
import org.apache.arrow.vector.complex.StructVector;
6062
import org.apache.arrow.vector.types.Types;
6163

@@ -156,6 +158,17 @@ private static BaseAvroProducer<?> createProducer(FieldVector vector, boolean nu
156158
Producer<?> itemProducer = createProducer(itemVector, itemVector.getField().isNullable());
157159
return new AvroArraysProducer(listVector, itemProducer);
158160

161+
case MAP:
162+
163+
MapVector mapVector = (MapVector) vector;
164+
StructVector entryVector = (StructVector) mapVector.getDataVector();
165+
VarCharVector keyVector = (VarCharVector) entryVector.getChildrenFromFields().get(0);
166+
FieldVector valueVector = entryVector.getChildrenFromFields().get(1);
167+
Producer<?> keyProducer = new AvroStringProducer(keyVector);
168+
Producer<?> valueProducer = createProducer(valueVector, valueVector.getField().isNullable());
169+
Producer<?> entryProducer = new AvroStructProducer(entryVector, new Producer<?>[] {keyProducer, valueProducer});
170+
return new AvroMapProducer(mapVector, entryProducer);
171+
159172
// Not all Arrow types are supported for encoding (yet)!
160173

161174
default:
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.adapter.avro.producers;
19+
20+
import org.apache.arrow.vector.FieldVector;
21+
import org.apache.arrow.vector.complex.MapVector;
22+
import org.apache.avro.io.Encoder;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* Producer which produces map type values to avro encoder. Write the data to {@link MapVector}.
28+
*/
29+
public class AvroMapProducer extends BaseAvroProducer<MapVector> {
30+
31+
private final Producer<? extends FieldVector> delegate;
32+
33+
/** Instantiate a AvroMapProducer. */
34+
public AvroMapProducer(MapVector vector, Producer<? extends FieldVector> delegate) {
35+
super(vector);
36+
this.delegate = delegate;
37+
}
38+
39+
@Override
40+
public void produce(Encoder encoder) throws IOException {
41+
42+
int startOffset = vector.getOffsetBuffer().getInt(currentIndex * (long) Integer.BYTES);
43+
int endOffset = vector.getOffsetBuffer().getInt((currentIndex + 1) * (long) Integer.BYTES);
44+
int nEntries = endOffset - startOffset;
45+
46+
encoder.writeMapStart();
47+
encoder.setItemCount(nEntries);
48+
49+
for (int i = 0; i < nEntries; i++) {
50+
encoder.startItem();
51+
delegate.produce(encoder);
52+
}
53+
54+
encoder.writeMapEnd();
55+
currentIndex++;
56+
}
57+
58+
@Override
59+
@SuppressWarnings("unchecked")
60+
public boolean resetValueVector(MapVector vector) {
61+
((Producer<FieldVector>) delegate).resetValueVector(vector.getDataVector());
62+
return super.resetValueVector(vector);
63+
}
64+
}

0 commit comments

Comments
 (0)