Skip to content

Commit a5bf834

Browse files
authored
[Bug] CFP Skipping Changes (Azure#43788)
* Change feed missing changes * Debugging logs and cloning solution * Fixed prefetching issue * Add test for ff * Add test * Revert logs and add test to validate prefetch * unused imports * unused imports * Added Changelog * React to comments * Unused import * dummy commit to rerun pipeline * revert dummy commit to rerun pipeline
1 parent 507b1e2 commit a5bf834

File tree

8 files changed

+98
-12
lines changed

8 files changed

+98
-12
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
1111
import com.azure.cosmos.implementation.RetryAnalyzer;
1212
import com.azure.cosmos.implementation.Utils;
13+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
1314
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
1415
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
1516
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
@@ -119,6 +120,14 @@ public static Object[][] changeFeedQueryEndLSNDataProvider() {
119120
};
120121
}
121122

123+
@DataProvider(name = "changeFeedQueryPrefetchingDataProvider")
124+
public static Object[][] changeFeedQueryPrefetchingDataProvider() {
125+
return new Object[][]{
126+
{ChangeFeedMode.FULL_FIDELITY},
127+
{ ChangeFeedMode.INCREMENTAL},
128+
};
129+
}
130+
122131
@DataProvider(name = "changeFeedQueryEndLSNHangDataProvider")
123132
public static Object[][] changeFeedQueryEndLSNHangDataProvider() {
124133
return new Object[][]{
@@ -323,6 +332,63 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro
323332
}
324333
}
325334

335+
@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT)
336+
public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception {
337+
this.createContainer(
338+
(cp) -> {
339+
if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) {
340+
return cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy());
341+
}
342+
return cp.setChangeFeedPolicy(ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10)));
343+
}
344+
);
345+
CosmosChangeFeedRequestOptions options;
346+
if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
347+
options = CosmosChangeFeedRequestOptions
348+
.createForProcessingFromNow(FeedRange.forFullRange())
349+
.setMaxItemCount(10).allVersionsAndDeletes();
350+
} else {
351+
options = CosmosChangeFeedRequestOptions
352+
.createForProcessingFromBeginning(FeedRange.forFullRange()).setMaxItemCount(10);
353+
}
354+
AtomicInteger count = new AtomicInteger(0);
355+
insertDocuments(5, 20);
356+
AtomicReference<String> continuation = new AtomicReference<>("");
357+
createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> {
358+
count.incrementAndGet();
359+
continuation.set(r.getContinuationToken());
360+
}
361+
).byPage().subscribe();
362+
363+
CosmosChangeFeedRequestOptions optionsFF = null;
364+
if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
365+
insertDocuments(5, 20);
366+
count.set(0);
367+
optionsFF = CosmosChangeFeedRequestOptions
368+
.createForProcessingFromContinuation(continuation.get())
369+
.setMaxItemCount(10).allVersionsAndDeletes();
370+
createdContainer.asyncContainer.queryChangeFeed(optionsFF, ObjectNode.class).handle((r) -> {
371+
count.incrementAndGet();
372+
continuation.set(r.getContinuationToken());
373+
}
374+
).byPage().subscribe();
375+
}
376+
Thread.sleep(3000);
377+
assertThat(count.get()).isGreaterThan(2);
378+
379+
if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) {
380+
// full fidelity is only from now so need to insert more documents
381+
insertDocuments(5, 20);
382+
}
383+
count.set(0);
384+
// should only get two pages
385+
createdContainer.asyncContainer.queryChangeFeed(changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)? optionsFF
386+
: options, ObjectNode.class).handle((r) -> count.incrementAndGet())
387+
.byPage().take(2, true).subscribe();
388+
Thread.sleep(3000);
389+
assertThat(count.get()).isEqualTo(2);
390+
}
391+
326392
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
327393
public void asyncChangeFeed_fromBeginning_incremental_forEPK() throws Exception {
328394
this.createContainer(

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixes an issue in change feed processor where records are skipped and excessive requests are prefetched. - See [PR 43788](https://github.com/Azure/azure-sdk-for-java/pull/43788)
1011

1112
#### Other Changes
1213
* Added temporary internal-only option to enable thin client mode with system property COSMOS.THINCLIENT_ENABLED, setting the thin client endpoint with system property COSMOS.THINCLIENT_ENDPOINT, and default thin client endpoint with system property COSMOS.DEFAULT_THINCLIENT_ENDPOINT while the thin-client transport is still under development. This transport mode is not yet supported or ready to be used by external customers. Please don't use these configs in any production scenario yet. - [PR 43188](https://github.com/Azure/azure-sdk-for-java/pull/43188)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
1212
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
1313
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
14+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
1415
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
1516
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
1617
import com.azure.cosmos.models.CosmosRequestOptions;
@@ -52,7 +53,11 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ
5253
private Long endLSN;
5354

5455
public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) {
55-
this.continuationState = toBeCloned.continuationState;
56+
if (toBeCloned.continuationState != null) {
57+
this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) toBeCloned.continuationState);
58+
} else {
59+
this.continuationState = null;
60+
}
5661
this.feedRangeInternal = toBeCloned.feedRangeInternal;
5762
this.properties = toBeCloned.properties;
5863
this.maxItemCount = toBeCloned.maxItemCount;
@@ -93,7 +98,12 @@ public CosmosChangeFeedRequestOptionsImpl(
9398
this.maxPrefetchPageCount = DEFAULT_MAX_PREFETCH_PAGE_COUNT;
9499
this.feedRangeInternal = feedRange;
95100
this.startFromInternal = startFromInternal;
96-
this.continuationState = continuationState;
101+
if (continuationState != null) {
102+
this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) continuationState);
103+
} else {
104+
this.continuationState = null;
105+
}
106+
97107

