Skip to content

Commit 9458618

Browse files
author
Myron Scott
authored
Merge pull request #1281 from chrisdennis/async
Async client implementation
2 parents 2727e73 + 9f9d888 commit 9458618

File tree

67 files changed

+568
-1520
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+568
-1520
lines changed

build-data/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<parent>
3131
<groupId>org.terracotta.internal</groupId>
3232
<artifactId>build-parent</artifactId>
33-
<version>5.9-SNAPSHOT</version>
33+
<version>5.10-SNAPSHOT</version>
3434
<relativePath>../build-parent</relativePath>
3535
</parent>
3636

build-parent/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<parent>
2626
<groupId>org.terracotta.internal</groupId>
2727
<artifactId>terracotta-core-root</artifactId>
28-
<version>5.9-SNAPSHOT</version>
28+
<version>5.10-SNAPSHOT</version>
2929
<relativePath>..</relativePath>
3030
</parent>
3131

client-runtime/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>terracotta-core-root</artifactId>
77
<groupId>org.terracotta.internal</groupId>
8-
<version>5.9-SNAPSHOT</version>
8+
<version>5.10-SNAPSHOT</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

common-spi/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
<parent>
2525
<groupId>org.terracotta.internal</groupId>
2626
<artifactId>build-parent</artifactId>
27-
<version>5.9-SNAPSHOT</version>
27+
<version>5.10-SNAPSHOT</version>
2828
<relativePath>../build-parent</relativePath>
2929
</parent>
3030
<modelVersion>4.0.0</modelVersion>

common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ Apache Ivy version: 2.2.0 20100923230623
3030
<parent>
3131
<groupId>org.terracotta.internal</groupId>
3232
<artifactId>build-parent</artifactId>
33-
<version>5.9-SNAPSHOT</version>
33+
<version>5.10-SNAPSHOT</version>
3434
<relativePath>../build-parent</relativePath>
3535
</parent>
3636

