Skip to content

Commit 73c46f2

Browse files
author
Myron Scott
committed
simplify health-checker callback ports to only use ports known by the client
1 parent e54c3f6 commit 73c46f2

18 files changed

+36
-130
lines changed

common/src/main/java/com/tc/net/protocol/tcm/CommunicationsManagerImpl.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ public class CommunicationsManagerImpl implements CommunicationsManager {
9999

100100
private final ConnectionHealthChecker connectionHealthChecker;
101101
private ServerID serverID = ServerID.NULL_ID;
102-
private int callbackPort = TransportHandshakeMessage.NO_CALLBACK_PORT;
103102
private final TransportHandshakeErrorHandler handshakeErrHandler;
104103

105104
/**
@@ -276,7 +275,7 @@ public ClientMessageChannel createClientChannel(ProductID productId,
276275
connectionHealthChecker,
277276
connectionManager,
278277
timeout,
279-
callbackPort, handshakeErrHandler,
278+
handshakeErrHandler,
280279
reconnectionRejectedHandler
281280
);
282281
NetworkStackHarness stackHarness = this.stackHarnessFactory.createClientHarness(transportFactory, rv,
@@ -328,13 +327,6 @@ public MessageChannelInternal createNewChannel(ChannelID id) {
328327
}
329328
};
330329

331-
// XXX: since we don't create multiple listeners per commsMgr, its OK to set
332-
// L2's callbackPort here. Otherwise, have interface method and set after starting
333-
// commsMgr listener.
334-
if (!this.healthCheckerConfig.isCallbackPortListenerNeeded()) {
335-
this.callbackPort = addr.getPort();
336-
}
337-
338330
final ChannelManagerImpl channelManager = new ChannelManagerImpl(transportDisconnectRemovesChannel, channelFactory);
339331
return new NetworkListenerImpl(addr, this, channelManager, msgFactory, reuseAddr,
340332
connectionIdFactory, wireProtoMsgSnk, activeProvider, validation);

common/src/main/java/com/tc/net/protocol/tcm/MessageTransportFactoryImpl.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.tc.net.protocol.transport.ReconnectionRejectedHandler;
2828
import com.tc.net.protocol.transport.ServerMessageTransport;
2929
import com.tc.net.protocol.transport.TransportHandshakeErrorHandler;
30+
import com.tc.net.protocol.transport.TransportHandshakeMessage;
3031
import com.tc.net.protocol.transport.TransportHandshakeMessageFactory;
3132
import com.tc.net.protocol.transport.WireProtocolAdaptorFactory;
3233
import com.tc.net.protocol.transport.WireProtocolAdaptorFactoryImpl;
@@ -38,21 +39,19 @@ public class MessageTransportFactoryImpl implements MessageTransportFactory {
3839
private final ConnectionHealthChecker connectionHealthChecker;
3940
private final TCConnectionManager connectionMgr;
4041
private final int timeout;
41-
private final int callbackport;
4242
private final TransportHandshakeErrorHandler defaultHandshakeErrorHandler;
4343
private final ReconnectionRejectedHandler reconnectionRejectedHandler;
4444

4545
public MessageTransportFactoryImpl(TransportHandshakeMessageFactory transportMessageFactory,
4646
ConnectionHealthChecker connectionHealthChecker,
4747
TCConnectionManager connectionManager,
48-
int timeout, int callbackPort,
48+
int timeout,
4949
TransportHandshakeErrorHandler defaultHandshakeErrorHandler,
5050
ReconnectionRejectedHandler reconnectionRejectedBehaviour) {
5151
this.transportMessageFactory = transportMessageFactory;
5252
this.connectionHealthChecker = connectionHealthChecker;
5353
this.connectionMgr = connectionManager;
5454
this.timeout = timeout;
55-
this.callbackport = callbackPort;
5655
this.defaultHandshakeErrorHandler = defaultHandshakeErrorHandler;
5756
this.reconnectionRejectedHandler = reconnectionRejectedBehaviour;
5857
}
@@ -61,17 +60,16 @@ public MessageTransportFactoryImpl(TransportHandshakeMessageFactory transportMes
6160
public ClientMessageTransport createNewTransport() {
6261
ClientMessageTransport cmt = createClientMessageTransport(
6362
defaultHandshakeErrorHandler, transportMessageFactory,
64-
new WireProtocolAdaptorFactoryImpl(), callbackport);
63+
new WireProtocolAdaptorFactoryImpl());
6564
cmt.addTransportListener(connectionHealthChecker);
6665
return cmt;
6766
}
6867

6968
protected ClientMessageTransport createClientMessageTransport(TransportHandshakeErrorHandler handshakeErrorHandler,
7069
TransportHandshakeMessageFactory messageFactory,
71-
WireProtocolAdaptorFactory wireProtocolAdaptorFactory,
72-
int callbackPortNum) {
70+
WireProtocolAdaptorFactory wireProtocolAdaptorFactory) {
7371
return new ClientMessageTransport(this.connectionMgr, handshakeErrorHandler, transportMessageFactory,
74-
wireProtocolAdaptorFactory, callbackPortNum, this.timeout, reconnectionRejectedHandler);
72+
wireProtocolAdaptorFactory, this.timeout, reconnectionRejectedHandler);
7573
}
7674

7775
@Override

common/src/main/java/com/tc/net/protocol/transport/ClientMessageTransport.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,14 @@ public class ClientMessageTransport extends MessageTransportBase {
5959
private CompletableFuture<NetworkStackID> opener;
6060
private CompletableFuture<SynAckMessage> waitForSynAckResult;
6161
private final WireProtocolAdaptorFactory wireProtocolAdaptorFactory;
62-
private final int callbackPort;
6362
private final int timeout;
6463
private final ReconnectionRejectedHandler reconnectionRejectedHandler;
6564

6665
public ClientMessageTransport(TCConnectionManager clientConnectionEstablisher,
6766
TransportHandshakeErrorHandler handshakeErrorHandler,
6867
TransportHandshakeMessageFactory messageFactory,
69-
WireProtocolAdaptorFactory wireProtocolAdaptorFactory, int callbackPort, int timeout) {
70-
this(clientConnectionEstablisher, handshakeErrorHandler, messageFactory, wireProtocolAdaptorFactory, callbackPort, timeout,
68+
WireProtocolAdaptorFactory wireProtocolAdaptorFactory, int timeout) {
69+
this(clientConnectionEstablisher, handshakeErrorHandler, messageFactory, wireProtocolAdaptorFactory, timeout,
7170
ReconnectionRejectedHandlerL1.SINGLETON);
7271
}
7372

@@ -78,13 +77,12 @@ public ClientMessageTransport(TCConnectionManager clientConnectionEstablisher,
7877
public ClientMessageTransport(TCConnectionManager connectionManager,
7978
TransportHandshakeErrorHandler handshakeErrorHandler,
8079
TransportHandshakeMessageFactory messageFactory,
81-
WireProtocolAdaptorFactory wireProtocolAdaptorFactory, int callbackPort, int timeout,
80+
WireProtocolAdaptorFactory wireProtocolAdaptorFactory, int timeout,
8281
ReconnectionRejectedHandler reconnectionRejectedHandler) {
8382

8483
super(MessageTransportState.STATE_START, handshakeErrorHandler, messageFactory, LoggerFactory.getLogger(ClientMessageTransport.class));
8584
this.wireProtocolAdaptorFactory = wireProtocolAdaptorFactory;
8685
this.connectionManager = connectionManager;
87-
this.callbackPort = callbackPort;
8886
this.timeout = timeout;
8987
this.reconnectionRejectedHandler = reconnectionRejectedHandler;
9088
}
@@ -309,7 +307,6 @@ private void handleSynAck(WireProtocolMessage message) {
309307
}
310308
getConnection().setTransportEstablished();
311309
setSynAckResult(synAck);
312-
setRemoteCallbackPort(synAck.getCallbackPort());
313310
}
314311
}
315312

@@ -400,7 +397,7 @@ private Future<SynAckMessage> sendSyn() throws TransportHandshakeException {
400397
// get the stack layer list and pass it in
401398
short stackLayerFlags = getCommunicationStackFlags(this);
402399
TransportHandshakeMessage syn = this.messageFactory.createSyn(getConnectionID(), getConnection(),
403-
stackLayerFlags, this.callbackPort);
400+
stackLayerFlags);
404401
// send syn message
405402
try {
406403
this.sendToConnection(syn);

common/src/main/java/com/tc/net/protocol/transport/ConnectionHealthCheckerContextImpl.java

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ class ConnectionHealthCheckerContextImpl implements ConnectionHealthCheckerConte
6767

6868
// Context info
6969
private final HealthCheckerConfig config;
70-
private final int callbackPort;
7170
private final String remoteNodeDesc;
7271

7372
// Socket Connect probes
@@ -96,15 +95,8 @@ public ConnectionHealthCheckerContextImpl(MessageTransportBase mtb, HealthChecke
9695
this.remoteNodeDesc = mtb.getRemoteAddress().toString();
9796
logger.info("Health monitoring agent started for " + remoteNodeDesc);
9897
currentState = INIT;
99-
callbackPort = transport.getRemoteCallbackPort();
10098
configFactor = 1;
101-
// only verify the callback port if the callback is different
102-
// from the connection port
103-
if (callbackPort != transport.getRemoteAddress().getPort()) {
104-
initCallbackPortVerification();
105-
} else {
106-
changeState(START);
107-
}
99+
changeState(START);
108100
}
109101

110102
@Override
@@ -113,29 +105,6 @@ public synchronized void close() {
113105
sockectConnect.stop();
114106
}
115107

116-
// RMP-343
117-
private void initCallbackPortVerification() {
118-
if (config.isSocketConnectOnPingFail()) {
119-
SocketConnectStartStatus status = initSocketConnectProbe();
120-
if (status == SocketConnectStartStatus.FAILED) {
121-
// 1. callback port handshaked and not reachable -- upgrade the config factor
122-
callbackPortVerificationFailed();
123-
} else if (status == SocketConnectStartStatus.NOT_STARTED) {
124-
// 2. callback port not handshaked -- just log it, no config upgrade, move to next state
125-
changeState(START);
126-
} else if (status == SocketConnectStartStatus.STARTED) {
127-
// async socket connect to callback port has started. HC state remains at INIT. state change happens on
128-
// connection events
129-
} else {
130-
throw new AssertionError("initCallbackPortVerification: Unexpected SocketConnectStart Status");
131-
}
132-
} else {
133-
logger
134-
.info("HealthCheck SocketConnect disabled for " + remoteNodeDesc + ". HealthCheckCallbackPort not verified");
135-
changeState(START);
136-
}
137-
}
138-
139108
/* all callers of this method are already synchronized */
140109
private void changeState(State newState) {
141110
if (logger.isDebugEnabled() && currentState != newState) {
@@ -188,15 +157,8 @@ protected TCConnection getNewConnection(TCConnectionManager connManager) throws
188157
protected HealthCheckerSocketConnect getHealthCheckerSocketConnector(TCConnection connection,
189158
MessageTransportBase transportBase,
190159
Logger loger, HealthCheckerConfig cnfg) {
191-
if (TransportHandshakeMessage.NO_CALLBACK_PORT == callbackPort) {
192-
logger.info("No HealthCheckCallbackPort handshaked for node " + remoteNodeDesc);
193-
return new NullHealthCheckerSocketConnectImpl();
194-
}
195-
196-
// TODO: do we need to exchange the address as well ??? (since it might be different than the remote IP on this
197-
// conn)
198-
InetSocketAddress sa = new InetSocketAddress(transportBase.getRemoteAddress().getAddress(), callbackPort);
199-
return new HealthCheckerSocketConnectImpl(sa, connection, remoteNodeDesc + "(callbackport:" + callbackPort + ")",
160+
InetSocketAddress sa = transportBase.getRemoteAddress();
161+
return new HealthCheckerSocketConnectImpl(sa, connection, remoteNodeDesc,
200162
loger, cnfg.getSocketConnectTimeout());
201163
}
202164

@@ -310,17 +272,14 @@ public synchronized boolean probeIfAlive() {
310272
}
311273

312274
private void callbackPortVerificationFailed() {
313-
transport.setRemoteCallbackPort(TransportHandshakeMessage.NO_CALLBACK_PORT);
314275
updateConfigFactor(CONFIG_UPGRADE_FACTOR);
315276
changeState(START);
316-
logger.debug("HealthCheckCallbackPort verification FAILED for " + remoteNodeDesc + "(callbackport: " + callbackPort
317-
+ ")");
277+
logger.debug("HealthCheckCallbackPort verification FAILED for " + remoteNodeDesc);
318278
}
319279

320280
private void callbackPortVerificationSuccess() {
321281
changeState(START);
322-
logger.debug("HealthCheckCallbackPort verification PASSED for " + remoteNodeDesc + "(callbackport: " + callbackPort
323-
+ ")");
282+
logger.debug("HealthCheckCallbackPort verification PASSED for " + remoteNodeDesc);
324283
}
325284

326285
@Override

common/src/main/java/com/tc/net/protocol/transport/DisabledHealthCheckerConfigImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,6 @@ public int getSocketConnectTimeout() {
6565
throw new AssertionError("Disabled HealthChecker");
6666
}
6767

68-
@Override
69-
public boolean isCallbackPortListenerNeeded() {
70-
return false;
71-
}
72-
7368
@Override
7469
public boolean isCheckTimeEnabled() {
7570
throw new AssertionError("Disabled HealthChecker");

common/src/main/java/com/tc/net/protocol/transport/HealthCheckerConfig.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ public interface HealthCheckerConfig {
5252

5353
int getSocketConnectTimeout();
5454

55-
/**
56-
* RMP-343: L2 SocketConnect L1
57-
*/
58-
boolean isCallbackPortListenerNeeded();
5955
/**
6056
* Checking time difference between hosts enabled/disabled.
6157
*/

common/src/main/java/com/tc/net/protocol/transport/HealthCheckerConfigClientImpl.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,8 @@ public HealthCheckerConfigClientImpl(String name, String bindPort) {
3131
super(name);
3232
}
3333

34-
public HealthCheckerConfigClientImpl(long idle, long interval, int probes, String name, boolean extraCheck,
34+
public HealthCheckerConfigClientImpl(long idle, long interval, int probes, String name,
3535
int socketConnectMaxCount, int socketConnectTimeout) {
36-
super(idle, interval, probes, name, extraCheck, socketConnectMaxCount, socketConnectTimeout);
37-
}
38-
39-
@Override
40-
public boolean isCallbackPortListenerNeeded() {
41-
return true;
36+
super(idle, interval, probes, name, false, socketConnectMaxCount, socketConnectTimeout);
4237
}
4338
}

common/src/main/java/com/tc/net/protocol/transport/HealthCheckerConfigImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,6 @@ public int getSocketConnectTimeout() {
139139
return this.socketConnectTimeout;
140140
}
141141

142-
@Override
143-
public boolean isCallbackPortListenerNeeded() {
144-
return false;
145-
}
146-
147142
@Override
148143
public boolean isCheckTimeEnabled() {
149144
return this.checkTimeEnabled;

common/src/main/java/com/tc/net/protocol/transport/MessageTransport.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,5 @@ public interface MessageTransport extends NetworkLayer, PrettyPrintable {
5050

5151
public String getCommunicationStackNames(NetworkLayer parentLayer);
5252

53-
public void setRemoteCallbackPort(int callbackPort);
54-
55-
public int getRemoteCallbackPort();
56-
5753
public void initConnectionID(ConnectionID cid);
5854
}

common/src/main/java/com/tc/net/protocol/transport/MessageTransportBase.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ abstract class MessageTransportBase extends AbstractMessageTransport implements
5555

5656
private final AtomicReference<TCConnectionEvent> connectionCloseEvent = new AtomicReference<>();
5757
private volatile ConnectionHealthCheckerContext healthCheckerContext = new ConnectionHealthCheckerContextDummyImpl();
58-
private int remoteCallbackPort = TransportHandshakeMessage.NO_CALLBACK_PORT;
5958

6059
protected MessageTransportBase(MessageTransportState initialState,
6160
TransportHandshakeErrorHandler handshakeErrorHandler,
@@ -386,16 +385,6 @@ public String getStackLayerName() {
386385
return NAME_TRANSPORT_LAYER;
387386
}
388387

389-
@Override
390-
public synchronized int getRemoteCallbackPort() {
391-
return this.remoteCallbackPort;
392-
}
393-
394-
@Override
395-
public synchronized void setRemoteCallbackPort(int remoteCallbackPort) {
396-
this.remoteCallbackPort = remoteCallbackPort;
397-
}
398-
399388
@Override
400389
public synchronized final void initConnectionID(ConnectionID cid) {
401390
connectionId = cid;

0 commit comments

Comments
 (0)