Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@


import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
import com.mongodb.assertions.Assertions;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncConnectionSource;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.lang.NonNull;
Expand Down Expand Up @@ -232,8 +235,9 @@ private void retryOperation(final AsyncBlock asyncBlock,
} else {
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
assertNotNull(source).getServerDescription().getMaxWireVersion());
source.release();
changeStreamOperation.executeAsync(binding, operationContext, (asyncBatchCursor, t1) -> {
// We wrap the binding so that the selected AsyncConnectionSource is reused, preventing redundant server selection.
// Consequently, the same AsyncConnectionSource remains pinned to the resulting AsyncCommandCursor.
changeStreamOperation.executeAsync(new AsyncSourceAwareReadBinding(source, binding), operationContext, (asyncBatchCursor, t1) -> {
if (t1 != null) {
callback.onResult(null, t1);
} else {
Expand All @@ -242,6 +246,7 @@ private void retryOperation(final AsyncBlock asyncBlock,
operationContext);
} finally {
try {
source.release();
binding.release(); // release the new change stream batch cursor's reference to the binding
} finally {
resumeableOperation(asyncBlock, callback, operationContext, tryNext);
Expand All @@ -252,5 +257,51 @@ private void retryOperation(final AsyncBlock asyncBlock,
}
});
}

/**
* Does not retain wrapped {@link AsyncReadBinding} as it serves as a wrapper only.
*/
private static class AsyncSourceAwareReadBinding implements AsyncReadBinding {
private final AsyncConnectionSource source;
private final AsyncReadBinding binding;

AsyncSourceAwareReadBinding(final AsyncConnectionSource source, final AsyncReadBinding binding) {
this.source = source;
this.binding = binding;
}

@Override
public ReadPreference getReadPreference() {
return binding.getReadPreference();
}

@Override
public void getReadConnectionSource(final OperationContext operationContext, final SingleResultCallback<AsyncConnectionSource> callback) {
source.retain();
callback.onResult(source, null);
}

@Override
public void getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference,
final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
throw Assertions.fail();
}

@Override
public AsyncReadBinding retain() {
return binding.retain();
}

@Override
public int release() {
return binding.release();
}

@Override
public int getCount() {
return binding.getCount();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import com.mongodb.MongoChangeStreamException;
import com.mongodb.MongoException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.assertions.Assertions;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.binding.ConnectionSource;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.lang.Nullable;
Expand Down Expand Up @@ -251,9 +254,11 @@ private void resumeChangeStream(final OperationContext operationContext) {
wrapped.close(operationContextWithDefaultMaxTime);
withReadConnectionSource(binding, operationContext, (source, operationContextWithMinRtt) -> {
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
// We wrap the binding so that the selected ConnectionSource is reused, preventing redundant server selection.
// Consequently, the same ConnectionSource remains pinned to the resulting CommandCursor.
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding), operationContextWithDefaultMaxTime)).getWrapped();
return null;
});
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding, operationContextWithDefaultMaxTime)).getWrapped();
binding.release(); // release the new change stream batch cursor's reference to the binding
}

Expand All @@ -264,4 +269,48 @@ private boolean hasPreviousNextTimedOut() {
private static boolean isTimeoutException(final Throwable exception) {
return exception instanceof MongoOperationTimeoutException;
}

/**
* Does not retain wrapped {link @ReadBinding} as it serves as a wrapper only.
*/
private static class SourceAwareReadBinding implements ReadBinding {
private final ConnectionSource source;
private final ReadBinding binding;

SourceAwareReadBinding(final ConnectionSource source, final ReadBinding binding) {
this.source = source;
this.binding = binding;
}

@Override
public ReadPreference getReadPreference() {
return binding.getReadPreference();
}

@Override
public ConnectionSource getReadConnectionSource(final OperationContext ignored) {
source.retain();
return source;
}

@Override
public ConnectionSource getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference, final OperationContext ignored) {
throw Assertions.fail();
}

@Override
public int getCount() {
return binding.getCount();
}

@Override
public ReadBinding retain() {
return binding.retain();
}

@Override
public int release() {
return binding.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -216,7 +215,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() {
verify(newCursor).next(operationContextCaptor.capture()));
verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion);
verify(changeStreamOperation, times(1)).getDecoder();
verify(changeStreamOperation, times(1)).execute(eq(readBinding), any());
verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any());
verifyNoMoreInteractions(changeStreamOperation);
verify(newCursor, times(1)).next(any());
verify(newCursor, atLeastOnce()).getPostBatchResumeToken();
Expand Down Expand Up @@ -245,7 +244,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() {
void shouldPropagateAnyErrorsOccurredInAggregateOperation() {
when(cursor.next(any())).thenThrow(new MongoOperationTimeoutException("timeout"));
MongoNotPrimaryException resumableError = new MongoNotPrimaryException(new BsonDocument(), new ServerAddress());
when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow(resumableError);
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow(resumableError);

ChangeStreamBatchCursor<Document> cursor = createChangeStreamCursor();
//when
Expand All @@ -272,12 +271,12 @@ void shouldResumeAfterTimeoutInAggregateOnNextCall() {
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding);

//second next operation times out on resume attempt when creating change stream
when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow(
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow(
new MongoOperationTimeoutException("timeout during resumption"));
assertThrows(MongoOperationTimeoutException.class, cursor::next);
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation);
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding);

doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(eq(readBinding), any());
doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(any(ReadBinding.class), any());

//when third operation succeeds to resume and call next
sleep(TIMEOUT_CONSUMPTION_SLEEP_MS);
Expand Down Expand Up @@ -308,7 +307,7 @@ void shouldCloseChangeStreamWhenResumeOperationFailsDueToNonTimeoutError() {
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding);

//when second next operation errors on resume attempt when creating change stream
when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow(
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow(
new MongoNotPrimaryException(new BsonDocument(), new ServerAddress()));
assertThrows(MongoNotPrimaryException.class, cursor::next);

Expand Down Expand Up @@ -344,7 +343,11 @@ private void verifyNoResumeAttemptCalled() {
private void verifyResumeAttemptCalled() {
verify(cursor, times(1)).close(any());
verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion);
verify(changeStreamOperation, times(1)).execute(eq(readBinding), any());
verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any());
verifyNoMoreInteractions(cursor);
verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any());
// Verify server selection is done once for the resume attempt.
verify(readBinding, times(1)).getReadConnectionSource(any());
verifyNoMoreInteractions(cursor);
}

