diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImplTests.java index 7a468e699a2f..c86d37a4a3f7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImplTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImplTests.java @@ -29,8 +29,12 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -54,7 +58,7 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges( boolean shouldDoCheckpoint) { ChangeFeedObserver observerMock = Mockito.mock(ChangeFeedObserver.class); ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class); - + // Setup initial state with continuation token ChangeFeedStateV1 initialChangeFeedState = this.getChangeFeedStateWithContinuationTokens(1); @@ -117,6 +121,62 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges( } } + @Test(groups = "unit") + public void processedTimeSetAfterProcessing() { + // Arrange + ChangeFeedObserver observerMock = Mockito.mock(ChangeFeedObserver.class); + Mockito.when(observerMock.processChanges(Mockito.any(), Mockito.anyList())).thenReturn(Mono.empty()); + + ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + + ChangeFeedStateV1 startState = getChangeFeedStateWithContinuationTokens(1); + ProcessorSettings settings = new ProcessorSettings(startState, containerMock); + settings.withMaxItemCount(10); + + Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class); + Mockito.when(leaseMock.getContinuationToken()).thenReturn(startState.toString()); + + PartitionCheckpointer checkpointerMock = Mockito.mock(PartitionCheckpointerImpl.class); + + // Create a feed response with one mocked result + @SuppressWarnings("unchecked") FeedResponse feedResponseMock = Mockito.mock(FeedResponse.class); + List results = new ArrayList<>(); + results.add(Mockito.mock(ChangeFeedProcessorItem.class)); + AtomicInteger counter = new AtomicInteger(0); + Mockito.when(feedResponseMock.getResults()).thenAnswer(invocation -> { + Thread.sleep(500); + return counter.getAndIncrement() < 10 ? results : new ArrayList<>(); + }); + ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationTokens(1); + Mockito.when(feedResponseMock.getContinuationToken()).thenReturn(changeFeedState.toString()); + + // The processor will continuously fetch, but we will cancel shortly after first batch + Mockito.doReturn(Flux.just(feedResponseMock)) + .when(changeFeedContextClientMock) + .createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(), Mockito.any()); + + PartitionProcessorImpl processor = new PartitionProcessorImpl<>( + observerMock, + changeFeedContextClientMock, + settings, + checkpointerMock, + leaseMock, + ChangeFeedProcessorItem.class, + ChangeFeedMode.INCREMENTAL, + null); + Instant initialTime = processor.getLastProcessedTime(); + + CancellationTokenSource cts = new CancellationTokenSource(); + Mono runMono = processor.run(cts.getToken()); + + StepVerifier.create(runMono) + .thenAwait(Duration.ofMillis(800)) + .then(cts::cancel) + .verifyComplete(); + + assertThat(processor.getLastProcessedTime()).isAfter(initialTime); + } @Test(groups = "unit") public void partitionSplitHappenOnFirstRequest() { @@ -184,7 +244,7 @@ private ChangeFeedStateV1 getChangeFeedStateWithContinuationTokens(int tokenCoun continuationBuilder.append("],") .append("\"PKRangeId\":\"").append(pkRangeId).append("\"}"); - + String continuationJson = continuationBuilder.toString(); FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSupervisorImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSupervisorImplTests.java new file mode 100644 index 000000000000..da820c383f7a --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSupervisorImplTests.java @@ -0,0 +1,145 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.changefeed.epkversion; + +import com.azure.cosmos.implementation.changefeed.CancellationToken; +import com.azure.cosmos.implementation.changefeed.CancellationTokenSource; +import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver; +import com.azure.cosmos.implementation.changefeed.Lease; +import com.azure.cosmos.implementation.changefeed.LeaseRenewer; +import com.fasterxml.jackson.databind.JsonNode; +import org.mockito.Mockito; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.lang.reflect.Method; +import java.time.Duration; +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the PartitionSupervisorImpl class + */ +public class PartitionSupervisorImplTests { + + private Lease leaseMock; + private PartitionProcessor processorMock; + private LeaseRenewer renewerMock; + private ChangeFeedObserver observerMock; // added field + + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setup() { + leaseMock = Mockito.mock(Lease.class); + Mockito.when(leaseMock.getLeaseToken()).thenReturn("-FF"); + observerMock = Mockito.mock(ChangeFeedObserver.class); // assign to field + processorMock = Mockito.mock(PartitionProcessor.class); + renewerMock = Mockito.mock(LeaseRenewer.class); + + Mockito.doNothing().when(observerMock).open(Mockito.any()); + Mockito.when(processorMock.run(Mockito.any())).thenReturn(Mono.empty()); + Mockito.when(renewerMock.run(Mockito.any())).thenReturn(Mono.empty()); + Mockito.when(renewerMock.getLeaseRenewInterval()).thenReturn(Duration.ofSeconds(1)); + } + + private PartitionSupervisorImpl createSupervisor() { + return new PartitionSupervisorImpl( + leaseMock, + observerMock, + processorMock, + renewerMock, + Schedulers.immediate()); + } + + private boolean invokeShouldContinue(PartitionSupervisorImpl sup, CancellationToken token) throws Exception { + Method m = PartitionSupervisorImpl.class.getDeclaredMethod("shouldContinue", CancellationToken.class); + m.setAccessible(true); + return (boolean) m.invoke(sup, token); + } + + @Test(groups = "unit") + public void shouldContinue_NoVerificationWindow_NotCancelled_NoErrors() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isTrue(); + } + + @Test(groups = "unit") + public void shouldContinue_VerificationWindow_ProcessedBatchesTrue() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Duration renewInterval = renewerMock.getLeaseRenewInterval(); + // Force verification window elapsed: interval * 25 + 1s + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval + .multipliedBy(23)).minusSeconds(1)); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + + boolean result = invokeShouldContinue(sup, cts.getToken()); + + assertThat(result).isTrue(); + } + + @Test(groups = "unit") + public void shouldContinue_VerificationWindow_ProcessedBatchesFalse() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Duration renewInterval = renewerMock.getLeaseRenewInterval(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval.multipliedBy(25)).minusSeconds(1)); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); // should stop due to no processed batches + } + + @Test(groups = "unit") + public void shouldContinue_ProcessorError_Stops() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + Mockito.when(processorMock.getResultException()).thenReturn(new RuntimeException("failure")); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); + } + + @Test(groups = "unit") + public void shouldContinue_RenewerError_Stops() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(new RuntimeException("lease error")); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); + } + + @Test(groups = "unit") + public void shouldContinue_ShutdownRequested_Stops() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + cts.cancel(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImplTests.java index ef740ec3b2fc..0658b000184f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImplTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImplTests.java @@ -16,10 +16,12 @@ import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1; +import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1; import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation; import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl; +import com.azure.cosmos.models.ChangeFeedProcessorItem; import com.azure.cosmos.models.FeedResponse; import com.fasterxml.jackson.databind.JsonNode; import org.mockito.Mockito; @@ -29,8 +31,12 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -117,6 +123,60 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges( } } + @Test(groups = "unit") + public void processedTimeSetAfterProcessing() { + ChangeFeedObserver observerMock = Mockito.mock(ChangeFeedObserver.class); + Mockito.when(observerMock.processChanges(Mockito.any(), Mockito.anyList())).thenReturn(Mono.empty()); + + ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + + ChangeFeedStateV1 startState = getChangeFeedStateWithContinuationTokens(1); + ProcessorSettings settings = new ProcessorSettings(startState, containerMock); + settings.withMaxItemCount(10); + + Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class); + Mockito.when(leaseMock.getContinuationToken()).thenReturn(startState.toString()); + + PartitionCheckpointer checkpointerMock = Mockito.mock(PartitionCheckpointerImpl.class); + + // Create a feed response with one mocked result + @SuppressWarnings("unchecked") FeedResponse feedResponseMock = Mockito.mock(FeedResponse.class); + List results = new ArrayList<>(); + results.add(Mockito.mock(ChangeFeedProcessorItem.class)); + AtomicInteger counter = new AtomicInteger(0); + Mockito.when(feedResponseMock.getResults()).thenAnswer(invocation -> { + Thread.sleep(500); + return counter.getAndIncrement() < 10 ? results : new ArrayList<>(); + }); + ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationTokens(1); + Mockito.when(feedResponseMock.getContinuationToken()).thenReturn(changeFeedState.toString()); + + // The processor will continuously fetch, but we will cancel shortly after first batch + Mockito.doReturn(Flux.just(feedResponseMock)) + .when(changeFeedContextClientMock) + .createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(), Mockito.any()); + + PartitionProcessorImpl processor = new PartitionProcessorImpl( + observerMock, + changeFeedContextClientMock, + settings, + checkpointerMock, + leaseMock, + null); + Instant initialTime = processor.getLastProcessedTime(); + + CancellationTokenSource cts = new CancellationTokenSource(); + Mono runMono = processor.run(cts.getToken()); + + StepVerifier.create(runMono) + .thenAwait(Duration.ofMillis(800)) + .then(cts::cancel) + .verifyComplete(); + + assertThat(processor.getLastProcessedTime()).isAfter(initialTime); + } + @Test public void partitionSplitHappenOnFirstRequest() { @SuppressWarnings("unchecked") ChangeFeedObserver observerMock = diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSupervisorImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSupervisorImplTests.java new file mode 100644 index 000000000000..966b30928120 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSupervisorImplTests.java @@ -0,0 +1,146 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.changefeed.pkversion; + +import com.azure.cosmos.implementation.changefeed.CancellationToken; +import com.azure.cosmos.implementation.changefeed.CancellationTokenSource; +import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver; +import com.azure.cosmos.implementation.changefeed.Lease; +import com.azure.cosmos.implementation.changefeed.LeaseRenewer; +import com.fasterxml.jackson.databind.JsonNode; +import org.mockito.Mockito; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.lang.reflect.Method; +import java.time.Duration; +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the PartitionSupervisorImpl class + */ +public class PartitionSupervisorImplTests { + + private Lease leaseMock; + private PartitionProcessor processorMock; + private LeaseRenewer renewerMock; + private ChangeFeedObserver observerMock; // added field + + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setup() { + leaseMock = Mockito.mock(Lease.class); + Mockito.when(leaseMock.getLeaseToken()).thenReturn("1"); + observerMock = Mockito.mock(ChangeFeedObserver.class); // assign to field + processorMock = Mockito.mock(PartitionProcessor.class); + renewerMock = Mockito.mock(LeaseRenewer.class); + + Mockito.doNothing().when(observerMock).open(Mockito.any()); + Mockito.when(processorMock.run(Mockito.any())).thenReturn(Mono.empty()); + Mockito.when(renewerMock.run(Mockito.any())).thenReturn(Mono.empty()); + Mockito.when(renewerMock.getLeaseRenewInterval()).thenReturn(Duration.ofSeconds(1)); + } + + private PartitionSupervisorImpl createSupervisor() { + return new PartitionSupervisorImpl( + leaseMock, + observerMock, + processorMock, + renewerMock, + Schedulers.immediate()); + } + + private boolean invokeShouldContinue(PartitionSupervisorImpl sup, CancellationToken token) throws Exception { + Method m = PartitionSupervisorImpl.class.getDeclaredMethod("shouldContinue", CancellationToken.class); + m.setAccessible(true); + return (boolean) m.invoke(sup, token); + } + + @Test(groups = "unit") + public void shouldContinue_NoVerificationWindow_NotCancelled_NoErrors() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isTrue(); + } + + @Test(groups = "unit") + public void shouldContinue_VerificationWindow_ProcessedBatchesTrue() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Duration renewInterval = renewerMock.getLeaseRenewInterval(); + // Force verification window elapsed: interval * 25 + 1s + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval + .multipliedBy(24)).minusSeconds(1)); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + + assertThat(result).isTrue(); + } + + @Test(groups = "unit") + public void shouldContinue_VerificationWindow_ProcessedBatchesFalse() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Duration renewInterval = renewerMock.getLeaseRenewInterval(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval + .multipliedBy(25)).minusSeconds(1)); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); // should stop due to no processed batches + } + + @Test(groups = "unit") + public void shouldContinue_ProcessorError_Stops() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + Mockito.when(processorMock.getResultException()).thenReturn(new RuntimeException("failure")); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); + } + + @Test(groups = "unit") + public void shouldContinue_RenewerError_Stops() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(new RuntimeException("lease error")); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); + } + + @Test(groups = "unit") + public void shouldContinue_ShutdownRequested_Stops() throws Exception { + this.setup(); + PartitionSupervisorImpl sup = createSupervisor(); + CancellationTokenSource cts = new CancellationTokenSource(); + cts.cancel(); + Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now()); + Mockito.when(processorMock.getResultException()).thenReturn(null); + Mockito.when(renewerMock.getResultException()).thenReturn(null); + + boolean result = invokeShouldContinue(sup, cts.getToken()); + assertThat(result).isFalse(); + } +} diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 2f14d11c80a4..e77a3a792e21 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed * Fixed an issue where Per-Partition Circuit Breaker was hitting `NullPointerException` in e2e timeout scenarios. - [PR 46968](https://github.com/Azure/azure-sdk-for-java/pull/46968/files) +* Allow lease in `ChangeFeedProcessor` to be rebalanced if changes for associated partition have not been processed for a while. - [PR 25](https://github.com/jeet1995/azure-sdk-for-java/pull/25) #### Other Changes * Changed to use `PartitionKeyRangeCache` to get partition key range during startup and split handling. - [46700](https://github.com/Azure/azure-sdk-for-java/pull/46700) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseRenewer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseRenewer.java index 0d3097c1a453..79ead884a130 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseRenewer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseRenewer.java @@ -4,6 +4,8 @@ import reactor.core.publisher.Mono; +import java.time.Duration; + /** * Interface for the lease renewer. */ @@ -20,4 +22,9 @@ public interface LeaseRenewer { * @return the inner exception if any, otherwise null. */ RuntimeException getResultException(); + + /** + * @return the interval at which the lease will be renewed. + */ + Duration getLeaseRenewInterval(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseRenewerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseRenewerImpl.java index 2ce4e9a959fb..980c245c65a5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseRenewerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseRenewerImpl.java @@ -77,6 +77,11 @@ public RuntimeException getResultException() { return this.resultException; } + @Override + public Duration getLeaseRenewInterval() { + return this.leaseRenewInterval; + } + private Mono renew(CancellationToken cancellationToken) { if (cancellationToken.isCancellationRequested()) { return Mono.empty(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessor.java index ca100f877325..cb3a887f6bc4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessor.java @@ -8,6 +8,8 @@ import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext; import reactor.core.publisher.Mono; +import java.time.Instant; + /** * Provides an API to run continious processing on a single partition of some resource. *

