Skip to content

Commit fdf7043

Browse files
authored
Merge pull request #302 from passuied/feature/fix-decimal-encoding
Fixed decimal encoding in a backwards compatible way
2 parents d289aa2 + 38ca4e1 commit fdf7043

File tree

3 files changed

+351
-17
lines changed

3 files changed

+351
-17
lines changed

codec.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ type CodecOption struct {
4949
// When true, the string literal "null" in textual Avro data will be coerced to Go's nil.
5050
// Primarily used to handle edge cases where some Avro implementations allow string representations of null.
5151
EnableStringNull bool
52+
53+
// EnableDecimalBinarySpecCompliantEncoding controls whether decimal values use
54+
// Avro 1.10.2 spec-compliant encoding. When true:
55+
// - Binary encoding uses two's-complement representation of the unscaled integer
56+
// - JSON textual encoding uses human-readable decimal strings like "40.20"
57+
// When false (default), legacy encoding is used for backwards compatibility.
58+
// Default: false (legacy encoding for backwards compatibility)
59+
EnableDecimalBinarySpecCompliantEncoding bool
5260
}
5361

5462
// Codec supports decoding binary and text Avro data to Go native data types,
@@ -82,7 +90,8 @@ type codecBuilder struct {
8290
// DefaultCodecOption returns a CodecOption with recommended default settings.
8391
func DefaultCodecOption() *CodecOption {
8492
return &CodecOption{
85-
EnableStringNull: true,
93+
EnableStringNull: true,
94+
EnableDecimalBinarySpecCompliantEncoding: false,
8695
}
8796
}
8897

@@ -739,9 +748,9 @@ func buildCodecForTypeDescribedByString(st map[string]*Codec, enclosingNamespace
739748
case "record":
740749
return makeRecordCodec(st, enclosingNamespace, schemaMap, cb)
741750
case "bytes.decimal":
742-
return makeDecimalBytesCodec(st, enclosingNamespace, schemaMap)
751+
return makeDecimalBytesCodec(st, enclosingNamespace, schemaMap, cb)
743752
case "fixed.decimal":
744-
return makeDecimalFixedCodec(st, enclosingNamespace, schemaMap)
753+
return makeDecimalFixedCodec(st, enclosingNamespace, schemaMap, cb)
745754
case "string.validated-string":
746755
return makeValidatedStringCodec(st, enclosingNamespace, schemaMap)
747756
default:

logical_type.go

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func precisionAndScaleFromSchemaMap(schemaMap map[string]interface{}) (int, int,
258258

259259
var one = big.NewInt(1)
260260

261-
func makeDecimalBytesCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
261+
func makeDecimalBytesCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) {
262262
precision, scale, err := precisionAndScaleFromSchemaMap(schemaMap)
263263
if err != nil {
264264
return nil, err
@@ -275,14 +275,27 @@ func makeDecimalBytesCodec(st map[string]*Codec, enclosingNamespace string, sche
275275
decimalSearchType := fmt.Sprintf("bytes.decimal.%d.%d", precision, scale)
276276
st[decimalSearchType] = c
277277

278-
c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, precision, scale)
279-
c.textualFromNative = decimalBytesFromNative(bytesTextualFromNative, toSignedBytes, precision, scale)
280-
c.nativeFromBinary = nativeFromDecimalBytes(bytesNativeFromBinary, precision, scale)
281-
c.nativeFromTextual = nativeFromDecimalBytes(bytesNativeFromTextual, precision, scale)
278+
// Check if spec-compliant encoding is enabled
279+
specCompliant := cb != nil && cb.option != nil && cb.option.EnableDecimalBinarySpecCompliantEncoding
280+
281+
if specCompliant {
282+
// Spec-compliant encoding: two's complement binary, human-readable textual
283+
c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, precision, scale)
284+
c.textualFromNative = decimalTextualFromNative(scale)
285+
c.nativeFromBinary = nativeFromDecimalBytes(bytesNativeFromBinary, scale)
286+
c.nativeFromTextual = nativeFromDecimalTextual()
287+
} else {
288+
// Legacy encoding (default): for backwards compatibility
289+
c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, precision, scale)
290+
c.textualFromNative = decimalBytesFromNative(bytesTextualFromNative, toSignedBytes, precision, scale)
291+
c.nativeFromBinary = nativeFromDecimalBytes(bytesNativeFromBinary, scale)
292+
c.nativeFromTextual = nativeFromDecimalBytes(bytesNativeFromTextual, scale)
293+
}
282294
return c, nil
283295
}
284296

285-
func nativeFromDecimalBytes(fn toNativeFn, precision, scale int) toNativeFn {
297+
// nativeFromDecimalBytes decodes bytes to *big.Rat using two's-complement representation.
298+
func nativeFromDecimalBytes(fn toNativeFn, scale int) toNativeFn {
286299
return func(bytes []byte) (interface{}, []byte, error) {
287300
d, b, err := fn(bytes)
288301
if err != nil {
@@ -292,15 +305,16 @@ func nativeFromDecimalBytes(fn toNativeFn, precision, scale int) toNativeFn {
292305
if !ok {
293306
return nil, bytes, fmt.Errorf("cannot transform to native decimal, expected []byte, received %T", d)
294307
}
308+
309+
// Two's-complement decoding
295310
num := big.NewInt(0)
296311
fromSignedBytes(num, bs)
297312
denom := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
298-
r := new(big.Rat).SetFrac(num, denom)
299-
return r, b, nil
313+
return new(big.Rat).SetFrac(num, denom), b, nil
300314
}
301315
}
302316

303-
func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, precision, scale int) fromNativeFn {
317+
func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, _, scale int) fromNativeFn {
304318
return func(b []byte, d interface{}) ([]byte, error) {
305319
r, ok := d.(*big.Rat)
306320
if !ok {
@@ -320,7 +334,36 @@ func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, prec
320334
}
321335
}
322336

323-
func makeDecimalFixedCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
337+
// decimalTextualFromNative encodes a *big.Rat to a JSON string representation
338+
// like "40.20" according to the Avro 1.10.2 spec.
339+
func decimalTextualFromNative(scale int) fromNativeFn {
340+
return func(b []byte, d interface{}) ([]byte, error) {
341+
r, ok := d.(*big.Rat)
342+
if !ok {
343+
return nil, fmt.Errorf("cannot transform to textual decimal, expected *big.Rat, received %T", d)
344+
}
345+
// Format as decimal string with proper scale
346+
return stringTextualFromNative(b, r.FloatString(scale))
347+
}
348+
}
349+
350+
// nativeFromDecimalTextual decodes a JSON string like "40.20" to a *big.Rat
351+
// according to the Avro 1.10.2 spec.
352+
func nativeFromDecimalTextual() toNativeFn {
353+
return func(buf []byte) (interface{}, []byte, error) {
354+
s, remaining, err := stringNativeFromTextual(buf)
355+
if err != nil {
356+
return nil, nil, fmt.Errorf("cannot decode textual decimal: %s", err)
357+
}
358+
r := new(big.Rat)
359+
if _, ok := r.SetString(s.(string)); !ok {
360+
return nil, nil, fmt.Errorf("cannot parse decimal string: %q", s)
361+
}
362+
return r, remaining, nil
363+
}
364+
}
365+
366+
func makeDecimalFixedCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) {
324367
precision, scale, err := precisionAndScaleFromSchemaMap(schemaMap)
325368
if err != nil {
326369
return nil, err
@@ -336,10 +379,23 @@ func makeDecimalFixedCodec(st map[string]*Codec, enclosingNamespace string, sche
336379
if err != nil {
337380
return nil, err
338381
}
339-
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), precision, scale)
340-
c.textualFromNative = decimalBytesFromNative(c.textualFromNative, toSignedFixedBytes(size), precision, scale)
341-
c.nativeFromBinary = nativeFromDecimalBytes(c.nativeFromBinary, precision, scale)
342-
c.nativeFromTextual = nativeFromDecimalBytes(c.nativeFromTextual, precision, scale)
382+
383+
// Check if spec-compliant encoding is enabled
384+
specCompliant := cb != nil && cb.option != nil && cb.option.EnableDecimalBinarySpecCompliantEncoding
385+
386+
if specCompliant {
387+
// Spec-compliant encoding: two's complement binary, human-readable textual
388+
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), precision, scale)
389+
c.textualFromNative = decimalTextualFromNative(scale)
390+
c.nativeFromBinary = nativeFromDecimalBytes(c.nativeFromBinary, scale)
391+
c.nativeFromTextual = nativeFromDecimalTextual()
392+
} else {
393+
// Legacy encoding (default): for backwards compatibility
394+
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), precision, scale)
395+
c.textualFromNative = decimalBytesFromNative(c.textualFromNative, toSignedFixedBytes(size), precision, scale)
396+
c.nativeFromBinary = nativeFromDecimalBytes(c.nativeFromBinary, scale)
397+
c.nativeFromTextual = nativeFromDecimalBytes(c.nativeFromTextual, scale)
398+
}
343399
return c, nil
344400
}
345401

0 commit comments

Comments
 (0)