Skip to content

Commit 81cb6fe

Browse files
committed
Added EmbeddedConnectionPool to reduce the cost of new database clients
JAVA-2807
1 parent c9ea8f7 commit 81cb6fe

File tree

9 files changed

+465
-272
lines changed

9 files changed

+465
-272
lines changed

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
/**
5555
* A command message that uses OP_MSG or OP_QUERY to send the command.
5656
*/
57-
final class CommandMessage extends RequestMessage {
57+
public final class CommandMessage extends RequestMessage {
5858
private final MongoNamespace namespace;
5959
private final BsonDocument command;
6060
private final FieldNameValidator commandFieldNameValidator;

driver-core/src/main/com/mongodb/internal/connection/ResponseBuffers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import java.io.Closeable;
2424

25-
class ResponseBuffers implements Closeable {
25+
public class ResponseBuffers implements Closeable {
2626
private final ReplyHeader replyHeader;
2727
private final ByteBuf bodyByteBuffer;
2828
private final int bodyByteBufferStartPosition;

driver-embedded/src/main/com/mongodb/embedded/client/EmbeddedConnection.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

driver-embedded/src/main/com/mongodb/embedded/client/EmbeddedDatabase.java

Lines changed: 0 additions & 67 deletions
This file was deleted.
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.embedded.client;
18+
19+
import com.mongodb.MongoCompressor;
20+
import com.mongodb.ServerAddress;
21+
import com.mongodb.async.SingleResultCallback;
22+
import com.mongodb.connection.AsyncCompletionHandler;
23+
import com.mongodb.connection.ClusterId;
24+
import com.mongodb.connection.ConnectionDescription;
25+
import com.mongodb.connection.ServerId;
26+
import com.mongodb.connection.Stream;
27+
import com.mongodb.connection.StreamFactory;
28+
import com.mongodb.event.CommandListener;
29+
import com.mongodb.internal.connection.Authenticator;
30+
import com.mongodb.internal.connection.CommandMessage;
31+
import com.mongodb.internal.connection.InternalConnection;
32+
import com.mongodb.internal.connection.InternalStreamConnection;
33+
import com.mongodb.internal.connection.InternalStreamConnectionInitializer;
34+
import com.mongodb.internal.connection.ResponseBuffers;
35+
import com.mongodb.session.SessionContext;
36+
import com.sun.jna.Pointer;
37+
import com.sun.jna.ptr.IntByReference;
38+
import com.sun.jna.ptr.PointerByReference;
39+
import org.bson.BsonDocument;
40+
import org.bson.ByteBuf;
41+
import org.bson.ByteBufNIO;
42+
import org.bson.codecs.Decoder;
43+
44+
import java.nio.ByteBuffer;
45+
import java.util.ArrayList;
46+
import java.util.Collections;
47+
import java.util.List;
48+
49+
class EmbeddedInternalConnection implements InternalConnection {
50+
private final InternalConnection wrapped;
51+
private volatile Pointer clientPointer;
52+
53+
EmbeddedInternalConnection(final Pointer databasePointer, final CommandListener commandListener,
54+
final BsonDocument clientMetadataDocument) {
55+
this.clientPointer = MongoDBCAPIHelper.db_client_new(databasePointer);
56+
this.wrapped = new InternalStreamConnection(new ServerId(new ClusterId(), new ServerAddress()),
57+
new StreamFactory() {
58+
@Override
59+
public Stream create(final ServerAddress serverAddress) {
60+
return new EmbeddedInternalStream();
61+
}
62+
}, Collections.<MongoCompressor>emptyList(), commandListener,
63+
new InternalStreamConnectionInitializer(Collections.<Authenticator>emptyList(), clientMetadataDocument,
64+
Collections.<MongoCompressor>emptyList()));
65+
}
66+
67+
@Override
68+
public ConnectionDescription getDescription() {
69+
return wrapped.getDescription();
70+
}
71+
72+
@Override
73+
public void open() {
74+
wrapped.open();
75+
}
76+
77+
@Override
78+
public void openAsync(final SingleResultCallback<Void> callback) {
79+
wrapped.openAsync(callback);
80+
}
81+
82+
@Override
83+
public void close() {
84+
if (!wrapped.isClosed()) {
85+
wrapped.close();
86+
MongoDBCAPIHelper.db_client_destroy(clientPointer);
87+
clientPointer = null;
88+
}
89+
}
90+
91+
@Override
92+
public boolean opened() {
93+
return wrapped.opened();
94+
}
95+
96+
@Override
97+
public boolean isClosed() {
98+
return wrapped.isClosed();
99+
}
100+
101+
@Override
102+
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final SessionContext sessionContext) {
103+
return wrapped.sendAndReceive(message, decoder, sessionContext);
104+
}
105+
106+
@Override
107+
public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<T> decoder, final SessionContext sessionContext,
108+
final SingleResultCallback<T> callback) {
109+
wrapped.sendAndReceiveAsync(message, decoder, sessionContext, callback);
110+
}
111+
112+
@Override
113+
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
114+
wrapped.sendMessage(byteBuffers, lastRequestId);
115+
}
116+
117+
@Override
118+
public ResponseBuffers receiveMessage(final int responseTo) {
119+
return wrapped.receiveMessage(responseTo);
120+
}
121+
122+
@Override
123+
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId, final SingleResultCallback<Void> callback) {
124+
wrapped.sendMessageAsync(byteBuffers, lastRequestId, callback);
125+
}
126+
127+
@Override
128+
public void receiveMessageAsync(final int responseTo, final SingleResultCallback<ResponseBuffers> callback) {
129+
wrapped.receiveMessageAsync(responseTo, callback);
130+
}
131+
132+
@Override
133+
public ByteBuf getBuffer(final int size) {
134+
return wrapped.getBuffer(size);
135+
}
136+
137+
class EmbeddedInternalStream implements Stream {
138+
private volatile boolean isClosed;
139+
private volatile ByteBuffer curResponse;
140+
141+
@Override
142+
public void open() {
143+
// nothing to do here
144+
}
145+
146+
@Override
147+
public void openAsync(final AsyncCompletionHandler<Void> handler) {
148+
// nothing to do here
149+
handler.completed(null);
150+
}
151+
152+
@Override
153+
public void write(final List<ByteBuf> buffers) {
154+
byte[] message = createCompleteMessage(buffers);
155+
156+
PointerByReference outputBufferReference = new PointerByReference();
157+
IntByReference outputSize = new IntByReference();
158+
MongoDBCAPIHelper.db_client_wire_protocol_rpc(clientPointer, message, message.length, outputBufferReference, outputSize);
159+
curResponse = outputBufferReference.getValue().getByteBuffer(0, outputSize.getValue());
160+
}
161+
162+
@Override
163+
public ByteBuf read(final int numBytes) {
164+
ByteBuffer slice = curResponse.slice();
165+
slice.limit(numBytes);
166+
curResponse.position(curResponse.position() + numBytes);
167+
168+
return new ByteBufNIO(slice);
169+
}
170+
171+
@Override
172+
public void writeAsync(final List<ByteBuf> buffers, final AsyncCompletionHandler<Void> handler) {
173+
throw new UnsupportedOperationException(getClass() + " does not support asynchronous operations.");
174+
}
175+
176+
@Override
177+
public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler) {
178+
throw new UnsupportedOperationException(getClass() + " does not support asynchronous operations.");
179+
}
180+
181+
@Override
182+
public ServerAddress getAddress() {
183+
return wrapped.getDescription().getServerAddress();
184+
}
185+
186+
@Override
187+
public void close() {
188+
isClosed = true;
189+
}
190+
191+
@Override
192+
public boolean isClosed() {
193+
return isClosed;
194+
}
195+
196+
@Override
197+
public ByteBuf getBuffer(final int size) {
198+
return new ByteBufNIO(ByteBuffer.wrap(new byte[size]));
199+
}
200+
201+
private byte[] createCompleteMessage(final List<ByteBuf> byteBufList) {
202+
List<ByteBuffer> buffers = asByteBufferList(byteBufList);
203+
int totalLength = 0;
204+
for (ByteBuffer cur : buffers) {
205+
totalLength += cur.remaining();
206+
}
207+
byte[] completeMessage = new byte[totalLength];
208+
209+
int offset = 0;
210+
for (ByteBuffer cur : buffers) {
211+
int remaining = cur.remaining();
212+
cur.get(completeMessage, offset, cur.remaining());
213+
offset += remaining;
214+
}
215+
return completeMessage;
216+
}
217+
218+
private List<ByteBuffer> asByteBufferList(final List<ByteBuf> byteBufList) {
219+
List<ByteBuffer> retVal = new ArrayList<ByteBuffer>(byteBufList.size());
220+
for (ByteBuf cur: byteBufList) {
221+
retVal.add(cur.asNIO());
222+
}
223+
return retVal;
224+
}
225+
}
226+
}

0 commit comments

Comments
 (0)