Skip to content

Commit 4cdd775

Browse files
xinlian12annie-mac
andauthored
fixChildLeaseCreationAfterSplit (#46075)
* fix child lease create with null continuation token after split --------- Co-authored-by: annie-mac <[email protected]>
1 parent 7adf7f6 commit 4cdd775

File tree

8 files changed

+476
-441
lines changed

8 files changed

+476
-441
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java

Lines changed: 213 additions & 184 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java

Lines changed: 213 additions & 223 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where child partition lease is getting created with null continuation token when change feed processor restart after split - See [PR 46075](https://github.com/Azure/azure-sdk-for-java/pull/46075)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseStoreManagerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ public Mono<Lease> createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String cont
195195
return Mono.error(ex);
196196
})
197197
.map(documentResourceResponse -> {
198+
logger.info(
199+
"Successfully created lease document for {} with continuation token {}",
200+
leaseToken,
201+
continuationToken);
198202
if (documentResourceResponse == null) {
199203
return null;
200204
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/BootstrapperImpl.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@
33
package com.azure.cosmos.implementation.changefeed.pkversion;
44

55
import com.azure.cosmos.implementation.CosmosSchedulers;
6-
import com.azure.cosmos.implementation.Strings;
76
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
87
import com.azure.cosmos.implementation.changefeed.LeaseStore;
98
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
10-
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
11-
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
129
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
1310
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
1411
import org.slf4j.Logger;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseStoreManagerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ public Mono<Lease> createLeaseIfNotExist(String leaseToken, String continuationT
191191
return Mono.error(ex);
192192
})
193193
.map(documentResourceResponse -> {
194+
logger.info(
195+
"Successfully created lease document for {} with continuation token {}.",
196+
leaseToken,
197+
continuationToken);
194198
if (documentResourceResponse == null) {
195199
return null;
196200
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
*/
1313
public interface PartitionSynchronizer {
1414
/**
15-
* Creates missing leases.
15+
* Creates missing leases during startup.
1616
*
1717
* @return a deferred computation of this operation.
1818
*/

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizerImpl.java

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import com.azure.cosmos.CosmosAsyncContainer;
66
import com.azure.cosmos.implementation.PartitionKeyRange;
7-
import com.azure.cosmos.implementation.Resource;
87
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
98
import com.azure.cosmos.implementation.changefeed.Lease;
109
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
@@ -17,8 +16,11 @@
1716
import reactor.core.publisher.Flux;
1817
import reactor.core.publisher.Mono;
1918

20-
import java.util.HashSet;
21-
import java.util.Set;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.stream.Collectors;
2224

2325
import static com.azure.cosmos.BridgeInternal.extractContainerSelfLink;
2426

@@ -55,15 +57,19 @@ public PartitionSynchronizerImpl(
5557

5658
@Override
5759
public Mono<Void> createMissingLeases() {
60+
Map<String, List<String>> leaseTokenMap = new ConcurrentHashMap<>();
61+
5862
return this.enumPartitionKeyRanges()
59-
.map(Resource::getId)
63+
.map(partitionKeyRange -> {
64+
leaseTokenMap.put(partitionKeyRange.getId(), partitionKeyRange.getParents());
65+
return partitionKeyRange.getId();
66+
})
6067
.collectList()
6168
.flatMap( partitionKeyRangeIds -> {
6269
logger.info(
6370
"Checking whether leases for any partition is missing - partitions - {}",
6471
String.join(", ", partitionKeyRangeIds));
65-
Set<String> leaseTokens = new HashSet<>(partitionKeyRangeIds);
66-
return this.createLeases(leaseTokens).then();
72+
return this.createLeases(leaseTokenMap).then();
6773
})
6874
.onErrorResume( throwable -> {
6975
logger.error("Failed to create missing leases.", throwable);
@@ -143,41 +149,45 @@ private Flux<PartitionKeyRange> enumPartitionKeyRanges() {
143149
}
144150

145151
/**
146-
* Creates leases if they do not exist. This might happen on initial start or if some lease was unexpectedly lost.
152+
* Creates leases if they do not exist for the partition or partition's parent partitions.
153+
* This might happen on initial start or if some lease was unexpectedly lost.
147154
* <p>
148155
* Leases are created without the continuation token. It means partitions will be read according to
149156
* 'From Beginning' or 'From current time'.
150157
* Same applies also to split partitions. We do not search for parent lease and take continuation token since this
151158
* might end up of reprocessing all the events since the split.
152159
*
153-
* @param leaseTokens a hash set of all the lease tokens.
160+
* @param leaseTokenMap a map of all the lease tokens and their mapping parent lease tokens.
154161
* @return a deferred computation of this call.
155162
*/
156-
private Flux<Lease> createLeases(Set<String> leaseTokens)
163+
private Flux<Lease> createLeases(Map<String, List<String>> leaseTokenMap)
157164
{
158-
Set<String> addedLeaseTokens = new HashSet<>(leaseTokens);
159-
165+
List<String> leaseTokensToBeAdded = new ArrayList<>();
160166
return this.leaseContainer.getAllLeases()
167+
.map(lease -> lease.getLeaseToken())
168+
.collectList()
169+
.flatMapMany(existingLeaseTokens -> {
170+
// only create lease documents if there is no existing lease document matching the partition or its parent partitions
171+
leaseTokensToBeAdded.addAll(
172+
leaseTokenMap.entrySet().stream()
173+
.filter(entry -> !existingLeaseTokens.contains(entry.getKey()))
174+
.filter(entry -> entry.getValue() == null ||
175+
entry.getValue().isEmpty() ||
176+
entry.getValue().stream().noneMatch(existingLeaseTokens::contains))
177+
.map(Map.Entry::getKey)
178+
.collect(Collectors.toList())
179+
);
180+
181+
logger.info("Missing lease documents for partitions: [{}]", String.join(", ", leaseTokensToBeAdded));
182+
return Flux.fromIterable(leaseTokensToBeAdded);
183+
})
184+
.flatMap(leaseTokenToBeAdded -> {
185+
logger.debug("Adding a new lease document for partition {}", leaseTokenToBeAdded);
186+
return this.leaseManager.createLeaseIfNotExist(leaseTokenToBeAdded, null);
187+
}, this.degreeOfParallelism)
161188
.map(lease -> {
162-
if (lease != null) {
163-
logger.debug("Found an existing lease document for partition {}", lease.getLeaseToken());
164-
// Get leases after getting ranges, to make sure that no other hosts checked in continuation for
165-
// split partition after we got leases.
166-
addedLeaseTokens.remove(lease.getLeaseToken());
167-
}
168-
189+
logger.info("Added new lease document for partition {}", lease.getLeaseToken());
169190
return lease;
170-
})
171-
.thenMany(Flux.fromIterable(addedLeaseTokens)
172-
.flatMap( addedRangeId -> {
173-
logger.debug("Adding a new lease document for partition {}", addedRangeId);
174-
175-
return this.leaseManager.createLeaseIfNotExist(addedRangeId, null);
176-
}, this.degreeOfParallelism)
177-
.map( lease -> {
178-
logger.info("Added new lease document for partition {}", lease.getLeaseToken());
179-
return lease;
180-
})
181-
);
191+
});
182192
}
183193
}

0 commit comments

Comments
 (0)