Skip to content

Commit cf712e0

Browse files
authored
Handle Mongo dates outside of RFC3339 (#3527)
Go is doing strict validation of datetimes and leaving us with `[qrep] failed to pull records: failed to convert record: error marshalling document: Time.MarshalJSON: year outside of range [0,9999]'` On destination side it's a string anyway, and CH has toDateTimeOrNull to filter those out if needed
1 parent 8e31f94 commit cf712e0

File tree

3 files changed

+71
-54
lines changed

3 files changed

+71
-54
lines changed

flow/connectors/mongo/codec.go

Lines changed: 26 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"math"
55
"reflect"
66
"strconv"
7+
"time"
78
"unsafe"
89

910
jsoniter "github.com/json-iterator/go"
@@ -40,26 +41,8 @@ func (codec *BsonDCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
4041
if i > 0 {
4142
stream.WriteMore()
4243
}
43-
4444
stream.WriteObjectField(elem.Key)
45-
46-
switch v := elem.Value.(type) {
47-
case float64:
48-
if math.IsNaN(v) {
49-
stream.WriteString("NaN")
50-
} else if math.IsInf(v, 1) {
51-
stream.WriteString("+Inf")
52-
} else if math.IsInf(v, -1) {
53-
stream.WriteString("-Inf")
54-
} else {
55-
writeFloat64WithExplicitDecimal(v, stream)
56-
}
57-
default:
58-
// Delegate to json-iterator's encoding system for all other types.
59-
// This maintains extension behavior: if elem.Value is another bson.D,
60-
// it will recursively use this custom encoder for proper NaN/Inf handling.
61-
stream.WriteVal(elem.Value)
62-
}
45+
encodeCustom(elem.Value, stream)
6346
}
6447
stream.WriteObjectEnd()
6548
}
@@ -83,23 +66,7 @@ func (codec *BsonACodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
8366
if i > 0 {
8467
stream.WriteMore()
8568
}
86-
switch v := elem.(type) {
87-
case float64:
88-
if math.IsNaN(v) {
89-
stream.WriteString("NaN")
90-
} else if math.IsInf(v, 1) {
91-
stream.WriteString("+Inf")
92-
} else if math.IsInf(v, -1) {
93-
stream.WriteString("-Inf")
94-
} else {
95-
writeFloat64WithExplicitDecimal(v, stream)
96-
}
97-
default:
98-
// Delegate to json-iterator's encoding system for all other types.
99-
// This maintains extension behavior: if elem.Value is another bson.A,
100-
// it will recursively use this custom encoder for proper NaN/Inf handling.
101-
stream.WriteVal(elem)
102-
}
69+
encodeCustom(elem, stream)
10370
}
10471
stream.WriteArrayEnd()
10572
}
@@ -113,6 +80,29 @@ func (codec *BsonACodec) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
11380
// Not used
11481
}
11582

