Skip to content

Commit f4df234

Browse files
committed
Added LazyBsonDocument and made SinkDocument key/values lazily decoded.
KAFKA-45
1 parent ec0310f commit f4df234

File tree

3 files changed

+171
-12
lines changed

3 files changed

+171
-12
lines changed
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.converter;
18+
19+
import java.io.InvalidObjectException;
20+
import java.io.ObjectInputStream;
21+
import java.util.Collection;
22+
import java.util.Map;
23+
import java.util.Set;
24+
import java.util.function.Supplier;
25+
26+
import org.apache.kafka.connect.errors.DataException;
27+
28+
import org.bson.BsonDocument;
29+
import org.bson.BsonValue;
30+
31+
public class LazyBsonDocument extends BsonDocument {
32+
private static final long serialVersionUID = 1L;
33+
private final Supplier<BsonDocument> supplier;
34+
private BsonDocument unwrapped;
35+
36+
/**
37+
* Construct a new instance with the given suppler of the document.
38+
*
39+
* @param supplier the supplier of the document
40+
*/
41+
public LazyBsonDocument(final Supplier<BsonDocument> supplier) {
42+
if (supplier == null) {
43+
throw new IllegalArgumentException("Supplier can not be null");
44+
}
45+
this.supplier = supplier;
46+
}
47+
48+
@Override
49+
public int size() {
50+
return getUnwrapped().size();
51+
}
52+
53+
@Override
54+
public boolean isEmpty() {
55+
return getUnwrapped().isEmpty();
56+
}
57+
58+
@Override
59+
public boolean containsKey(final Object key) {
60+
return getUnwrapped().containsKey(key);
61+
}
62+
63+
@Override
64+
public boolean containsValue(final Object value) {
65+
return getUnwrapped().containsValue(value);
66+
}
67+
68+
@Override
69+
public BsonValue get(final Object key) {
70+
return getUnwrapped().get(key);
71+
}
72+
73+
@Override
74+
public BsonValue put(final String key, final BsonValue value) {
75+
return getUnwrapped().put(key, value);
76+
}
77+
78+
@Override
79+
public BsonValue remove(final Object key) {
80+
return getUnwrapped().remove(key);
81+
}
82+
83+
@Override
84+
public void putAll(final Map<? extends String, ? extends BsonValue> m) {
85+
getUnwrapped().putAll(m);
86+
}
87+
88+
@Override
89+
public void clear() {
90+
getUnwrapped().clear();
91+
}
92+
93+
@Override
94+
public Set<String> keySet() {
95+
return getUnwrapped().keySet();
96+
}
97+
98+
@Override
99+
public Collection<BsonValue> values() {
100+
return getUnwrapped().values();
101+
}
102+
103+
@Override
104+
public Set<Entry<String, BsonValue>> entrySet() {
105+
return getUnwrapped().entrySet();
106+
}
107+
108+
@Override
109+
public boolean equals(final Object o) {
110+
return getUnwrapped().equals(o);
111+
}
112+
113+
@Override
114+
public int hashCode() {
115+
return getUnwrapped().hashCode();
116+
}
117+
118+
@Override
119+
public String toString() {
120+
return getUnwrapped().toString();
121+
}
122+
123+
@Override
124+
public BsonDocument clone() {
125+
return getUnwrapped().clone();
126+
}
127+
128+
private BsonDocument getUnwrapped() {
129+
if (unwrapped == null) {
130+
try {
131+
unwrapped = supplier.get();
132+
} catch (Exception e) {
133+
throw new DataException("Unexpected data conversion exception.", e);
134+
}
135+
}
136+
return unwrapped;
137+
}
138+
139+
// see https://docs.oracle.com/javase/6/docs/platform/serialization/spec/output.html
140+
private Object writeReplace() {
141+
return getUnwrapped();
142+
}
143+
144+
// see https://docs.oracle.com/javase/6/docs/platform/serialization/spec/input.html
145+
private void readObject(final ObjectInputStream stream) throws InvalidObjectException {
146+
throw new InvalidObjectException("Proxy required");
147+
}
148+
149+
}

