Skip to content

Commit 25a2d06

Browse files
author
Myron Scott
committed
update reference checking for connections
1 parent 928bdd3 commit 25a2d06

File tree

11 files changed

+60
-191
lines changed

11 files changed

+60
-191
lines changed

common/src/main/java/com/tc/async/api/SEDA.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ public class SEDA {
3333

3434
public SEDA(TCThreadGroup threadGroup) {
3535
this.threadGroup = threadGroup;
36-
this.stageManager = new StageManagerImpl(threadGroup, new QueueFactory(), new StageListener() {
37-
@Override
38-
public void stageStalled(String name, long delay, int queueDepth) {
39-
stageWarning(new StallWarning(name, delay, queueDepth));
40-
}
41-
});
36+
this.stageManager = new StageManagerImpl(threadGroup, new QueueFactory());
37+
}
38+
39+
public SEDA(TCThreadGroup threadGroup, StageListener listener) {
40+
this.threadGroup = threadGroup;
41+
this.stageManager = new StageManagerImpl(threadGroup, new QueueFactory(), listener);
4242
}
4343

4444
public StageManager getStageManager() {
@@ -48,25 +48,4 @@ public StageManager getStageManager() {
4848
protected TCThreadGroup getThreadGroup() {
4949
return this.threadGroup;
5050
}
51-
52-
public void stageWarning(Object description) {
53-
54-
}
55-
56-
private static class StallWarning {
57-
private final String name;
58-
private final long delay;
59-
private final int depth;
60-
61-
public StallWarning(String name, long delay, int depth) {
62-
this.name = name;
63-
this.delay = delay;
64-
this.depth = depth;
65-
}
66-
67-
@Override
68-
public String toString() {
69-
return "StallWarning{" + "name=" + name + ", delay=" + delay + ", depth=" + depth + '}';
70-
}
71-
}
7251
}

common/src/main/java/com/tc/net/protocol/transport/ConnectionWatcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ public ConnectionWatcher(ClientMessageChannel target, ClientConnectionEstablishe
4646

4747
private boolean checkForStop() {
4848
Reference<? extends ClientMessageChannel> target = stopQueue.poll();
49-
if (target != null) {
49+
while (target != null) {
5050
if (target == targetHolder) {
5151
stopped.set();
5252
LOGGER.warn("unreferenced connection left open {} {}", targetHolder.get(), connection);
5353
cce.shutdown();
5454
}
55+
target = stopQueue.poll();
5556
}
5657
return stopped.isSet();
5758
}

tc-client/src/main/java/com/tc/object/ClientBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ ClientHandshakeManager createClientHandshakeManager(Logger logger,
6161
String clientRevision,
6262
ClientEntityManager entity);
6363

64-
ClientEntityManager createClientEntityManager(ClientMessageChannel channel, StageManager stages);
64+
ClientEntityManager createClientEntityManager(ClientMessageChannel channel);
6565

6666
void setClientConnectionErrorListener(ClientConnectionErrorListener listener);
6767

tc-client/src/main/java/com/tc/object/ClientConfigurationContext.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

tc-client/src/main/java/com/tc/object/ClientEntityManagerImpl.java

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package com.tc.object;
2020

21-
import com.tc.async.api.Stage;
22-
import com.tc.async.api.StageManager;
2321
import com.tc.bytes.TCByteBufferFactory;
2422
import com.tc.exception.EntityBusyException;
2523
import com.tc.exception.EntityReferencedException;
@@ -61,22 +59,19 @@
6159
import com.tc.util.Assert;
6260
import com.tc.util.Util;
6361
import java.io.IOException;
64-
import java.lang.ref.WeakReference;
6562
import java.nio.ByteBuffer;
6663

6764
import java.util.EnumSet;
6865
import java.util.LinkedHashMap;
6966
import java.util.Map;
7067
import java.util.Objects;
71-
import java.util.Optional;
7268
import java.util.Set;
7369
import java.util.concurrent.ConcurrentHashMap;
7470
import java.util.concurrent.ConcurrentMap;
7571
import java.util.concurrent.ExecutorService;
7672
import java.util.concurrent.Executors;
7773
import java.util.concurrent.RejectedExecutionException;
7874
import java.util.concurrent.TimeUnit;
79-
import java.util.concurrent.TimeoutException;
8075
import java.util.concurrent.atomic.LongAdder;
8176
import java.util.function.Supplier;
8277

@@ -91,38 +86,31 @@
9186
public class ClientEntityManagerImpl implements ClientEntityManager {
9287
private final Logger logger;
9388

94-
private final WeakReference<ClientMessageChannel> channel;
89+
private final ClientMessageChannel channel;
9590
private final ConcurrentMap<TransactionID, InFlightMessage> inFlightMessages;
9691
private final TransactionSource transactionSource;
9792

9893
private final ClientEntityStateManager stateManager;
9994
private final ConcurrentMap<ClientInstanceID, EntityClientEndpointImpl<?, ?>> objectStoreMap;
100-
101-
private final StageManager stages;
102-
95+
10396
private final ExecutorService endpointCloser = Executors.newWorkStealingPool();
10497

10598
private final LongAdder msgCount = new LongAdder();
10699
private final LongAdder inflights = new LongAdder();
107100
private final LongAdder addWindow = new LongAdder();
108101

109-
public ClientEntityManagerImpl(ClientMessageChannel channel, StageManager mgr) {
110-
this.channel = new WeakReference<>(channel);
111-
this.logger = new ClientIDLogger(() -> channel().map(ClientMessageChannel::getClientID).orElse(ClientID.NULL_ID), LoggerFactory.getLogger(ClientEntityManager.class));
102+
public ClientEntityManagerImpl(ClientMessageChannel channel) {
103+
this.channel = channel;
104+
this.logger = new ClientIDLogger(() -> channel.getClientID(), LoggerFactory.getLogger(ClientEntityManager.class));
112105
this.inFlightMessages = new ConcurrentHashMap<>();
113106
this.transactionSource = new TransactionSource();
114107
this.stateManager = new ClientEntityStateManager();
115108
this.objectStoreMap = new ConcurrentHashMap<>(10240, 0.75f, 128);
116-
this.stages = mgr;
117-
}
118-
119-
private Optional<ClientMessageChannel> channel() {
120-
return Optional.ofNullable(channel.get());
121109
}
122110

123111
@Override
124112
public synchronized boolean isValid() {
125-
return !stateManager.isShutdown() && channel().map(ClientMessageChannel::isOpen).orElse(false);
113+
return !stateManager.isShutdown() && channel.isOpen();
126114
}
127115

128116
private boolean enqueueMessage(InFlightMessage msg) throws RejectedExecutionException {
@@ -261,24 +249,24 @@ private Invocation.Task invoke(EntityID eid, EntityDescriptor entityDescriptor,
261249
map.put("averagePending", inflights.sum()/msgCount.sum());
262250
map.put("averageServerWindow", addWindow.sum()/msgCount.sum());
263251
}
264-
channel().ifPresent(channel -> {
265-
Object stats = channel().map(c -> c.getAttachment("ChannelStats")).orElse(null);
266-
Map<String, Object> sub = new LinkedHashMap<>();
267-
sub.put("connection", channel().map(ClientMessageChannel::getConnectionID));
268-
sub.put("local", channel.getLocalAddress());
269-
sub.put("remote", channel.getRemoteAddress());
270-
sub.put("product", channel.getProductID());
271-
sub.put("client", channel.getClientID());
272-
if (stateManager.isShutdown()) {
273-
sub.put("pendingMessages", "<shutdown>");
274-
} else {
275-
sub.put("pendingMessages", inFlightMessages.size());
276-
}
277-
map.put("channel", sub);
278-
if (stats instanceof PrettyPrintable) {
279-
sub.put("stats", ((PrettyPrintable)stats).getStateMap());
280-
}
281-
});
252+
253+
Object stats = channel.getAttachment("ChannelStats");
254+
Map<String, Object> sub = new LinkedHashMap<>();
255+
sub.put("connection", channel.getConnectionID());
256+
sub.put("local", channel.getLocalAddress());
257+
sub.put("remote", channel.getRemoteAddress());
258+
sub.put("product", channel.getProductID());
259+
sub.put("client", channel.getClientID());
260+
if (stateManager.isShutdown()) {
261+
sub.put("pendingMessages", "<shutdown>");
262+
} else {
263+
sub.put("pendingMessages", inFlightMessages.size());
264+
}
265+
map.put("channel", sub);
266+
if (stats instanceof PrettyPrintable) {
267+
sub.put("stats", ((PrettyPrintable)stats).getStateMap());
268+
}
269+
282270
return map;
283271
}
284272

@@ -359,12 +347,6 @@ public synchronized void initializeHandshake(ClientHandshakeMessage handshakeMes
359347
handshakeMessage.addReconnectReference(context);
360348
}
361349

362-
Stage<VoltronEntityMultiResponse> responderMulti = stages.getStage(ClientConfigurationContext.VOLTRON_ENTITY_MULTI_RESPONSE_STAGE, VoltronEntityMultiResponse.class);
363-
if (!responderMulti.isEmpty()) {
364-
FlushResponse flush = new FlushResponse();
365-
responderMulti.getSink().addToSink(flush);
366-
flush.waitForAccess();
367-
}
368350
// Walk the inFlightMessages, adding them all to the handshake, since we need them to be replayed.
369351
for (InFlightMessage inFlight : this.inFlightMessages.values()) {
370352
if (inFlight.commit()) {
@@ -512,12 +494,12 @@ private Invocation.Task queueInFlightMessage(EntityID eid, Supplier<NetworkVoltr
512494
inFlight.sent();
513495
if (!inFlight.send()) {
514496
logger.debug("message not sent. Make sure resend happens " + inFlight);
515-
if (channel().map(c -> !c.getProductID().isReconnectEnabled()).orElse(false)) {
497+
if (!channel.getProductID().isReconnectEnabled()) {
516498
throwClosedExceptionOnMessage(inFlight, "connection not capable of resend");
517499
}
500+
} else {
501+
throwClosedExceptionOnMessage(inFlight, "Connection closed before sending message");
518502
}
519-
} else {
520-
throwClosedExceptionOnMessage(inFlight, "Connection closed before sending message");
521503
}
522504
return () -> {
523505
if (inFlight.cancel()) {
@@ -543,7 +525,6 @@ private NetworkVoltronEntityMessage createMessageWithoutClientInstance(EntityID
543525
}
544526

545527
private NetworkVoltronEntityMessage createMessageWithDescriptor(EntityID entityID, EntityDescriptor entityDescriptor, boolean requiresReplication, byte[] config, VoltronEntityMessage.Type type, Set<VoltronEntityMessage.Acks> acks) {
546-
ClientMessageChannel channel = channel().orElseThrow(() -> new ConnectionClosedException("Connection closed"));
547528
NetworkVoltronEntityMessage message = (NetworkVoltronEntityMessage) channel.createMessage(TCMessageType.VOLTRON_ENTITY_MESSAGE);
548529
ClientID clientID = channel.getClientID();
549530
TransactionID transactionID = transactionSource.create();

0 commit comments

Comments
 (0)