Skip to content

Add Client Metadata Update Support. #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9753f28
Add Client Metadata update support.
vbabanin May 6, 2025
889d5c8
Add prose tests.
vbabanin May 8, 2025
4b3065c
Add prose tests.
vbabanin May 8, 2025
2a684bf
Add ClientMetadata entity.
vbabanin May 8, 2025
28c1844
Update tests.
vbabanin May 8, 2025
371ce0b
Merge branch 'main' into JAVA-5854
vbabanin May 8, 2025
972fe9d
Fix tests.
vbabanin May 8, 2025
bcf9cc8
Fix tests.
vbabanin May 8, 2025
179b262
Spotless fix.
vbabanin May 8, 2025
bc43ba0
Skip tests when auth is enabled.
vbabanin May 9, 2025
3f5fc91
Apply Scala spotless.
vbabanin May 9, 2025
61192d8
Update tests.
vbabanin May 10, 2025
e9f8dd6
Fix tests.
vbabanin May 10, 2025
95c1bb1
Add parametrized tests.
vbabanin May 19, 2025
dd63a49
Apply Kotlin spotless.
vbabanin May 19, 2025
89d67bb
Move update to separate variables.
vbabanin May 19, 2025
8a18d39
Merge branch 'main' into JAVA-5854
vbabanin May 20, 2025
f296d0c
Merge branch 'main' into JAVA-5854
vbabanin May 20, 2025
a8dc4fb
Add lock for updates.
vbabanin May 21, 2025
8ade58b
Clone document before passing it as an argument.
vbabanin May 21, 2025
71350fa
Rename to "appendMetadata".
vbabanin May 21, 2025
9890aa1
Add ReadWriteLock and rename method.
vbabanin May 27, 2025
28f7d88
Add parametrized tests.
vbabanin Jun 4, 2025
246a040
Add readLock.
vbabanin Jun 4, 2025
8a58294
Fix readLock issue.
vbabanin Jun 4, 2025
15ecef8
Merge branch 'main' into JAVA-5854
vbabanin Jun 4, 2025
5c7a6e3
Add Kotlin and Scala API.
vbabanin Jun 6, 2025
f466d08
Add unified tests.
vbabanin Jun 10, 2025
3132541
Merge branch 'main' into JAVA-5854
vbabanin Jun 10, 2025
d961447
Fix static checks.
vbabanin Jun 10, 2025
74d8558
Change specification submodule commit.
vbabanin Jun 10, 2025
31ef18e
Revert specifications submodule URL to upstream repository.
vbabanin Jun 17, 2025
38ba5e6
Update specifications submodule commit hash.
vbabanin Jun 17, 2025
edafc54
Merge branch 'main' into JAVA-5854
vbabanin Jun 17, 2025
6aa0a1e
Merge branch 'main' into JAVA-5854
vbabanin Jun 17, 2025
dab5451
Remove duplicate import of CLIENT_METADATA in MultiServerClusterSpeci…
vbabanin Jun 17, 2025
1550122
Merge branch 'main' into JAVA-5854
vbabanin Jun 19, 2025
b5c4c20
Remove ClientMetadataHelper.
vbabanin Jun 20, 2025
b6b4d67
Update native-image.properties.
vbabanin Jun 20, 2025
d43972d
Refactor tests to use JUnit assertions and remove unused dependencies.
vbabanin Jun 24, 2025
73bdbd8
Fix test.
vbabanin Jun 24, 2025
66a5913
Add internal driver information.
vbabanin Jun 25, 2025
26246f0
Fix tests.
vbabanin Jun 25, 2025
843d890
Merge branch 'main' into JAVA-5854
vbabanin Jun 25, 2025
850ebe7
Add metadata logging.
vbabanin Jun 26, 2025
d0cabe4
Merge branch 'main' into JAVA-5854
vbabanin Jun 30, 2025
cb40b6a
Add test assertion.
vbabanin Jun 30, 2025
c44200f
Fix GSSAPI test.
vbabanin Jun 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Listing code owners is required by DRIVERS-3098
* @mongodb/dbx-java
3 changes: 1 addition & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
[submodule "specifications"]
path = driver-core/src/test/resources/specifications
url = https://github.com/vbabanin/specifications
branch = DRIVERS-2985
url = https://github.com/mongodb/specifications
36 changes: 36 additions & 0 deletions driver-core/src/main/com/mongodb/MongoStalePrimaryException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb;

/**
* Exception thrown when a replica set primary is identified as a stale primary during Server Discovery and Monitoring (SDAM).
* This occurs when a new primary is discovered, causing the previously known primary to be marked stale, typically during network
* partitions or elections.
*
* @since 5.6
*/
public class MongoStalePrimaryException extends MongoException {

/**
* Construct an instance.
*
* @param message the exception message.
*/
public MongoStalePrimaryException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
/**
* A cluster closed event.
*
* <p>This event is synonymous with TopologyClosedEvent</p>
*
* @since 3.3
*/
public final class ClusterClosedEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* An event signifying that the cluster description has changed.
*
* <p>This event is synonymous with TopologyDescriptionChangedEvent</p>
*
* @since 3.3
*/
public final class ClusterDescriptionChangedEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
/**
* A cluster opening event.
*
* <p>This event is synonymous with TopologyOpeningEvent</p>
*
* @since 3.3
*/
public final class ClusterOpeningEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.connection;

import com.mongodb.MongoException;
import com.mongodb.MongoStalePrimaryException;
import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterId;
Expand Down Expand Up @@ -269,7 +270,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
}

