Skip to content

Commit 25ab506

Browse files
author
Myron Scott
authored
Merge pull request #1332 from myronkscott/fix_reconnect2
fixes to prevent client reconnection outside the server's reconnect window
2 parents 02ff96e + eee9240 commit 25ab506

File tree

18 files changed

+221
-105
lines changed

18 files changed

+221
-105
lines changed

common/src/main/java/com/tc/net/protocol/transport/ClientMessageTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ private void handshakeConnection() throws TCTimeoutException, MaxConnectionsExce
490490
handleHandshakeError(result);
491491
initConnectionID(result.synAck.getConnectionId());
492492
sendAck();
493-
log("Handshake is complete");
493+
logger.debug("Handshake is complete");
494494
}
495495

496496
private String getMaxConnectionsExceededMessage(int maxConnections) {

common/src/main/java/com/tc/net/protocol/transport/MessageTransportBase.java

Lines changed: 21 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,13 @@ public final void send(TCNetworkMessage message) throws IOException {
190190
}
191191
}
192192

193-
// Do not override this method. Not a final method, as a test class is deriving it
194193
@Override
195194
public void sendToConnection(TCNetworkMessage message) throws IOException {
196195
if (message == null) throw new AssertionError("Attempt to send a null message.");
197196
if (!status.isEnd()) {
198197
connection.putMessage(message);
199198
} else {
199+
message.complete();
200200
throw new IOException("Couldn't send message status: " + status);
201201
}
202202
}
@@ -211,51 +211,31 @@ public boolean isConnected() {
211211
}
212212

