Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

Expand All @@ -54,7 +58,7 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
boolean shouldDoCheckpoint) {
ChangeFeedObserver<ChangeFeedProcessorItem> observerMock = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);

// Setup initial state with continuation token
ChangeFeedStateV1 initialChangeFeedState = this.getChangeFeedStateWithContinuationTokens(1);

Expand Down Expand Up @@ -117,6 +121,62 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
}
}

@Test(groups = "unit")
public void processedTimeSetAfterProcessing() {
// Arrange
ChangeFeedObserver<ChangeFeedProcessorItem> observerMock = Mockito.mock(ChangeFeedObserver.class);
Mockito.when(observerMock.processChanges(Mockito.any(), Mockito.anyList())).thenReturn(Mono.empty());

ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);

ChangeFeedStateV1 startState = getChangeFeedStateWithContinuationTokens(1);
ProcessorSettings settings = new ProcessorSettings(startState, containerMock);
settings.withMaxItemCount(10);

Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class);
Mockito.when(leaseMock.getContinuationToken()).thenReturn(startState.toString());

PartitionCheckpointer checkpointerMock = Mockito.mock(PartitionCheckpointerImpl.class);

// Create a feed response with one mocked result
@SuppressWarnings("unchecked") FeedResponse<ChangeFeedProcessorItem> feedResponseMock = Mockito.mock(FeedResponse.class);
List<ChangeFeedProcessorItem> results = new ArrayList<>();
results.add(Mockito.mock(ChangeFeedProcessorItem.class));
AtomicInteger counter = new AtomicInteger(0);
Mockito.when(feedResponseMock.getResults()).thenAnswer(invocation -> {
Thread.sleep(500);
return counter.getAndIncrement() < 10 ? results : new ArrayList<>();
});
ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationTokens(1);
Mockito.when(feedResponseMock.getContinuationToken()).thenReturn(changeFeedState.toString());

// The processor will continuously fetch, but we will cancel shortly after first batch
Mockito.doReturn(Flux.just(feedResponseMock))
.when(changeFeedContextClientMock)
.createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(), Mockito.any());

PartitionProcessorImpl<ChangeFeedProcessorItem> processor = new PartitionProcessorImpl<>(
observerMock,
changeFeedContextClientMock,
settings,
checkpointerMock,
leaseMock,
ChangeFeedProcessorItem.class,
ChangeFeedMode.INCREMENTAL,
null);
Instant initialTime = processor.getLastProcessedTime();

CancellationTokenSource cts = new CancellationTokenSource();
Mono<Void> runMono = processor.run(cts.getToken());

StepVerifier.create(runMono)
.thenAwait(Duration.ofMillis(800))
.then(cts::cancel)
.verifyComplete();

assertThat(processor.getLastProcessedTime()).isAfter(initialTime);
}

@Test(groups = "unit")
public void partitionSplitHappenOnFirstRequest() {
Expand Down Expand Up @@ -184,7 +244,7 @@ private ChangeFeedStateV1 getChangeFeedStateWithContinuationTokens(int tokenCoun

continuationBuilder.append("],")
.append("\"PKRangeId\":\"").append(pkRangeId).append("\"}");

String continuationJson = continuationBuilder.toString();

FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseRenewer;
import com.fasterxml.jackson.databind.JsonNode;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.lang.reflect.Method;
import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for the PartitionSupervisorImpl class
*/
public class PartitionSupervisorImplTests {

private Lease leaseMock;
private PartitionProcessor processorMock;
private LeaseRenewer renewerMock;
private ChangeFeedObserver<JsonNode> observerMock; // added field

@SuppressWarnings({"unchecked", "rawtypes"})
public void setup() {
leaseMock = Mockito.mock(Lease.class);
Mockito.when(leaseMock.getLeaseToken()).thenReturn("-FF");
observerMock = Mockito.mock(ChangeFeedObserver.class); // assign to field
processorMock = Mockito.mock(PartitionProcessor.class);
renewerMock = Mockito.mock(LeaseRenewer.class);

Mockito.doNothing().when(observerMock).open(Mockito.any());
Mockito.when(processorMock.run(Mockito.any())).thenReturn(Mono.empty());
Mockito.when(renewerMock.run(Mockito.any())).thenReturn(Mono.empty());
Mockito.when(renewerMock.getLeaseRenewInterval()).thenReturn(Duration.ofSeconds(1));
}

private PartitionSupervisorImpl createSupervisor() {
return new PartitionSupervisorImpl(
leaseMock,
observerMock,
processorMock,
renewerMock,
Schedulers.immediate());
}

private boolean invokeShouldContinue(PartitionSupervisorImpl sup, CancellationToken token) throws Exception {
Method m = PartitionSupervisorImpl.class.getDeclaredMethod("shouldContinue", CancellationToken.class);
m.setAccessible(true);
return (boolean) m.invoke(sup, token);
}

@Test(groups = "unit")
public void shouldContinue_NoVerificationWindow_NotCancelled_NoErrors() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isTrue();
}

@Test(groups = "unit")
public void shouldContinue_VerificationWindow_ProcessedBatchesTrue() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Duration renewInterval = renewerMock.getLeaseRenewInterval();
// Force verification window elapsed: interval * 25 + 1s
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval
.multipliedBy(23)).minusSeconds(1));
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);


