Skip to content

Commit 358f133

Browse files
jstewart148jyemin
authored andcommitted
Detect if transactions are not supported by the server
JAVA-2928
1 parent 4febb80 commit 358f133

File tree

5 files changed

+182
-0
lines changed

5 files changed

+182
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async.client;
18+
19+
import com.mongodb.ClientSessionOptions;
20+
import com.mongodb.MongoClientException;
21+
import com.mongodb.async.FutureResultCallback;
22+
import org.bson.Document;
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
26+
import static com.mongodb.ClusterFixture.isSharded;
27+
import static com.mongodb.ClusterFixture.serverVersionLessThan;
28+
import static org.junit.Assume.assumeTrue;
29+
30+
public class TransactionFailureTest extends DatabaseTestCase {
31+
public TransactionFailureTest() {
32+
}
33+
34+
@Before
35+
public void setUp() {
36+
assumeTrue(canRunTests());
37+
super.setUp();
38+
}
39+
40+
@Test(expected = MongoClientException.class)
41+
public void testTransactionFails() throws InterruptedException {
42+
final ClientSession clientSession = createSession();
43+
44+
try {
45+
clientSession.startTransaction();
46+
47+
FutureResultCallback<Void> futureResultCallback = new FutureResultCallback<Void>();
48+
collection.insertOne(clientSession, Document.parse("{_id: 1, a: 1}"), futureResultCallback);
49+
futureResultCallback.get();
50+
} finally {
51+
clientSession.close();
52+
}
53+
}
54+
55+
private ClientSession createSession() {
56+
final ClientSessionOptions options = ClientSessionOptions.builder().build();
57+
return new MongoOperation<ClientSession>() {
58+
@Override
59+
public void execute() {
60+
client.startSession(options, getCallback());
61+
}
62+
}.get();
63+
}
64+
65+
66+
private boolean canRunTests() {
67+
return serverVersionLessThan("4.0")
68+
|| (serverVersionLessThan("4.1.0") && isSharded());
69+
}
70+
}

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import com.mongodb.MongoClientException;
1920
import com.mongodb.MongoNamespace;
2021
import com.mongodb.ReadPreference;
2122
import com.mongodb.connection.ByteBufferBsonOutput;
@@ -49,6 +50,8 @@
4950
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
5051
import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
5152
import static com.mongodb.internal.connection.ReadConcernHelper.getReadConcernDocument;
53+
import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_TWO_WIRE_VERSION;
54+
import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_ZERO_WIRE_VERSION;
5255
import static com.mongodb.internal.operation.ServerVersionHelper.THREE_DOT_SIX_WIRE_VERSION;
5356