213213
@Override
214-
public final void attachNewConnection(TCConnection newConnection) throws IllegalReconnectException {
215-
getConnectionAttacher().attachNewConnection(this.connectionCloseEvent.getAndSet(null), this.connection,
214+
public void attachNewConnection(TCConnection newConnection) throws IllegalReconnectException {
215+
attachNewConnection(this.connectionCloseEvent.getAndSet(null), this.connection,
216216
newConnection);
217217
}
218218

219-
protected ConnectionAttacher getConnectionAttacher() {
220-
return new DefaultConnectionAttacher(this, getLogger());
221-
}
222-
223-
protected interface ConnectionAttacher {
224-
public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection);
225-
}
226-
227-
private static final class DefaultConnectionAttacher implements ConnectionAttacher {
228-
229-
private final MessageTransportBase transport;
230-
private final Logger logger;
231-
232-
private DefaultConnectionAttacher(MessageTransportBase transport, Logger logger) {
233-
this.transport = transport;
234-
this.logger = logger;
235-
}
236-
237-
@Override
238-
public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) {
239-
Assert.assertNotNull(oldConnection);
240-
if (closeEvent == null || closeEvent.getSource() != oldConnection) {
241-
// We either didn't receive a close event or we received a close event
242-
// from a connection that isn't our current connection.
243-
if (transport.isConnected()) {
244-
// DEV-1689 : Don't bother for connections which actually didn't make up to Transport Establishment.
245-
this.transport.status.reset();
246-
this.transport.fireTransportDisconnectedEvent();
247-
this.transport.getConnection().asynchClose();
248-
} else {
249-
logger.warn("Old connection " + oldConnection + "might not have been Transport Established ");
250-
}
251-
}
252-
// remove the transport as a listener for the old connection
253-
if (oldConnection != null && oldConnection != transport.getConnection()) {
254-
oldConnection.removeListener(transport);
219+
private void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) {
220+
Assert.assertNotNull(oldConnection);
221+
if (closeEvent == null || closeEvent.getSource() != oldConnection) {
222+
// We either didn't receive a close event or we received a close event
223+
// from a connection that isn't our current connection.
224+
if (isConnected()) {
225+
// DEV-1689 : Don't bother for connections which actually didn't make up to Transport Establishment.
226+
status.reset();
227+
fireTransportDisconnectedEvent();
228+
getConnection().asynchClose();
229+
} else {
230+
logger.warn("Old connection " + oldConnection + "might not have been Transport Established ");
255231
}
256-
// set the new connection to the current connection.
257-
transport.wireNewConnection(newConnection);
258232
}
233+
// remove the transport as a listener for the old connection
234+
if (oldConnection != null && oldConnection != getConnection()) {
235+
oldConnection.removeListener(this);
236+
}
237+
// set the new connection to the current connection.
238+
wireNewConnection(newConnection);
259239
}
260240

261241
/*********************************************************************************************************************
@@ -410,15 +390,6 @@ public TCByteBufferOutputStream createOutput() {
410390
}
411391
}
412392

413-
void log(String msg) {
414-
if (!getProductID().isInternal()) {
415-
getLogger().info(msg);
416-
} else {
417-
getLogger().debug(msg);
418-
}
419-
420-
}
421-
422393
@Override
423394
public Map<String, ?> getStateMap() {
424395
Map<String, Object> map = new LinkedHashMap<>();

common/src/main/java/com/tc/net/protocol/transport/MessageTransportState.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ public boolean isAlive() {
2828
return false;
2929
}
3030
},
31-
32-
STATE_START_OPEN("START_OPEN"),
33-
31+
3432
STATE_CONNECTED("CONNECTED"),
3533

36-
STATE_RESTART("RESTART"),
34+
STATE_RESTART("RESTART") {
35+
@Override
36+
public boolean isAlive() {
37+
return false;
38+
}
39+
},
3740

3841
/**
3942
* XXX: Move to client state machine SYN message sent, waiting for reply

common/src/main/java/com/tc/net/protocol/transport/MessageTransportStatus.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,19 @@
2323
import com.tc.util.Assert;
2424

2525
class MessageTransportStatus {
26-
private final MessageTransportState initial;
2726
private MessageTransportState state;
2827
private final Logger logger;
29-
private volatile boolean isEstablished = false;
3028

3129
MessageTransportStatus(MessageTransportState initialState, Logger logger) {
32-
this.initial = initialState;
3330
this.state = initialState;
3431
this.logger = logger;
3532
}
3633

3734
void reset() {
38-
stateChange(initial);
35+
stateChange(MessageTransportState.STATE_START);
3936
}
4037

4138
private synchronized void stateChange(MessageTransportState newState) {
42-
isEstablished = false;
4339
if (logger.isDebugEnabled()) {
4440
logger.debug("Changing from " + state.toString() + " to " + newState.toString());
4541
}
@@ -78,19 +74,12 @@ void end() {
7874
}
7975

8076
private synchronized boolean checkState(MessageTransportState check) {
81-
if (check == MessageTransportState.STATE_ESTABLISHED && state == MessageTransportState.STATE_ESTABLISHED) {
82-
isEstablished = true;
83-
}
8477
return this.state.equals(check);
8578
}
8679

8780
boolean isStart() {
8881
return checkState(MessageTransportState.STATE_START);
8982
}
90-
91-
boolean isStartOpen() {
92-
return checkState(MessageTransportState.STATE_START_OPEN);
93-
}
9483

9584
boolean isRestart() {
9685
return checkState(MessageTransportState.STATE_RESTART);
@@ -101,11 +90,7 @@ boolean isSynSent() {
10190
}
10291

10392
boolean isEstablished() {
104-
if (isEstablished) {
105-
return true;
106-
} else {
107-
return checkState(MessageTransportState.STATE_ESTABLISHED);
108-
}
93+
return checkState(MessageTransportState.STATE_ESTABLISHED);
10994
}
11095

11196
boolean isDisconnected() {

common/src/main/java/com/tc/net/protocol/transport/ServerMessageTransport.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.slf4j.LoggerFactory;
2323

2424
import com.tc.net.core.TCConnection;
25-
import com.tc.net.core.event.TCConnectionEvent;
25+
import com.tc.net.protocol.IllegalReconnectException;
2626
import com.tc.net.protocol.NetworkStackID;
2727
import com.tc.util.Assert;
2828

@@ -44,16 +44,20 @@ public ServerMessageTransport(TransportHandshakeErrorHandler handshakeErrorHandl
4444
public ServerMessageTransport(TCConnection conn,
4545
TransportHandshakeErrorHandler handshakeErrorHandler,
4646
TransportHandshakeMessageFactory messageFactory) {
47-
super(MessageTransportState.STATE_START_OPEN, handshakeErrorHandler, messageFactory, smtLogger);
47+
super(MessageTransportState.STATE_START, handshakeErrorHandler, messageFactory, smtLogger);
4848
Assert.assertNotNull(conn);
4949
wireNewConnection(conn);
5050
}
5151

5252
@Override
53-
protected ConnectionAttacher getConnectionAttacher() {
54-
if (this.status.isRestart()) {
55-
return new RestartConnectionAttacher();
56-
} else return super.getConnectionAttacher();
53+
public void attachNewConnection(TCConnection newConnection) throws IllegalReconnectException {
54+
if (!this.status.isRestart()) {
55+
// servers transports can only restart once
56+
throw new IllegalReconnectException();
57+
}
58+
Assert.assertNull(getConnection());
59+
wireNewConnection(newConnection);
60+
logger.debug("reconnect connection attach {} {}", this.getConnectionID().getClientID(), getConnection());
5761
}
5862

5963
@Override
@@ -129,16 +133,6 @@ private boolean verifyAck(WireProtocolMessage message) {
129133
return message instanceof TransportHandshakeMessage && ((TransportHandshakeMessage) message).isAck();
130134
}
131135

132-
private final class RestartConnectionAttacher implements ConnectionAttacher {
133-
134-
@Override
135-
public void attachNewConnection(TCConnectionEvent closeEvent, TCConnection oldConnection, TCConnection newConnection) {
136-
wireNewConnection(newConnection);
137-
log("Attaching new connection to transport: " + newConnection);
138-
}
139-
140-
}
141-
142136
@Override
143137
public String toString() {
144138
return "ServerMessageTransport{connection=" + getConnection() + '}';

common/src/main/java/com/tc/net/protocol/transport/ServerStackProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
public class ServerStackProvider implements NetworkStackProvider, MessageTransportListener, ProtocolAdaptorFactory {
5555
private static final Logger logger = LoggerFactory.getLogger(ServerStackProvider.class);
5656

57-
private final Map<ClientID, ServerNetworkStackHarness> harnesses = new ConcurrentHashMap<ClientID, ServerNetworkStackHarness>();
57+
private final Map<ClientID, ServerNetworkStackHarness> harnesses = new ConcurrentHashMap<>();
5858
private final NetworkStackHarnessFactory harnessFactory;
5959
private final ServerMessageChannelFactory channelFactory;
6060
private final TransportHandshakeMessageFactory handshakeMessageFactory;
@@ -63,7 +63,7 @@ public class ServerStackProvider implements NetworkStackProvider, MessageTranspo
6363
private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory;
6464
private final WireProtocolMessageSink wireProtoMsgsink;
6565
private final MessageTransportFactory messageTransportFactory;
66-
private final List<MessageTransportListener> transportListeners = new ArrayList<MessageTransportListener>();
66+
private final List<MessageTransportListener> transportListeners = new ArrayList<>();
6767
private final ReentrantLock licenseLock;
6868
private final RedirectAddressProvider activeProvider;
6969
private final Predicate<MessageTransport> validateTransport;
@@ -213,7 +213,7 @@ public void notifyTransportClosed(MessageTransport transport) {
213213
if (!connectionId.isJvmIDNull()) {
214214
boolean removed = this.connectionPolicy.clientDisconnected(connectionId);
215215
if (removed) {
216-
logger.warn("connectionid not removed be transport disconnect");
216+
logger.warn("connectionid not removed by transport disconnect");
217217
}
218218
}
219219
}

common/src/test/java/com/tc/net/protocol/transport/ServerMessageTransportTest.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.tc.net.core.TCConnection;
2222
import com.tc.net.core.event.TCConnectionEvent;
2323
import com.tc.net.core.event.TCConnectionEventListener;
24+
import com.tc.net.protocol.IllegalReconnectException;
2425
import com.tc.util.Assert;
2526
import java.util.ArrayList;
2627
import java.util.List;
@@ -78,18 +79,16 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
7879
return null;
7980
}
8081
}).when(connection).addListener(ArgumentMatchers.any(TCConnectionEventListener.class));
82+
when(connection.isConnected()).thenReturn(Boolean.TRUE);
8183
TransportHandshakeErrorHandler errHdr = mock(TransportHandshakeErrorHandler.class);
8284
TransportHandshakeMessageFactory factory = mock(TransportHandshakeMessageFactory.class);
8385
ServerMessageTransport transport = new ServerMessageTransport(connection, errHdr, factory);
8486
transport.initConnectionID(id);
8587
transport.addTransportListener(checker);
8688

87-
Assert.assertTrue(transport.status.isStartOpen());
89+
Assert.assertTrue(transport.status.isConnected());
8890

8991
TCConnectionEvent event = new TCConnectionEvent(connection);
90-
transport.connectEvent(event);
91-
92-
Assert.assertTrue(transport.status.isConnected());
9392

9493
for (TCConnectionEventListener trigger : listeners) {
9594
trigger.closeEvent(event);
@@ -126,4 +125,47 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
126125
verify(checker, times(2)).notifyTransportDisconnected(eq(transport), eq(false));
127126
verify(checker, times(1)).notifyTransportDisconnected(eq(transport), eq(true));
128127
}
128+
129+
@Test
130+
public void testDoubleAttachFails() throws Exception {
131+
ConnectionID id = new ConnectionID("JVM", 1);
132+
TCConnection connection = mock(TCConnection.class);
133+
134+
MessageTransportListener checker = mock(MessageTransportListener.class);
135+
136+
final List<TCConnectionEventListener> listeners = new ArrayList<TCConnectionEventListener>();
137+
doAnswer(new Answer<Void>() {
138+
@Override
139+
public Void answer(InvocationOnMock invocation) throws Throwable {
140+
listeners.add((TCConnectionEventListener)invocation.getArguments()[0]);
141+
return null;
142+
}
143+
}).when(connection).addListener(ArgumentMatchers.any(TCConnectionEventListener.class));
144+
when(connection.isConnected()).thenReturn(Boolean.TRUE);
145+
TransportHandshakeErrorHandler errHdr = mock(TransportHandshakeErrorHandler.class);
146+
TransportHandshakeMessageFactory factory = mock(TransportHandshakeMessageFactory.class);
147+
ServerMessageTransport transport = new ServerMessageTransport(errHdr, factory);
148+
transport.initConnectionID(id);
149+
transport.addTransportListener(checker);
150+
151+
Assert.assertFalse(transport.status.isConnected());
152+
153+
transport.attachNewConnection(connection);
154+
Assert.assertTrue(transport.status.isConnected());
155+
156+
try {
157+
transport.attachNewConnection(connection);
158+
Assert.fail();
159+
} catch (IllegalReconnectException illegal) {
160+
// expected
161+
}
162+
transport.close();
163+
Assert.assertFalse(transport.status.isConnected());
164+
try {
165+
transport.attachNewConnection(connection);
166+
Assert.fail();
167+
} catch (IllegalReconnectException illegal) {
168+
// expected
169+
}
170+
}
129171
}

0 commit comments

Comments
 (0)