98108
if (mode != ChangeFeedMode.INCREMENTAL && mode != ChangeFeedMode.FULL_FIDELITY) {
99109
throw new IllegalArgumentException(

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@
77
import com.azure.cosmos.CosmosAsyncDatabase;
88
import com.azure.cosmos.CosmosBridgeInternal;
99
import com.azure.cosmos.implementation.AsyncDocumentClient;
10-
import com.azure.cosmos.implementation.ChangeFeedOperationState;
11-
import com.azure.cosmos.implementation.Document;
1210
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
13-
import com.azure.cosmos.implementation.OperationType;
1411
import com.azure.cosmos.implementation.PartitionKeyRange;
15-
import com.azure.cosmos.implementation.ResourceType;
1612
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
1713
import com.azure.cosmos.implementation.routing.Range;
1814
import com.azure.cosmos.models.CosmosBulkOperationResponse;
@@ -42,7 +38,6 @@
4238
import java.net.URI;
4339
import java.util.ArrayList;
4440
import java.util.List;
45-
import java.util.stream.Collectors;
4641

4742
import static com.azure.cosmos.CosmosBridgeInternal.getContextClient;
4843
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
@@ -151,7 +146,7 @@ public <T> Flux<FeedResponse<T>> createDocumentChangeFeedQuery(CosmosAsyncConta
151146
}
152147
return collectionLink
153148
.queryChangeFeed(changeFeedRequestOptions, klass)
154-
.byPage()
149+
.byPage().take(1, true)
155150
.publishOn(this.scheduler);
156151
}
157152

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
1212
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
1313

14+
import java.util.ArrayList;
15+
import java.util.List;
1416
import java.util.Objects;
1517

1618
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
@@ -38,6 +40,20 @@ public ChangeFeedStateV1(
3840
this.mode = mode;
3941
}
4042

43+
public ChangeFeedStateV1(ChangeFeedStateV1 toBeCloned) {
44+
this.containerRid = toBeCloned.containerRid;
45+
this.feedRange = toBeCloned.feedRange;
46+
this.startFromSettings = toBeCloned.startFromSettings;
47+
if (toBeCloned.continuation != null) {
48+
List<CompositeContinuationToken> compositeContinuationTokens = new ArrayList<>();
49+
compositeContinuationTokens.addAll(toBeCloned.continuation.getCompositeContinuationTokens());
50+
this.continuation = FeedRangeContinuation.create(toBeCloned.continuation.getContainerRid(), toBeCloned.continuation.getFeedRange(), compositeContinuationTokens);
51+
} else {
52+
this.continuation = null;
53+
}
54+
this.mode = toBeCloned.mode;
55+
}
56+
4157
@Override
4258
public FeedRangeContinuation getContinuation() {
4359
return this.continuation;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ public Mono<List<ChangeFeedProcessorState>> getCurrentState() {
232232

233233
return this.feedContextClient
234234
.createDocumentChangeFeedQuery(this.feedContextClient.getContainerClient(), options, ChangeFeedProcessorItem.class, false)
235-
.take(1)
236235
.map(feedResponse -> {
237236
ChangeFeedProcessorState changeFeedProcessorState = new ChangeFeedProcessorState()
238237
.setHostName(lease.getOwner())

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ public Mono<Void> run(CancellationToken cancellationToken) {
119119
return this.documentClient.createDocumentChangeFeedQuery(
120120
this.settings.getCollectionSelfLink(),
121121
this.options,
122-
itemType)
123-
.limitRequest(1);
122+
itemType);
124123
})
125124
.flatMap(documentFeedResponse -> {
126125
if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException());

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public Mono<Void> run(CancellationToken cancellationToken) {
121121
return this.documentClient.createDocumentChangeFeedQuery(
122122
this.settings.getCollectionSelfLink(),
123123
this.options,
124-
JsonNode.class).limitRequest(1);
124+
JsonNode.class);
125125
})
126126
.flatMap(documentFeedResponse -> {
127127
if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException());

0 commit comments

Comments
 (0)