Skip to content

Commit e6dd19b

Browse files
author
Myron Scott
committed
Rework transport states to make sure server transports are associated with at most one connection. The connection can either be established at the formation of the server comms stack or associated with an already formed reconnection server stack during the reconnect window
1 parent 02ff96e commit e6dd19b

File tree

6 files changed

+43
-92
lines changed

6 files changed

+43
-92
lines changed

common/src/main/java/com/tc/net/protocol/transport/ClientMessageTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ private void handshakeConnection() throws TCTimeoutException, MaxConnectionsExce
490490
handleHandshakeError(result);
491491
initConnectionID(result.synAck.getConnectionId());
492492
sendAck();
493-
log("Handshake is complete");
493+
logger.debug("Handshake is complete");
494494
}
495495

496496
private String getMaxConnectionsExceededMessage(int maxConnections) {

common/src/main/java/com/tc/net/protocol/transport/MessageTransportBase.java

Lines changed: 21 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,13 @@ public final void send(TCNetworkMessage message) throws IOException {
190190
}
191191
}
192192

193-
// Do not override this method. Not a final method, as a test class is deriving it
194193
@Override
195194
public void sendToConnection(TCNetworkMessage message) throws IOException {
196195
if (message == null) throw new AssertionError("Attempt to send a null message.");
197196
if (!status.isEnd()) {
198197
connection.putMessage(message);
199198
} else {
199+
message.complete();
200200
throw new IOException("Couldn't send message status: " + status);
201201
}
202202
}
@@ -211,51 +211,31 @@ public boolean isConnected() {
211211
}
212212

