Skip to content

Commit cb43188

Browse files
authored
Fix BsonTimestamp integer overflow bug
KAFKA-220
1 parent d2a304f commit cb43188

File tree

4 files changed

+41
-9
lines changed

4 files changed

+41
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
### Bug Fixes
1616
- [KAFKA-218](https://jira.mongodb.org/browse/KAFKA-218) Fixed bug in LazyBsonDocument#clone ignoring any changes made once unwrapped.
17+
- [KAFKA-220](https://jira.mongodb.org/browse/KAFKA-220) Fixed bug with timestamp integer overflow.
1718

1819
## 1.5.1
1920

src/main/java/com/mongodb/kafka/connect/source/schema/BsonValueToSchemaAndValue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private SchemaAndValue numberToSchemaAndValue(final Schema schema, final BsonVal
138138
value = bsonNumber.doubleValue();
139139
}
140140
} else if (bsonValue.isTimestamp()) {
141-
value = bsonValue.asTimestamp().getTime() * 1000; // normalize to millis
141+
value = bsonValue.asTimestamp().getTime() * 1000L; // normalize to millis
142142
} else if (bsonValue.isDateTime()) {
143143
value = bsonValue.asDateTime().getValue();
144144
} else {

src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ void testInferSchemaAndValueProducer() {
255255
"{\"$regularExpression\": {\"pattern\": \"^test.*regex.*xyz$\", \"options\": \"i\"}}")
256256
.put("string", "the fox ...")
257257
.put("symbol", "ruby stuff")
258-
.put("timestamp", new Date(477217984))
258+
.put("timestamp", new Date(305419896000L))
259259
.put("undefined", "{\"$undefined\": true}"));
260260

261261
SchemaAndValueProducer valueProducer =

src/test/java/com/mongodb/kafka/connect/source/schema/BsonValueToSchemaAndValueTest.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.bson.BsonDouble;
5454
import org.bson.BsonInt32;
5555
import org.bson.BsonInt64;
56+
import org.bson.BsonTimestamp;
5657
import org.bson.RawBsonDocument;
5758
import org.bson.types.Decimal128;
5859

@@ -115,6 +116,7 @@ void testNumberSupport() {
115116
BsonInt32 bsonInt32 = new BsonInt32(42);
116117
BsonInt64 bsonInt64 = new BsonInt64(2020L);
117118
BsonDouble bsonDouble = new BsonDouble(20.20);
119+
BsonTimestamp bsonTimestamp = new BsonTimestamp(1234567890, 1);
118120

119121
assertAll(
120122
"Testing int8 support",
@@ -129,7 +131,11 @@ void testNumberSupport() {
129131
() ->
130132
assertSchemaAndValueEquals(
131133
new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 20.20),
132-
CONVERTER.toSchemaAndValue(Schema.INT8_SCHEMA, bsonDouble)));
134+
CONVERTER.toSchemaAndValue(Schema.INT8_SCHEMA, bsonDouble)),
135+
() ->
136+
assertSchemaAndValueEquals(
137+
new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 1234567890000L),
138+
CONVERTER.toSchemaAndValue(Schema.INT8_SCHEMA, bsonTimestamp)));
133139

134140
assertAll(
135141
"Testing int16 support",
@@ -144,7 +150,11 @@ void testNumberSupport() {
144150
() ->
145151
assertSchemaAndValueEquals(
146152
new SchemaAndValue(Schema.INT16_SCHEMA, (short) 20.20),
147-
CONVERTER.toSchemaAndValue(Schema.INT16_SCHEMA, bsonDouble)));
153+
CONVERTER.toSchemaAndValue(Schema.INT16_SCHEMA, bsonDouble)),
154+
() ->
155+
assertSchemaAndValueEquals(
156+
new SchemaAndValue(Schema.INT16_SCHEMA, (short) 1234567890000L),
157+
CONVERTER.toSchemaAndValue(Schema.INT16_SCHEMA, bsonTimestamp)));
148158

