Skip to content

Commit e7da504

Browse files
author
Martin Traverse
committed
Thoughts
1 parent b028352 commit e7da504

File tree

2 files changed

+200
-0
lines changed

2 files changed

+200
-0
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.adapter.avro;
19+
20+
import org.apache.arrow.adapter.avro.producers.*;
21+
import org.apache.arrow.adapter.avro.producers.logical.*;
22+
import org.apache.arrow.util.Preconditions;
23+
import org.apache.arrow.vector.*;
24+
import org.apache.arrow.vector.types.pojo.Field;
25+
import org.apache.avro.LogicalType;
26+
import org.apache.avro.LogicalTypes;
27+
import org.apache.avro.Schema;
28+
import org.apache.avro.SchemaBuilder;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.Set;
33+
34+
public class ArrowToAvroUtils {
35+
36+
static Schema convertSchema(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
37+
38+
SchemaBuilder.FieldAssembler<?> fieldAssembler = SchemaBuilder.record("").fields();
39+
40+
for (Field arrowField : arrowSchema.getFields()) {
41+
fieldAssembler = convertFieldSchema(fieldAssembler, arrowField);
42+
}
43+
44+
return fieldAssembler.requiredBytes().endRecord();
45+
}
46+
47+
static SchemaBuilder.FieldAssembler<?> convertFieldSchema(
48+
SchemaBuilder.FieldAssembler<?> builder,
49+
Field arrowField) {
50+
51+
return builder.
52+
}
53+
54+
static CompositeAvroProducer createCompositeProducer(Schema schema) {
55+
56+
Preconditions.checkNotNull(schema, "Avro schema object can't be null");
57+
58+
List<Producer<?>> producers = new ArrayList<>();
59+
final Set<String> skipFieldNames = Set.of(); // TODO: Is this needed?
60+
61+
Schema.Type type = schema.getType();
62+
if (type == Schema.Type.RECORD) {
63+
for (Schema.Field field : schema.getFields()) {
64+
if (skipFieldNames.contains(field.name())) {
65+
// producers.add(createSkipConsumer(field.schema())); // TODO
66+
} else {
67+
Producer<?> producer = createProducer(field.schema(), field.name(), config);
68+
producers.add(producer);
69+
}
70+
}
71+
} else {
72+
Producer<?> producer = createProducer(schema, "", config);
73+
producers.add(producer);
74+
}
75+
76+
return new CompositeAvroProducer(producers);
77+
}
78+
79+
80+
private static BaseAvroProducer<?> createProducer(Schema schema, FieldVector vector) {
81+
82+
Preconditions.checkNotNull(schema, "Avro schema object can't be null");
83+
84+
final Schema.Type type = schema.getType();
85+
final LogicalType logicalType = schema.getLogicalType();
86+
87+
switch (type) {
88+
case UNION:
89+
return createUnionProducer(schema, vector);
90+
case ARRAY:
91+
return createArrayProducer(schema, vector);
92+
case MAP:
93+
return createMapProducer(schema, vector);
94+
case RECORD:
95+
return createStructProducer(schema, vector);
96+
case ENUM:
97+
return new AvroEnumProducer((VarCharVector) vector);
98+
case STRING:
99+
return new AvroStringProducer((VarCharVector) vector);
100+
case FIXED:
101+
if (logicalType instanceof LogicalTypes.Decimal) {
102+
return new AvroDecimalProducer.FixedDecimalProducer((DecimalVector) vector, schema.getFixedSize());
103+
} else {
104+
return new AvroFixedProducer((FixedSizeBinaryVector) vector, schema.getFixedSize());
105+
}
106+
case INT:
107+
if (logicalType instanceof LogicalTypes.Date) {
108+
return new AvroDateProducer((DateDayVector) vector);
109+
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
110+
return new AvroTimeMillisProducer((TimeMilliVector) vector);
111+
} else {
112+
return new AvroIntProducer((IntVector) vector);
113+
}
114+
case BOOLEAN:
115+
return new AvroBooleanProducer((BitVector) vector);
116+
case LONG:
117+
if (logicalType instanceof LogicalTypes.TimeMicros) {
118+
return new AvroTimeMicroProducer((TimeMicroVector) vector);
119+
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
120+
return new AvroTimestampMillisProducer((TimeStampMilliVector) vector);
121+
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
122+
return new AvroTimestampMicroProducer((TimeStampMicroVector) vector);
123+
} else {
124+
return new AvroLongProducer((BigIntVector) vector);
125+
}
126+
case FLOAT:
127+
return new AvroFloatProducer((Float4Vector) vector);
128+
case DOUBLE:
129+
return new AvroDoubleProducer((Float8Vector) vector);
130+
case BYTES:
131+
if (logicalType instanceof LogicalTypes.Decimal) {
132+
return new AvroDecimalProducer.BytesDecimalProducer((DecimalVector) vector);
133+
} else {
134+
return new AvroBytesProducer((VarBinaryVector) vector);
135+
}
136+
case NULL:
137+
return new AvroNullProducer((NullVector) vector);
138+
default:
139+
throw new UnsupportedOperationException("Unsupported vector type: " + type.getName());
140+
}
141+
}
142+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.adapter.avro.producers;
19+
20+
import java.io.IOException;
21+
import org.apache.arrow.vector.complex.ListVector;
22+
import org.apache.avro.io.Encoder;
23+
24+
/**
25+
* Producer which produces array type values to an Avro encoder.
26+
* Writes the data from a {@link ListVector}.
27+
*/
28+
public class AvroArrayProducer extends BaseAvroProducer<ListVector> {
29+
30+
private final BaseAvroProducer<?> delegate;
31+
32+
/** Instantiate an ArrayProducer. */
33+
public AvroArrayProducer(ListVector vector, BaseAvroProducer<?> delegate) {
34+
super(vector);
35+
this.delegate = delegate;
36+
}
37+
38+
@Override
39+
public void produce(Encoder encoder) throws IOException {
40+
41+
encoder.writeArrayStart();
42+
encoder.setItemCount(vector.getValueCount());
43+
44+
for (int i = 0; i < vector.getValueCount(); i++) {
45+
encoder.startItem();
46+
delegate.produce(encoder);
47+
}
48+
49+
encoder.writeArrayEnd();
50+
currentIndex++;
51+
}
52+
53+
@Override
54+
public boolean resetValueVector(ListVector vector) {
55+
this.delegate.resetValueVector(vector.getDataVector());
56+
return super.resetValueVector(vector);
57+
}
58+
}

0 commit comments

Comments
 (0)