Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

Expand All @@ -54,7 +58,7 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
boolean shouldDoCheckpoint) {
ChangeFeedObserver<ChangeFeedProcessorItem> observerMock = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);

// Setup initial state with continuation token
ChangeFeedStateV1 initialChangeFeedState = this.getChangeFeedStateWithContinuationTokens(1);

Expand Down Expand Up @@ -117,6 +121,62 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
}
}

@Test(groups = "unit")
public void processedTimeSetAfterProcessing() {
// Arrange
ChangeFeedObserver<ChangeFeedProcessorItem> observerMock = Mockito.mock(ChangeFeedObserver.class);
Mockito.when(observerMock.processChanges(Mockito.any(), Mockito.anyList())).thenReturn(Mono.empty());

ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);

ChangeFeedStateV1 startState = getChangeFeedStateWithContinuationTokens(1);
ProcessorSettings settings = new ProcessorSettings(startState, containerMock);
settings.withMaxItemCount(10);

Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class);
Mockito.when(leaseMock.getContinuationToken()).thenReturn(startState.toString());

PartitionCheckpointer checkpointerMock = Mockito.mock(PartitionCheckpointerImpl.class);

// Create a feed response with one mocked result
@SuppressWarnings("unchecked") FeedResponse<ChangeFeedProcessorItem> feedResponseMock = Mockito.mock(FeedResponse.class);
List<ChangeFeedProcessorItem> results = new ArrayList<>();
results.add(Mockito.mock(ChangeFeedProcessorItem.class));
AtomicInteger counter = new AtomicInteger(0);
Mockito.when(feedResponseMock.getResults()).thenAnswer(invocation -> {
Thread.sleep(500);
return counter.getAndIncrement() < 10 ? results : new ArrayList<>();
});
ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationTokens(1);
Mockito.when(feedResponseMock.getContinuationToken()).thenReturn(changeFeedState.toString());

// The processor will continuously fetch, but we will cancel shortly after first batch
Mockito.doReturn(Flux.just(feedResponseMock))
.when(changeFeedContextClientMock)
.createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(), Mockito.any());

PartitionProcessorImpl<ChangeFeedProcessorItem> processor = new PartitionProcessorImpl<>(
observerMock,
changeFeedContextClientMock,
settings,
checkpointerMock,
leaseMock,
ChangeFeedProcessorItem.class,
ChangeFeedMode.INCREMENTAL,
null);
Instant initialTime = processor.getLastProcessedTime();

CancellationTokenSource cts = new CancellationTokenSource();
Mono<Void> runMono = processor.run(cts.getToken());

StepVerifier.create(runMono)
.thenAwait(Duration.ofMillis(800))
.then(cts::cancel)
.verifyComplete();

assertThat(processor.getLastProcessedTime()).isAfter(initialTime);
}

@Test(groups = "unit")
public void partitionSplitHappenOnFirstRequest() {
Expand Down Expand Up @@ -184,7 +244,7 @@ private ChangeFeedStateV1 getChangeFeedStateWithContinuationTokens(int tokenCoun

continuationBuilder.append("],")
.append("\"PKRangeId\":\"").append(pkRangeId).append("\"}");

String continuationJson = continuationBuilder.toString();

FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseRenewer;
import com.fasterxml.jackson.databind.JsonNode;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.lang.reflect.Method;
import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for the PartitionSupervisorImpl class
*/
public class PartitionSupervisorImplTests {

private Lease leaseMock;
private PartitionProcessor processorMock;
private LeaseRenewer renewerMock;
private ChangeFeedObserver<JsonNode> observerMock; // added field

@SuppressWarnings({"unchecked", "rawtypes"})
public void setup() {
leaseMock = Mockito.mock(Lease.class);
Mockito.when(leaseMock.getLeaseToken()).thenReturn("-FF");
observerMock = Mockito.mock(ChangeFeedObserver.class); // assign to field
processorMock = Mockito.mock(PartitionProcessor.class);
renewerMock = Mockito.mock(LeaseRenewer.class);

Mockito.doNothing().when(observerMock).open(Mockito.any());
Mockito.when(processorMock.run(Mockito.any())).thenReturn(Mono.empty());
Mockito.when(renewerMock.run(Mockito.any())).thenReturn(Mono.empty());
Mockito.when(renewerMock.getLeaseRenewInterval()).thenReturn(Duration.ofSeconds(1));
}

private PartitionSupervisorImpl createSupervisor() {
return new PartitionSupervisorImpl(
leaseMock,
observerMock,
processorMock,
renewerMock,
Schedulers.immediate());
}

private boolean invokeShouldContinue(PartitionSupervisorImpl sup, CancellationToken token) throws Exception {
Method m = PartitionSupervisorImpl.class.getDeclaredMethod("shouldContinue", CancellationToken.class);
m.setAccessible(true);
return (boolean) m.invoke(sup, token);
}

@Test(groups = "unit")
public void shouldContinue_NoVerificationWindow_NotCancelled_NoErrors() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isTrue();
}

@Test(groups = "unit")
public void shouldContinue_VerificationWindow_ProcessedBatchesTrue() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Duration renewInterval = renewerMock.getLeaseRenewInterval();
// Force verification window elapsed: interval * 25 + 1s
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval
.multipliedBy(23)).minusSeconds(1));
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);