149159
assertAll(
150160
"Testing int32 support",
@@ -159,7 +169,11 @@ void testNumberSupport() {
159169
() ->
160170
assertSchemaAndValueEquals(
161171
new SchemaAndValue(Schema.INT32_SCHEMA, (int) 20.20),
162-
CONVERTER.toSchemaAndValue(Schema.INT32_SCHEMA, bsonDouble)));
172+
CONVERTER.toSchemaAndValue(Schema.INT32_SCHEMA, bsonDouble)),
173+
() ->
174+
assertSchemaAndValueEquals(
175+
new SchemaAndValue(Schema.INT32_SCHEMA, (int) 1234567890000L),
176+
CONVERTER.toSchemaAndValue(Schema.INT32_SCHEMA, bsonTimestamp)));
163177

164178
assertAll(
165179
"Testing int64 support",
@@ -174,7 +188,11 @@ void testNumberSupport() {
174188
() ->
175189
assertSchemaAndValueEquals(
176190
new SchemaAndValue(Schema.INT64_SCHEMA, (long) 20.20),
177-
CONVERTER.toSchemaAndValue(Schema.INT64_SCHEMA, bsonDouble)));
191+
CONVERTER.toSchemaAndValue(Schema.INT64_SCHEMA, bsonDouble)),
192+
() ->
193+
assertSchemaAndValueEquals(
194+
new SchemaAndValue(Schema.INT64_SCHEMA, 1234567890000L),
195+
CONVERTER.toSchemaAndValue(Schema.INT64_SCHEMA, bsonTimestamp)));
178196

179197
assertAll(
180198
"Testing float32 support",
@@ -189,7 +207,11 @@ void testNumberSupport() {
189207
() ->
190208
assertSchemaAndValueEquals(
191209
new SchemaAndValue(Schema.FLOAT32_SCHEMA, (float) 20.20),
192-
CONVERTER.toSchemaAndValue(Schema.FLOAT32_SCHEMA, bsonDouble)));
210+
CONVERTER.toSchemaAndValue(Schema.FLOAT32_SCHEMA, bsonDouble)),
211+
() ->
212+
assertSchemaAndValueEquals(
213+
new SchemaAndValue(Schema.FLOAT32_SCHEMA, (float) 1234567890000L),
214+
CONVERTER.toSchemaAndValue(Schema.FLOAT32_SCHEMA, bsonTimestamp)));
193215

194216
assertAll(
195217
"Testing float64 support",
@@ -204,7 +226,11 @@ void testNumberSupport() {
204226
() ->
205227
assertSchemaAndValueEquals(
206228
new SchemaAndValue(Schema.FLOAT64_SCHEMA, 20.20),
207-
CONVERTER.toSchemaAndValue(Schema.FLOAT64_SCHEMA, bsonDouble)));
229+
CONVERTER.toSchemaAndValue(Schema.FLOAT64_SCHEMA, bsonDouble)),
230+
() ->
231+
assertSchemaAndValueEquals(
232+
new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) 1234567890000L),
233+
CONVERTER.toSchemaAndValue(Schema.FLOAT64_SCHEMA, bsonTimestamp)));
208234

209235
assertAll(
210236
"Testing logical types",
@@ -219,7 +245,12 @@ void testNumberSupport() {
219245
() ->
220246
assertSchemaAndValueEquals(
221247
new SchemaAndValue(Timestamp.SCHEMA, Timestamp.toLogical(Timestamp.SCHEMA, 2020)),
222-
CONVERTER.toSchemaAndValue(Timestamp.SCHEMA, new BsonDateTime(2020L))));
248+
CONVERTER.toSchemaAndValue(Timestamp.SCHEMA, new BsonDateTime(2020L))),
249+
() ->
250+
assertSchemaAndValueEquals(
251+
new SchemaAndValue(
252+
Timestamp.SCHEMA, Timestamp.toLogical(Timestamp.SCHEMA, 1234567890000L)),
253+
CONVERTER.toSchemaAndValue(Timestamp.SCHEMA, bsonTimestamp)));
223254

224255
List<String> validKeys = asList("myInt", "myDouble", "myDate", "myDecimal");
225256
Set<String> invalidKeys =

0 commit comments

Comments
 (0)