Skip to content

Commit c386831

Browse files
authored
Block ChangeFeedMode interoperability for ChangeFeedProcessor. (Azure#43798)
* Attempt a fixing change feed mode switch. * Attempt a fixing change feed mode switch between Pk-Range based lease and Epk-Range and AllVersionAndDeletes based lease. * Test blocking change feed mode switch for the same lease prefix. * Test blocking change feed mode switch for the same lease prefix. * Fixing live tests pipeline. * Fixing live tests pipeline. * Fixing live tests pipeline. * Fixing live tests pipeline. * Update CHANGELOG.md * Fixing live tests pipeline. * Adding extra commit. * Added non-empty lease prefix to change feed mode switch tests.
1 parent 9daa88a commit c386831

File tree

13 files changed

+1022
-59
lines changed

13 files changed

+1022
-59
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/BootstrapperImplTests.java

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
1111
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
1212
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
13+
import com.azure.cosmos.implementation.changefeed.pkversion.ServiceItemLease;
1314
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
1415
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
16+
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
1517
import org.mockito.Mockito;
1618
import org.testng.Assert;
1719
import org.testng.annotations.DataProvider;
@@ -21,11 +23,12 @@
2123

2224
import java.time.Duration;
2325

26+
import static org.mockito.Mockito.atLeast;
2427
import static org.mockito.Mockito.times;
2528

2629
public class BootstrapperImplTests {
2730

28-
private static final String baseContinuationStringForFullRange = "{\"V\":1," +
31+
private static final String BASE_CONTINUATION_STRING_FOR_EPK_FULL_RANGE = "{\"V\":1," +
2932
"\"Rid\":\"%s\"," +
3033
"\"Continuation\":[" +
3134
"{\"token\":\"%s\",\"range\":{\"min\":\"\",\"max\":\"FF\"}}" +
@@ -34,9 +37,12 @@ public class BootstrapperImplTests {
3437

3538
@DataProvider(name = "leaseProvider")
3639
public Object[][] leaseProvider() {
40+
41+
String BASE_CONTINUATION_STRING_FOR_PK_FULL_RANGE = "\"100\"";
42+
3743
return new Object[][] {
3844
{
39-
createLeaseWithContinuation(
45+
createEpkRangeBasedLeaseWithContinuation(
4046
true,
4147
ChangeFeedMode.FULL_FIDELITY,
4248
ChangeFeedStartFromInternal.createFromNow(),
@@ -45,10 +51,11 @@ public Object[][] leaseProvider() {
4551
"0",
4652
"-FF",
4753
"0"),
54+
null,
4855
false
4956
},
5057
{
51-
createLeaseWithContinuation(
58+
createEpkRangeBasedLeaseWithContinuation(
5259
true,
5360
ChangeFeedMode.INCREMENTAL,
5461
ChangeFeedStartFromInternal.createFromNow(),
@@ -57,10 +64,11 @@ public Object[][] leaseProvider() {
5764
"0",
5865
"-FF",
5966
"0"),
67+
null,
6068
true
6169
},
6270
{
63-
createLeaseWithContinuation(
71+
createEpkRangeBasedLeaseWithContinuation(
6472
false,
6573
ChangeFeedMode.INCREMENTAL,
6674
ChangeFeedStartFromInternal.createFromNow(),
@@ -69,10 +77,11 @@ public Object[][] leaseProvider() {
6977
"0",
7078
"-FF",
7179
"0"),
80+
null,
7281
false
7382
},
7483
{
75-
createLeaseWithContinuation(
84+
createEpkRangeBasedLeaseWithContinuation(
7685
false,
7786
ChangeFeedMode.FULL_FIDELITY,
7887
ChangeFeedStartFromInternal.createFromNow(),
@@ -81,13 +90,51 @@ public Object[][] leaseProvider() {
8190
"0",
8291
"-FF",
8392
"0"),
93+
null,
94+
false
95+
},
96+
{
97+
createEpkRangeBasedLeaseWithContinuation(
98+
true,
99+
ChangeFeedMode.FULL_FIDELITY,
100+
ChangeFeedStartFromInternal.createFromNow(),
101+
"XyJKUI7=",
102+
"NO67Hq=",
103+
"0",
104+
"-FF",
105+
"0"),
106+
createPkRangeBasedLeaseWithContinuation(
107+
true,
108+
"XyJKUI7=",
109+
"NO67Hq=",
110+
"-FF",
111+
BASE_CONTINUATION_STRING_FOR_PK_FULL_RANGE),
112+
true
113+
},
114+
{
115+
null,
116+
createPkRangeBasedLeaseWithContinuation(
117+
true,
118+
"XyJKUI7=",
119+
"NO67Hq=",
120+
"-FF",
121+
BASE_CONTINUATION_STRING_FOR_PK_FULL_RANGE),
122+
true
123+
},
124+
{
125+
null,
126+
null,
84127
false
85128
}
86129
};
87130
}
88131

89132
@Test(groups = {"unit"}, dataProvider = "leaseProvider")
90-
public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(ServiceItemLeaseV1 lease, boolean expectIllegalStateException) {
133+
public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(
134+
ServiceItemLeaseV1 epkRangeBasedLease,
135+
ServiceItemLease pkRangeBasedLease,
136+
boolean expectIllegalStateException) {
137+
91138
Duration lockTime = Duration.ofSeconds(5);
92139
Duration expireTIme = Duration.ofSeconds(5);
93140

@@ -104,13 +151,30 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
104151
Mockito.when(leaseStoreMock.releaseInitializationLock()).thenReturn(Mono.empty());
105152

106153
LeaseStoreManager epkRangeVersionLeaseStoreManagerMock = Mockito.mock(LeaseStoreManager.class);
107-
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.just(lease));
154+
LeaseStoreManager pkRangeVersionLeaseStoreManagerMock = Mockito.mock(LeaseStoreManager.class);
155+
156+
ChangeFeedProcessorOptions changeFeedProcessorOptionsMock = Mockito.mock(ChangeFeedProcessorOptions.class);
157+
158+
if (epkRangeBasedLease == null) {
159+
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.empty());
160+
} else {
161+
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.just(epkRangeBasedLease));
162+
}
163+
164+
if (pkRangeBasedLease == null) {
165+
Mockito.when(pkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.empty());
166+
} else {
167+
Mockito.when(pkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.just(pkRangeBasedLease));
168+
}
169+
108170
Bootstrapper bootstrapper = new BootstrapperImpl(
109171
partitionSynchronizerMock,
110172
leaseStoreMock,
111173
lockTime,
112174
expireTIme,
113175
epkRangeVersionLeaseStoreManagerMock,
176+
pkRangeVersionLeaseStoreManagerMock,
177+
changeFeedProcessorOptionsMock,
114178
ChangeFeedMode.FULL_FIDELITY);
115179

116180
if (expectIllegalStateException) {
@@ -119,12 +183,17 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
119183
bootstrapper.initialize().block();
120184
}
121185

122-
Mockito.verify(epkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
186+
Mockito.verify(pkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
187+
188+
if (pkRangeBasedLease == null) {
189+
Mockito.verify(epkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
190+
}
191+
123192
Mockito.verify(partitionSynchronizerMock, times(1)).createMissingLeases();
124193
Mockito.verify(leaseStoreMock, times(2)).isInitialized();
125194
}
126195

127-
private static ServiceItemLeaseV1 createLeaseWithContinuation(
196+
private static ServiceItemLeaseV1 createEpkRangeBasedLeaseWithContinuation(
128197
boolean withContinuation,
129198
ChangeFeedMode changeFeedMode,
130199
ChangeFeedStartFromInternal startFromSettings,
@@ -141,8 +210,9 @@ private static ServiceItemLeaseV1 createLeaseWithContinuation(
141210

142211
if (withContinuation) {
143212
FeedRangePartitionKeyRangeImpl feedRangePartitionKeyRangeImpl = new FeedRangePartitionKeyRangeImpl(pkRangeId);
213+
144214
String continuationAsJsonString = String.format(
145-
baseContinuationStringForFullRange,
215+
BASE_CONTINUATION_STRING_FOR_EPK_FULL_RANGE,
146216
collectionRid,
147217
continuationToken,
148218
pkRangeId);
@@ -161,4 +231,24 @@ private static ServiceItemLeaseV1 createLeaseWithContinuation(
161231

162232
return lease;
163233
}
234+
235+
private static ServiceItemLease createPkRangeBasedLeaseWithContinuation(
236+
boolean withContinuation,
237+
String databaseRid,
238+
String collectionRid,
239+
String leaseToken,
240+
String continuationToken) {
241+
242+
ServiceItemLease lease = new ServiceItemLease();
243+
244+
lease.setId(String.format("%s_%s..%s", databaseRid, collectionRid, leaseToken));
245+
246+
lease = lease.withLeaseToken(leaseToken);
247+
248+
if (withContinuation) {
249+
lease = lease.withContinuationToken(continuationToken);
250+
}
251+
252+
return lease;
253+
}
164254
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PkRangeIdVersionLeasesBootstrapperImplTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
1313
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
1414
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
15+
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
1516
import org.mockito.Mockito;
1617
import org.testng.Assert;
1718
import org.testng.annotations.DataProvider;
@@ -72,6 +73,11 @@ public void initializeStoreFromPkRangeIdVersionLeaseStore() {
7273
Mockito.when(partitionSynchronizerMock.createMissingLeases(Mockito.any())).thenReturn(Mono.empty());
7374

7475
LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);
76+
77+
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
78+
79+
changeFeedProcessorOptions.setLeasePrefix("testLease");
80+
7581
Mockito
7682
.when(leaseStoreMock.isInitialized())
7783
.thenReturn(Mono.just(false))
@@ -94,6 +100,7 @@ public void initializeStoreFromPkRangeIdVersionLeaseStore() {
94100
expireTIme,
95101
pkRangeIdVersionLeaseStoreManagerMock,
96102
epkRangeVersionLeaseStoreManagerMock,
103+
changeFeedProcessorOptions,
97104
ChangeFeedMode.INCREMENTAL);
98105

99106
bootstrapper.initialize().block();
@@ -114,6 +121,8 @@ public void initializeStoreFromScratch() {
114121
Duration expireTIme = Duration.ofSeconds(5);
115122

116123
PartitionSynchronizer partitionSynchronizerMock = Mockito.mock(PartitionSynchronizer.class);
124+
ChangeFeedProcessorOptions changeFeedProcessorOptionsMock = Mockito.mock(ChangeFeedProcessorOptions.class);
125+
117126
Mockito.when(partitionSynchronizerMock.createMissingLeases()).thenReturn(Mono.empty());
118127

119128
LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);
@@ -139,6 +148,7 @@ public void initializeStoreFromScratch() {
139148
expireTIme,
140149
pkRangeIdVersionLeaseStoreManagerMock,
141150
epkRangeVersionLeaseStoreManagerMock,
151+
changeFeedProcessorOptionsMock,
142152
ChangeFeedMode.INCREMENTAL);
143153

144154
bootstrapper.initialize().block();
@@ -156,6 +166,8 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
156166
Duration expireTIme = Duration.ofSeconds(5);
157167

158168
PartitionSynchronizer partitionSynchronizerMock = Mockito.mock(PartitionSynchronizer.class);
169+
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
170+
159171
Mockito.when(partitionSynchronizerMock.createMissingLeases(Mockito.any())).thenReturn(Mono.empty());
160172

161173
LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);
@@ -181,6 +193,7 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
181193
expireTIme,
182194
pkRangeIdVersionLeaseStoreManagerMock,
183195
epkRangeVersionLeaseStoreManagerMock,
196+
changeFeedProcessorOptions,
184197
ChangeFeedMode.INCREMENTAL);
185198

186199
if (expectIllegalStateException) {

0 commit comments

Comments
 (0)