Expand Down Expand Up @@ -394,10 +397,14 @@ void setUp() {
changeStreamOperation = mock(ChangeStreamOperation.class);
when(changeStreamOperation.getDecoder()).thenReturn(new DocumentCodec());
doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion);
when(changeStreamOperation.execute(eq(readBinding), any())).thenReturn(newChangeStreamCursor);
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenAnswer(invocation -> {
ReadBinding binding = invocation.getArgument(0);
OperationContext operationContext = invocation.getArgument(1);
binding.getReadConnectionSource(operationContext);
return newChangeStreamCursor;
});
}


private void assertTimeoutWasRefreshedForOperation(final TimeoutContext timeoutContextUsedForOperation) {
assertNotNull(timeoutContextUsedForOperation.getTimeout(), "TimeoutMs was not set");
timeoutContextUsedForOperation.getTimeout().run(TimeUnit.MILLISECONDS, () -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.reactivestreams.client;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.AbstractChangeSteamFunctionalTest;
import com.mongodb.client.MongoClient;
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;

public class ChangeStreamFunctionalTest extends AbstractChangeSteamFunctionalTest {
@Override
protected MongoClient createMongoClient(final MongoClientSettings mongoClientSettings) {
return new SyncMongoClient(mongoClientSettings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.client;

import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.test.CollectionHelper;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.codecs.BsonDocumentCodec;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;

import static com.mongodb.client.Fixture.getDefaultDatabaseName;
import static java.lang.String.format;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

/**
* The {@link ChangeStreamProseTest}, which is defined only for sync driver, should be migrated to this class.
* Once this done, this class should be renamed to ChangeStreamProseTest.
*/
public abstract class AbstractChangeSteamFunctionalTest {

private static final String FAIL_COMMAND_NAME = "failCommand";
private static final MongoNamespace NAMESPACE = new MongoNamespace(getDefaultDatabaseName(), "test");
private final CollectionHelper<BsonDocument> collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), NAMESPACE);

protected abstract MongoClient createMongoClient(MongoClientSettings mongoClientSettings);

@Test
public void shouldDoOneServerSelectionForResumeAttempt() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//given
assumeTrue(ClusterFixture.isDiscoverableReplicaSet());
AtomicInteger serverSelectionCounter = new AtomicInteger();
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
try (MongoClient mongoClient = createMongoClient(Fixture.getMongoClientSettingsBuilder()
.applyToClusterSettings(builder -> builder.serverSelector(clusterDescription -> {
serverSelectionCounter.incrementAndGet();
return clusterDescription.getServerDescriptions();
})).build())) {

MongoCollection<Document> collection = mongoClient
.getDatabase(NAMESPACE.getDatabaseName())
.getCollection(NAMESPACE.getCollectionName());

collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
+ " mode: {"
+ " times: 1"
+ " },"
+ " data: {"
+ " failCommands: ['getMore'],"
+ " errorCode: 10107,"
+ " errorLabels: ['ResumableChangeStreamError']"
+ " }"
+ "}");
// We insert document here, because async cursor performs aggregate and getMore right after we call cursor()
collection.insertOne(Document.parse("{ x: 1 }"));
serverSelectionCounter.set(0);

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = collection.watch()
.batchSize(0)
.startAtOperationTime(startTime)
.cursor()) {

//when
ChangeStreamDocument<Document> changeStreamDocument = cursor.next();
//then
assertNotNull(changeStreamDocument);
int actualCountOfServerSelections = serverSelectionCounter.get();
assertEquals(2, actualCountOfServerSelections,
format("Expected 2 server selections (initial aggregate command + resume attempt aggregate command), but there were %s",
actualCountOfServerSelections));
}
}
}

@AfterEach
public void tearDown() throws InterruptedException {
ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME);
collectionHelper.drop();
}
}
Loading