5457
/**
@@ -242,6 +245,7 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
242245
}
243246
boolean firstMessageInTransaction = sessionContext.notifyMessageSent();
244247
if (sessionContext.hasActiveTransaction()) {
248+
checkServerVersionForTransactionSupport();
245249
extraElements.add(new BsonElement("txnNumber", new BsonInt64(sessionContext.getTransactionNumber())));
246250
if (firstMessageInTransaction) {
247251
extraElements.add(new BsonElement("startTransaction", BsonBoolean.TRUE));
@@ -259,6 +263,15 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
259263
return extraElements;
260264
}
261265

266+
private void checkServerVersionForTransactionSupport() {
267+
int wireVersion = getSettings().getMaxWireVersion();
268+
if (wireVersion < FOUR_DOT_ZERO_WIRE_VERSION
269+
|| (wireVersion < FOUR_DOT_TWO_WIRE_VERSION && getSettings().getServerType() == SHARD_ROUTER)) {
270+
throw new MongoClientException("Transactions are not supported by the MongoDB cluster to which this client is connected.");
271+
}
272+
}
273+
274+
262275
private void addReadConcernDocument(final List<BsonElement> extraElements, final SessionContext sessionContext) {
263276
BsonDocument readConcernDocument = getReadConcernDocument(sessionContext);
264277
if (!readConcernDocument.isEmpty()) {

driver-core/src/main/com/mongodb/internal/operation/ServerVersionHelper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public final class ServerVersionHelper {
2929
public static final int THREE_DOT_FOUR_WIRE_VERSION = 5;
3030
public static final int THREE_DOT_SIX_WIRE_VERSION = 6;
3131
public static final int FOUR_DOT_ZERO_WIRE_VERSION = 7;
32+
public static final int FOUR_DOT_TWO_WIRE_VERSION = 8;
3233

3334
public static boolean serverIsAtLeastVersionThreeDotZero(final ConnectionDescription description) {
3435
return description.getMaxWireVersion() >= THREE_DOT_ZERO_WIRE_VERSION;
@@ -70,6 +71,10 @@ public static boolean serverIsLessThanVersionFourDotZero(final ConnectionDescrip
7071
return description.getMaxWireVersion() < FOUR_DOT_ZERO_WIRE_VERSION;
7172
}
7273

74+
public static boolean serverIsLessThanVersionFourDotTwo(final ConnectionDescription description) {
75+
return description.getMaxWireVersion() < FOUR_DOT_TWO_WIRE_VERSION;
76+
}
77+
7378
private ServerVersionHelper() {
7479
}
7580
}

driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.connection
1818

19+
import com.mongodb.MongoClientException
1920
import com.mongodb.MongoNamespace
2021
import com.mongodb.ReadConcern
2122
import com.mongodb.ReadPreference
@@ -45,6 +46,7 @@ import spock.lang.Specification
4546
import java.nio.ByteBuffer
4647

4748
import static com.mongodb.connection.SplittablePayload.Type.INSERT
49+
import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_ZERO_WIRE_VERSION
4850
import static com.mongodb.internal.operation.ServerVersionHelper.THREE_DOT_FOUR_WIRE_VERSION
4951
import static com.mongodb.internal.operation.ServerVersionHelper.THREE_DOT_SIX_WIRE_VERSION
5052

@@ -387,6 +389,45 @@ class CommandMessageSpecification extends Specification {
387389
thrown(BsonMaximumSizeExceededException)
388390
}
389391

392+
def 'should throw if wire version does not support transactions'() {
393+
given:
394+
def messageSettings = MessageSettings.builder().maxWireVersion(THREE_DOT_SIX_WIRE_VERSION).build()
395+
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonInt32(1))])
396+
def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
397+
false, payload, fieldNameValidator, ClusterConnectionMode.MULTIPLE)
398+
def output = new BasicOutputBuffer()
399+
def sessionContext = Stub(SessionContext) {
400+
getReadConcern() >> ReadConcern.DEFAULT
401+
hasActiveTransaction() >> true
402+
}
403+
404+
when:
405+
message.encode(output, sessionContext)
406+
407+
then:
408+
thrown(MongoClientException)
409+
}
410+
411+
def 'should throw if wire version and sharded cluster does not support transactions'() {
412+
given:
413+
def messageSettings = MessageSettings.builder().serverType(ServerType.SHARD_ROUTER)
414+
.maxWireVersion(FOUR_DOT_ZERO_WIRE_VERSION).build()
415+
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonInt32(1))])
416+
def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
417+
false, payload, fieldNameValidator, ClusterConnectionMode.MULTIPLE)
418+
def output = new BasicOutputBuffer()
419+
def sessionContext = Stub(SessionContext) {
420+
getReadConcern() >> ReadConcern.DEFAULT
421+
hasActiveTransaction() >> true
422+
}
423+
424+
when:
425+
message.encode(output, sessionContext)
426+
427+
then:
428+
thrown(MongoClientException)
429+
}
430+
390431
private static BsonDocument getCommandDocument(ByteBufNIO byteBuf, ReplyHeader replyHeader) {
391432
new ReplyMessage<BsonDocument>(new ResponseBuffers(replyHeader, byteBuf), new BsonDocumentCodec(), 0).documents.get(0)
392433
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client;
18+
19+
import com.mongodb.MongoClientException;
20+
import org.bson.Document;
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
24+
import static com.mongodb.ClusterFixture.isSharded;
25+
import static com.mongodb.ClusterFixture.serverVersionLessThan;
26+
import static org.junit.Assume.assumeTrue;
27+
28+
public class TransactionFailureTest extends DatabaseTestCase {
29+
public TransactionFailureTest() {
30+
}
31+
32+
@Before
33+
public void setUp() {
34+
assumeTrue(canRunTests());
35+
super.setUp();
36+
}
37+
38+
@Test(expected = MongoClientException.class)
39+
public void testTransactionFails() {
40+
ClientSession clientSession = client.startSession();
41+
try {
42+
clientSession.startTransaction();
43+
collection.insertOne(clientSession, Document.parse("{_id: 1, a: 1}"));
44+
} finally {
45+
clientSession.close();
46+
}
47+
}
48+
49+
private boolean canRunTests() {
50+
return serverVersionLessThan("4.0")
51+
|| (serverVersionLessThan("4.1.0") && isSharded());
52+
}
53+
}

0 commit comments

Comments
 (0)