Skip to content

Commit eee9240

Browse files
author
Myron Scott
committed
Reject any reconnect attempts outside the reconnect window. Unless something very strange happens to the underlying network comms, this should not happen. If it does, any reconnect handshake will be rejected when outside the reconnect window. The server has no idea how to handle reconnects extra processing outside the reconnect window.
1 parent e6dd19b commit eee9240

File tree

13 files changed

+178
-13
lines changed

13 files changed

+178
-13
lines changed

common/src/main/java/com/tc/net/protocol/transport/ServerStackProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
public class ServerStackProvider implements NetworkStackProvider, MessageTransportListener, ProtocolAdaptorFactory {
5555
private static final Logger logger = LoggerFactory.getLogger(ServerStackProvider.class);
5656

57-
private final Map<ClientID, ServerNetworkStackHarness> harnesses = new ConcurrentHashMap<ClientID, ServerNetworkStackHarness>();
57+
private final Map<ClientID, ServerNetworkStackHarness> harnesses = new ConcurrentHashMap<>();
5858
private final NetworkStackHarnessFactory harnessFactory;
5959
private final ServerMessageChannelFactory channelFactory;
6060
private final TransportHandshakeMessageFactory handshakeMessageFactory;
@@ -63,7 +63,7 @@ public class ServerStackProvider implements NetworkStackProvider, MessageTranspo
6363
private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory;
6464
private final WireProtocolMessageSink wireProtoMsgsink;
6565
private final MessageTransportFactory messageTransportFactory;
66-
private final List<MessageTransportListener> transportListeners = new ArrayList<MessageTransportListener>();
66+
private final List<MessageTransportListener> transportListeners = new ArrayList<>();
6767
private final ReentrantLock licenseLock;
6868
private final RedirectAddressProvider activeProvider;
6969
private final Predicate<MessageTransport> validateTransport;
@@ -213,7 +213,7 @@ public void notifyTransportClosed(MessageTransport transport) {
213213
if (!connectionId.isJvmIDNull()) {
214214
boolean removed = this.connectionPolicy.clientDisconnected(connectionId);
215215
if (removed) {
216-
logger.warn("connectionid not removed be transport disconnect");
216+
logger.warn("connectionid not removed by transport disconnect");
217217
}
218218
}
219219
}

common/src/test/java/com/tc/net/protocol/transport/ServerMessageTransportTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.tc.net.core.TCConnection;
2222
import com.tc.net.core.event.TCConnectionEvent;
2323
import com.tc.net.core.event.TCConnectionEventListener;
24+
import com.tc.net.protocol.IllegalReconnectException;
2425
import com.tc.util.Assert;
2526
import java.util.ArrayList;
2627
import java.util.List;
@@ -124,4 +125,47 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
124125
verify(checker, times(2)).notifyTransportDisconnected(eq(transport), eq(false));
125126
verify(checker, times(1)).notifyTransportDisconnected(eq(transport), eq(true));
126127
}
128+
129+
@Test
130+
public void testDoubleAttachFails() throws Exception {
131+
ConnectionID id = new ConnectionID("JVM", 1);
132+
TCConnection connection = mock(TCConnection.class);
133+
134+
MessageTransportListener checker = mock(MessageTransportListener.class);
135+
136+
final List<TCConnectionEventListener> listeners = new ArrayList<TCConnectionEventListener>();
137+
doAnswer(new Answer<Void>() {
138+
@Override
139+
public Void answer(InvocationOnMock invocation) throws Throwable {
140+
listeners.add((TCConnectionEventListener)invocation.getArguments()[0]);
141+
return null;
142+
}
143+
}).when(connection).addListener(ArgumentMatchers.any(TCConnectionEventListener.class));
144+
when(connection.isConnected()).thenReturn(Boolean.TRUE);
145+
TransportHandshakeErrorHandler errHdr = mock(TransportHandshakeErrorHandler.class);
146+
TransportHandshakeMessageFactory factory = mock(TransportHandshakeMessageFactory.class);
147+
ServerMessageTransport transport = new ServerMessageTransport(errHdr, factory);
148+
transport.initConnectionID(id);
149+
transport.addTransportListener(checker);
150+
151+
Assert.assertFalse(transport.status.isConnected());
152+
153+
transport.attachNewConnection(connection);
154+
Assert.assertTrue(transport.status.isConnected());
155+
156+
try {
157+
transport.attachNewConnection(connection);
158+
Assert.fail();
159+
} catch (IllegalReconnectException illegal) {
160+
// expected
161+
}
162+
transport.close();
163+
Assert.assertFalse(transport.status.isConnected());
164+
try {
165+
transport.attachNewConnection(connection);
166+
Assert.fail();
167+
} catch (IllegalReconnectException illegal) {
168+
// expected
169+
}
170+
}
127171
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
*
3+
* The contents of this file are subject to the Terracotta Public License Version
4+
* 2.0 (the "License"); You may not use this file except in compliance with the
5+
* License. You may obtain a copy of the License at
6+
*
7+
* http://terracotta.org/legal/terracotta-public-license.
8+
*
9+
* Software distributed under the License is distributed on an "AS IS" basis,
10+
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
11+
* the specific language governing rights and limitations under the License.
12+
*
13+
* The Covered Software is Terracotta Core.
14+
*
15+
* The Initial Developer of the Covered Software is
16+
* Terracotta, Inc., a Software AG company
17+
*
18+
*/
19+
package org.terracotta.functional;
20+
21+
22+
import java.io.StringReader;
23+
import java.net.InetSocketAddress;
24+
import java.util.Properties;
25+
import junit.framework.Assert;
26+
import org.junit.Rule;
27+
import org.junit.Test;
28+
import org.terracotta.connection.Connection;
29+
import org.terracotta.connection.Diagnostics;
30+
import org.terracotta.connection.DiagnosticsFactory;
31+
import org.terracotta.connection.entity.EntityRef;
32+
import org.terracotta.entity.map.ConcurrentClusteredMap;
33+
import org.terracotta.entity.map.MapConfig;
34+
import org.terracotta.exception.ConnectionClosedException;
35+
import org.terracotta.testing.rules.BasicExternalClusterBuilder;
36+
import org.terracotta.testing.rules.Cluster;
37+
38+
/**
39+
*
40+
*/
41+
public class ReconnectRejectIT {
42+
43+
@Rule
44+
public final Cluster CLUSTER = BasicExternalClusterBuilder.newCluster(1).withClientReconnectWindowTime(30)
45+
.build();
46+
47+
@Test
48+
public void testClusterHostPorts() throws Exception {
49+
Connection connection = CLUSTER.newConnection();
50+
String hp = CLUSTER.getClusterHostPorts()[0];
51+
String[] shp = hp.split(":");
52+
int port = Integer.parseInt(shp[1]);
53+
String id = null;
54+
try (Diagnostics d = DiagnosticsFactory.connect(InetSocketAddress.createUnresolved(shp[0], port), new Properties())) {
55+
Properties clients = new Properties();
56+
clients.load(new StringReader(d.invoke("Server", "getConnectedClients")));
57+
id = clients.getProperty("clients.0.id");
58+
}
59+
try (Diagnostics d = DiagnosticsFactory.connect(InetSocketAddress.createUnresolved(shp[0], port), new Properties())) {
60+
System.out.println(d.invokeWithArg("Server", "disconnectClient", id));
61+
}
62+
try {
63+
EntityRef<ConcurrentClusteredMap, MapConfig, Void> ref = connection.getEntityRef(ConcurrentClusteredMap.class, 1L, "ROOT");
64+
ref.create(new MapConfig(1, "ROOT"));
65+
ConcurrentClusteredMap map = ref.fetchEntity(null);
66+
Assert.fail();
67+
} catch (ConnectionClosedException e) {
68+
//expected
69+
// There should be TRANSPORT_RECONNECTION_REJECTED_EVENT in the client logs
70+
e.printStackTrace();
71+
}
72+
73+
}
74+
}

