Skip to content

Commit a52ae3a

Browse files
xinlian12annie-macCopilot
authored
[Kafka][CosmosSourceConnector]FixDuplicateRecordProcessing (Azure#47308)
* fix duplicate record processing --------- Co-authored-by: annie-mac <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 81df241 commit a52ae3a

File tree

5 files changed

+279
-27
lines changed

5 files changed

+279
-27
lines changed

sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed duplicate processing in `CosmosSourceConnector` when task rebalance(pause/resume) happens - See [PR 47308](https://github.com/Azure/azure-sdk-for-java/pull/47308)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class CosmosSourceTask extends SourceTask {
4747
private CosmosSourceTaskConfig taskConfig;
4848
private CosmosClientCacheItem cosmosClientItem;
4949
private CosmosClientCacheItem throughputControlCosmosClientItem;
50+
private MetadataKafkaStorageManager kafkaOffsetStorageReader;
51+
5052
private final Queue<ITaskUnit> taskUnitsQueue = new LinkedList<>();
5153

5254
private long lastLogTimeMs = System.currentTimeMillis();
@@ -76,7 +78,13 @@ public void start(Map<String, String> map) {
7678
this.taskConfig.getFeedRangeTaskUnits().size(),
7779
this.taskConfig.getTaskId());
7880

79-
this.taskUnitsQueue.addAll(this.taskConfig.getFeedRangeTaskUnits());
81+
this.kafkaOffsetStorageReader = new MetadataKafkaStorageManager(this.context.offsetStorageReader());
82+
83+
for (FeedRangeTaskUnit taskUnit : this.taskConfig.getFeedRangeTaskUnits()) {
84+
this.applyContinuationStateFromOffsetIfExists(taskUnit);
85+
this.taskUnitsQueue.add(taskUnit);
86+
}
87+
8088
LOGGER.info("Creating the cosmos client");
8189

8290
this.cosmosClientItem =
@@ -92,6 +100,55 @@ public void start(Map<String, String> map) {
92100
}
93101
}
94102

103+
private void applyContinuationStateFromOffsetIfExists(FeedRangeTaskUnit feedRangeTaskUnit) {
104+
// populate the changeFeedState if offset exists for the feed range
105+
106+
// In task level, we will try to find the EXACT offset matching for the feed range.
107+
// The task config used here is only a snapshot of the config assigned; when there is a task level rebalance,
108+
// the new task will keep using the snapshot which could contain stale feed range or change feed state
109+
110+
KafkaCosmosChangeFeedState changeFeedState =
111+
this.getContinuationStateFromOffset(
112+
feedRangeTaskUnit.getDatabaseName(),
113+
feedRangeTaskUnit.getContainerRid(),
114+
feedRangeTaskUnit.getFeedRange());
115+
116+
if (changeFeedState != null) {
117+
LOGGER.debug(
118+
"Find continuation state from offset {} for feed range {} in task {}. ",
119+
changeFeedState,
120+
feedRangeTaskUnit.getFeedRange(),
121+
this.taskConfig.getTaskId());
122+
feedRangeTaskUnit.setContinuationState(changeFeedState);
123+
} else {
124+
LOGGER.debug(
125+
"Can not find continuation state from offset for feed range {} in task {}. ",
126+
feedRangeTaskUnit.getFeedRange(),
127+
this.taskConfig.getTaskId());
128+
}
129+
}
130+
131+
private KafkaCosmosChangeFeedState getContinuationStateFromOffset(
132+
String databaseName,
133+
String containerRid,
134+
FeedRange feedRange) {
135+
136+
FeedRangeContinuationTopicOffset feedRangeContinuationTopicOffset =
137+
this.kafkaOffsetStorageReader.getFeedRangeContinuationOffset(
138+
databaseName,
139+
containerRid,
140+
feedRange);
141+
142+
if (feedRangeContinuationTopicOffset == null) {
143+
return null;
144+
}
145+
146+
return new KafkaCosmosChangeFeedState(
147+
feedRangeContinuationTopicOffset.getResponseContinuation(),
148+
feedRange,
149+
feedRangeContinuationTopicOffset.getItemLsn());
150+
}
151+
95152
private CosmosClientCacheItem getThroughputControlCosmosClientItem() {
96153
if (this.taskConfig.getThroughputControlConfig().isThroughputControlEnabled()
97154
&& this.taskConfig.getThroughputControlConfig().getThroughputControlAccountConfig() != null) {
@@ -147,7 +204,7 @@ public List<SourceRecord> poll() {
147204
return loggingContext;
148205
});
149206

150-
logFeedRangeCounts();
207+
logFeedRangeCounts(false);
151208
}
152209

153210
return results;
@@ -160,10 +217,10 @@ public List<SourceRecord> poll() {
160217
}
161218
}
162219