boolean result = invokeShouldContinue(sup, cts.getToken());

assertThat(result).isTrue();
}

@Test(groups = "unit")
public void shouldContinue_VerificationWindow_ProcessedBatchesFalse() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Duration renewInterval = renewerMock.getLeaseRenewInterval();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval.multipliedBy(25)).minusSeconds(1));
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse(); // should stop due to no processed batches
}

@Test(groups = "unit")
public void shouldContinue_ProcessorError_Stops() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());
Mockito.when(processorMock.getResultException()).thenReturn(new RuntimeException("failure"));
Mockito.when(renewerMock.getResultException()).thenReturn(null);

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse();
}

@Test(groups = "unit")
public void shouldContinue_RenewerError_Stops() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(new RuntimeException("lease error"));

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse();
}

@Test(groups = "unit")
public void shouldContinue_ShutdownRequested_Stops() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
cts.cancel();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1;
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.databind.JsonNode;
import org.mockito.Mockito;
Expand All @@ -29,8 +31,12 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

Expand Down Expand Up @@ -117,6 +123,60 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
}
}

@Test(groups = "unit")
public void processedTimeSetAfterProcessing() {
ChangeFeedObserver<JsonNode> observerMock = Mockito.mock(ChangeFeedObserver.class);
Mockito.when(observerMock.processChanges(Mockito.any(), Mockito.anyList())).thenReturn(Mono.empty());

ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);

ChangeFeedStateV1 startState = getChangeFeedStateWithContinuationTokens(1);
ProcessorSettings settings = new ProcessorSettings(startState, containerMock);
settings.withMaxItemCount(10);

Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class);
Mockito.when(leaseMock.getContinuationToken()).thenReturn(startState.toString());

PartitionCheckpointer checkpointerMock = Mockito.mock(PartitionCheckpointerImpl.class);

// Create a feed response with one mocked result
@SuppressWarnings("unchecked") FeedResponse<ChangeFeedProcessorItem> feedResponseMock = Mockito.mock(FeedResponse.class);
List<ChangeFeedProcessorItem> results = new ArrayList<>();
results.add(Mockito.mock(ChangeFeedProcessorItem.class));
AtomicInteger counter = new AtomicInteger(0);
Mockito.when(feedResponseMock.getResults()).thenAnswer(invocation -> {
Thread.sleep(500);
return counter.getAndIncrement() < 10 ? results : new ArrayList<>();
});
ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationTokens(1);
Mockito.when(feedResponseMock.getContinuationToken()).thenReturn(changeFeedState.toString());

// The processor will continuously fetch, but we will cancel shortly after first batch
Mockito.doReturn(Flux.just(feedResponseMock))
.when(changeFeedContextClientMock)
.createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(), Mockito.any());

PartitionProcessorImpl processor = new PartitionProcessorImpl(
observerMock,
changeFeedContextClientMock,
settings,
checkpointerMock,
leaseMock,
null);
Instant initialTime = processor.getLastProcessedTime();

CancellationTokenSource cts = new CancellationTokenSource();
Mono<Void> runMono = processor.run(cts.getToken());

StepVerifier.create(runMono)
.thenAwait(Duration.ofMillis(800))
.then(cts::cancel)
.verifyComplete();

assertThat(processor.getLastProcessedTime()).isAfter(initialTime);
}

@Test
public void partitionSplitHappenOnFirstRequest() {
@SuppressWarnings("unchecked") ChangeFeedObserver<JsonNode> observerMock =
Expand Down
Loading
Loading