common/src/main/java/com/tc/net/core/TCConnectionImpl.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -417,27 +417,29 @@ private void buildWriteContextsFromMessages() {
417417
int batchSize = 0;
418418
int batchMsgCount = 0;
419419
for (final TCNetworkMessage element : messagesToWrite) {
420-
if (element instanceof WireProtocolMessage) {
421-
// we don't want to group already constructed Transport Handshake WireProtocolMessages
422-
final WireProtocolMessage ms = finalizeWireProtocolMessage((WireProtocolMessage) element, 1);
423-
this.writeContexts.add(new WriteContext(ms));
424-
} else if (WireProtocolHeader.PROTOCOL_UNKNOWN == WireProtocolHeader.getProtocolForMessageClass(element)) {
425-
// GenericNetwork messages are used for testing
426-
this.writeContexts.add(new WriteContext(element));
427-
} else if (MSG_GROUPING_ENABLED) {
428-
int realMessageSize = element.getTotalLength();
429-
if (!canBatch(realMessageSize, batchSize, batchMsgCount)) {
430-
// We can't add this to the current batch so seal the current batch as a write context and create a new one.
431-
this.writeContexts.add(new WriteContext(buildWireProtocolMessageGroup(currentBatch)));
432-
batchSize = 0;
433-
batchMsgCount = 0;
434-
currentBatch = new ArrayList<TCNetworkMessage>();
420+
if (element.commit()) {
421+
if (element instanceof WireProtocolMessage) {
422+
// we don't want to group already constructed Transport Handshake WireProtocolMessages
423+
final WireProtocolMessage ms = finalizeWireProtocolMessage((WireProtocolMessage) element, 1);
424+
this.writeContexts.add(new WriteContext(ms));
425+
} else if (WireProtocolHeader.PROTOCOL_UNKNOWN == WireProtocolHeader.getProtocolForMessageClass(element)) {
426+
// GenericNetwork messages are used for testing
427+
this.writeContexts.add(new WriteContext(element));
428+
} else if (MSG_GROUPING_ENABLED) {
429+
int realMessageSize = element.getTotalLength();
430+
if (!canBatch(realMessageSize, batchSize, batchMsgCount)) {
431+
// We can't add this to the current batch so seal the current batch as a write context and create a new one.
432+
this.writeContexts.add(new WriteContext(buildWireProtocolMessageGroup(currentBatch)));
433+
batchSize = 0;
434+
batchMsgCount = 0;
435+
currentBatch = new ArrayList<TCNetworkMessage>();
436+
}
437+
batchSize += realMessageSize;
438+
batchMsgCount++;
439+
currentBatch.add(element);
440+
} else {
441+
this.writeContexts.add(new WriteContext(buildWireProtocolMessage(element)));
435442
}
436-
batchSize += realMessageSize;
437-
batchMsgCount++;
438-
currentBatch.add(element);
439-
} else {
440-
this.writeContexts.add(new WriteContext(buildWireProtocolMessage(element)));
441443
}
442444
}
443445

common/src/main/java/com/tc/net/protocol/transport/ClientMessageTransport.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,7 @@ private void handleSynAck(WireProtocolMessage message) {
305305
handleHandshakeError(new TransportHandshakeErrorContext(synAck.getErrorContext() + " " + message,
306306
synAck.getErrorType()));
307307
}
308-
}
309-
310-
if (!getConnectionID().isNewConnection() && getConnectionID().isValid()) {
308+
} else if (!getConnectionID().isNewConnection() && getConnectionID().isValid()) {
311309
// This is a reconnect
312310
Assert.eval(getConnectionID().equals(synAck.getConnectionId()));
313311
}

common/src/main/java/com/tc/object/net/DSOChannelManagerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void makeChannelActive(ClientID clientID) {
135135
synchronized (activeChannels) {
136136
activeChannels.put(clientID, channel);
137137
ackMsg.initialize(getAllActiveClientIDs(), clientID, serverVersion);
138-
if (!ackMsg.send()) {
138+
if (ackMsg.send() == null) {
139139
logger.warn("Not sending handshake message to disconnected client: " + clientID);
140140
}
141141
}
@@ -151,7 +151,7 @@ public void makeChannelRefuse(ClientID clientID, String message) {
151151
ClientHandshakeRefusedMessage handshakeRefuseMsg = newClientHandshakeRefusedMessage(clientID);
152152
synchronized (activeChannels) {
153153
handshakeRefuseMsg.initialize(message);
154-
if (!handshakeRefuseMsg.send()) {
154+
if (handshakeRefuseMsg.send() == null) {
155155
logger.warn("Not sending handshake rejected message to disconnected client: " + clientID);
156156
}
157157
}

common/src/main/java/com/tc/properties/TCPropertiesConsts.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,6 @@ public interface TCPropertiesConsts {
115115
* </code>
116116
********************************************************************************************************************/
117117

118-
public static final String CLIENT_MAX_PENDING_REQUESTS = "client.requests.pending.max";
119-
120118
public static final String TC_TRANSPORT_HANDSHAKE_TIMEOUT = "tc.transport.handshake.timeout";
121119
public static final String TC_CONFIG_SOURCEGET_TIMEOUT = "tc.config.getFromSource.timeout";
122120
public static final String TC_CONFIG_TOTAL_TIMEOUT = "tc.config.total.timeout";
@@ -306,7 +304,6 @@ public interface TCPropertiesConsts {
306304
L2_SEDA_STAGE_STALL_WARNING,
307305
L2_SEDA_STAGE_ALWAYS_HYDRATE,
308306
L2_NHA_TCGROUPCOMM_RECONNECT_L2PROXY_TO_PORT,
309-
CLIENT_MAX_PENDING_REQUESTS,
310307
};
311308

312309
}

common/src/test/java/com/tc/net/core/NoReconnectThreadTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121

2222
import com.tc.net.ServerID;
23-
import com.tc.net.TCSocketAddress;
24-
import com.tc.net.basic.BasicConnectionManager;
2523
import com.tc.net.protocol.NetworkStackHarnessFactory;
2624
import com.tc.net.protocol.PlainNetworkStackHarnessFactory;
2725
import com.tc.net.protocol.tcm.ChannelEvent;
@@ -83,7 +81,7 @@ private NetworkStackHarnessFactory getNetworkStackHarnessFactory() {
8381
}
8482

8583
private ClientMessageChannel createClientMsgCh() {
86-
BasicConnectionManager connMgr = new BasicConnectionManager("", new ClearTextBufferManagerFactory());
84+
TCConnectionManager connMgr = new TCConnectionManagerImpl("TestCommMgr-Client", 0, new ClearTextBufferManagerFactory());
8785
clientConnectionMgrs.add(connMgr);
8886
CommunicationsManager clientComms = new CommunicationsManagerImpl(new NullMessageMonitor(),
8987
getNetworkStackHarnessFactory(),

0 commit comments

Comments
 (0)