163-
private void logFeedRangeCounts() {
220+
private void logFeedRangeCounts(boolean flushLogs) {
164221
long currentTimeInMs = System.currentTimeMillis();
165222
long durationInMs = currentTimeInMs - lastLogTimeMs;
166-
if (durationInMs >= CosmosSourceTaskConfig.LOG_INTERVAL_MS) {
223+
if (flushLogs || durationInMs >= CosmosSourceTaskConfig.LOG_INTERVAL_MS) {
167224
// Log accumulated counts for all feed ranges
168225
for (Map.Entry<String, FeedRangeLoggingContext> entry : feedRangeCounts.entrySet()) {
169226
LOGGER.info(
@@ -370,6 +427,8 @@ private Mono<Boolean> handleFeedRangeGone(FeedRangeTaskUnit feedRangeTaskUnit) {
370427
pkRange,
371428
getChildRangeChangeFeedState(feedRangeTaskUnit.getContinuationState(), pkRange),
372429
feedRangeTaskUnit.getTopic());
430+
applyContinuationStateFromOffsetIfExists(childTaskUnit);
431+
373432
this.taskUnitsQueue.add(childTaskUnit);
374433
}
375434

@@ -476,6 +535,7 @@ private void cleanup() {
476535
@Override
477536
public void stop() {
478537
LOGGER.info("Stopping CosmosSourceTask");
538+
this.logFeedRangeCounts(true);
479539
this.cleanup();
480540
}
481541

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/CosmosSourceConnectorITest.java

Lines changed: 157 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import reactor.core.publisher.Mono;
3535

3636
import java.time.Duration;
37+
import java.time.Instant;
3738
import java.util.ArrayList;
3839
import java.util.Arrays;
3940
import java.util.HashMap;
@@ -201,20 +202,16 @@ public void readFromSingleContainer(boolean useMasterKey, CosmosMetadataStorageT
201202
List<ConsumerRecord<String, JsonNode>> metadataRecords = new ArrayList<>();
202203
List<ConsumerRecord<String, JsonNode>> itemRecords = new ArrayList<>();
203204
int expectedMetadataRecordsCount = metadataStorageType == CosmosMetadataStorageType.COSMOS ? 0 : 2;
204-
int expectedItemRecords = createdItems.size();
205+
int expectedItemRecordsCount = createdItems.size();
205206

206-
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
207-
kafkaConsumer.poll(Duration.ofMillis(1000))
208-
.iterator()
209-
.forEachRemaining(consumerRecord -> {
210-
if (consumerRecord.topic().equals(topicName)) {
211-
itemRecords.add(consumerRecord);
212-
} else if (consumerRecord.topic().equals(sourceConfig.getMetadataConfig().getStorageName())) {
213-
metadataRecords.add(consumerRecord);
214-
}
215-
});
216-
return metadataRecords.size() >= expectedMetadataRecordsCount && itemRecords.size() >= expectedItemRecords;
217-
});
207+
pollChangesForSingleTopic(
208+
kafkaConsumer,
209+
topicName,
210+
sourceConfig.getMetadataConfig().getStorageName(),
211+
itemRecords,
212+
metadataRecords,
213+
expectedItemRecordsCount,
214+
expectedMetadataRecordsCount);
218215

219216
assertThat(metadataRecords.size()).isEqualTo(expectedMetadataRecordsCount);
220217
if (metadataStorageType == CosmosMetadataStorageType.KAFKA) {
@@ -270,15 +267,7 @@ public void readFromSingleContainer(boolean useMasterKey, CosmosMetadataStorageT
270267
assertThat(feedRangesMetadataTopicOffsetOffset.getFeedRanges().size()).isEqualTo(1);
271268
}
272269

273-
// validate the item records
274-
assertThat(itemRecords.size()).isEqualTo(createdItems.size());
275-
List<String> receivedItems =
276-
itemRecords.stream().map(consumerRecord -> {
277-
JsonNode jsonNode = consumerRecord.value();
278-
return jsonNode.get("payload").get("id").asText();
279-
}).collect(Collectors.toList());
280-
281-
assertThat(receivedItems.containsAll(createdItems)).isTrue();
270+
validateFeedRangeItemRecords(itemRecords, createdItems);
282271

283272
} finally {
284273
if (client != null) {
@@ -680,4 +669,150 @@ public void readFromAllContainer(boolean useMasterKey, CosmosMetadataStorageType
680669
}
681670
}
682671
}
672+
673+
@Test(groups = { "kafka-integration" }, timeOut = 2 * TIMEOUT)
674+
public void readFromSingleContainer_pause_and_resume() {
675+
logger.info("Pause and resume connector for single container ");
676+
String topicName = singlePartitionContainerName + "-" + UUID.randomUUID();
677+
678+
Map<String, String> sourceConnectorConfig = new HashMap<>();
679+
sourceConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSourceConnector");
680+
sourceConnectorConfig.put("azure.cosmos.account.endpoint", KafkaCosmosTestConfigurations.HOST);
681+
sourceConnectorConfig.put("azure.cosmos.application.name", "Test");
682+
sourceConnectorConfig.put("azure.cosmos.source.database.name", databaseName);
683+
sourceConnectorConfig.put("azure.cosmos.source.containers.includeAll", "false");
684+
sourceConnectorConfig.put("azure.cosmos.source.containers.includedList", singlePartitionContainerName);
685+
sourceConnectorConfig.put("azure.cosmos.source.containers.topicMap", topicName + "#" + singlePartitionContainerName);
686+
sourceConnectorConfig.put("azure.cosmos.account.key", KafkaCosmosTestConfigurations.MASTER_KEY);
687+
688+
// Create topic ahead of time
689+
kafkaCosmosConnectContainer.createTopic(topicName, 1);
690+
691+
CosmosSourceConfig sourceConfig = new CosmosSourceConfig(sourceConnectorConfig);
692+
CosmosAsyncContainer container = client.getDatabase(databaseName).getContainer(singlePartitionContainerName);
693+
694+
String connectorName = "simpleTest-" + UUID.randomUUID();
695+
696+
try {
697+
// create few items in the container
698+
logger.info("creating items in container {}", singlePartitionContainerName);
699+
List<String> createdItems = new ArrayList<>();
700+
for (int i = 0; i < 10; i++) {
701+
TestItem testItem = TestItem.createNewItem();
702+
container.createItem(testItem).block();
703+
createdItems.add(testItem.getId());
704+
}
705+
706+
kafkaCosmosConnectContainer.registerConnector(connectorName, sourceConnectorConfig);
707+
708+
logger.info("Getting consumer and subscribe to topic {}", singlePartitionContainerName);
709+
710+
Properties consumerProperties = kafkaCosmosConnectContainer.getConsumerProperties();
711+
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
712+
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
713+
KafkaConsumer<String, JsonNode> kafkaConsumer = new KafkaConsumer<>(consumerProperties);
714+
715+
kafkaConsumer.subscribe(
716+
Arrays.asList(
717+
topicName,
718+
sourceConfig.getMetadataConfig().getStorageName()));
719+
720+
List<ConsumerRecord<String, JsonNode>> metadataRecords = new ArrayList<>();
721+
List<ConsumerRecord<String, JsonNode>> itemRecords = new ArrayList<>();
722+
int expectedMetadataRecordsCount = 2;
723+
int expectedItemRecordsCount = createdItems.size();
724+
725+
pollChangesForSingleTopic(
726+
kafkaConsumer,
727+
topicName,
728+
sourceConfig.getMetadataConfig().getStorageName(),
729+
itemRecords,
730+
metadataRecords,
731+
expectedItemRecordsCount,
732+
expectedMetadataRecordsCount);
733+
734+
assertThat(metadataRecords.size()).isEqualTo(expectedMetadataRecordsCount);
735+
validateFeedRangeItemRecords(itemRecords, createdItems);
736+
737+
// now pause the connector
738+
kafkaCosmosConnectContainer.pauseConnector(connectorName);
739+
740+
// create few items
741+
createdItems.clear();
742+
metadataRecords.clear();
743+
itemRecords.clear();
744+
745+
for (int i = 0; i < 5; i++) {
746+
TestItem testItem = TestItem.createNewItem();
747+
container.createItem(testItem).block();
748+
createdItems.add(testItem.getId());
749+
}
750+
751+
// resume the connector
752+
kafkaCosmosConnectContainer.resumeConnector(connectorName);
753+
// poll again, poll a little bit longer to make sure no duplicate records are being returned
754+
Instant startPollTime = Instant.now();
755+
while (Duration.between(startPollTime, Instant.now()).toMillis() < 60 * 1000 ) {
756+
kafkaConsumer.poll(Duration.ofMillis(1000))
757+
.iterator()
758+
.forEachRemaining(consumerRecord -> {
759+
if (consumerRecord.topic().equals(topicName)) {
760+
itemRecords.add(consumerRecord);
761+
} else if (consumerRecord.topic().equals(sourceConfig.getMetadataConfig().getStorageName())) {
762+
metadataRecords.add(consumerRecord);
763+
}
764+
});
765+
}
766+
767+
assertThat(metadataRecords.size()).isEqualTo(expectedMetadataRecordsCount);
768+
validateFeedRangeItemRecords(itemRecords, createdItems);
769+
} finally {
770+
if (client != null) {
771+
logger.info("cleaning container {}", singlePartitionContainerName);
772+
cleanUpContainer(client, databaseName, singlePartitionContainerName);
773+
}
774+
775+
// IMPORTANT: remove the connector after use
776+
if (kafkaCosmosConnectContainer != null) {
777+
kafkaCosmosConnectContainer.deleteConnector(connectorName);
778+
}
779+
}
780+
}
781+
782+
private void pollChangesForSingleTopic(
783+
KafkaConsumer<String, JsonNode> kafkaConsumer,
784+
String topicName,
785+
String storageName,
786+
List<ConsumerRecord<String, JsonNode>> itemRecords,
787+
List<ConsumerRecord<String, JsonNode>> metadataRecords,
788+
int expectedItemRecords,
789+
int expectedMetadataRecordsCount) {
790+
791+
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
792+
kafkaConsumer.poll(Duration.ofMillis(1000))
793+
.iterator()
794+
.forEachRemaining(consumerRecord -> {
795+
if (consumerRecord.topic().equals(topicName)) {
796+
itemRecords.add(consumerRecord);
797+
} else if (consumerRecord.topic().equals(storageName)) {
798+
metadataRecords.add(consumerRecord);
799+
}
800+
});
801+
return metadataRecords.size() >= expectedMetadataRecordsCount && itemRecords.size() >= expectedItemRecords;
802+
});
803+
}
804+
805+
private void validateFeedRangeItemRecords(
806+
List<ConsumerRecord<String, JsonNode>> itemRecords,
807+
List<String> expectedItems) {
808+
// validate the item records
809+
assertThat(itemRecords.size()).isEqualTo(expectedItems.size());
810+
List<String> receivedItems =
811+
itemRecords.stream().map(consumerRecord -> {
812+
JsonNode jsonNode = consumerRecord.value();
813+
return jsonNode.get("payload").get("id").asText();
814+
}).collect(Collectors.toList());
815+
816+
assertThat(receivedItems.containsAll(expectedItems)).isTrue();
817+
}
683818
}

sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/KafkaCosmosConnectContainer.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void deleteConnector(String name) {
172172
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));
173173
try {
174174
kafkaConnectClient.deleteConnector(name);
175-
logger.info("Deleting container {} succeeded.", name);
175+
logger.info("Deleting connector {} succeeded.", name);
176176
} catch (Exception exception) {
177177
if (exception instanceof ResourceNotFoundException) {
178178
logger.info("Connector {} not found");
@@ -182,6 +182,34 @@ public void deleteConnector(String name) {
182182
}
183183
}
184184

185+
public void pauseConnector(String name) {
186+
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));
187+
try {
188+
kafkaConnectClient.pauseConnector(name);
189+
logger.info("Pause connector {} succeeded.", name);
190+
} catch (Exception exception) {
191+
if (exception instanceof ResourceNotFoundException) {
192+
logger.info("Connector {} not found");
193+
}
194+
195+
logger.warn("Failed to pause connector {}", name);
196+
}
197+
}
198+
199+
public void resumeConnector(String name) {
200+
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));
201+
try {
202+
kafkaConnectClient.resumeConnector(name);
203+
logger.info("Resume connector {} succeeded.", name);
204+
} catch (Exception exception) {
205+
if (exception instanceof ResourceNotFoundException) {
206+
logger.info("Connector {} not found");
207+
}
208+
209+
logger.warn("Failed to Resume connector {}", name);
210+
}
211+
}
212+
185213
public ConnectorStatus getConnectorStatus(String name) {
186214
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));
187215
return kafkaConnectClient.getConnectorStatus(name);

0 commit comments

Comments
 (0)