Skip to content

Commit 90c72f4

Browse files
Revise logic for decimal producers
1 parent 3b7fa50 commit 90c72f4

File tree

2 files changed

+44
-6
lines changed

2 files changed

+44
-6
lines changed

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/logical/AvroDecimal256Producer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,32 @@
1616
*/
1717
package org.apache.arrow.adapter.avro.producers.logical;
1818

19-
import org.apache.arrow.adapter.avro.producers.AvroFixedSizeBinaryProducer;
19+
import org.apache.arrow.adapter.avro.producers.BaseAvroProducer;
2020
import org.apache.arrow.vector.Decimal256Vector;
21+
import org.apache.avro.io.Encoder;
22+
23+
import java.io.IOException;
24+
import java.math.BigDecimal;
2125

2226
/**
2327
* Producer that produces decimal values from a {@link Decimal256Vector}, writes data to an Avro
2428
* encoder.
2529
*/
26-
public class AvroDecimal256Producer extends AvroFixedSizeBinaryProducer {
30+
public class AvroDecimal256Producer extends BaseAvroProducer<Decimal256Vector> {
31+
32+
// Logic is the same as for DecimalVector (128 bit)
2733

28-
// Decimal stored as fixed width bytes, matches Avro decimal encoding
34+
byte[] encodedBytes = new byte[Decimal256Vector.TYPE_WIDTH];
2935

3036
/** Instantiate an AvroDecimalProducer. */
3137
public AvroDecimal256Producer(Decimal256Vector vector) {
3238
super(vector);
3339
}
40+
41+
@Override
42+
public void produce(Encoder encoder) throws IOException {
43+
BigDecimal value = vector.getObject(currentIndex++);
44+
AvroDecimalProducer.encodeDecimal(value, encodedBytes);
45+
encoder.writeFixed(encodedBytes);
46+
}
3447
}

adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/logical/AvroDecimalProducer.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,44 @@
1616
*/
1717
package org.apache.arrow.adapter.avro.producers.logical;
1818

19-
import org.apache.arrow.adapter.avro.producers.AvroFixedSizeBinaryProducer;
19+
import org.apache.arrow.adapter.avro.producers.BaseAvroProducer;
2020
import org.apache.arrow.vector.DecimalVector;
21+
import org.apache.arrow.vector.util.DecimalUtility;
22+
import org.apache.avro.io.Encoder;
23+
24+
import java.io.IOException;
25+
import java.math.BigDecimal;
2126

2227
/**
2328
* Producer that produces decimal values from a {@link DecimalVector}, writes data to an Avro
2429
* encoder.
2530
*/
26-
public class AvroDecimalProducer extends AvroFixedSizeBinaryProducer {
31+
public class AvroDecimalProducer extends BaseAvroProducer<DecimalVector> {
32+
33+
// Arrow stores decimals with native endianness, but Avro requires big endian
34+
// Writing the Arrow representation as fixed bytes fails on little-end machines
35+
// Instead, we replicate the big endian logic explicitly here
36+
// See DecimalUtility.writeByteArrayToArrowBufHelper
2737

28-
// Decimal stored as fixed width bytes, matches Avro decimal encoding
38+
byte[] encodedBytes = new byte[DecimalVector.TYPE_WIDTH];
2939

3040
/** Instantiate an AvroDecimalProducer. */
3141
public AvroDecimalProducer(DecimalVector vector) {
3242
super(vector);
3343
}
44+
45+
@Override
46+
public void produce(Encoder encoder) throws IOException {
47+
// Use getObject() to go back to a BigDecimal then re-encode
48+
BigDecimal value = vector.getObject(currentIndex++);
49+
encodeDecimal(value, encodedBytes);
50+
encoder.writeFixed(encodedBytes);
51+
}
52+
53+
static void encodeDecimal(BigDecimal value, byte[] encodedBytes) {
54+
byte[] valueBytes = value.unscaledValue().toByteArray();
55+
byte[] padding = valueBytes[0] < 0 ? DecimalUtility.minus_one : DecimalUtility.zeroes;
56+
System.arraycopy(padding, 0, encodedBytes, 0, encodedBytes.length - valueBytes.length);
57+
System.arraycopy(valueBytes, 0, encodedBytes, encodedBytes.length - valueBytes.length, valueBytes.length);
58+
}
3459
}

0 commit comments

Comments
 (0)