management/src/main/java/com/tc/management/beans/TCServerInfoMBean.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ public interface TCServerInfoMBean extends TerracottaMBean, RuntimeStatisticCons
102102

103103
void setPipelineMonitoring(boolean monitor);
104104

105+
boolean disconnectClient(String id);
106+
105107
String getClusterState(boolean shortForm);
106108

107109
String getConnectedClients() throws IOException;

tc-client/src/main/java/com/tc/object/DistributedObjectClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ private synchronized ClientMessageChannel internalStart(int socketTimeout) {
240240

241241
final ProductInfo pInfo = ProductInfo.getInstance(getClass().getClassLoader());
242242

243-
ClientHandshakeMessageFactory chmf = (u, n, c, r)->{
243+
ClientHandshakeMessageFactory chmf = (u, n, c, r, reconnect)->{
244244
ClientMessageChannel cmc = getClientMessageChannel();
245245
if (cmc != null) {
246246
final ClientHandshakeMessage rv = (ClientHandshakeMessage)cmc.createMessage(TCMessageType.CLIENT_HANDSHAKE_MESSAGE);
@@ -249,6 +249,7 @@ private synchronized ClientMessageChannel internalStart(int socketTimeout) {
249249
rv.setClientPID(getPID());
250250
rv.setUUID(u);
251251
rv.setName(n);
252+
rv.setReconnect(reconnect);
252253
return rv;
253254
} else {
254255
return null;

tc-client/src/main/java/com/tc/object/handshakemanager/ClientHandshakeManagerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private enum State {
5959

6060
private State state;
6161
private volatile boolean disconnected;
62+
private volatile boolean wasConnected;
6263
private volatile boolean isShutdown = false;
6364

6465
public ClientHandshakeManagerImpl(Logger logger, ClientHandshakeMessageFactory chmf,
@@ -73,6 +74,7 @@ public ClientHandshakeManagerImpl(Logger logger, ClientHandshakeMessageFactory c
7374
this.callBacks = entities;
7475
this.state = State.PAUSED;
7576
this.disconnected = true;
77+
this.wasConnected = false;
7678
pauseCallbacks();
7779
}
7880

@@ -106,7 +108,7 @@ private void initiateHandshake() {
106108
ClientHandshakeMessage handshakeMessage;
107109

108110
changeToStarting();
109-
handshakeMessage = this.chmf.newClientHandshakeMessage(this.uuid, this.name, this.clientVersion, this.clientRevision);
111+
handshakeMessage = this.chmf.newClientHandshakeMessage(this.uuid, this.name, this.clientVersion, this.clientRevision, this.wasConnected);
110112
if (handshakeMessage != null) {
111113
notifyCallbackOnHandshake(handshakeMessage);
112114

@@ -242,5 +244,6 @@ private void changeToRunning() {
242244
state = State.RUNNING;
243245

244246
this.disconnected = false;
247+
this.wasConnected = true;
245248
}
246249
}

tc-client/src/test/java/com/tc/object/ClientEntityManagerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ public void result(byte[] response) {
579579
result.get(1, TimeUnit.SECONDS);
580580
Assert.fail();
581581
} catch (TimeoutException to) {
582+
assertTrue(!result.isDone());
582583
assertThat(System.currentTimeMillis() - start, Matchers.greaterThanOrEqualTo(1000L));
583584
// expected
584585
}
@@ -587,6 +588,7 @@ public void result(byte[] response) {
587588
result.get(2, TimeUnit.SECONDS);
588589
Assert.fail();
589590
} catch (TimeoutException to) {
591+
assertTrue(!result.isDone());
590592
assertThat(System.currentTimeMillis() - start, Matchers.greaterThanOrEqualTo(2000L));
591593
// expected
592594
}

tc-messaging/src/main/java/com/tc/object/msg/ClientHandshakeMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626

2727
public interface ClientHandshakeMessage extends TCAction {
2828

29+
void setReconnect(boolean isReconnect);
30+
31+
boolean isReconnect();
32+
2933
void setUUID(String uuid);
3034

3135
String getUUID();

tc-messaging/src/main/java/com/tc/object/msg/ClientHandshakeMessageFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@
2020

2121
public interface ClientHandshakeMessageFactory {
2222

23-
public ClientHandshakeMessage newClientHandshakeMessage(String uuid, String name, String clientVersion, String clientRevision);
23+
public ClientHandshakeMessage newClientHandshakeMessage(String uuid, String name, String clientVersion, String clientRevision, boolean reconnect);
2424

2525
}

tc-messaging/src/main/java/com/tc/object/msg/ClientHandshakeMessageImpl.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040

4141
public class ClientHandshakeMessageImpl extends DSOMessageBase implements ClientHandshakeMessage {
42-
private static final byte UNUSED_1 = 1;
42+
private static final byte RECONNECT = 1;
4343
private static final byte CLIENT_VERSION = 2;
4444
private static final byte UNUSED_2 = 3;
4545
private static final byte LOCAL_TIME_MILLS = 4;
@@ -58,6 +58,7 @@ public class ClientHandshakeMessageImpl extends DSOMessageBase implements Client
5858
private String clientRevision = "";
5959
private String clientAddress = "";
6060
private int pid = -1;
61+
private boolean reconnect = false;
6162
private final Set<ClientEntityReferenceContext> reconnectReferences = new HashSet<ClientEntityReferenceContext>();
6263
private final Set<ResendVoltronEntityMessage> resendMessages = new TreeSet<ResendVoltronEntityMessage>(new Comparator<ResendVoltronEntityMessage>() {
6364
@Override
@@ -80,6 +81,16 @@ public ClientHandshakeMessageImpl(SessionID sessionID, MessageMonitor monitor, M
8081
clientAddress = TCSocketAddress.getStringForm(channel.getLocalAddress());
8182
}
8283

84+
@Override
85+
public void setReconnect(boolean isReconnect) {
86+
this.reconnect = isReconnect;
87+
}
88+
89+
@Override
90+
public boolean isReconnect() {
91+
return this.reconnect;
92+
}
93+
8394
@Override
8495
public String getClientVersion() {
8596
return this.clientVersion;
@@ -142,7 +153,7 @@ public String getClientAddress() {
142153

143154
@Override
144155
protected void dehydrateValues() {
145-
putNVPair(UNUSED_1, false); // unused but keep for compatibility
156+
putNVPair(RECONNECT, reconnect); // unused but keep for compatibility
146157
putNVPair(UNUSED_2, false); // unused but keep for compatibility
147158
putNVPair(CLIENT_UUID, this.uuid);
148159
putNVPair(CLIENT_NAME, this.name);
@@ -162,8 +173,8 @@ protected void dehydrateValues() {
162173
@Override
163174
protected boolean hydrateValue(byte name) throws IOException {
164175
switch (name) {
165-
case UNUSED_1:
166-
getBooleanValue(); // unused but keep for compatibility
176+
case RECONNECT:
177+
this.reconnect = getBooleanValue();
167178
return true;
168179
case UNUSED_2:
169180
getBooleanValue(); // unused but keep for compatibility

0 commit comments

Comments
 (0)