boolean result = invokeShouldContinue(sup, cts.getToken());

assertThat(result).isTrue();
}

@Test(groups = "unit")
public void shouldContinue_VerificationWindow_ProcessedBatchesFalse() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Duration renewInterval = renewerMock.getLeaseRenewInterval();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now().minus(renewInterval.multipliedBy(25)).minusSeconds(1));
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse(); // should stop due to no processed batches
}

@Test(groups = "unit")
public void shouldContinue_ProcessorError_Stops() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());
Mockito.when(processorMock.getResultException()).thenReturn(new RuntimeException("failure"));
Mockito.when(renewerMock.getResultException()).thenReturn(null);

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse();
}

@Test(groups = "unit")
public void shouldContinue_RenewerError_Stops() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(new RuntimeException("lease error"));

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse();
}

@Test(groups = "unit")
public void shouldContinue_ShutdownRequested_Stops() throws Exception {
this.setup();
PartitionSupervisorImpl sup = createSupervisor();
CancellationTokenSource cts = new CancellationTokenSource();
cts.cancel();
Mockito.when(processorMock.getLastProcessedTime()).thenReturn(Instant.now());
Mockito.when(processorMock.getResultException()).thenReturn(null);
Mockito.when(renewerMock.getResultException()).thenReturn(null);

boolean result = invokeShouldContinue(sup, cts.getToken());
assertThat(result).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1;
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.databind.JsonNode;
import org.mockito.Mockito;
Expand All @@ -29,8 +31,12 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

Expand Down Expand Up @@ -117,6 +123,60 @@ public void shouldCheckpointFor304WhenContinuationTokenChanges(
}
}

@Test(groups = "unit")
public void processedTimeSetAfterProcessing() {
ChangeFeedObserver<JsonNode> observerMock = Mockito.mock(ChangeFeedObserver.class);
Mockito.when(observerMock.processChanges(Mockito.any(), Mockito.anyList())).thenReturn(Mono.empty());

ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);

ChangeFeedStateV1 startState = getChangeFeedStateWithContinuationTokens(1);
ProcessorSettings settings = new ProcessorSettings(startState, containerMock);
settings.withMaxItemCount(10);

Lease leaseMock = Mockito.mock(ServiceItemLeaseV1.class);
Mockito.when(leaseMock.getContinuationToken()).thenReturn(startState.toString());

PartitionCheckpointer checkpointerMock = Mockito.mock(PartitionCheckpointerImpl.class);

// Create a feed response with one mocked result
@SuppressWarnings("unchecked") FeedResponse<ChangeFeedProcessorItem> feedResponseMock = Mockito.mock(FeedResponse.class);
List<ChangeFeedProcessorItem> results = new ArrayList<>();
results.add(Mockito.mock(ChangeFeedProcessorItem.class));
AtomicInteger counter = new AtomicInteger(0);
Mockito.when(feedResponseMock.getResults()).thenAnswer(invocation -> {
Thread.sleep(500);
return counter.getAndIncrement() < 10 ? results : new ArrayList<>();
});
ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationTokens(1);
Mockito.when(feedResponseMock.getContinuationToken()).thenReturn(changeFeedState.toString());

// The processor will continuously fetch, but we will cancel shortly after first batch
Mockito.doReturn(Flux.just(feedResponseMock))
.when(changeFeedContextClientMock)
.createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(), Mockito.any());

PartitionProcessorImpl processor = new PartitionProcessorImpl(
observerMock,
changeFeedContextClientMock,
settings,
checkpointerMock,
leaseMock,
null);
Instant initialTime = processor.getLastProcessedTime();

CancellationTokenSource cts = new CancellationTokenSource();
Mono<Void> runMono = processor.run(cts.getToken());

StepVerifier.create(runMono)
.thenAwait(Duration.ofMillis(800))
.then(cts::cancel)
.verifyComplete();

assertThat(processor.getLastProcessedTime()).isAfter(initialTime);
}

@Test
public void partitionSplitHappenOnFirstRequest() {
@SuppressWarnings("unchecked") ChangeFeedObserver<JsonNode> observerMock =
Expand Down
Loading