if (isStalePrimary(newDescription)) {
invalidatePotentialPrimary(newDescription);
invalidatePotentialPrimary(newDescription, new MongoStalePrimaryException("Primary marked stale due to electionId/setVersion mismatch"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test asserting this exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied in #1708 (comment).

return false;
}

Expand Down Expand Up @@ -300,12 +301,13 @@ private boolean isStalePrimary(final ServerDescription description) {
}
}

private void invalidatePotentialPrimary(final ServerDescription newDescription) {
private void invalidatePotentialPrimary(final ServerDescription newDescription, final MongoStalePrimaryException cause) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(), newDescription.getSetVersion(), newDescription.getElectionId(),
maxSetVersion, maxElectionId));
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();

addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting(cause);
}

/**
Expand Down Expand Up @@ -380,7 +382,7 @@ private void invalidateOldPrimaries(final ServerAddress newPrimary) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Rediscovering type of existing primary %s", serverTuple.description.getAddress()));
}
serverTuple.server.invalidate();
serverTuple.server.invalidate(new MongoStalePrimaryException("Primary marked stale due to discovery of newer primary"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import com.mongodb.event.ClusterClosedEvent;
import com.mongodb.event.ClusterDescriptionChangedEvent;
Expand All @@ -50,7 +49,6 @@
import com.mongodb.selector.CompositeServerSelector;
import com.mongodb.selector.ServerSelector;

import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
Expand All @@ -64,6 +62,7 @@
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ClusterType.UNKNOWN;
import static com.mongodb.connection.ServerDescription.MAX_DRIVER_WIRE_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_SERVER_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_WIRE_VERSION;
Expand All @@ -72,6 +71,7 @@
import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static com.mongodb.internal.logging.LogMessage.Component.SERVER_SELECTION;
import static com.mongodb.internal.logging.LogMessage.Component.TOPOLOGY;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION_ID;
Expand All @@ -80,11 +80,16 @@
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_DESCRIPTION;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_ID;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_NEW_DESCRIPTION;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_PREVIOUS_DESCRIPTION;
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
import static com.mongodb.internal.logging.LogMessage.Level.INFO;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_EXPIRED;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -115,8 +120,10 @@ abstract class BaseCluster implements Cluster {
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
this.clusterListener = singleClusterListener(settings);
this.clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
this.description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(),
ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId);
this.clusterListener.clusterOpening(clusterOpeningEvent);
logTopologyOpening(clusterId, clusterOpeningEvent);
this.description = new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(),
settings, serverFactory.getSettings());
this.clientMetadata = clientMetadata;
}
Expand Down Expand Up @@ -220,7 +227,11 @@ public void close() {
if (!isClosed()) {
isClosed = true;
phase.get().countDown();
clusterListener.clusterClosed(new ClusterClosedEvent(clusterId));
fireChangeEvent(new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()),
description);
ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId);
clusterListener.clusterClosed(clusterClosedEvent);
logTopologyClosedEvent(clusterId, clusterClosedEvent);
stopWaitQueueHandler();
}
}
Expand All @@ -247,8 +258,9 @@ protected void updateDescription(final ClusterDescription newDescription) {
*/
protected void fireChangeEvent(final ClusterDescription newDescription, final ClusterDescription previousDescription) {
if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) {
clusterListener.clusterDescriptionChanged(
new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription));
ClusterDescriptionChangedEvent changedEvent = new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription);
clusterListener.clusterDescriptionChanged(changedEvent);
logTopologyDescriptionChanged(getClusterId(), changedEvent);
}
}

Expand Down Expand Up @@ -629,4 +641,43 @@ static void logServerSelectionSucceeded(
+ " Selector: {}, topology description: {}"));
}
}

static void logTopologyOpening(
final ClusterId clusterId,
final ClusterOpeningEvent clusterOpeningEvent) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Starting topology monitoring", clusterId,
singletonList(new Entry(TOPOLOGY_ID, clusterId)),
"Starting monitoring for topology with ID {}"));
}
}

static void logTopologyDescriptionChanged(
final ClusterId clusterId,
final ClusterDescriptionChangedEvent clusterDescriptionChangedEvent) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Topology description changed", clusterId,
asList(
new Entry(TOPOLOGY_ID, clusterId),
new Entry(TOPOLOGY_PREVIOUS_DESCRIPTION,
clusterDescriptionChangedEvent.getPreviousDescription().getShortDescription()),
new Entry(TOPOLOGY_NEW_DESCRIPTION,
clusterDescriptionChangedEvent.getNewDescription().getShortDescription())),
"Description changed for topology with ID {}. Previous description: {}. New description: {}"));
}
}

