Skip to content

Commit 7e6bf97

Browse files
committed
Copying data - ensure the queue blocks if full
KAFKA-85
1 parent 5e68296 commit 7e6bf97

File tree

2 files changed

+56
-3
lines changed

2 files changed

+56
-3
lines changed

src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,21 @@ private void copyDataFrom(final MongoNamespace namespace) {
120120
mongoClient.getDatabase(namespace.getDatabaseName())
121121
.getCollection(namespace.getCollectionName(), BsonDocument.class)
122122
.aggregate(createPipeline(namespace))
123-
.forEach((Consumer<? super BsonDocument>) queue::add);
123+
.forEach((Consumer<? super BsonDocument>) this::putToQueue);
124124
namespacesToCopy.decrementAndGet();
125125
} catch (Exception e) {
126126
errorException = e;
127127
}
128128
}
129129

130+
private void putToQueue(final BsonDocument bsonDocument) {
131+
try {
132+
queue.put(bsonDocument);
133+
} catch (InterruptedException e) {
134+
throw new RuntimeException(e);
135+
}
136+
}
137+
130138
private List<Bson> createPipeline(final MongoNamespace namespace) {
131139
List<Bson> pipeline = new ArrayList<>();
132140
pipeline.add(BsonDocument.parse("{$replaceRoot: {newRoot: {"

src/test/java/com/mongodb/kafka/connect/source/MongoCopyDataManagerTest.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818

1919
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COLLECTION_CONFIG;
20+
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG;
2021
import static com.mongodb.kafka.connect.source.SourceTestHelper.TEST_COLLECTION;
2122
import static com.mongodb.kafka.connect.source.SourceTestHelper.TEST_DATABASE;
2223
import static com.mongodb.kafka.connect.source.SourceTestHelper.createConfigMap;
2324
import static com.mongodb.kafka.connect.source.SourceTestHelper.createSourceConfig;
25+
import static java.lang.String.format;
2426
import static java.util.Arrays.asList;
2527
import static junit.framework.TestCase.assertTrue;
2628
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -31,11 +33,14 @@
3133
import static org.mockito.Mockito.when;
3234

3335
import java.util.ArrayList;
36+
import java.util.Arrays;
3437
import java.util.HashMap;
3538
import java.util.List;
3639
import java.util.Map;
3740
import java.util.Optional;
3841
import java.util.function.Consumer;
42+
import java.util.stream.Collectors;
43+
import java.util.stream.IntStream;
3944

4045
import org.junit.jupiter.api.DisplayName;
4146
import org.junit.jupiter.api.Test;
@@ -111,6 +116,42 @@ void testReturnsTheExpectedCollectionResults() {
111116
assertEquals(expected, results);
112117
}
113118

119+
@Test
120+
@DisplayName("test blocks adding docs to the queue")
121+
void testBlocksAddingResultsToTheQueue() {
122+
List<BsonDocument> docs = IntStream.range(0, 10).mapToObj(i ->
123+
BsonDocument.parse(format("{'_id': {'_id': %s, 'copy': true}}", i))
124+
).collect(Collectors.toList());
125+
126+
when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
127+
when(mongoDatabase.getCollection(TEST_COLLECTION, BsonDocument.class)).thenReturn(mongoCollection);
128+
when(mongoCollection.aggregate(anyList())).thenReturn(aggregateIterable);
129+
doCallRealMethod().when(aggregateIterable).forEach(any(Consumer.class));
130+
when(aggregateIterable.iterator()).thenReturn(cursor);
131+
132+
Boolean[] hasNextResponses = new Boolean[docs.size()];
133+
Arrays.fill(hasNextResponses, true);
134+
hasNextResponses[hasNextResponses.length - 1] = false;
135+
136+
when(cursor.hasNext()).thenReturn(true, hasNextResponses);
137+
when(cursor.next()).thenReturn(docs.get(0), docs.subList(1, docs.size()).toArray(new BsonDocument[docs.size() - 1]));
138+
139+
List<Optional<BsonDocument>> results;
140+
try (MongoCopyDataManager copyExistingDataManager =
141+
new MongoCopyDataManager(createSourceConfig(COPY_EXISTING_QUEUE_SIZE_CONFIG, "1"), mongoClient)) {
142+
sleep();
143+
results = IntStream.range(0, 11).mapToObj(i -> {
144+
sleep(200);
145+
return copyExistingDataManager.poll();
146+
}
147+
).collect(Collectors.toList());
148+
}
149+
150+
List<Optional<BsonDocument>> expected = docs.stream().map(Optional::of).collect(Collectors.toList());
151+
expected.add(Optional.empty());
152+
assertEquals(expected, results);
153+
}
154+
114155
@Test
115156
@DisplayName("test returns the expected database results")
116157
void testReturnsTheExpectedDatabaseResults() {
@@ -231,11 +272,15 @@ void testReturnsTheExpectedClientResults() {
231272
assertEquals(results.get(results.size() - 1), Optional.empty());
232273
}
233274

234-
private void sleep() {
275+
private void sleep(final int millis) {
235276
try {
236-
Thread.sleep(500);
277+
Thread.sleep(millis);
237278
} catch (InterruptedException e) {
238279
// ignore
239280
}
240281
}
282+
283+
private void sleep() {
284+
sleep(500);
285+
}
241286
}

0 commit comments

Comments
 (0)