Skip to content

Commit ff7150a

Browse files
Update writer to match reader implementation
1 parent 53efc2d commit ff7150a

File tree

1 file changed

+44
-46
lines changed

1 file changed

+44
-46
lines changed

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,19 @@
1818
package org.apache.arrow.adapter.avro;
1919

2020
import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer;
21-
import org.apache.arrow.memory.ArrowBuf;
22-
import org.apache.arrow.memory.BufferAllocator;
2321
import org.apache.arrow.vector.VectorSchemaRoot;
24-
import org.apache.arrow.vector.compression.CompressionCodec;
25-
import org.apache.arrow.vector.compression.CompressionUtil;
2622
import org.apache.arrow.vector.dictionary.DictionaryProvider;
2723
import org.apache.avro.Schema;
24+
import org.apache.avro.file.Codec;
2825
import org.apache.avro.file.DataFileConstants;
2926
import org.apache.avro.io.BinaryEncoder;
3027
import org.apache.avro.io.Encoder;
3128
import org.apache.avro.io.EncoderFactory;
3229

30+
import java.io.ByteArrayOutputStream;
3331
import java.io.IOException;
3432
import java.io.OutputStream;
35-
import java.nio.channels.Channels;
36-
import java.nio.channels.WritableByteChannel;
33+
import java.nio.ByteBuffer;
3734
import java.nio.charset.StandardCharsets;
3835
import java.util.HashMap;
3936
import java.util.Map;
@@ -44,23 +41,20 @@ class AvroFileWriter {
4441

4542
// Use magic from Avro's own constants
4643
private static final byte[] AVRO_MAGIC = DataFileConstants.MAGIC;
47-
48-
private static final String codecName = "zstandard";
49-
private static final CompressionUtil.CodecType codecType = CompressionUtil.CodecType.ZSTD;
44+
private static final int SYNC_MARKER_SIZE = 16;
5045

5146
private final OutputStream stream;
5247
private final Encoder encoder;
5348

54-
private final BufferAllocator allocator;
55-
private final BufferOutputStream batchBuffer;
49+
private final BufferOutputStream batchStream;
5650
private BinaryEncoder batchEncoder;
5751
private VectorSchemaRoot batch;
5852

5953
private final Schema avroSchema;
6054
private final byte[] syncMarker;
6155

6256
private final CompositeAvroProducer recordProducer;
63-
private final CompressionCodec compressionCodec;
57+
private final Codec avroCodec;
6458

6559

6660
public AvroFileWriter(
@@ -69,13 +63,22 @@ public AvroFileWriter(
6963
DictionaryProvider dictionaries)
7064
throws IOException {
7165

66+
this(stream, firstBatch, dictionaries, null);
67+
}
68+
69+
public AvroFileWriter(
70+
OutputStream stream,
71+
VectorSchemaRoot firstBatch,
72+
DictionaryProvider dictionaries,
73+
String codecName)
74+
throws IOException {
75+
7276
EncoderFactory encoderFactory = EncoderFactory.get();
7377

7478
this.stream = stream;
7579
this.encoder = encoderFactory.binaryEncoder(stream, null);
7680

77-
this.allocator = firstBatch.getVector(0).getAllocator();
78-
this.batchBuffer = new BufferOutputStream(allocator);
81+
this.batchStream = new BufferOutputStream();
7982
this.batchEncoder = encoderFactory.binaryEncoder(stream, null);
8083
this.batch = firstBatch;
8184

@@ -89,43 +92,34 @@ public AvroFileWriter(
8992
firstBatch.getFieldVectors(),
9093
dictionaries);
9194

92-
this.compressionCodec = CompressionCodec.Factory.INSTANCE.createCodec(codecType);
93-
9495
// Generate a random sync marker
9596
var random = new Random();
96-
this.syncMarker = new byte[16];
97+
this.syncMarker = new byte[SYNC_MARKER_SIZE];
9798
random.nextBytes(this.syncMarker);
99+
100+
// Look up the compression codec
101+
this.avroCodec = AvroCompression.getAvroCodec(codecName);
98102
}
99103
catch (Throwable e) {
100104
// Do not leak the batch buffer if there are problems during setup
101-
batchBuffer.close();
105+
batchStream.close();
102106
throw e;
103107
}
104108
}
105109

106-
// Sets up a defaulr binary encoder for the channel
107-
public AvroFileWriter(
108-
WritableByteChannel channel,
109-
VectorSchemaRoot firstBatch,
110-
DictionaryProvider dictionaries)
111-
throws IOException {
112-
113-
this(Channels.newOutputStream(channel), firstBatch, dictionaries);
114-
}
115-
116110
// Write the Avro header (throws if already written)
117111
public void writeHeader() throws IOException {
118112

119113
// Prepare the metadata map
120114
Map<String, byte[]> metadata = new HashMap<>();
121115
metadata.put("avro.schema", avroSchema.toString().getBytes(StandardCharsets.UTF_8));
122-
metadata.put("avro.codec", codecName.getBytes(StandardCharsets.UTF_8));
116+
metadata.put("avro.codec", avroCodec.getName().getBytes(StandardCharsets.UTF_8));
123117

124118
// Avro magic
125119
encoder.writeFixed(AVRO_MAGIC);
126120

127121
// Write the metadata map
128-
encoder.writeMapStart(); // write metadata
122+
encoder.writeMapStart();
129123
encoder.setItemCount(metadata.size());
130124
for (Map.Entry<String, byte[]> entry : metadata.entrySet()) {
131125
encoder.startItem();
@@ -144,34 +138,31 @@ public void writeHeader() throws IOException {
144138
// Expects new data to be in the batch (i.e. VSR can be recycled)
145139
public void writeBatch() throws IOException {
146140

147-
// Reset batch buffer and encoder
148-
batchBuffer.reset();
149-
batchEncoder = EncoderFactory.get().directBinaryEncoder(batchBuffer, batchEncoder);
141+
// Prepare batch stream and encoder
142+
batchStream.reset();
143+
batchEncoder = EncoderFactory.get().directBinaryEncoder(batchStream, batchEncoder);
150144

151145
// Reset producers
152146
recordProducer.getProducers().forEach(producer -> producer.setPosition(0));
153147

154-
// Produce a batch
148+
// Produce a batch, writing to the batch stream (buffer)
155149
for (int row = 0; row < batch.getRowCount(); row++) {
156150
recordProducer.produce(batchEncoder);
157151
}
158152

159153
batchEncoder.flush();
160154

161-
// Raw buffer is a view onto the stream backing buffer - do not release
162-
ArrowBuf rawBuffer = batchBuffer.getBuffer();
163-
164-
// Compressed buffer is newly allocated and needs to be released
165-
try (ArrowBuf compressedBuffer = compressionCodec.compress(allocator, rawBuffer)) {
155+
// Compress the batch buffer using Avro's codecs
156+
ByteBuffer batchBuffer = ByteBuffer.wrap(batchStream.internalBuffer());
157+
ByteBuffer batchCompressed = avroCodec.compress(batchBuffer);
166158

167-
// Write Avro block to the main encoder
168-
encoder.writeLong(batch.getRowCount());
169-
encoder.writeBytes(compressedBuffer.nioBuffer());
170-
encoder.writeFixed(syncMarker);
171-
}
159+
// Write Avro block to the main encoder
160+
encoder.writeLong(batch.getRowCount());
161+
encoder.writeBytes(batchCompressed);
162+
encoder.writeFixed(syncMarker);
172163
}
173164

174-
// Reset vectors in all the producders
165+
// Reset vectors in all the producers
175166
// Supports a stream of VSRs if source VSR is not recycled
176167
void resetBatch(VectorSchemaRoot batch) {
177168
recordProducer.resetProducerVectors(batch);
@@ -187,6 +178,13 @@ public void flush() throws IOException {
187178
public void close() throws IOException {
188179
encoder.flush();
189180
stream.close();
190-
batchBuffer.close();
181+
batchStream.close();
182+
}
183+
184+
private static final class BufferOutputStream extends ByteArrayOutputStream {
185+
186+
byte[] internalBuffer() {
187+
return buf;
188+
}
191189
}
192190
}

0 commit comments

Comments
 (0)