static void logTopologyClosedEvent(
final ClusterId clusterId,
final ClusterClosedEvent clusterClosedEvent) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Stopped topology monitoring", clusterId,
singletonList(new Entry(TOPOLOGY_ID, clusterId)),
"Stopped monitoring for topology with ID {}"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.internal.connection;

import com.mongodb.MongoException;

import java.util.List;

import static java.util.Arrays.asList;
Expand All @@ -30,13 +32,13 @@ interface ClusterableServer extends Server {
/**
* Reset server description to connecting state
*/
void resetToConnecting();
void resetToConnecting(MongoException cause);

/**
* Invalidate the description of this server. Implementation of this method should not block, but rather trigger an asynchronous
* attempt to connect with the server in order to determine its current status.
*/
void invalidate();
void invalidate(MongoException cause);

/**
* <p>Closes the server. Instances that have been closed will no longer be available for use.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ void doMaintenance() {
try {
sdamProvider.optional().ifPresent(sdam -> {
if (!silentlyComplete.test(actualException)) {
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(actualException, sdam.context(newConnection)));
sdam.handleExceptionBeforeHandshake(SdamIssue.of(actualException, sdam.context(newConnection)));
}
});
} catch (Exception suppressed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class DefaultSdamServerDescriptionManager implements SdamServerDescription
}

@Override
public void update(final ServerDescription candidateDescription) {
public void monitorUpdate(final ServerDescription candidateDescription) {
cluster.withLock(() -> {
if (TopologyVersionHelper.newer(description.getTopologyVersion(), candidateDescription.getTopologyVersion())) {
return;
Expand All @@ -82,6 +82,18 @@ public void update(final ServerDescription candidateDescription) {
});
}

@Override
public void updateToUnknown(final ServerDescription candidateDescription) {
assertTrue(candidateDescription.getType() == UNKNOWN);
cluster.withLock(() -> {
if (TopologyVersionHelper.newer(description.getTopologyVersion(), candidateDescription.getTopologyVersion())) {
return;
}

updateDescription(candidateDescription);
});
}

@Override
public void handleExceptionBeforeHandshake(final SdamIssue sdamIssue) {
cluster.withLock(() -> handleException(sdamIssue, true));
Expand Down Expand Up @@ -128,7 +140,7 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand
updateDescription(sdamIssue.serverDescription());
connectionPool.invalidate(sdamIssue.exception().orElse(null));
serverMonitor.cancelCurrentCheck();
} else if (sdamIssue.relatedToWriteConcern() || !sdamIssue.specific()) {
} else if (sdamIssue.relatedToWriteConcern() || sdamIssue.relatedToStalePrimary()) {
updateDescription(sdamIssue.serverDescription());
serverMonitor.connect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Connection getConnection(final OperationContext operationContext) {
try {
operationEnd();
if (e instanceof MongoException) {
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(e, exceptionContext));
sdam.handleExceptionBeforeHandshake(SdamIssue.of(e, exceptionContext));
}
} catch (Exception suppressed) {
e.addSuppressed(suppressed);
Expand All @@ -118,7 +118,7 @@ public void getConnectionAsync(final OperationContext operationContext, final Si
if (t != null) {
try {
operationEnd();
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(t, exceptionContext));
sdam.handleExceptionBeforeHandshake(SdamIssue.of(t, exceptionContext));
} catch (Exception suppressed) {
t.addSuppressed(suppressed);
} finally {
Expand Down Expand Up @@ -150,14 +150,14 @@ private void operationEnd() {
}

@Override
public void resetToConnecting() {
sdam.update(unknownConnectingServerDescription(serverId, null));
public void resetToConnecting(final MongoException cause) {
sdam.updateToUnknown(unknownConnectingServerDescription(serverId, cause));
}

@Override
public void invalidate() {
public void invalidate(final MongoException cause) {
if (!isClosed()) {
sdam.handleExceptionAfterHandshake(SdamIssue.unspecified(sdam.context()));
sdam.handleExceptionAfterHandshake(SdamIssue.of(cause, sdam.context()));
}
}

Expand Down Expand Up @@ -208,7 +208,7 @@ public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection
.execute(connection);
} catch (MongoException e) {
try {
sdam.handleExceptionAfterHandshake(SdamIssue.specific(e, sdam.context(connection)));
sdam.handleExceptionAfterHandshake(SdamIssue.of(e, sdam.context(connection)));
} catch (Exception suppressed) {
e.addSuppressed(suppressed);
}
Expand All @@ -231,7 +231,7 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
.executeAsync(connection, errorHandlingCallback((result, t) -> {
if (t != null) {
try {
sdam.handleExceptionAfterHandshake(SdamIssue.specific(t, sdam.context(connection)));
sdam.handleExceptionAfterHandshake(SdamIssue.of(t, sdam.context(connection)));
} catch (Exception suppressed) {
t.addSuppressed(suppressed);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void run() {
}

logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().update(currentServerDescription);
sdamProvider.get().monitorUpdate(currentServerDescription);

if ((shouldStreamResponses && currentServerDescription.getType() != UNKNOWN)
|| (connection != null && connection.hasMoreToCome())
Expand Down
Loading