Skip to content

Commit 3e323b5

Browse files
committed
Added implicit support for Sessions in the Async Driver
JAVA-2621
1 parent 2015b14 commit 3e323b5

File tree

113 files changed

+865
-264
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+865
-264
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Copyright 2017 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async.client;
18+
19+
import com.mongodb.session.ClientSession;
20+
import com.mongodb.ClientSessionOptions;
21+
import com.mongodb.ReadPreference;
22+
import com.mongodb.session.ServerSession;
23+
import com.mongodb.async.SingleResultCallback;
24+
import com.mongodb.binding.AsyncClusterBinding;
25+
import com.mongodb.binding.AsyncReadBinding;
26+
import com.mongodb.binding.AsyncReadWriteBinding;
27+
import com.mongodb.binding.AsyncWriteBinding;
28+
import com.mongodb.connection.ClusterConnectionMode;
29+
import com.mongodb.connection.ClusterDescription;
30+
import com.mongodb.connection.Server;
31+
import com.mongodb.connection.ServerDescription;
32+
import com.mongodb.diagnostics.logging.Logger;
33+
import com.mongodb.diagnostics.logging.Loggers;
34+
import com.mongodb.internal.session.ClientSessionImpl;
35+
import com.mongodb.operation.AsyncOperationExecutor;
36+
import com.mongodb.operation.AsyncReadOperation;
37+
import com.mongodb.operation.AsyncWriteOperation;
38+
import com.mongodb.selector.ServerSelector;
39+
40+
import java.util.List;
41+
42+
import static com.mongodb.assertions.Assertions.isTrue;
43+
import static com.mongodb.assertions.Assertions.notNull;
44+
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
45+
46+
class AsyncOperationExecutorImpl implements AsyncOperationExecutor {
47+
private static final Logger LOGGER = Loggers.getLogger("client");
48+
private final MongoClientImpl mongoClient;
49+
50+
AsyncOperationExecutorImpl(final MongoClientImpl mongoClient) {
51+
this.mongoClient = mongoClient;
52+
}
53+
54+
@Override
55+
public <T> void execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference,
56+
final SingleResultCallback<T> callback) {
57+
execute(operation, readPreference, null, callback);
58+
}
59+
60+
@Override
61+
public <T> void execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ClientSession session,
62+
final SingleResultCallback<T> callback) {
63+
notNull("operation", operation);
64+
notNull("readPreference", readPreference);
65+
notNull("callback", callback);
66+
final SingleResultCallback<T> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
67+
getClientSession(session, new SingleResultCallback<ClientSession>(){
68+
@Override
69+
public void onResult(final ClientSession clientSession, final Throwable t) {
70+
if (t != null) {
71+
errHandlingCallback.onResult(null, t);
72+
} else {
73+
final AsyncReadBinding binding = getReadWriteBinding(readPreference, clientSession,
74+
session == null && clientSession != null);
75+
operation.executeAsync(binding, new SingleResultCallback<T>() {
76+
@Override
77+
public void onResult(final T result, final Throwable t) {
78+
try {
79+
errHandlingCallback.onResult(result, t);
80+
} finally {
81+
binding.release();
82+
}
83+
}
84+
});
85+
}
86+
}
87+
});
88+
}
89+
90+
@Override
91+
public <T> void execute(final AsyncWriteOperation<T> operation, final SingleResultCallback<T> callback) {
92+
execute(operation, null, callback);
93+
}
94+
95+
@Override
96+
public <T> void execute(final AsyncWriteOperation<T> operation, final ClientSession session,
97+
final SingleResultCallback<T> callback) {
98+
notNull("operation", operation);
99+
notNull("callback", callback);
100+
final SingleResultCallback<T> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
101+
getClientSession(session, new SingleResultCallback<ClientSession>() {
102+
@Override
103+
public void onResult(final ClientSession clientSession, final Throwable t) {
104+
if (t != null) {
105+
errHandlingCallback.onResult(null, t);
106+
} else {
107+
final AsyncWriteBinding binding = getReadWriteBinding(ReadPreference.primary(), clientSession,
108+
session == null && clientSession != null);
109+
operation.executeAsync(binding, new SingleResultCallback<T>() {
110+
@Override
111+
public void onResult(final T result, final Throwable t) {
112+
try {
113+
errHandlingCallback.onResult(result, t);
114+
} finally {
115+
binding.release();
116+
}
117+
}
118+
});
119+
}
120+
}
121+
});
122+
}
123+
124+
private void getClientSession(final ClientSession clientSessionFromOperation,
125+
final SingleResultCallback<ClientSession> callback) {
126+
if (clientSessionFromOperation != null) {
127+
isTrue("ClientSession from same MongoClient",
128+
clientSessionFromOperation.getOriginator() == mongoClient);
129+
callback.onResult(clientSessionFromOperation, null);
130+
} else {
131+
createClientSession(ClientSessionOptions.builder().causallyConsistent(false).build(), callback);
132+
}
133+
}
134+
135+
private AsyncReadWriteBinding getReadWriteBinding(final ReadPreference readPreference, final ClientSession session,
136+
final boolean ownsSession) {
137+
notNull("readPreference", readPreference);
138+
AsyncReadWriteBinding readWriteBinding = new AsyncClusterBinding(mongoClient.getCluster(), readPreference);
139+
if (session != null) {
140+
readWriteBinding = new ClientSessionBinding(session, ownsSession, readWriteBinding);
141+
}
142+
return readWriteBinding;
143+
}
144+
145+
@SuppressWarnings("deprecation")
146+
private void createClientSession(final ClientSessionOptions options, final SingleResultCallback<ClientSession> callback) {
147+
if (mongoClient.getSettings().getCredentialList().size() > 1) {
148+
callback.onResult(null, null);
149+
} else {
150+
ClusterDescription clusterDescription = mongoClient.getCluster().getDescription();
151+
if (!getServerDescriptionListToConsiderForSessionSupport(clusterDescription).isEmpty()
152+
&& clusterDescription.getLogicalSessionTimeoutMinutes() != null) {
153+
getServerSessionCallback(options, callback).onResult(mongoClient.getServerSessionPool().get(), null);
154+
} else {
155+
mongoClient.getCluster().selectServerAsync(new ServerSelector() {
156+
@Override
157+
public List<ServerDescription> select(final ClusterDescription clusterDescription) {
158+
return getServerDescriptionListToConsiderForSessionSupport(clusterDescription);
159+
}
160+
}, new SingleResultCallback<Server>() {
161+
@Override
162+
public void onResult(final Server server, final Throwable t) {
163+
if (t != null) {
164+
callback.onResult(null, null);
165+
} else if (server.getDescription().getLogicalSessionTimeoutMinutes() == null) {
166+
callback.onResult(null, null);
167+
} else {
168+
getServerSessionCallback(options, callback).onResult(mongoClient.getServerSessionPool().get(), null);
169+
}
170+
}
171+
});
172+
}
173+
}
174+
}
175+
176+
@SuppressWarnings("deprecation")
177+
private List<ServerDescription> getServerDescriptionListToConsiderForSessionSupport(final ClusterDescription clusterDescription) {
178+
if (clusterDescription.getConnectionMode() == ClusterConnectionMode.SINGLE) {
179+
return clusterDescription.getAny();
180+
} else {
181+
return clusterDescription.getAnyPrimaryOrSecondary();
182+
}
183+
}
184+
185+
private SingleResultCallback<ServerSession> getServerSessionCallback(final ClientSessionOptions options,
186+
final SingleResultCallback<ClientSession> callback) {
187+
return new SingleResultCallback<ServerSession>() {
188+
@Override
189+
public void onResult(final ServerSession serverSession, final Throwable t) {
190+
if (t != null) {
191+
callback.onResult(null, t);
192+
} else {
193+
callback.onResult(new ClientSessionImpl(mongoClient.getServerSessionPool(), serverSession,
194+
System.identityHashCode(mongoClient), options), null);
195+
}
196+
}
197+
};
198+
}
199+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2017 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async.client;
18+
19+
import com.mongodb.session.ClientSession;
20+
import com.mongodb.ReadPreference;
21+
import com.mongodb.async.SingleResultCallback;
22+
import com.mongodb.binding.AsyncConnectionSource;
23+
import com.mongodb.binding.AsyncReadWriteBinding;
24+
import com.mongodb.connection.AsyncConnection;
25+
import com.mongodb.connection.ServerDescription;
26+
import com.mongodb.session.SessionContext;
27+
import com.mongodb.internal.session.ClientSessionContext;
28+
29+
import static org.bson.assertions.Assertions.notNull;
30+
31+
class ClientSessionBinding implements AsyncReadWriteBinding {
32+
private final AsyncReadWriteBinding wrapped;
33+
private final ClientSession session;
34+
private final boolean ownsSession;
35+
private final ClientSessionContext sessionContext;
36+
37+
ClientSessionBinding(final ClientSession session, final boolean ownsSession, final AsyncReadWriteBinding wrapped) {
38+
this.wrapped = notNull("wrapped", wrapped);
39+
this.ownsSession = ownsSession;
40+
this.session = notNull("session", session);
41+
this.sessionContext = new ClientSessionContext(session);
42+
}
43+
44+
@Override
45+
public ReadPreference getReadPreference() {
46+
return wrapped.getReadPreference();
47+
}
48+
49+
@Override
50+
public void getWriteConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
51+
wrapped.getWriteConnectionSource(new SingleResultCallback<AsyncConnectionSource>() {
52+
@Override
53+
public void onResult(final AsyncConnectionSource result, final Throwable t) {
54+
if (t != null) {
55+
callback.onResult(null, t);
56+
} else {
57+
callback.onResult(new SessionBindingAsyncConnectionSource(result), null);
58+
}
59+
}
60+
});
61+
}
62+
63+
@Override
64+
public SessionContext getSessionContext() {
65+
return sessionContext;
66+
}
67+
68+
@Override
69+
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
70+
wrapped.getReadConnectionSource(new SingleResultCallback<AsyncConnectionSource>() {
71+
@Override
72+
public void onResult(final AsyncConnectionSource result, final Throwable t) {
73+
if (t != null) {
74+
callback.onResult(null, t);
75+
} else {
76+
callback.onResult(new SessionBindingAsyncConnectionSource(result), null);
77+
}
78+
}
79+
});
80+
}
81+
82+
@Override
83+
public int getCount() {
84+
return wrapped.getCount();
85+
}
86+
87+
@Override
88+
public AsyncReadWriteBinding retain() {
89+
wrapped.retain();
90+
return this;
91+
}
92+
93+
@Override
94+
public void release() {
95+
wrapped.release();
96+
closeSessionIfCountIsZero();
97+
}
98+
99+
private void closeSessionIfCountIsZero() {
100+
if (getCount() == 0 && ownsSession) {
101+
session.close();
102+
}
103+
}
104+
105+
private class SessionBindingAsyncConnectionSource implements AsyncConnectionSource {
106+
private AsyncConnectionSource wrapped;
107+
108+
SessionBindingAsyncConnectionSource(final AsyncConnectionSource wrapped) {
109+
this.wrapped = wrapped;
110+
}
111+
112+
@Override
113+
public ServerDescription getServerDescription() {
114+
return wrapped.getServerDescription();
115+
}
116+
117+
@Override
118+
public SessionContext getSessionContext() {
119+
return sessionContext;
120+
}
121+
122+
@Override
123+
public void getConnection(final SingleResultCallback<AsyncConnection> callback) {
124+
wrapped.getConnection(callback);
125+
}
126+
127+
@Override
128+
public AsyncConnectionSource retain() {
129+
wrapped = wrapped.retain();
130+
return this;
131+
}
132+
133+
@Override
134+
public int getCount() {
135+
return wrapped.getCount();
136+
}
137+
138+
@Override
139+
public void release() {
140+
wrapped.release();
141+
closeSessionIfCountIsZero();
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)