Skip to content

Commit 1b37ff8

Browse files
authored
Close mutation sender after completing import (#141)
1 parent cdeda9c commit 1b37ff8

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/importer/PulsarImporter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,13 @@ public ExitStatus importTable() {
210210
LOGGER.warn("Error while closing CVS connector", e);
211211
}
212212
}
213+
if (mutationSender != null) {
214+
try {
215+
mutationSender.close();
216+
} catch (Exception e) {
217+
LOGGER.warn("Error while closing Pulsar mutation sender", e);
218+
}
219+
}
213220
printSummary(recordsCount);
214221
}
215222
}

backfill-cli/src/test/java/com/datastax/oss/cdc/backfill/PulsarImporterTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public void testImportPartitionKeyOnly() {
128128
// then
129129
assertEquals(ExitStatus.STATUS_OK, status);
130130
Mockito.verify(sender, Mockito.times(2)).sendMutationAsync(abstractMutationCaptor.capture());
131+
Mockito.verify(sender, Mockito.times(1)).close();
131132
List<AbstractMutation<TableMetadata>> pkValues = abstractMutationCaptor.getAllValues();
132133
assertEquals(2, pkValues.size());
133134
List<Object> allPkValues = pkValues.stream().flatMap(v-> Arrays.stream(v.getPkValues())).collect(Collectors.toList());
@@ -203,6 +204,7 @@ public void testImportPartitionAndClusteringKeys() {
203204
// then
204205
assertEquals(ExitStatus.STATUS_OK, status);
205206
Mockito.verify(sender, Mockito.times(2)).sendMutationAsync(abstractMutationCaptor.capture());
207+
Mockito.verify(sender, Mockito.times(1)).close();
206208
List<AbstractMutation<TableMetadata>> pkValues = abstractMutationCaptor.getAllValues();
207209
assertEquals(2, pkValues.size());
208210
List<Object>[] allPkValues = pkValues.stream().map(v-> v.getPkValues()).map(Arrays::asList).toArray(List[]::new);
@@ -285,6 +287,7 @@ public void testImportInflightMessagesBound() throws URISyntaxException, IOExcep
285287
// verify that no more interactions with sender because no new records should've been sent.
286288
assertTrue(importFuture.isDone());
287289
assertThat(importFuture.get(), is(ExitStatus.STATUS_OK));
290+
Mockito.verify(sender, Mockito.times(1)).close();
288291
Mockito.verifyNoMoreInteractions(sender);
289292
}
290293

@@ -330,6 +333,7 @@ public void testImportFailsFast() throws URISyntaxException, IOException, Execut
330333

331334
// then
332335
assertTrue(importFuture.isDone());
336+
Mockito.verify(sender, Mockito.times(1)).close();
333337
assertThat(importFuture.get(), is(ExitStatus.STATUS_ABORTED_FATAL_ERROR));
334338
}
335339

0 commit comments

Comments
 (0)