Skip to content

Commit b58648d

Browse files
xinlian12annie-macCopilot
authored
usePkRangeCacheToEnumeratePartitionKeyCacheInPkCFP (#46700)
* wireup the pkRange cache for cfp setartup Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: annie-mac <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 36f1349 commit b58648d

File tree

3 files changed

+135
-13
lines changed

3 files changed

+135
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.changefeed.pkversion;
5+
6+
import com.azure.cosmos.CosmosAsyncContainer;
7+
import com.azure.cosmos.implementation.PartitionKeyRange;
8+
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
9+
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
10+
import com.azure.cosmos.implementation.changefeed.LeaseManager;
11+
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
12+
import org.assertj.core.api.Assertions;
13+
import org.mockito.ArgumentCaptor;
14+
import org.mockito.Mockito;
15+
import org.testng.annotations.Test;
16+
import reactor.core.publisher.Flux;
17+
import reactor.core.publisher.Mono;
18+
import reactor.test.StepVerifier;
19+
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.UUID;
24+
25+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
26+
import static org.mockito.ArgumentMatchers.any;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.when;
30+
31+
public class PartitionSynchronizerImplTests {
32+
private final String COLLECTION_RESOURCE_ID = "collection1";
33+
34+
@Test
35+
public void createAllLeases() {
36+
ChangeFeedContextClient feedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);
37+
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
38+
LeaseContainer leaseContainerMock = Mockito.mock(LeaseContainer.class);
39+
LeaseManager leaseManagerMock = Mockito.mock(LeaseManager.class);
40+
41+
PartitionSynchronizerImpl partitionSynchronizer = new PartitionSynchronizerImpl(
42+
feedContextClientMock,
43+
containerMock,
44+
leaseContainerMock,
45+
leaseManagerMock,
46+
1,
47+
1,
48+
COLLECTION_RESOURCE_ID);
49+
50+
List<PartitionKeyRange> overlappingRanges = new ArrayList<>();
51+
overlappingRanges.add(new PartitionKeyRange("1", "AA", "BB"));
52+
overlappingRanges.add(new PartitionKeyRange("2", "BB", "CC"));
53+
54+
ServiceItemLease childLease1 = new ServiceItemLease()
55+
.withLeaseToken("1");
56+
childLease1.setId("TestLease-" + UUID.randomUUID());
57+
58+
ServiceItemLease childLease2 = new ServiceItemLease()
59+
.withLeaseToken("2");
60+
childLease2.setId("TestLease-" + UUID.randomUUID());
61+
62+
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true))
63+
.thenReturn(Mono.just(overlappingRanges));
64+
when(leaseContainerMock.getAllLeases()).thenReturn(Flux.empty());
65+
when(leaseManagerMock.createLeaseIfNotExist(any(String.class), any()))
66+
.thenReturn(Mono.just(childLease1))
67+
.thenReturn(Mono.just(childLease2));
68+
69+
StepVerifier.create(partitionSynchronizer.createMissingLeases())
70+
.verifyComplete();
71+
72+
ArgumentCaptor<String> leaseTokenCaptor = ArgumentCaptor.forClass(String.class);
73+
verify(leaseManagerMock, times(2)).createLeaseIfNotExist(leaseTokenCaptor.capture(), any());
74+
List<String> capturedLeaseTokens = leaseTokenCaptor.getAllValues();
75+
Assertions.assertThat(capturedLeaseTokens.size()).isEqualTo(2);
76+
Assertions.assertThat(capturedLeaseTokens.get(0)).isEqualTo(overlappingRanges.get(0).getId());
77+
Assertions.assertThat(capturedLeaseTokens.get(1)).isEqualTo(overlappingRanges.get(1).getId());
78+
}
79+
80+
@Test
81+
public void createMissingLeases() {
82+
ChangeFeedContextClient feedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);
83+
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
84+
LeaseContainer leaseContainerMock = Mockito.mock(LeaseContainer.class);
85+
LeaseManager leaseManagerMock = Mockito.mock(LeaseManager.class);
86+
87+
PartitionSynchronizerImpl partitionSynchronizer = new PartitionSynchronizerImpl(
88+
feedContextClientMock,
89+
containerMock,
90+
leaseContainerMock,
91+
leaseManagerMock,
92+
1,
93+
1,
94+
COLLECTION_RESOURCE_ID);
95+
96+
List<PartitionKeyRange> overlappingRanges = new ArrayList<>();
97+
overlappingRanges.add(new PartitionKeyRange("1", "AA", "BB"));
98+
overlappingRanges.add(new PartitionKeyRange("2", "BB", "DD"));
99+
overlappingRanges.add(new PartitionKeyRange("3", "DD", "EE"));
100+
101+
ServiceItemLease childLease1 = new ServiceItemLease()
102+
.withLeaseToken("1");
103+
childLease1.setId("TestLease-" + UUID.randomUUID());
104+
105+
ServiceItemLease childLease2 = new ServiceItemLease()
106+
.withLeaseToken("2");
107+
childLease2.setId("TestLease-" + UUID.randomUUID());
108+
109+
ServiceItemLease childLease3 = new ServiceItemLease()
110+
.withLeaseToken("3");
111+
childLease3.setId("TestLease-" + UUID.randomUUID());
112+
113+
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true))
114+
.thenReturn(Mono.just(overlappingRanges));
115+
when(leaseContainerMock.getAllLeases()).thenReturn(Flux.fromIterable(Arrays.asList(childLease1, childLease2)));
116+
when(leaseManagerMock.createLeaseIfNotExist(any(String.class), any()))
117+
.thenReturn(Mono.just(childLease3));
118+
119+
StepVerifier.create(partitionSynchronizer.createMissingLeases())
120+
.verifyComplete();
121+
122+
ArgumentCaptor<String> leaseTokenCaptor = ArgumentCaptor.forClass(String.class);
123+
verify(leaseManagerMock, times(1)).createLeaseIfNotExist(leaseTokenCaptor.capture(), any());
124+
assertThat(leaseTokenCaptor.getAllValues().size()).isEqualTo(1);
125+
assertThat(leaseTokenCaptor.getAllValues().get(0)).isEqualTo(overlappingRanges.get(2).getId());
126+
}
127+
}

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Changed to use `PartitionKeyRangeCache` to get partition key range during startup and split handling. - [46700](https://github.com/Azure/azure-sdk-for-java/pull/46700)
1213

1314
### 4.74.0 (2025-09-05)
1415

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizerImpl.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,19 @@
88
import com.azure.cosmos.implementation.changefeed.Lease;
99
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
1010
import com.azure.cosmos.implementation.changefeed.LeaseManager;
11-
import com.azure.cosmos.models.CosmosQueryRequestOptions;
12-
import com.azure.cosmos.models.FeedResponse;
13-
import com.azure.cosmos.models.ModelBridgeInternal;
11+
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
1412
import org.slf4j.Logger;
1513
import org.slf4j.LoggerFactory;
1614
import reactor.core.publisher.Flux;
1715
import reactor.core.publisher.Mono;
1816

1917
import java.util.ArrayList;
18+
import java.util.Collections;
2019
import java.util.List;
2120
import java.util.Map;
2221
import java.util.concurrent.ConcurrentHashMap;
2322
import java.util.stream.Collectors;
2423

25-
import static com.azure.cosmos.BridgeInternal.extractContainerSelfLink;
26-
2724
/**
2825
* Implementation for the partition synchronizer.
2926
*/
@@ -61,7 +58,9 @@ public Mono<Void> createMissingLeases() {
6158

6259
return this.enumPartitionKeyRanges()
6360
.map(partitionKeyRange -> {
64-
leaseTokenMap.put(partitionKeyRange.getId(), partitionKeyRange.getParents());
61+
leaseTokenMap.put(
62+
partitionKeyRange.getId(),
63+
partitionKeyRange.getParents() == null ? Collections.emptyList() : partitionKeyRange.getParents());
6564
return partitionKeyRange.getId();
6665
})
6766
.collectList()
@@ -135,13 +134,8 @@ public Flux<Lease> splitPartition(Lease lease) {
135134
}
136135

137136
private Flux<PartitionKeyRange> enumPartitionKeyRanges() {
138-
String partitionKeyRangesPath = extractContainerSelfLink(this.collectionSelfLink);
139-
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
140-
ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount(cosmosQueryRequestOptions, null, this.maxBatchSize);
141-
142-
return this.documentClient.readPartitionKeyRangeFeed(partitionKeyRangesPath, cosmosQueryRequestOptions)
143-
.map(FeedResponse::getResults)
144-
.flatMap(Flux::fromIterable)
137+
return this.documentClient.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true)
138+
.flatMapMany(Flux::fromIterable)
145139
.onErrorResume(throwable -> {
146140
logger.error("Failed to retrieve physical partition information.", throwable);
147141
return Flux.empty();

0 commit comments

Comments
 (0)