Skip to content

Commit f6b540c

Browse files
authored
Ensure server discovery and monitoring events are published in order (#860)
* Ensure events are ordered according to how they are applied to driver state * Publish all events to application listeners one at a time, on a separate thread * Document and ensure that cluster event listeners are not required to be thread safe JAVA-4449
1 parent 3691ae6 commit f6b540c

21 files changed

+698
-92
lines changed

driver-core/src/main/com/mongodb/event/ClusterListener.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,14 @@
2020

2121
/**
2222
* A listener for cluster-related events.
23-
*
23+
* <p>
24+
* It does not have to be thread-safe. All events received by {@link ClusterListener}, {@link ServerListener},
25+
* {@link ServerMonitorListener} are totally ordered (and the event order implies the happens-before order), provided that the listeners
26+
* are not shared by different {@code MongoClient}s. This means that even if you have a single class implementing all of
27+
* {@link ClusterListener}, {@link ServerListener}, {@link ServerMonitorListener}, it does not have to be thread-safe.
28+
* </p>
29+
* @see ServerListener
30+
* @see ServerMonitorListener
2431
* @since 3.3
2532
*/
2633
public interface ClusterListener extends EventListener {

driver-core/src/main/com/mongodb/event/ServerListener.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020

2121
/**
2222
* A listener for server-related events
23-
*
23+
* <p>
24+
* It does not have to be thread-safe, see {@link ClusterListener} for the details regarding the order of events.
25+
* </p>
26+
* @see ClusterListener
27+
* @see ServerMonitorListener
2428
* @since 3.3
2529
*/
2630
public interface ServerListener extends EventListener {

driver-core/src/main/com/mongodb/event/ServerMonitorListener.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
/**
2424
* A listener for server monitor-related events
25-
*
25+
* <p>
26+
* It does not have to be thread-safe, see {@link ClusterListener} for the details regarding the order of events.
27+
* </p>
28+
* @see ClusterListener
29+
* @see ServerListener
2630
* @since 3.3
2731
*/
2832
public interface ServerMonitorListener extends EventListener {

driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ protected void initialize(final Collection<ServerAddress> serverAddresses) {
9898
addServer(serverAddress);
9999
}
100100
newDescription = updateDescription();
101+
fireChangeEvent(newDescription, currentDescription);
101102
}
102-
fireChangeEvent(newDescription, currentDescription);
103103
}
104104

105105
@Override
@@ -221,9 +221,9 @@ private void onChange(final ServerDescriptionChangedEvent event) {
221221
oldClusterDescription = getCurrentDescription();
222222
newClusterDescription = updateDescription();
223223
}
224-
}
225-
if (shouldUpdateDescription) {
226-
fireChangeEvent(newClusterDescription, oldClusterDescription);
224+
if (shouldUpdateDescription) {
225+
fireChangeEvent(newClusterDescription, oldClusterDescription);
226+
}
227227
}
228228
}
229229

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.connection;
18+
19+
import com.mongodb.annotations.ThreadSafe;
20+
import com.mongodb.connection.ClusterId;
21+
import com.mongodb.event.ClusterClosedEvent;
22+
import com.mongodb.event.ClusterDescriptionChangedEvent;
23+
import com.mongodb.event.ClusterListener;
24+
import com.mongodb.event.ClusterOpeningEvent;
25+
import com.mongodb.event.ServerClosedEvent;
26+
import com.mongodb.event.ServerDescriptionChangedEvent;
27+
import com.mongodb.event.ServerHeartbeatFailedEvent;
28+
import com.mongodb.event.ServerHeartbeatStartedEvent;
29+
import com.mongodb.event.ServerHeartbeatSucceededEvent;
30+
import com.mongodb.event.ServerListener;
31+
import com.mongodb.event.ServerMonitorListener;
32+
import com.mongodb.event.ServerOpeningEvent;
33+
import com.mongodb.internal.VisibleForTesting;
34+
35+
import java.util.concurrent.BlockingQueue;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.function.Supplier;
38+
39+
import static com.mongodb.assertions.Assertions.notNull;
40+
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
41+
42+
/**
43+
* An implementation of a listener for all cluster-related events. Its purpose is the following:
44+
*
45+
* 1. To ensure that cluster-related events are delivered one at a time, with happens-before semantics
46+
* 2. To ensure that application-provided event listener methods do not execute within critical sections of the driver
47+
*
48+
* This is done by adding all events to an unbounded blocking queue, and then publishing them from a dedicated thread by taking
49+
* them off the queue one at a time.
50+
*
51+
* There is an assumption that the last event that should be published is the {@link ClusterClosedEvent}. Once that event is published,
52+
* the publishing thread is allowed to die.
53+
*/
54+
@ThreadSafe
55+
final class AsynchronousClusterEventListener implements ClusterListener, ServerListener, ServerMonitorListener {
56+
private final BlockingQueue<Supplier<Boolean>> eventPublishers = new LinkedBlockingQueue<>();
57+
private final ClusterListener clusterListener;
58+
private final ServerListener serverListener;
59+
private final ServerMonitorListener serverMonitorListener;
60+
61+
private final Thread publishingThread;
62+
63+
@FunctionalInterface
64+
private interface VoidFunction<T> {
65+
void apply(T t);
66+
}
67+
68+
static AsynchronousClusterEventListener startNew(final ClusterId clusterId, final ClusterListener clusterListener,
69+
final ServerListener serverListener, final ServerMonitorListener serverMonitorListener) {
70+
AsynchronousClusterEventListener result = new AsynchronousClusterEventListener(clusterId, clusterListener, serverListener,
71+
serverMonitorListener);
72+
result.publishingThread.start();
73+
return result;
74+
}
75+
76+
private AsynchronousClusterEventListener(final ClusterId clusterId, final ClusterListener clusterListener,
77+
final ServerListener serverListener, final ServerMonitorListener serverMonitorListener) {
78+
this.clusterListener = notNull("clusterListener", clusterListener);
79+
this.serverListener = notNull("serverListener", serverListener);
80+
this.serverMonitorListener = notNull("serverMonitorListener", serverMonitorListener);
81+
publishingThread = new Thread(this::publishEvents, "cluster-event-publisher-" + clusterId.getValue());
82+
publishingThread.setDaemon(true);
83+
}
84+
85+
@VisibleForTesting(otherwise = PRIVATE)
86+
Thread getPublishingThread() {
87+
return publishingThread;
88+
}
89+
90+
@Override
91+
public void clusterOpening(final ClusterOpeningEvent event) {
92+
addClusterEventInvocation(clusterListener -> clusterListener.clusterOpening(event), false);
93+
}
94+
95+
@Override
96+
public void clusterClosed(final ClusterClosedEvent event) {
97+
addClusterEventInvocation(clusterListener -> clusterListener.clusterClosed(event), true);
98+
}
99+
100+
@Override
101+
public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) {
102+
addClusterEventInvocation(clusterListener -> clusterListener.clusterDescriptionChanged(event), false);
103+
}
104+
105+
@Override
106+
public void serverOpening(final ServerOpeningEvent event) {
107+
addServerEventInvocation(serverListener -> serverListener.serverOpening(event));
108+
}
109+
110+
@Override
111+
public void serverClosed(final ServerClosedEvent event) {
112+
addServerEventInvocation(serverListener -> serverListener.serverClosed(event));
113+
}
114+
115+
@Override
116+
public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) {
117+
addServerEventInvocation(serverListener -> serverListener.serverDescriptionChanged(event));
118+
}
119+
120+
@Override
121+
public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) {
122+
addServerMonitorEventInvocation(serverMonitorListener -> serverMonitorListener.serverHearbeatStarted(event));
123+
}
124+
125+
@Override
126+
public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) {
127+
addServerMonitorEventInvocation(serverMonitorListener -> serverMonitorListener.serverHeartbeatSucceeded(event));
128+
}
129+
130+
@Override
131+
public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) {
132+
addServerMonitorEventInvocation(serverMonitorListener -> serverMonitorListener.serverHeartbeatFailed(event));
133+
}
134+
135+
private void addClusterEventInvocation(final VoidFunction<ClusterListener> eventPublisher, final boolean isLastEvent) {
136+
addEvent(() -> {
137+
eventPublisher.apply(clusterListener);
138+
return isLastEvent;
139+
});
140+
}
141+
142+
private void addServerEventInvocation(final VoidFunction<ServerListener> eventPublisher) {
143+
addEvent(() -> {
144+
eventPublisher.apply(serverListener);
145+
return false;
146+
});
147+
}
148+
149+
private void addServerMonitorEventInvocation(final VoidFunction<ServerMonitorListener> eventPublisher) {
150+
addEvent(() -> {
151+
eventPublisher.apply(serverMonitorListener);
152+
return false;
153+
});
154+
}
155+
156+
private void addEvent(final Supplier<Boolean> supplier) {
157+
// protect against rogue publishers
158+
if (!publishingThread.isAlive()) {
159+
return;
160+
}
161+
eventPublishers.add(supplier);
162+
}
163+
164+
private void publishEvents() {
165+
while (true) {
166+
try {
167+
Supplier<Boolean> eventPublisher = eventPublishers.take();
168+
boolean isLastEvent = eventPublisher.get();
169+
if (isLastEvent) {
170+
break;
171+
}
172+
} catch (RuntimeException | InterruptedException e) {
173+
// ignore exceptions thrown from listeners, also ignore interrupts that user code may cause
174+
}
175+
}
176+
}
177+
}

driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@
5757
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_WIRE_VERSION;
5858
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
5959
import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents;
60-
import static com.mongodb.internal.event.EventListenerHelper.createServerListener;
61-
import static com.mongodb.internal.event.EventListenerHelper.getClusterListener;
60+
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
6261
import static java.lang.String.format;
6362
import static java.util.Arrays.asList;
6463
import static java.util.Comparator.comparingInt;
@@ -85,7 +84,7 @@ abstract class BaseCluster implements Cluster {
8584
this.clusterId = notNull("clusterId", clusterId);
8685
this.settings = notNull("settings", settings);
8786
this.serverFactory = notNull("serverFactory", serverFactory);
88-
this.clusterListener = getClusterListener(settings);
87+
this.clusterListener = singleClusterListener(settings);
8988
clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
9089
description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.<ServerDescription>emptyList(),
9190
settings, serverFactory.getSettings());
@@ -250,6 +249,10 @@ protected synchronized void updateDescription(final ClusterDescription newDescri
250249
updatePhase();
251250
}
252251

252+
/**
253+
* Subclasses must ensure that this method is called in a way that events are delivered in a predictable order.
254+
* Typically, this means calling it while holding a lock that includes both updates to the cluster state and firing the event.
255+
*/
253256
protected void fireChangeEvent(final ClusterDescription newDescription, final ClusterDescription previousDescription) {
254257
if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) {
255258
clusterListener.clusterDescriptionChanged(
@@ -384,8 +387,7 @@ private ServerSelector getCompositeServerSelector(final ServerSelector serverSel
384387

385388
protected ClusterableServer createServer(final ServerAddress serverAddress,
386389
final ServerDescriptionChangedListener serverDescriptionChangedListener) {
387-
return serverFactory.create(serverAddress, serverDescriptionChangedListener, createServerListener(serverFactory.getSettings()),
388-
clusterClock);
390+
return serverFactory.create(serverAddress, serverDescriptionChangedListener, clusterClock);
389391
}
390392

391393
private void throwIfIncompatible(final ClusterDescription curDescription) {

driver-core/src/main/com/mongodb/internal/connection/ClusterableServerFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.connection.ServerSettings;
21-
import com.mongodb.event.ServerListener;
2221

2322
public interface ClusterableServerFactory {
2423
ClusterableServer create(ServerAddress serverAddress, ServerDescriptionChangedListener serverDescriptionChangedListener,
25-
ServerListener serverListener, ClusterClock clusterClock);
24+
ClusterClock clusterClock);
2625

2726
ServerSettings getSettings();
2827
}

driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,22 @@
2626
import com.mongodb.connection.ConnectionPoolSettings;
2727
import com.mongodb.connection.ServerSettings;
2828
import com.mongodb.connection.StreamFactory;
29+
import com.mongodb.event.ClusterListener;
2930
import com.mongodb.event.CommandListener;
31+
import com.mongodb.event.ServerListener;
32+
import com.mongodb.event.ServerMonitorListener;
3033
import com.mongodb.lang.Nullable;
3134

3235
import java.util.List;
3336

37+
import static com.mongodb.internal.event.EventListenerHelper.NO_OP_CLUSTER_LISTENER;
38+
import static com.mongodb.internal.event.EventListenerHelper.NO_OP_SERVER_LISTENER;
39+
import static com.mongodb.internal.event.EventListenerHelper.NO_OP_SERVER_MONITOR_LISTENER;
40+
import static com.mongodb.internal.event.EventListenerHelper.clusterListenerMulticaster;
41+
import static com.mongodb.internal.event.EventListenerHelper.serverListenerMulticaster;
42+
import static com.mongodb.internal.event.EventListenerHelper.serverMonitorListenerMulticaster;
43+
import static java.util.Collections.singletonList;
44+
3445
/**
3546
* The default factory for cluster implementations.
3647
*
@@ -41,8 +52,8 @@ public final class DefaultClusterFactory {
4152
/**
4253
* Creates a cluster with the given settings. The cluster mode will be based on the mode from the settings.
4354
*
44-
* @param clusterSettings the cluster settings
45-
* @param serverSettings the server settings
55+
* @param originalClusterSettings the cluster settings
56+
* @param originalServerSettings the server settings
4657
* @param connectionPoolSettings the connection pool settings
4758
* @param internalConnectionPoolSettings the internal connection pool settings
4859
* @param streamFactory the stream factory
@@ -57,7 +68,7 @@ public final class DefaultClusterFactory {
5768
*
5869
* @since 3.6
5970
*/
60-
public Cluster createCluster(final ClusterSettings clusterSettings, final ServerSettings serverSettings,
71+
public Cluster createCluster(final ClusterSettings originalClusterSettings, final ServerSettings originalServerSettings,
6172
final ConnectionPoolSettings connectionPoolSettings,
6273
final InternalConnectionPoolSettings internalConnectionPoolSettings,
6374
final StreamFactory streamFactory, final StreamFactory heartbeatStreamFactory,
@@ -67,6 +78,30 @@ public Cluster createCluster(final ClusterSettings clusterSettings, final Server
6778
final List<MongoCompressor> compressorList, final @Nullable ServerApi serverApi) {
6879

6980
ClusterId clusterId = new ClusterId();
81+
ClusterSettings clusterSettings;
82+
ServerSettings serverSettings;
83+
84+
if (noClusterEventListeners(originalClusterSettings, originalServerSettings)) {
85+
clusterSettings = ClusterSettings.builder(originalClusterSettings)
86+
.clusterListenerList(singletonList(NO_OP_CLUSTER_LISTENER))
87+
.build();
88+
serverSettings = ServerSettings.builder(originalServerSettings)
89+
.serverListenerList(singletonList(NO_OP_SERVER_LISTENER))
90+
.serverMonitorListenerList(singletonList(NO_OP_SERVER_MONITOR_LISTENER))
91+
.build();
92+
} else {
93+
AsynchronousClusterEventListener clusterEventListener =
94+
AsynchronousClusterEventListener.startNew(clusterId, getClusterListener(originalClusterSettings),
95+
getServerListener(originalServerSettings), getServerMonitorListener(originalServerSettings));
96+
97+
clusterSettings = ClusterSettings.builder(originalClusterSettings)
98+
.clusterListenerList(singletonList(clusterEventListener))
99+
.build();
100+
serverSettings = ServerSettings.builder(originalServerSettings)
101+
.serverListenerList(singletonList(clusterEventListener))
102+
.serverMonitorListenerList(singletonList(clusterEventListener))
103+
.build();
104+
}
70105

71106
DnsSrvRecordMonitorFactory dnsSrvRecordMonitorFactory = new DefaultDnsSrvRecordMonitorFactory(clusterId, serverSettings);
72107

@@ -96,4 +131,28 @@ public Cluster createCluster(final ClusterSettings clusterSettings, final Server
96131
}
97132
}
98133
}
134+
135+
private boolean noClusterEventListeners(final ClusterSettings clusterSettings, final ServerSettings serverSettings) {
136+
return clusterSettings.getClusterListeners().isEmpty()
137+
&& serverSettings.getServerListeners().isEmpty()
138+
&& serverSettings.getServerMonitorListeners().isEmpty();
139+
}
140+
141+
private static ClusterListener getClusterListener(final ClusterSettings clusterSettings) {
142+
return clusterSettings.getClusterListeners().size() == 0
143+
? NO_OP_CLUSTER_LISTENER
144+
: clusterListenerMulticaster(clusterSettings.getClusterListeners());
145+
}
146+
147+
private static ServerListener getServerListener(final ServerSettings serverSettings) {
148+
return serverSettings.getServerListeners().size() == 0
149+
? NO_OP_SERVER_LISTENER
150+
: serverListenerMulticaster(serverSettings.getServerListeners());
151+
}
152+
153+
private static ServerMonitorListener getServerMonitorListener(final ServerSettings serverSettings) {
154+
return serverSettings.getServerMonitorListeners().size() == 0
155+
? NO_OP_SERVER_MONITOR_LISTENER
156+
: serverMonitorListenerMulticaster(serverSettings.getServerMonitorListeners());
157+
}
99158
}

0 commit comments

Comments
 (0)