@@ -30,4 +32,9 @@ public interface PartitionProcessor { * @return the inner exception if any, otherwise null. */ RuntimeException getResultException(); + + /** + * @return the last time a record was processed from the partition. + */ + Instant getLastProcessedTime(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index 53b3c0f7af4e..3a581ec91bfb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -59,6 +59,7 @@ class PartitionProcessorImpl implements PartitionProcessor { private volatile boolean hasMoreResults; private volatile boolean hasServerContinuationTokenChange; private final FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager; + private Instant lastProcessedTime; public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, @@ -82,6 +83,7 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, settings.getMaxItemCount(), this.changeFeedMode); this.feedRangeThroughputControlConfigManager = feedRangeThroughputControlConfigManager; + this.lastProcessedTime = Instant.now(); } @Override @@ -144,6 +146,7 @@ public Mono run(CancellationToken cancellationToken) { this.lastServerContinuationToken = continuationToken; this.hasMoreResults = !ModelBridgeInternal.noChanges(documentFeedResponse); + lastProcessedTime = Instant.now(); if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) { logger.info("Lease with token {}: processing {} feeds with owner {}.", this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()); @@ -323,6 +326,11 @@ public RuntimeException getResultException() { return this.resultException; } + @Override + public Instant getLastProcessedTime() { + return this.lastProcessedTime; + } + private Mono dispatchChanges( FeedResponse response, ChangeFeedState continuationState) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSupervisorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSupervisorImpl.java index 9e656fc9d6f8..86ae023518b3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSupervisorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSupervisorImpl.java @@ -16,26 +16,37 @@ import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException; import com.azure.cosmos.implementation.changefeed.exceptions.ObserverException; import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import java.time.Duration; +import java.time.Instant; /** * Implementation for {@link PartitionSupervisor}. */ class PartitionSupervisorImpl implements PartitionSupervisor { + private static final Logger logger = LoggerFactory.getLogger(PartitionSupervisorImpl.class); private final Lease lease; private final ChangeFeedObserver observer; private final PartitionProcessor processor; private final LeaseRenewer renewer; private final CancellationTokenSource childShutdownCts; + /** + * The verification factor determines the size of the verification window for lease renewal operations. + * It is set to 25 times the renewal interval to provide a sufficiently large window for verifying + * lease validity and handling transient failures. + */ + private static final int VERIFICATION_FACTOR = 25; private volatile RuntimeException resultException; private final Scheduler scheduler; - public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver observer, PartitionProcessor processor, LeaseRenewer renewer, Scheduler scheduler) { + public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver observer, PartitionProcessor processor, + LeaseRenewer renewer, Scheduler scheduler) { this.lease = lease; this.observer = observer; this.processor = processor; @@ -60,11 +71,22 @@ public Mono run(CancellationToken shutdownToken) { return Mono.just(this) .delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL) - .repeat( () -> !shutdownToken.isCancellationRequested() && this.processor.getResultException() == null && this.renewer.getResultException() == null) + .repeat( () -> shouldContinue(shutdownToken)) .last() .flatMap( value -> this.afterRun(context, shutdownToken)); } + private boolean shouldContinue(CancellationToken shutdownToken) { + Duration timeSinceLastProcessedChanges = Duration.between(processor.getLastProcessedTime(), Instant.now()); + // if cfp has seen successful processing, we do a renew, + // otherwise we do not to allow lease stealing + if (timeSinceLastProcessedChanges.getSeconds() > this.renewer.getLeaseRenewInterval().getSeconds() * VERIFICATION_FACTOR) { + logger.info("Lease with token {}: skipping renew as no batches processed.", this.lease.getLeaseToken()); + return false; + } + return !shutdownToken.isCancellationRequested() && this.processor.getResultException() == null && this.renewer.getResultException() == null; + } + private Mono afterRun(ChangeFeedObserverContext context, CancellationToken shutdownToken) { ChangeFeedObserverCloseReason closeReason = ChangeFeedObserverCloseReason.UNKNOWN; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseRenewerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseRenewerImpl.java index 551157089ce5..1da30f366fcd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseRenewerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseRenewerImpl.java @@ -77,6 +77,11 @@ public RuntimeException getResultException() { return this.resultException; } + @Override + public Duration getLeaseRenewInterval() { + return this.leaseRenewInterval; + } + private Mono renew(CancellationToken cancellationToken) { if (cancellationToken.isCancellationRequested()) { return Mono.empty(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessor.java index 7179dbff60cb..eed36e892b90 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessor.java @@ -8,6 +8,8 @@ import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext; import reactor.core.publisher.Mono; +import java.time.Instant; + /** * Provides an API to run continious processing on a single partition of some resource. *

@@ -30,4 +32,9 @@ public interface PartitionProcessor { * @return the inner exception if any, otherwise null. */ RuntimeException getResultException(); + + /** + * @return the last time a record was processed from the partition. + */ + Instant getLastProcessedTime(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java index b1be70baf4f0..b699c8ccee2c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java @@ -59,6 +59,7 @@ class PartitionProcessorImpl implements PartitionProcessor { private volatile boolean hasMoreResults; private volatile boolean hasServerContinuationTokenChange; private final FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager; + private Instant lastProcessedTime; public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, @@ -83,6 +84,7 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, String.valueOf(HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES_NONE)); this.feedRangeThroughputControlConfigManager = feedRangeThroughputControlConfigManager; + this.lastProcessedTime = Instant.now(); } @Override @@ -150,6 +152,7 @@ public Mono run(CancellationToken cancellationToken) { this.lastServerContinuationToken = currentServerContinuationToken; this.hasMoreResults = !ModelBridgeInternal.noChanges(documentFeedResponse); + this.lastProcessedTime = Instant.now(); if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) { logger.info("Partition {}: processing {} feeds with owner {}.", this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()); return this.dispatchChanges(documentFeedResponse, continuationState) @@ -318,6 +321,11 @@ public RuntimeException getResultException() { return this.resultException; } + @Override + public Instant getLastProcessedTime() { + return this.lastProcessedTime; + } + private Mono dispatchChanges( FeedResponse response, ChangeFeedState continuationState) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSupervisorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSupervisorImpl.java index 335046324648..f82e026558c4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSupervisorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSupervisorImpl.java @@ -17,26 +17,37 @@ import com.azure.cosmos.implementation.changefeed.exceptions.ObserverException; import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import java.time.Duration; +import java.time.Instant; /** * Implementation for {@link PartitionSupervisor}. */ class PartitionSupervisorImpl implements PartitionSupervisor { + private static final Logger logger = LoggerFactory.getLogger(PartitionSupervisorImpl.class); private final Lease lease; private final ChangeFeedObserver observer; private final PartitionProcessor processor; private final LeaseRenewer renewer; private final CancellationTokenSource childShutdownCts; + /** + * The verification factor determines the size of the verification window for lease renewal operations. + * It is set to 25 times the renewal interval to provide a sufficiently large window for verifying + * lease validity and handling transient failures. + */ + private static final int VERIFICATION_FACTOR = 25; private volatile RuntimeException resultException; private final Scheduler scheduler; - public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver observer, PartitionProcessor processor, LeaseRenewer renewer, Scheduler scheduler) { + public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver observer, PartitionProcessor processor, + LeaseRenewer renewer, Scheduler scheduler) { this.lease = lease; this.observer = observer; this.processor = processor; @@ -61,11 +72,22 @@ public Mono run(CancellationToken shutdownToken) { return Mono.just(this) .delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL) - .repeat( () -> !shutdownToken.isCancellationRequested() && this.processor.getResultException() == null && this.renewer.getResultException() == null) + .repeat( () -> shouldContinue(shutdownToken)) .last() .flatMap( value -> this.afterRun(context, shutdownToken)); } + private boolean shouldContinue(CancellationToken shutdownToken) { + Duration timeSinceLastProcessedChanges = Duration.between(processor.getLastProcessedTime(), Instant.now()); + // if cfp has seen successful processing, we do a renew, + // otherwise we do not to allow lease stealing + if (timeSinceLastProcessedChanges.getSeconds() > this.renewer.getLeaseRenewInterval().getSeconds() * VERIFICATION_FACTOR) { + logger.info("Lease with token {}: skipping renew as no batches processed.", this.lease.getLeaseToken()); + return false; + } + return !shutdownToken.isCancellationRequested() && this.processor.getResultException() == null && this.renewer.getResultException() == null; + } + private Mono afterRun(ChangeFeedObserverContext context, CancellationToken shutdownToken) { ChangeFeedObserverCloseReason closeReason = ChangeFeedObserverCloseReason.UNKNOWN;