83+
func encodeCustom(value any, stream *jsoniter.Stream) {
84+
switch v := value.(type) {
85+
case float64:
86+
if math.IsNaN(v) {
87+
stream.WriteString("NaN")
88+
} else if math.IsInf(v, 1) {
89+
stream.WriteString("+Inf")
90+
} else if math.IsInf(v, -1) {
91+
stream.WriteString("-Inf")
92+
} else {
93+
writeFloat64WithExplicitDecimal(v, stream)
94+
}
95+
case bson.DateTime:
96+
value := v.Time().UTC().Format(time.RFC3339Nano)
97+
stream.WriteString(value)
98+
default:
99+
// Delegate to json-iterator's encoding system for all other types.
100+
// This maintains extension behavior: if the value is another bson.A/D,
101+
// it will recursively use this custom encoder for proper NaN/Inf/date handling.
102+
stream.WriteVal(value)
103+
}
104+
}
105+
116106
func CreateExtendedJSONMarshaler() jsoniter.API {
117107
config := jsoniter.ConfigCompatibleWithStandardLibrary
118108
config.RegisterExtension(&BsonExtension{})

flow/connectors/mongo/codec_test.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,28 @@ func TestMarshalDocument(t *testing.T) {
179179
expected: `{"a":"-Inf"}`,
180180
},
181181

182+
// DateTime
183+
{
184+
desc: "bson.DateTime",
185+
input: bson.D{{Key: "date", Value: bson.DateTime(1672531200000)}}, // 2023-01-01 00:00:00 UTC
186+
expected: `{"date":"2023-01-01T00:00:00Z"}`,
187+
},
188+
{
189+
desc: "bson.DateTime out of RFC3339 range",
190+
input: bson.D{{Key: "date", Value: bson.DateTime(253433923200000)}}, // 10001-01-01 00:00:00 UTC
191+
expected: `{"date":"10001-01-01T00:00:00Z"}`,
192+
},
193+
{
194+
desc: "bson.DateTime max",
195+
input: bson.D{{Key: "date", Value: bson.DateTime(math.MaxInt64)}},
196+
expected: `{"date":"292278994-08-17T07:12:55.807Z"}`,
197+
},
198+
{
199+
desc: "bson.DateTime max",
200+
input: bson.D{{Key: "date", Value: bson.DateTime(math.MinInt64)}},
201+
expected: `{"date":"-292275055-05-16T16:47:04.192Z"}`,
202+
},
203+
182204
// Null values
183205
{
184206
desc: "nil",
@@ -222,21 +244,21 @@ func TestMarshalDocument(t *testing.T) {
222244
expected: `{"a":[1,"str",true]}`,
223245
},
224246
{
225-
desc: "bson.A special float",
226-
input: bson.D{{Key: "a", Value: bson.A{math.NaN(), math.Inf(1), math.Inf(-1)}}},
227-
expected: `{"a":["NaN","+Inf","-Inf"]}`,
247+
desc: "bson.A special values",
248+
input: bson.D{{Key: "a", Value: bson.A{math.NaN(), math.Inf(1), math.Inf(-1), bson.DateTime(math.MaxInt64)}}},
249+
expected: `{"a":["NaN","+Inf","-Inf","292278994-08-17T07:12:55.807Z"]}`,
228250
},
229251
{
230-
desc: "bson.A nested special float",
252+
desc: "bson.A nested special values",
231253
input: bson.D{{Key: "nested", Value: bson.A{
232254
bson.D{{Key: "inner1", Value: bson.A{
233255
bson.D{{Key: "inner_inner", Value: bson.A{
234-
math.NaN(), math.Inf(1), math.Inf(-1),
256+
math.NaN(), math.Inf(1), math.Inf(-1), bson.DateTime(math.MaxInt64),
235257
}}},
236258
}}},
237259
bson.D{{Key: "inner2", Value: 1.23}},
238260
}}},
239-
expected: `{"nested":[{"inner1":[{"inner_inner":["NaN","+Inf","-Inf"]}]},{"inner2":1.23}]}`,
261+
expected: `{"nested":[{"inner1":[{"inner_inner":["NaN","+Inf","-Inf","292278994-08-17T07:12:55.807Z"]}]},{"inner2":1.23}]}`,
240262
},
241263
{
242264
desc: "complex nested bson.A",
@@ -273,11 +295,6 @@ func TestMarshalDocument(t *testing.T) {
273295
}(),
274296
expected: `{"id":"507f1f77bcf86cd799439011"}`,
275297
},
276-
{
277-
desc: "bson.DateTime",
278-
input: bson.D{{Key: "date", Value: bson.DateTime(1672531200000)}}, // 2023-01-01 00:00:00 UTC
279-
expected: `{"date":"2023-01-01T00:00:00Z"}`,
280-
},
281298
{
282299
desc: "bson.JavaScript",
283300
input: bson.D{{Key: "js", Value: bson.JavaScript("function() { return 42; }")}},

flow/e2e/mongo_test.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -611,9 +611,16 @@ func (s MongoClickhouseSuite) Test_Json_Types() {
611611
{Key: "nan", Value: math.NaN()},
612612
{Key: "pos_inf", Value: math.Inf(1)},
613613
{Key: "neg_inf", Value: math.Inf(-1)},
614-
// Arrays with special floats
614+
615+
// Datetime
616+
{Key: "date_time", Value: bson.DateTime(1672531200000)}, // 2023-01-01 00:00:00 UTC
617+
{Key: "date_time_outside_rfc3339", Value: bson.DateTime(253433923200000)}, // 10001-01-01 00:00:00 UTC
618+
{Key: "date_time_max", Value: bson.DateTime(math.MaxInt64)},
619+
{Key: "date_time_min", Value: bson.DateTime(math.MinInt64)},
620+
621+
// Arrays with special values
615622
{Key: "array_mixed", Value: bson.A{1, "str", true}},
616-
{Key: "array_special_floats", Value: bson.A{math.NaN(), math.Inf(1), math.Inf(-1)}},
623+
{Key: "array_special_values", Value: bson.A{math.NaN(), math.Inf(1), math.Inf(-1), bson.DateTime(math.MaxInt64)}},
617624

618625
// Complex nested documents
619626
{Key: "nested_doc", Value: bson.D{
@@ -630,7 +637,7 @@ func (s MongoClickhouseSuite) Test_Json_Types() {
630637
{Key: "nested_array", Value: bson.A{
631638
bson.D{{Key: "inner1", Value: bson.A{
632639
bson.D{{Key: "inner_inner", Value: bson.A{
633-
math.NaN(), math.Inf(1), math.Inf(-1),
640+
math.NaN(), math.Inf(1), math.Inf(-1), bson.DateTime(math.MaxInt64),
634641
}}},
635642
}}},
636643
bson.D{{Key: "inner2", Value: 1.23}},
@@ -651,7 +658,6 @@ func (s MongoClickhouseSuite) Test_Json_Types() {
651658

652659
// Other bson types
653660
{Key: "object_id", Value: oid},
654-
{Key: "date_time", Value: bson.DateTime(1672531200000)}, // 2023-01-01 00:00:00 UTC
655661
{Key: "symbol", Value: bson.Symbol("test_symbol")},
656662
{Key: "binary", Value: bson.Binary{Subtype: 0x02, Data: []byte("hello world")}},
657663
{Key: "binary_empty", Value: bson.Binary{Subtype: 0x00, Data: []byte{}}},
@@ -720,16 +726,20 @@ func (s MongoClickhouseSuite) Test_Json_Types() {
720726
require.Contains(t, row, `"nan":"NaN"`)
721727
require.Contains(t, row, `"pos_inf":"+Inf"`)
722728
require.Contains(t, row, `"neg_inf":"-Inf"`)
729+
require.Contains(t, row, `"date_time":"2023-01-01T00:00:00Z"`)
730+
require.Contains(t, row, `"date_time_outside_rfc3339":"10001-01-01T00:00:00Z"`)
731+
require.Contains(t, row, `"date_time_max":"292278994-08-17T07:12:55.807Z"`)
732+
require.Contains(t, row, `"date_time_min":"-292275055-05-16T16:47:04.192Z"`)
723733
// mixed array promoted common type
724734
require.Contains(t, row, `"array_mixed":[1,"str",true]`)
725-
require.Contains(t, row, `"array_special_floats":["NaN","+Inf","-Inf"]`)
735+
require.Contains(t, row, `"array_special_values":["NaN","+Inf","-Inf","292278994-08-17T07:12:55.807Z"]`)
726736
require.Contains(t, row, `"nested_doc":{"inner1":"str","inner2":1,"inner3":true,"inner4":{"a":"NaN","b":["hello","world"]}}`)
727-
require.Contains(t, row, `"nested_array":[{"inner1":[{"inner_inner":["NaN","+Inf","-Inf"]}]},{"inner2":1.23}]`)
737+
require.Contains(t, row,
738+
`"nested_array":[{"inner1":[{"inner_inner":["NaN","+Inf","-Inf","292278994-08-17T07:12:55.807Z"]}]},{"inner2":1.23}]`)
728739
require.Contains(t, row, `"nested_array_2":[{"NaN":"NaN"},{"binary":{"Data":"dGVzdA==","Subtype":0}},`+
729740
`{"nested_arr":[[1],[2],[3]]},{"nested_doc":{"str":"hello world"}},{"timestamp":{"I":1,"T":1672531200}}]`)
730741

731742
require.Contains(t, row, `"object_id":"507f1f77bcf86cd799439011"`)
732-
require.Contains(t, row, `"date_time":"2023-01-01T00:00:00Z"`)
733743
require.Contains(t, row, `"symbol":"test_symbol"`)
734744
// binary data should be base64 encoded
735745
require.Contains(t, row, `"binary":{"Data":"aGVsbG8gd29ybGQ=","Subtype":2}`)

0 commit comments

Comments
 (0)