Skip to content

Commit 6d8a132

Browse files
committed
JAVA-1119: Added maxWriteBatchSize to ServerDescription
1 parent 97f19f3 commit 6d8a132

File tree

6 files changed

+115
-12
lines changed

6 files changed

+115
-12
lines changed

src/main/com/mongodb/DBCollectionImpl.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ BulkWriteResult executeBulkWriteOperation(final boolean ordered, final List<Writ
132132
DBPort port = db.getConnector().getPrimaryPort();
133133
try {
134134
BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner(port.getAddress(), writeConcern);
135-
for (Run run : getRunGenerator(ordered, writeRequests, writeConcern, encoder)) {
135+
for (Run run : getRunGenerator(ordered, writeRequests, writeConcern, encoder, port)) {
136136
try {
137137
BulkWriteResult result = run.execute(port);
138138
if (result.isAcknowledged()) {
@@ -506,11 +506,11 @@ private WriteResult insertWithWriteProtocol(final List<DBObject> list, final Wri
506506
}
507507

508508
private Iterable<Run> getRunGenerator(final boolean ordered, final List<WriteRequest> writeRequests,
509-
final WriteConcern writeConcern, final DBEncoder encoder) {
509+
final WriteConcern writeConcern, final DBEncoder encoder, final DBPort port) {
510510
if (ordered) {
511-
return new OrderedRunGenerator(writeRequests, writeConcern, encoder);
511+
return new OrderedRunGenerator(writeRequests, writeConcern, encoder, port);
512512
} else {
513-
return new UnorderedRunGenerator(writeRequests, writeConcern, encoder);
513+
return new UnorderedRunGenerator(writeRequests, writeConcern, encoder, port);
514514
}
515515
}
516516

@@ -529,15 +529,16 @@ private Logger getLogger() {
529529
return TRACE_LOGGER;
530530
}
531531

532-
private static final int MAX_COUNT = 1000;
533-
534532
private class OrderedRunGenerator implements Iterable<Run> {
535533
private final List<WriteRequest> writeRequests;
534+
private final DBPort port;
536535
private final WriteConcern writeConcern;
537536
private final DBEncoder encoder;
538537

539-
public OrderedRunGenerator(final List<WriteRequest> writeRequests, final WriteConcern writeConcern, final DBEncoder encoder) {
538+
public OrderedRunGenerator(final List<WriteRequest> writeRequests, final WriteConcern writeConcern, final DBEncoder encoder,
539+
final DBPort port) {
540540
this.writeRequests = writeRequests;
541+
this.port = port;
541542
this.writeConcern = writeConcern.continueOnError(false);
542543
this.encoder = encoder;
543544
}
@@ -566,7 +567,8 @@ public Run next() {
566567
private int getStartIndexOfNextRun() {
567568
WriteRequest.Type type = writeRequests.get(curIndex).getType();
568569
for (int i = curIndex; i < writeRequests.size(); i++) {
569-
if (i == MAX_COUNT || writeRequests.get(i).getType() != type) {
570+
if (i == db.getConnector().getServerDescription(port.getAddress()).getMaxWriteBatchSize()
571+
|| writeRequests.get(i).getType() != type) {
570572
return i;
571573
}
572574
}
@@ -584,12 +586,14 @@ public void remove() {
584586

585587
private class UnorderedRunGenerator implements Iterable<Run> {
586588
private final List<WriteRequest> writeRequests;
589+
private final DBPort port;
587590
private final WriteConcern writeConcern;
588591
private final DBEncoder encoder;
589592

590593
public UnorderedRunGenerator(final List<WriteRequest> writeRequests, final WriteConcern writeConcern,
591-
final DBEncoder encoder) {
594+
final DBEncoder encoder, final DBPort port) {
592595
this.writeRequests = writeRequests;
596+
this.port = port;
593597
this.writeConcern = writeConcern.continueOnError(true);
594598
this.encoder = encoder;
595599
}
@@ -622,7 +626,7 @@ public Run next() {
622626
}
623627
run.add(writeRequest, curIndex);
624628
curIndex++;
625-
if (run.size() > MAX_COUNT) {
629+
if (run.size() > db.getConnector().getServerDescription(port.getAddress()).getMaxWriteBatchSize()) {
626630
return runs.remove(run.type);
627631
}
628632
}

src/main/com/mongodb/ServerDescription.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class ServerDescription {
4949
private static final int DEFAULT_MAX_DOCUMENT_SIZE = 0x1000000; // 16MB
5050
private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x2000000; // 32MB
5151

52+
private static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 512;
53+
5254
private final ServerAddress address;
5355

5456
private final ServerType type;
@@ -57,6 +59,7 @@ class ServerDescription {
5759
private final Set<String> arbiters;
5860
private final String primary;
5961
private final int maxDocumentSize;
62+
private final int maxWriteBatchSize;
6063

6164
private final int maxMessageSize;
6265
private final Tags tags;
@@ -79,6 +82,7 @@ static class Builder {
7982
private String primary;
8083
private int maxDocumentSize = DEFAULT_MAX_DOCUMENT_SIZE;
8184
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
85+
private int maxWriteBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
8286
private Tags tags = Tags.freeze(new Tags());
8387
private String setName;
8488
private Integer setVersion;
@@ -130,6 +134,11 @@ public Builder maxMessageSize(final int maxMessageSize) {
130134
return this;
131135
}
132136

137+
public Builder maxWriteBatchSize(final int maxWriteBatchSize) {
138+
this.maxWriteBatchSize = maxWriteBatchSize;
139+
return this;
140+
}
141+
133142
public Builder tags(final Tags tags) {
134143
this.tags = tags == null ? Tags.freeze(new Tags()) : Tags.freeze(tags);
135144
return this;
@@ -220,6 +229,10 @@ public static int getDefaultMaxWireVersion() {
220229
return 0;
221230
}
222231

232+
public static int getDefaultMaxWriteBatchSize() {
233+
return DEFAULT_MAX_WRITE_BATCH_SIZE;
234+
}
235+
223236
public static Builder builder() {
224237
return new Builder();
225238
}
@@ -272,6 +285,10 @@ public int getMaxMessageSize() {
272285
return maxMessageSize;
273286
}
274287

288+
public int getMaxWriteBatchSize() {
289+
return maxWriteBatchSize;
290+
}
291+
275292
public Tags getTags() {
276293
return tags;
277294
}
@@ -285,8 +302,8 @@ public int getMaxWireVersion() {
285302
}
286303

287304
/**
288-
* Returns true if the server has the given tags. A server of either type @code{ServerType.StandAlone} or
289-
* @code{ServerType.ShardRouter} is considered to have all tags, so this method will always return true for instances of either of
305+
* Returns true if the server has the given tags. A server of either type {@code ServerType.StandAlone} or
306+
* {@code ServerType.ShardRouter} is considered to have all tags, so this method will always return true for instances of either of
290307
* those types.
291308
*
292309
* @param desiredTags the tags
@@ -365,6 +382,9 @@ public boolean equals(final Object o) {
365382
if (maxMessageSize != that.maxMessageSize) {
366383
return false;
367384
}
385+
if (maxWriteBatchSize != that.maxWriteBatchSize) {
386+
return false;
387+
}
368388
if (ok != that.ok) {
369389
return false;
370390
}
@@ -422,6 +442,7 @@ public int hashCode() {
422442
result = 31 * result + (primary != null ? primary.hashCode() : 0);
423443
result = 31 * result + maxDocumentSize;
424444
result = 31 * result + maxMessageSize;
445+
result = 31 * result + maxWriteBatchSize;
425446
result = 31 * result + tags.hashCode();
426447
result = 31 * result + (setName != null ? setName.hashCode() : 0);
427448
result = 31 * result + (setVersion != null ? setVersion.hashCode() : 0);
@@ -444,6 +465,7 @@ public String toString() {
444465
+ ", primary='" + primary + '\''
445466
+ ", maxDocumentSize=" + maxDocumentSize
446467
+ ", maxMessageSize=" + maxMessageSize
468+
+ ", maxWriteBatchSize=" + maxWriteBatchSize
447469
+ ", tags=" + tags
448470
+ ", setName='" + setName + '\''
449471
+ ", setVersion='" + setVersion + '\''
@@ -477,6 +499,7 @@ public String getShortDescription() {
477499
primary = builder.primary;
478500
maxDocumentSize = builder.maxDocumentSize;
479501
maxMessageSize = builder.maxMessageSize;
502+
maxWriteBatchSize = builder.maxWriteBatchSize;
480503
tags = builder.tags;
481504
setName = builder.setName;
482505
setVersion = builder.setVersion;

src/main/com/mongodb/ServerStateNotifier.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ private ServerDescription createDescription(final CommandResult commandResult, f
140140
.primary(commandResult.getString("primary"))
141141
.maxDocumentSize(commandResult.getInt("maxBsonObjectSize", ServerDescription.getDefaultMaxDocumentSize()))
142142
.maxMessageSize(commandResult.getInt("maxMessageSizeBytes", ServerDescription.getDefaultMaxMessageSize()))
143+
.maxWriteBatchSize(commandResult.getInt("maxWriteBatchSize",
144+
ServerDescription.getDefaultMaxWriteBatchSize()))
143145
.tags(getTagsFromDocument((DBObject) commandResult.get("tags")))
144146
.setName(commandResult.getString("setName"))
145147
.setVersion((Integer) commandResult.get("setVersion"))

src/test/com/mongodb/Fixture.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ public static synchronized DB getDefaultDatabase() {
5252
return defaultDatabase;
5353
}
5454

55+
/**
56+
*
57+
* @param version must be a major version, e.g. 1.8, 2,0, 2.2
58+
* @return true if server is at least specified version
59+
*/
60+
public static boolean serverIsAtLeastVersion(double version) {
61+
String serverVersion = (String) getMongoClient().getDB("admin").command("serverStatus").get("version");
62+
return Double.parseDouble(serverVersion.substring(0, 3)) >= version;
63+
}
64+
5565
static class ShutdownHook extends Thread {
5666
@Override
5767
public void run() {

src/test/com/mongodb/ServerDescriptionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public void testDefaults() throws UnknownHostException {
7171

7272
assertEquals(0x1000000, serverDescription.getMaxDocumentSize());
7373
assertEquals(0x2000000, serverDescription.getMaxMessageSize());
74+
assertEquals(512, serverDescription.getMaxWriteBatchSize());
7475

7576
assertNull(serverDescription.getPrimary());
7677
assertEquals(Collections.<String>emptySet(), serverDescription.getHosts());
@@ -94,6 +95,7 @@ public void testObjectOverrides() throws UnknownHostException {
9495
.setVersion(11)
9596
.maxDocumentSize(100)
9697
.maxMessageSize(200)
98+
.maxWriteBatchSize(1024)
9799
.averagePingTime(50000, java.util.concurrent.TimeUnit.NANOSECONDS)
98100
.primary("localhost:27017")
99101
.hosts(new HashSet<String>(Arrays.asList("localhost:27017",
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.mongodb
2+
3+
import static com.mongodb.Fixture.getMongoClient
4+
import static com.mongodb.Fixture.serverIsAtLeastVersion
5+
import static org.junit.Assume.assumeFalse
6+
import static org.junit.Assume.assumeTrue
7+
8+
class ServerStateNotifierSpecification extends FunctionalSpecification {
9+
ServerDescription newDescription
10+
ServerStateNotifier serverStateNotifier
11+
12+
def setup() {
13+
serverStateNotifier = new ServerStateNotifier(new ServerAddress(),
14+
new ChangeListener<ServerDescription>() {
15+
@Override
16+
void stateChanged(final ChangeEvent<ServerDescription> event) {
17+
newDescription = event.newValue
18+
}
19+
},
20+
SocketSettings.builder().build(), getMongoClient())
21+
}
22+
23+
def cleanup() {
24+
serverStateNotifier.close();
25+
}
26+
27+
def 'should set server version'() {
28+
given:
29+
CommandResult commandResult = database.command(new BasicDBObject('buildinfo', 1))
30+
def expectedVersion = new ServerVersion((commandResult.get('versionArray') as List<Integer>).subList(0, 3))
31+
32+
when:
33+
serverStateNotifier.run()
34+
35+
then:
36+
newDescription.version == expectedVersion
37+
}
38+
39+
def 'should set max wire batch size when provided by server'() {
40+
assumeTrue(serverIsAtLeastVersion(2.5))
41+
42+
given:
43+
CommandResult commandResult = database.command(new BasicDBObject('ismaster', 1))
44+
def expected = commandResult.get('maxWriteBatchSize')
45+
46+
when:
47+
serverStateNotifier.run()
48+
49+
then:
50+
newDescription.maxWriteBatchSize == expected
51+
}
52+
53+
def 'should set default max wire batch size when not provided by server'() {
54+
assumeFalse(serverIsAtLeastVersion(2.5))
55+
56+
when:
57+
serverStateNotifier.run()
58+
59+
then:
60+
newDescription.maxWriteBatchSize == ServerDescription.getDefaultMaxWriteBatchSize()
61+
}
62+
}

0 commit comments

Comments
 (0)