src/main/java/com/mongodb/kafka/connect/sink/converter/SinkConverter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ public SinkDocument convert(final SinkRecord record) {
4141

4242
BsonDocument keyDoc = null;
4343
if (record.key() != null) {
44-
keyDoc = getRecordConverter(record.key(), record.keySchema()).convert(record.keySchema(), record.key());
44+
keyDoc = new LazyBsonDocument(() ->
45+
getRecordConverter(record.key(), record.keySchema()).convert(record.keySchema(), record.key()));
4546
}
4647

4748
BsonDocument valueDoc = null;
4849
if (record.value() != null) {
49-
valueDoc = getRecordConverter(record.value(), record.valueSchema()).convert(record.valueSchema(), record.value());
50+
valueDoc = new LazyBsonDocument(() ->
51+
getRecordConverter(record.value(), record.valueSchema()).convert(record.valueSchema(), record.value()));
5052
}
5153

5254
return new SinkDocument(keyDoc, valueDoc);

src/test/java/com/mongodb/kafka/connect/sink/converter/SinkConverterTest.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.jupiter.api.Assertions.assertEquals;
2323
import static org.junit.jupiter.api.Assertions.assertNotNull;
2424
import static org.junit.jupiter.api.Assertions.assertThrows;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
2526
import static org.junit.jupiter.api.DynamicTest.dynamicTest;
2627

2728
import java.util.ArrayList;
@@ -128,17 +129,24 @@ void testEmptySinkRecordConversion() {
128129
@Test
129130
@DisplayName("test invalid sink record conversion")
130131
void testInvalidSinkRecordConversion() {
132+
SinkDocument convertedMissingSchema = sinkConverter.convert(
133+
new SinkRecord("topic", 1, null, new Object(), null, new Object(), 0L));
134+
assertAll("checks on lazy conversion results",
135+
() -> assertNotNull(convertedMissingSchema),
136+
() -> assertTrue(convertedMissingSchema.getKeyDoc().isPresent()),
137+
() -> assertTrue(convertedMissingSchema.getValueDoc().isPresent()),
138+
() -> assertThrows(DataException.class, () -> convertedMissingSchema.getKeyDoc().ifPresent(BsonDocument::isEmpty)),
139+
() -> assertThrows(DataException.class, () -> convertedMissingSchema.getValueDoc().ifPresent(BsonDocument::isEmpty))
140+
);
131141

132-
assertAll("checks on conversion result",
133-
() -> assertThrows(DataException.class, () -> sinkConverter.convert(
134-
new SinkRecord("topic", 1, null, new Object(), null, null, 0L)
135-
)),
136-
() -> assertThrows(DataException.class, () -> sinkConverter.convert(
137-
new SinkRecord("topic", 1, null, null, null, new Object(), 0L)
138-
)),
139-
() -> assertThrows(DataException.class, () -> sinkConverter.convert(
140-
new SinkRecord("topic", 1, null, new Object(), null, new Object(), 0L)
141-
))
142+
SinkDocument convertedWithSchema = sinkConverter.convert(
143+
new SinkRecord("topic", 1, Schema.STRING_SCHEMA, "a", Schema.INT32_SCHEMA, 1, 0L));
144+
assertAll("checks on lazy conversion results",
145+
() -> assertNotNull(convertedWithSchema),
146+
() -> assertTrue(convertedWithSchema.getKeyDoc().isPresent()),
147+
() -> assertTrue(convertedWithSchema.getValueDoc().isPresent()),
148+
() -> assertThrows(DataException.class, () -> convertedWithSchema.getKeyDoc().ifPresent(BsonDocument::isEmpty)),
149+
() -> assertThrows(DataException.class, () -> convertedWithSchema.getValueDoc().ifPresent(BsonDocument::isEmpty))
142150
);
143151
}
144152
}

0 commit comments

Comments
 (0)