213213
@Override
214-
public final void attachNewConnection(TCConnection newConnection) throws IllegalReconnectException {
215-
getConnectionAttacher().attachNewConnection(this.connectionCloseEvent.getAndSet(null), this.connection,
214+
public void attachNewConnection(TCConnection newConnection) throws IllegalReconnectException {
215+
attachNewConnection(this.connectionCloseEvent.getAndSet(null), this.connection,
216216
newConnection);
217217
}
218218

219-
protected ConnectionAttacher getConnectionAttacher() {
220-
return new DefaultConnectionAttacher(this, getLogger());
221-
}
222-
223-
protected interface ConnectionAttacher {
224-
public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection);
225-
}
226-
227-
private static final class DefaultConnectionAttacher implements ConnectionAttacher {
228-
229-
private final MessageTransportBase transport;
230-
private final Logger logger;
231-
232-
private DefaultConnectionAttacher(MessageTransportBase transport, Logger logger) {
233-
this.transport = transport;
234-
this.logger = logger;
235-
}
236-
237-
@Override
238-
public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) {
239-
Assert.assertNotNull(oldConnection);
240-
if (closeEvent == null || closeEvent.getSource() != oldConnection) {
241-
// We either didn't receive a close event or we received a close event
242-
// from a connection that isn't our current connection.
243-
if (transport.isConnected()) {
244-
// DEV-1689 : Don't bother for connections which actually didn't make up to Transport Establishment.
245-
this.transport.status.reset();
246-
this.transport.fireTransportDisconnectedEvent();
247-
this.transport.getConnection().asynchClose();
248-
} else {
249-
logger.warn("Old connection " + oldConnection + "might not have been Transport Established ");
250-
}
251-
}
252-
// remove the transport as a listener for the old connection
253-
if (oldConnection != null && oldConnection != transport.getConnection()) {
254-
oldConnection.removeListener(transport);
219+
private void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) {
220+
Assert.assertNotNull(oldConnection);
221+
if (closeEvent == null || closeEvent.getSource() != oldConnection) {
222+
// We either didn't receive a close event or we received a close event
223+
// from a connection that isn't our current connection.
224+
if (isConnected()) {
225+
// DEV-1689 : Don't bother for connections which actually didn't make up to Transport Establishment.
226+
status.reset();
227+
fireTransportDisconnectedEvent();
228+
getConnection().asynchClose();
229+
} else {
230+
logger.warn("Old connection " + oldConnection + "might not have been Transport Established ");
255231
}
256-
// set the new connection to the current connection.
257-
transport.wireNewConnection(newConnection);
258232
}
233+
// remove the transport as a listener for the old connection
234+
if (oldConnection != null && oldConnection != getConnection()) {
235+
oldConnection.removeListener(this);
236+
}
237+
// set the new connection to the current connection.
238+
wireNewConnection(newConnection);
259239
}
260240

261241
/*********************************************************************************************************************
@@ -410,15 +390,6 @@ public TCByteBufferOutputStream createOutput() {
410390
}
411391
}
412392

413-
void log(String msg) {
414-
if (!getProductID().isInternal()) {
415-
getLogger().info(msg);
416-
} else {
417-
getLogger().debug(msg);
418-
}
419-
420-
}
421-
422393
@Override
423394
public Map<String, ?> getStateMap() {
424395
Map<String, Object> map = new LinkedHashMap<>();

common/src/main/java/com/tc/net/protocol/transport/MessageTransportState.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ public boolean isAlive() {
2828
return false;
2929
}
3030
},
31-
32-
STATE_START_OPEN("START_OPEN"),
33-
31+
3432
STATE_CONNECTED("CONNECTED"),
3533

36-
STATE_RESTART("RESTART"),
34+
STATE_RESTART("RESTART") {
35+
@Override
36+
public boolean isAlive() {
37+
return false;
38+
}
39+
},
3740

3841
/**
3942
* XXX: Move to client state machine SYN message sent, waiting for reply

common/src/main/java/com/tc/net/protocol/transport/MessageTransportStatus.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,19 @@
2323
import com.tc.util.Assert;
2424

2525
class MessageTransportStatus {
26-
private final MessageTransportState initial;
2726
private MessageTransportState state;
2827
private final Logger logger;
29-
private volatile boolean isEstablished = false;
3028

3129
MessageTransportStatus(MessageTransportState initialState, Logger logger) {
32-
this.initial = initialState;
3330
this.state = initialState;
3431
this.logger = logger;
3532
}
3633

3734
void reset() {
38-
stateChange(initial);
35+
stateChange(MessageTransportState.STATE_START);
3936
}
4037

4138
private synchronized void stateChange(MessageTransportState newState) {
42-
isEstablished = false;
4339
if (logger.isDebugEnabled()) {
4440
logger.debug("Changing from " + state.toString() + " to " + newState.toString());
4541
}
@@ -78,19 +74,12 @@ void end() {
7874
}
7975

8076
private synchronized boolean checkState(MessageTransportState check) {
81-
if (check == MessageTransportState.STATE_ESTABLISHED && state == MessageTransportState.STATE_ESTABLISHED) {
82-
isEstablished = true;
83-
}
8477
return this.state.equals(check);
8578
}
8679

8780
boolean isStart() {
8881
return checkState(MessageTransportState.STATE_START);
8982
}
90-
91-
boolean isStartOpen() {
92-
return checkState(MessageTransportState.STATE_START_OPEN);
93-
}
9483

9584
boolean isRestart() {
9685
return checkState(MessageTransportState.STATE_RESTART);
@@ -101,11 +90,7 @@ boolean isSynSent() {
10190
}
10291

10392
boolean isEstablished() {
104-
if (isEstablished) {
105-
return true;
106-
} else {
107-
return checkState(MessageTransportState.STATE_ESTABLISHED);
108-
}
93+
return checkState(MessageTransportState.STATE_ESTABLISHED);
10994
}
11095

11196
boolean isDisconnected() {

common/src/main/java/com/tc/net/protocol/transport/ServerMessageTransport.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.slf4j.LoggerFactory;
2323

2424
import com.tc.net.core.TCConnection;
25-
import com.tc.net.core.event.TCConnectionEvent;
25+
import com.tc.net.protocol.IllegalReconnectException;
2626
import com.tc.net.protocol.NetworkStackID;
2727
import com.tc.util.Assert;
2828

@@ -44,16 +44,20 @@ public ServerMessageTransport(TransportHandshakeErrorHandler handshakeErrorHandl
4444
public ServerMessageTransport(TCConnection conn,
4545
TransportHandshakeErrorHandler handshakeErrorHandler,
4646
TransportHandshakeMessageFactory messageFactory) {
47-
super(MessageTransportState.STATE_START_OPEN, handshakeErrorHandler, messageFactory, smtLogger);
47+
super(MessageTransportState.STATE_START, handshakeErrorHandler, messageFactory, smtLogger);
4848
Assert.assertNotNull(conn);
4949
wireNewConnection(conn);
5050
}
5151

5252
@Override
53-
protected ConnectionAttacher getConnectionAttacher() {
54-
if (this.status.isRestart()) {
55-
return new RestartConnectionAttacher();
56-
} else return super.getConnectionAttacher();
53+
public void attachNewConnection(TCConnection newConnection) throws IllegalReconnectException {
54+
if (!this.status.isRestart()) {
55+
// servers transports can only restart once
56+
throw new IllegalReconnectException();
57+
}
58+
Assert.assertNull(getConnection());
59+
wireNewConnection(newConnection);
60+
logger.debug("reconnect connection attach {} {}", this.getConnectionID().getClientID(), getConnection());
5761
}
5862

5963
@Override
@@ -129,16 +133,6 @@ private boolean verifyAck(WireProtocolMessage message) {
129133
return message instanceof TransportHandshakeMessage && ((TransportHandshakeMessage) message).isAck();
130134
}
131135

132-
private final class RestartConnectionAttacher implements ConnectionAttacher {
133-
134-
@Override
135-
public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) {
136-
wireNewConnection(newConnection);
137-
log("Attaching new connection to transport: " + newConnection);
138-
}
139-
140-
}
141-
142136
@Override
143137
public String toString() {
144138
return "ServerMessageTransport{connection=" + getConnection() + '}';

common/src/test/java/com/tc/net/protocol/transport/ServerMessageTransportTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,16 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
7878
return null;
7979
}
8080
}).when(connection).addListener(ArgumentMatchers.any(TCConnectionEventListener.class));
81+
when(connection.isConnected()).thenReturn(Boolean.TRUE);
8182
TransportHandshakeErrorHandler errHdr = mock(TransportHandshakeErrorHandler.class);
8283
TransportHandshakeMessageFactory factory = mock(TransportHandshakeMessageFactory.class);
8384
ServerMessageTransport transport = new ServerMessageTransport(connection, errHdr, factory);
8485
transport.initConnectionID(id);
8586
transport.addTransportListener(checker);
8687

87-
Assert.assertTrue(transport.status.isStartOpen());
88+
Assert.assertTrue(transport.status.isConnected());
8889

8990
TCConnectionEvent event = new TCConnectionEvent(connection);
90-
transport.connectEvent(event);
91-
92-
Assert.assertTrue(transport.status.isConnected());
9391

9492
for (TCConnectionEventListener trigger : listeners) {
9593
trigger.closeEvent(event);

0 commit comments

Comments
 (0)