53
53
import java .util .Deque ;
54
54
import java .util .LinkedList ;
55
55
import java .util .List ;
56
+ import java .util .Queue ;
57
+ import java .util .concurrent .BlockingQueue ;
56
58
import java .util .concurrent .ConcurrentHashMap ;
57
- import java .util .concurrent .Executor ;
58
59
import java .util .concurrent .ExecutorService ;
59
60
import java .util .concurrent .Executors ;
61
+ import java .util .concurrent .LinkedBlockingQueue ;
60
62
import java .util .concurrent .ScheduledExecutorService ;
61
63
import java .util .concurrent .TimeUnit ;
62
64
import java .util .concurrent .atomic .AtomicBoolean ;
66
68
import java .util .concurrent .locks .Lock ;
67
69
import java .util .concurrent .locks .ReentrantLock ;
68
70
import java .util .concurrent .locks .StampedLock ;
71
+ import java .util .function .Consumer ;
72
+ import java .util .function .Supplier ;
69
73
70
74
import static com .mongodb .assertions .Assertions .assertFalse ;
71
75
import static com .mongodb .assertions .Assertions .assertNotNull ;
@@ -91,7 +95,7 @@ class DefaultConnectionPool implements ConnectionPool {
91
95
private final ConnectionPoolSettings settings ;
92
96
private final AtomicInteger generation = new AtomicInteger (0 );
93
97
private final ScheduledExecutorService sizeMaintenanceTimer ;
94
- private final Workers workers ;
98
+ private final AsyncWorkManager asyncWorkManager ;
95
99
private final Runnable maintenanceTask ;
96
100
private final ConnectionPoolListener connectionPoolListener ;
97
101
private final ServerId serverId ;
@@ -113,7 +117,7 @@ class DefaultConnectionPool implements ConnectionPool {
113
117
sizeMaintenanceTimer = createMaintenanceTimer ();
114
118
connectionPoolCreated (connectionPoolListener , serverId , settings );
115
119
openConcurrencyLimiter = new OpenConcurrencyLimiter (MAX_CONNECTING );
116
- workers = new Workers ();
120
+ asyncWorkManager = new AsyncWorkManager ();
117
121
connectionGenerationSupplier = new ConnectionGenerationSupplier () {
118
122
@ Override
119
123
public int getGeneration () {
@@ -172,35 +176,32 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
172
176
errHandlingCallback .onResult (null , checkOutFailed (failure ));
173
177
}
174
178
};
175
- PooledConnection immediateConnection = null ;
176
-
177
- try {
178
- immediateConnection = getPooledConnection (Timeout .immediate ());
179
- } catch (MongoTimeoutException e ) {
180
- // fall through
181
- } catch (RuntimeException e ) {
182
- eventSendingCallback .onResult (null , e );
183
- return ;
184
- }
185
-
186
- if (immediateConnection != null ) {
187
- openAsync (immediateConnection , timeout , eventSendingCallback );
188
- } else {
189
- workers .getter ().execute (() -> {
190
- if (timeout .expired ()) {
191
- eventSendingCallback .onResult (null , createTimeoutException (timeout ));
192
- return ;
193
- }
179
+ asyncWorkManager .enqueue (new Task (timeout , t -> {
180
+ if (t != null ) {
181
+ eventSendingCallback .onResult (null , t );
182
+ } else {
194
183
PooledConnection connection ;
195
184
try {
196
185
connection = getPooledConnection (timeout );
197
186
} catch (RuntimeException e ) {
198
187
eventSendingCallback .onResult (null , e );
199
188
return ;
200
189
}
201
- openAsync (connection , timeout , eventSendingCallback );
202
- });
203
- }
190
+ if (connection .opened ()) {
191
+ if (LOGGER .isTraceEnabled ()) {
192
+ LOGGER .trace (format ("Pooled connection %s to server %s is already open" ,
193
+ getId (connection ), serverId ));
194
+ }
195
+ eventSendingCallback .onResult (connection , null );
196
+ } else {
197
+ if (LOGGER .isTraceEnabled ()) {
198
+ LOGGER .trace (format ("Pooled connection %s to server %s is not yet open" ,
199
+ getId (connection ), serverId ));
200
+ }
201
+ openConcurrencyLimiter .openAsyncOrGetAvailable (connection , timeout , eventSendingCallback );
202
+ }
203
+ }
204
+ }));
204
205
}
205
206
206
207
/**
@@ -223,23 +224,6 @@ private Throwable checkOutFailed(final Throwable t) {
223
224
return result ;
224
225
}
225
226
226
- private void openAsync (final PooledConnection pooledConnection , final Timeout timeout ,
227
- final SingleResultCallback <InternalConnection > callback ) {
228
- if (pooledConnection .opened ()) {
229
- if (LOGGER .isTraceEnabled ()) {
230
- LOGGER .trace (format ("Pooled connection %s to server %s is already open" ,
231
- getId (pooledConnection ), serverId ));
232
- }
233
- callback .onResult (pooledConnection , null );
234
- } else {
235
- if (LOGGER .isTraceEnabled ()) {
236
- LOGGER .trace (format ("Pooled connection %s to server %s is not yet open" ,
237
- getId (pooledConnection ), serverId ));
238
- }
239
- workers .opener ().execute (() -> openConcurrencyLimiter .openAsyncOrGetAvailable (pooledConnection , timeout , callback ));
240
- }
241
- }
242
-
243
227
@ Override
244
228
public void invalidate () {
245
229
LOGGER .debug ("Invalidating the connection pool" );
@@ -264,7 +248,7 @@ public void close() {
264
248
if (sizeMaintenanceTimer != null ) {
265
249
sizeMaintenanceTimer .shutdownNow ();
266
250
}
267
- workers .close ();
251
+ asyncWorkManager .close ();
268
252
closed = true ;
269
253
connectionPoolListener .connectionPoolClosed (new ConnectionPoolClosedEvent (serverId ));
270
254
}
@@ -1170,55 +1154,160 @@ int getNumPinnedToTransaction() {
1170
1154
}
1171
1155
}
1172
1156
1157
+ /**
1158
+ * This class maintains threads needed to perform {@link #getAsync(SingleResultCallback)}.
1159
+ */
1173
1160
@ ThreadSafe
1174
- private static class Workers implements AutoCloseable {
1175
- private volatile ExecutorService getter ;
1176
- private volatile ExecutorService opener ;
1161
+ private static class AsyncWorkManager implements AutoCloseable {
1162
+ private volatile State state ;
1163
+ private volatile BlockingQueue < Task > tasks ;
1177
1164
private final Lock lock ;
1165
+ @ Nullable
1166
+ private ExecutorService worker ;
1178
1167
1179
- Workers () {
1168
+ AsyncWorkManager () {
1169
+ state = State .NEW ;
1170
+ tasks = new LinkedBlockingQueue <>();
1180
1171
lock = new StampedLock ().asWriteLock ();
1181
1172
}
1182
1173
1183
- Executor getter () {
1184
- if (getter == null ) {
1185
- lock .lock ();
1186
- try {
1187
- if (getter == null ) {
1188
- getter = Executors .newSingleThreadExecutor (new DaemonThreadFactory ("AsyncGetter" ));
1189
- }
1190
- } finally {
1191
- lock .unlock ();
1174
+ void enqueue (final Task task ) {
1175
+ lock .lock ();
1176
+ try {
1177
+ if (initUnlessClosed ()) {
1178
+ tasks .add (task );
1179
+ return ;
1192
1180
}
1181
+ } finally {
1182
+ lock .unlock ();
1193
1183
}
1194
- return getter ;
1184
+ task . failAsClosed () ;
1195
1185
}
1196
1186
1197
- Executor opener () {
1198
- if (opener == null ) {
1199
- lock .lock ();
1200
- try {
1201
- if (opener == null ) {
1202
- opener = Executors .newSingleThreadExecutor (new DaemonThreadFactory ("AsyncOpener" ));
1203
- }
1204
- } finally {
1205
- lock .unlock ();
1206
- }
1187
+ /**
1188
+ * Invocations of this method must be guarded by {@link #lock}.
1189
+ *
1190
+ * @return {@code false} iff the {@link #state} is {@link State#CLOSED}.
1191
+ */
1192
+ private boolean initUnlessClosed () {
1193
+ boolean result = true ;
1194
+ if (state == State .NEW ) {
1195
+ worker = Executors .newSingleThreadExecutor (new DaemonThreadFactory ("AsyncGetter" ));
1196
+ worker .submit (() -> runAndLogUncaught (this ::workerRun ));
1197
+ state = State .INITIALIZED ;
1198
+ } else if (state == State .CLOSED ) {
1199
+ result = false ;
1207
1200
}
1208
- return opener ;
1201
+ return result ;
1209
1202
}
1210
1203
1204
+ /**
1205
+ * {@linkplain Thread#interrupt() Interrupts} all workers and causes queued tasks to
1206
+ * {@linkplain Task#failAsClosed() fail} asynchronously.
1207
+ */
1211
1208
@ Override
1209
+ @ SuppressWarnings ("try" )
1212
1210
public void close () {
1211
+ lock .lock ();
1213
1212
try {
1214
- if (getter != null ) {
1215
- getter .shutdownNow ();
1213
+ if (state != State .CLOSED ) {
1214
+ state = State .CLOSED ;
1215
+ if (worker != null ) {
1216
+ worker .shutdownNow (); // at this point we interrupt `worker`s thread
1217
+ }
1216
1218
}
1217
1219
} finally {
1218
- if (opener != null ) {
1219
- opener .shutdownNow ();
1220
+ lock .unlock ();
1221
+ }
1222
+ }
1223
+
1224
+ private void workerRun () {
1225
+ try {
1226
+ while (state != State .CLOSED ) {
1227
+ try {
1228
+ Task task = tasks .take ();
1229
+ if (task .timeout ().expired ()) {
1230
+ task .failAsTimedOut ();
1231
+ } else {
1232
+ task .execute ();
1233
+ }
1234
+ } catch (RuntimeException e ) {
1235
+ LOGGER .error (null , e );
1236
+ }
1220
1237
}
1238
+ } catch (InterruptedException closed ) {
1239
+ // fail the rest of the tasks and stop
1221
1240
}
1241
+ failAllTasksAfterClosing ();
1242
+ }
1243
+
1244
+ private void failAllTasksAfterClosing () {
1245
+ Queue <Task > localGets ;
1246
+ lock .lock ();
1247
+ try {
1248
+ assertTrue (state == State .CLOSED );
1249
+ // at this point it is guaranteed that no thread enqueues a task
1250
+ localGets = tasks ;
1251
+ if (!tasks .isEmpty ()) {
1252
+ tasks = new LinkedBlockingQueue <>();
1253
+ }
1254
+ } finally {
1255
+ lock .unlock ();
1256
+ }
1257
+ localGets .forEach (Task ::failAsClosed );
1258
+ localGets .clear ();
1259
+ }
1260
+
1261
+ private void runAndLogUncaught (final Runnable runnable ) {
1262
+ try {
1263
+ runnable .run ();
1264
+ } catch (Throwable t ) {
1265
+ LOGGER .error ("The pool is not going to work correctly from now on. You may want to recreate the MongoClient" , t );
1266
+ throw t ;
1267
+ }
1268
+ }
1269
+
1270
+ private enum State {
1271
+ NEW ,
1272
+ INITIALIZED ,
1273
+ CLOSED
1274
+ }
1275
+ }
1276
+
1277
+ /**
1278
+ * An action that is allowed to be completed (failed or executed) at most once, and a timeout associated with it.
1279
+ */
1280
+ @ NotThreadSafe
1281
+ final class Task {
1282
+ private final Timeout timeout ;
1283
+ private final Consumer <RuntimeException > action ;
1284
+ private boolean completed ;
1285
+
1286
+ Task (final Timeout timeout , final Consumer <RuntimeException > action ) {
1287
+ this .timeout = timeout ;
1288
+ this .action = action ;
1289
+ }
1290
+
1291
+ void execute () {
1292
+ doComplete (() -> null );
1293
+ }
1294
+
1295
+ void failAsClosed () {
1296
+ doComplete (ConcurrentPool ::poolClosedException );
1297
+ }
1298
+
1299
+ void failAsTimedOut () {
1300
+ doComplete (() -> createTimeoutException (timeout ));
1301
+ }
1302
+
1303
+ private void doComplete (final Supplier <RuntimeException > failureSupplier ) {
1304
+ assertFalse (completed );
1305
+ completed = true ;
1306
+ action .accept (failureSupplier .get ());
1307
+ }
1308
+
1309
+ Timeout timeout () {
1310
+ return timeout ;
1222
1311
}
1223
1312
}
1224
1313
}
0 commit comments