Skip to content

Commit 389faf3

Browse files
First draft of AvroFileWriter
1 parent 45295a5 commit 389faf3

File tree

2 files changed

+287
-0
lines changed

2 files changed

+287
-0
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.adapter.avro;
19+
20+
import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer;
21+
import org.apache.arrow.memory.ArrowBuf;
22+
import org.apache.arrow.memory.BufferAllocator;
23+
import org.apache.arrow.vector.VectorSchemaRoot;
24+
import org.apache.arrow.vector.compression.CompressionCodec;
25+
import org.apache.arrow.vector.compression.CompressionUtil;
26+
import org.apache.arrow.vector.dictionary.DictionaryProvider;
27+
import org.apache.avro.Schema;
28+
import org.apache.avro.file.DataFileConstants;
29+
import org.apache.avro.io.BinaryEncoder;
30+
import org.apache.avro.io.Encoder;
31+
import org.apache.avro.io.EncoderFactory;
32+
33+
import java.io.IOException;
34+
import java.io.OutputStream;
35+
import java.nio.channels.Channels;
36+
import java.nio.channels.WritableByteChannel;
37+
import java.nio.charset.StandardCharsets;
38+
import java.util.HashMap;
39+
import java.util.Map;
40+
import java.util.Random;
41+
42+
43+
class AvroFileWriter {
44+
45+
// Use magic from Avro's own constants
46+
private static final byte[] AVRO_MAGIC = DataFileConstants.MAGIC;
47+
48+
private static final String codecName = "zstandard";
49+
private static final CompressionUtil.CodecType codecType = CompressionUtil.CodecType.ZSTD;
50+
51+
private final OutputStream stream;
52+
private final Encoder encoder;
53+
54+
private final BufferAllocator allocator;
55+
private final BufferOutputStream batchBuffer;
56+
private BinaryEncoder batchEncoder;
57+
private VectorSchemaRoot batch;
58+
59+
private final Schema avroSchema;
60+
private final byte[] syncMarker;
61+
62+
private final CompositeAvroProducer recordProducer;
63+
private final CompressionCodec compressionCodec;
64+
65+
66+
public AvroFileWriter(
67+
OutputStream stream,
68+
VectorSchemaRoot firstBatch,
69+
DictionaryProvider dictionaries)
70+
throws IOException {
71+
72+
EncoderFactory encoderFactory = EncoderFactory.get();
73+
74+
this.stream = stream;
75+
this.encoder = encoderFactory.binaryEncoder(stream, null);
76+
77+
this.allocator = firstBatch.getVector(0).getAllocator();
78+
this.batchBuffer = new BufferOutputStream(allocator);
79+
this.batchEncoder = encoderFactory.binaryEncoder(stream, null);
80+
this.batch = firstBatch;
81+
82+
try {
83+
84+
this.avroSchema = ArrowToAvroUtils.createAvroSchema(
85+
firstBatch.getSchema().getFields(),
86+
dictionaries);
87+
88+
this.recordProducer = ArrowToAvroUtils.createCompositeProducer(
89+
firstBatch.getFieldVectors(),
90+
dictionaries);
91+
92+
this.compressionCodec = CompressionCodec.Factory.INSTANCE.createCodec(codecType);
93+
94+
// Generate a random sync marker
95+
var random = new Random();
96+
this.syncMarker = new byte[16];
97+
random.nextBytes(this.syncMarker);
98+
}
99+
catch (Throwable e) {
100+
// Do not leak the batch buffer if there are problems during setup
101+
batchBuffer.close();
102+
throw e;
103+
}
104+
}
105+
106+
// Sets up a defaulr binary encoder for the channel
107+
public AvroFileWriter(
108+
WritableByteChannel channel,
109+
VectorSchemaRoot firstBatch,
110+
DictionaryProvider dictionaries)
111+
throws IOException {
112+
113+
this(Channels.newOutputStream(channel), firstBatch, dictionaries);
114+
}
115+
116+
// Write the Avro header (throws if already written)
117+
public void writeHeader() throws IOException {
118+
119+
// Prepare the metadata map
120+
Map<String, byte[]> metadata = new HashMap<>();
121+
metadata.put("avro.schema", avroSchema.toString().getBytes(StandardCharsets.UTF_8));
122+
metadata.put("avro.codec", codecName.getBytes(StandardCharsets.UTF_8));
123+
124+
// Avro magic
125+
encoder.writeFixed(AVRO_MAGIC);
126+
127+
// Write the metadata map
128+
encoder.writeMapStart(); // write metadata
129+
encoder.setItemCount(metadata.size());
130+
for (Map.Entry<String, byte[]> entry : metadata.entrySet()) {
131+
encoder.startItem();
132+
encoder.writeString(entry.getKey());
133+
encoder.writeBytes(entry.getValue());
134+
}
135+
encoder.writeMapEnd();
136+
137+
// Sync marker denotes end of the header
138+
encoder.writeFixed(this.syncMarker);
139+
encoder.flush();
140+
}
141+
142+
// Write the contents of the VSR as an Avro data block
143+
// Writes header if not yet written
144+
// Expects new data to be in the batch (i.e. VSR can be recycled)
145+
public void writeBatch() throws IOException {
146+
147+
// Reset batch buffer and encoder
148+
batchBuffer.reset();
149+
batchEncoder = EncoderFactory.get().directBinaryEncoder(batchBuffer, batchEncoder);
150+
151+
// Reset producers
152+
recordProducer.getProducers().forEach(producer -> producer.setPosition(0));
153+
154+
// Produce a batch
155+
for (int row = 0; row < batch.getRowCount(); row++) {
156+
recordProducer.produce(batchEncoder);
157+
}
158+
159+
batchEncoder.flush();
160+
161+
// Raw buffer is a view onto the stream backing buffer - do not release
162+
ArrowBuf rawBuffer = batchBuffer.getBuffer();
163+
164+
// Compressed buffer is newly allocated and needs to be released
165+
try (ArrowBuf compressedBuffer = compressionCodec.compress(allocator, rawBuffer)) {
166+
167+
// Write Avro block to the main encoder
168+
encoder.writeLong(batch.getRowCount());
169+
encoder.writeBytes(compressedBuffer.nioBuffer());
170+
encoder.writeFixed(syncMarker);
171+
}
172+
}
173+
174+
// Reset vectors in all the producders
175+
// Supports a stream of VSRs if source VSR is not recycled
176+
void resetBatch(VectorSchemaRoot batch) {
177+
recordProducer.resetProducerVectors(batch);
178+
this.batch = batch;
179+
}
180+
181+
public void flush() throws IOException {
182+
encoder.flush();
183+
}
184+
185+
// Closes encoder and / or channel
186+
// Does not close VSR or dictionary vectors
187+
public void close() throws IOException {
188+
encoder.flush();
189+
stream.close();
190+
batchBuffer.close();
191+
}
192+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.adapter.avro;
19+
20+
import org.apache.arrow.memory.ArrowBuf;
21+
import org.apache.arrow.memory.BufferAllocator;
22+
import org.apache.arrow.memory.util.MemoryUtil;
23+
24+
import java.io.IOException;
25+
import java.io.OutputStream;
26+
27+
public class BufferOutputStream extends OutputStream {
28+
29+
private static final int INITIAL_BUFFER_SIZE = 4096;
30+
31+
private ArrowBuf buffer;
32+
33+
public BufferOutputStream(BufferAllocator allocator) {
34+
buffer = allocator.buffer(INITIAL_BUFFER_SIZE);
35+
}
36+
37+
public ArrowBuf getBuffer() {
38+
return buffer.slice(0, buffer.writerIndex());
39+
}
40+
41+
@Override
42+
public void write(int b) throws IOException {
43+
ensureCapacity(buffer.capacity() + 1);
44+
buffer.writeByte(b);
45+
}
46+
47+
@Override
48+
public void write(byte[] b) throws IOException {
49+
ensureCapacity(buffer.capacity() + b.length);
50+
buffer.writeBytes(b);
51+
}
52+
53+
@Override
54+
public void write(byte[] b, int off, int len) throws IOException {
55+
ensureCapacity(buffer.capacity() + len);
56+
buffer.writeBytes(b, off, len);
57+
}
58+
59+
@Override
60+
public void flush() {
61+
// no-op
62+
}
63+
64+
public void reset() {
65+
buffer.clear();
66+
}
67+
68+
@Override
69+
public void close() throws IOException {
70+
buffer.close();
71+
}
72+
73+
private void ensureCapacity(long capacity) {
74+
75+
if (capacity > buffer.capacity()) {
76+
77+
long newCapacity = Math.max(capacity, buffer.capacity() * 2);
78+
79+
BufferAllocator allocator = buffer.getReferenceManager().getAllocator();
80+
ArrowBuf newBuffer = allocator.buffer(newCapacity);
81+
82+
try {
83+
84+
MemoryUtil.copyMemory(buffer.memoryAddress(), newBuffer.memoryAddress(), buffer.writerIndex());
85+
86+
buffer.close();
87+
buffer = newBuffer;
88+
}
89+
catch (Throwable t) {
90+
newBuffer.close();
91+
throw t;
92+
}
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)