Skip to content

Commit 6b2ec4e

Browse files
author
Myron Scott
authored
Merge pull request #1336 from myronkscott/relay
Add relay state and operation to server
2 parents 4b98a79 + d35fb5d commit 6b2ec4e

File tree

64 files changed

+2363
-500
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2363
-500
lines changed

common/src/main/java/com/tc/net/basic/BasicConnection.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@
6060
import com.tc.net.core.SocketEndpoint;
6161
import com.tc.net.core.SocketEndpointFactory;
6262
import com.tc.net.core.TCSocketEndpointReader;
63+
import java.util.Collections;
64+
import java.util.HashSet;
65+
import java.util.Set;
6366

6467
/**
6568
*
@@ -79,7 +82,7 @@ public class BasicConnection implements TCConnection {
7982
private volatile Socket src;
8083
private boolean established = false;
8184
private boolean connected = false;
82-
private final List<TCConnectionEventListener> listeners = new ArrayList<>();
85+
private final Set<TCConnectionEventListener> listeners = Collections.synchronizedSet(new HashSet<>());
8386
private volatile Thread serviceThread;
8487
private volatile ExecutorService readerExec;
8588
private final String id;
@@ -161,17 +164,13 @@ void markReceived() {
161164
}
162165

163166
@Override
164-
public synchronized void addListener(TCConnectionEventListener listener) {
165-
if (!listeners.contains(listener)) {
166-
listeners.add(listener);
167-
}
167+
public synchronized boolean addListener(TCConnectionEventListener listener) {
168+
return listeners.add(listener);
168169
}
169170

170171
@Override
171-
public synchronized void removeListener(TCConnectionEventListener listener) {
172-
if (listeners.contains(listener)) {
173-
listeners.remove(listener);
174-
}
172+
public synchronized boolean removeListener(TCConnectionEventListener listener) {
173+
return listeners.remove(listener);
175174
}
176175

177176
@Override

common/src/main/java/com/tc/net/core/CoreNIOServices.java

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.nio.channels.Selector;
4646
import java.nio.channels.ServerSocketChannel;
4747
import java.nio.channels.SocketChannel;
48+
import java.time.Duration;
4849
import java.util.ArrayList;
4950
import java.util.Iterator;
5051
import java.util.LinkedHashMap;
@@ -170,6 +171,10 @@ private void updateListenerString() {
170171
private synchronized String getListenerString() {
171172
return this.listenerString;
172173
}
174+
175+
public long getCongestionScore() {
176+
return readerComm.getCongestionScore() + writerComm.getCongestionScore();
177+
}
173178

174179
public long getTotalBytesRead() {
175180
return readerComm.getTotalBytesRead() + writerComm.getTotalBytesRead();
@@ -252,23 +257,33 @@ public void addConnection(TCConnectionImpl connection, SocketChannel channel) {
252257
// MainComm Thread
253258
if (workerCommMgr == null) { return; }
254259

255-
readerComm.unregister(channel);
256260
final CoreNIOServices workerComm = workerCommMgr.getNextWorkerComm();
257-
connection.setCommWorker(workerComm);
258-
workerComm.addConnection(connection);
259-
workerComm.requestReadWriteInterest(connection, channel);
261+
try {
262+
if (connection.setCommWorker(workerComm)) {
263+
readerComm.unregister(channel);
264+
if (connection.removeListener(this)) {
265+
this.clientWeights.decrementAndGet();
266+
}
267+
268+
workerComm.addConnection(connection);
269+
workerComm.requestReadWriteInterest(connection, channel);
270+
}
271+
} finally {
272+
workerComm.deselectForWeighting();
273+
}
260274
}
261275

262276
private void addConnection(TCConnectionImpl connection) {
263-
this.clientWeights.incrementAndGet();
264-
deselectForWeighting();
265-
connection.addListener(this);
277+
if (connection.addListener(this)) {
278+
this.clientWeights.incrementAndGet();
279+
}
266280
}
267281

268282
@Override
269283
public void closeEvent(TCConnectionEvent event) {
284+
if (event.getSource().removeListener(this)) {
270285
this.clientWeights.decrementAndGet();
271-
event.getSource().removeListener(this);
286+
}
272287
}
273288

274289
@Override
@@ -339,6 +354,8 @@ protected class CommThread extends Thread {
339354
private final String name;
340355
private long bytesMoved = 0;
341356
private final COMM_THREAD_MODE mode;
357+
private long congestionScore = 0;
358+
private long lastIdleNanos = System.nanoTime();
342359

343360
public CommThread(COMM_THREAD_MODE mode) {
344361
name = commThreadName + (mode == COMM_THREAD_MODE.NIO_READER ? "_R" : "_W");
@@ -483,7 +500,7 @@ void cleanupChannel(final Channel ch, final Runnable callback) {
483500

484501
if (Thread.currentThread() != this) {
485502
if (logger.isDebugEnabled()) {
486-
logger.debug("queue'ing channel close operation");
503+
logger.debug("queue'ing channel close operation " + getName());
487504
}
488505

489506
addSelectorTask(new Runnable() {
@@ -557,6 +574,7 @@ public void run() {
557574
// this is just a catch all to make sure that no exceptions will be thrown by this method, please do not remove
558575
logger.error("Unhandled exception in cleanupChannel()", e);
559576
} finally {
577+
logger.debug("cleanup runner for " + getName());
560578
try {
561579
if (callback != null) {
562580
callback.run();
@@ -603,7 +621,17 @@ private void selectLoop() throws IOException {
603621
while (true) {
604622
final int numKeys;
605623
try {
606-
numKeys = localSelector.select();
624+
int localKeys = localSelector.selectNow();
625+
if (localKeys == 0 && localSelectorTasks.isEmpty()) {
626+
congestionScore = 0;
627+
lastIdleNanos = System.nanoTime();
628+
if (!isStopRequested()) {
629+
localKeys = localSelector.select();
630+
}
631+
} else {
632+
congestionScore++;
633+
}
634+
numKeys = localKeys;
607635
} catch (IOException ioe) {
608636
throw ioe;
609637
} catch (CancelledKeyException cke) {
@@ -640,7 +668,7 @@ private void selectLoop() throws IOException {
640668
Util.selfInterruptIfNeeded(isInterrupted);
641669

642670
final Set<SelectionKey> selectedKeys = localSelector.selectedKeys();
643-
if ((0 == numKeys) && (0 == selectedKeys.size())) {
671+
if ((0 == numKeys) && (selectedKeys.isEmpty())) {
644672
continue;
645673
}
646674

@@ -676,7 +704,7 @@ private void selectLoop() throws IOException {
676704

677705
if (key.isValid() && !isReader() && key.isWritable()) {
678706
long written = ((TCChannelWriter) key.attachment()).doWrite();
679-
bytesMoved += written;
707+
bytesMoved += written;
680708
}
681709
} catch (CancelledKeyException cke) {
682710
logger.debug("selection key cancelled key@" + key.hashCode());
@@ -694,9 +722,16 @@ private void selectLoop() throws IOException {
694722
TCListenerImpl lsnr = (TCListenerImpl) key.attachment();
695723
// just log
696724
}
697-
698725
}
699726
} // for
727+
if (isReader() &&
728+
congestionScore > 100 && System.nanoTime() - lastIdleNanos > Duration.ofSeconds(2).toNanos() &&
729+
workerCommMgr != null && workerCommMgr.isOverweight(getWeight())) {
730+
for (SelectionKey key : selectedKeys) {
731+
TCConnectionImpl connect = (TCConnectionImpl)key.attachment();
732+
connect.migrate();
733+
}
734+
}
700735
} // while (true)
701736
}
702737

@@ -755,6 +790,10 @@ private void doConnect(SelectionKey key) {
755790
}
756791
}
757792

793+
public long getCongestionScore() {
794+
return congestionScore;
795+
}
796+
758797
public long getTotalBytesRead() {
759798
return this.mode == COMM_THREAD_MODE.NIO_READER ? this.bytesMoved : 0;
760799
}

common/src/main/java/com/tc/net/core/TCConnectionImpl.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,7 @@
5555
import java.util.ArrayList;
5656
import java.util.Date;
5757
import java.util.LinkedHashMap;
58-
import java.util.List;
5958
import java.util.Map;
60-
import java.util.concurrent.CopyOnWriteArrayList;
6159
import java.util.concurrent.atomic.AtomicBoolean;
6260
import java.util.concurrent.atomic.AtomicLong;
6361
import java.util.concurrent.atomic.LongAdder;
@@ -68,7 +66,9 @@
6866
import java.util.concurrent.ConcurrentLinkedQueue;
6967
import com.tc.net.protocol.tcm.TCActionNetworkMessage;
7068
import java.util.Collections;
69+
import java.util.Set;
7170
import java.util.concurrent.CompletableFuture;
71+
import java.util.concurrent.ConcurrentHashMap;
7272
import java.util.concurrent.ExecutionException;
7373
import java.util.concurrent.Future;
7474
import java.util.concurrent.locks.ReentrantLock;
@@ -103,7 +103,7 @@ final class TCConnectionImpl implements TCConnection, TCChannelReader, TCChannel
103103
private final AtomicLong lastDataReceiveTime = new AtomicLong(System.currentTimeMillis());
104104
private final LongAdder messagesRead = new LongAdder();
105105
private final AtomicLong connectTime = new AtomicLong(NO_CONNECT_TIME);
106-
private final List<TCConnectionEventListener> eventListeners = new CopyOnWriteArrayList<>();
106+
private final Set<TCConnectionEventListener> eventListeners = ConcurrentHashMap.newKeySet();
107107
private final TCProtocolAdaptor protocolAdaptor;
108108
private final AtomicBoolean isSocketEndpoint = new AtomicBoolean(false);
109109
private final SetOnceFlag closed = new SetOnceFlag();
@@ -201,8 +201,13 @@ final class TCConnectionImpl implements TCConnection, TCChannelReader, TCChannel
201201
return state;
202202
}
203203

204-
public void setCommWorker(CoreNIOServices worker) {
205-
this.commWorker = worker;
204+
public boolean setCommWorker(CoreNIOServices worker) {
205+
if (this.commWorker != worker) {
206+
this.commWorker = worker;
207+
return true;
208+
} else {
209+
return false;
210+
}
206211
}
207212

208213
private Future<Void> closeImpl(Runnable callback) {
@@ -541,6 +546,7 @@ private Runnable createCloseCallback() {
541546
return new Runnable() {
542547
@Override
543548
public void run() {
549+
logger.debug("running close callback");
544550
setConnected(false);
545551
TCConnectionImpl.this.parent.connectionClosed(TCConnectionImpl.this);
546552

@@ -610,15 +616,15 @@ public final String toString() {
610616
}
611617

612618
@Override
613-
public final void addListener(TCConnectionEventListener listener) {
614-
if (listener == null) { return; }
615-
this.eventListeners.add(listener); // don't need sync
619+
public final boolean addListener(TCConnectionEventListener listener) {
620+
if (listener == null) { return false; }
621+
return this.eventListeners.add(listener);
616622
}
617623

618624
@Override
619-
public final void removeListener(TCConnectionEventListener listener) {
620-
if (listener == null) { return; }
621-
this.eventListeners.remove(listener); // don't need sync
625+
public final boolean removeListener(TCConnectionEventListener listener) {
626+
if (listener == null) { return false; }
627+
return this.eventListeners.remove(listener);
622628
}
623629

624630
@Override
@@ -889,6 +895,16 @@ public void setTransportEstablished() {
889895
this.commWorker.addConnection(this, this.channel);
890896
this.transportEstablished.set(true);
891897
}
898+
899+
public void migrate() {
900+
if (this.commWorker.getReaderComm() == Thread.currentThread()) {
901+
this.commWorker.addConnection(this, this.channel);
902+
} else {
903+
this.commWorker.getReaderComm().addSelectorTask(()-> {
904+
this.commWorker.addConnection(this, this.channel);
905+
});
906+
}
907+
}
892908

893909
@Override
894910
public boolean isTransportEstablished() {

common/src/main/java/com/tc/net/core/TCConnectionManagerImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class TCConnectionManagerImpl implements TCConnectionManager {
6262
protected static final Logger logger = LoggerFactory.getLogger(TCConnectionManager.class);
6363

6464
private final TCCommImpl comm;
65-
private final Set<TCConnection> connections = new HashSet<>();
65+
private final Set<TCConnectionImpl> connections = new HashSet<>();
6666
private final Set<TCListener> listeners = new HashSet<>();
6767
private final SetOnceFlag shutdown = new SetOnceFlag();
6868
private final ConnectionEvents connEvents;
@@ -102,7 +102,7 @@ public TCConnectionManagerImpl(String name, int workerCommCount, SocketEndpointF
102102
return state;
103103
}
104104

105-
protected TCConnection createConnectionImpl(TCProtocolAdaptor adaptor, TCConnectionEventListener listener) {
105+
protected TCConnectionImpl createConnectionImpl(TCProtocolAdaptor adaptor, TCConnectionEventListener listener) {
106106
return new TCConnectionImpl(listener, adaptor, this, comm.nioServiceThreadForNewConnection(), socketParams, socketEndpointFactory);
107107
}
108108

@@ -170,7 +170,7 @@ public final TCListener createListener(InetSocketAddress addr, ProtocolAdaptorFa
170170
public final TCConnection createConnection(TCProtocolAdaptor adaptor) throws IOException {
171171
checkShutdown();
172172

173-
TCConnection rv = createConnectionImpl(adaptor, connEvents);
173+
TCConnectionImpl rv = createConnectionImpl(adaptor, connEvents);
174174
newConnection(rv);
175175

176176
return rv;
@@ -249,7 +249,7 @@ void connectionClosed(TCConnection conn) {
249249
}
250250
}
251251

252-
void newConnection(TCConnection conn) {
252+
void newConnection(TCConnectionImpl conn) {
253253
synchronized (connections) {
254254
connections.add(conn);
255255
}
@@ -329,4 +329,8 @@ TCDirectByteBufferCache getBufferCache() {
329329
public int getBufferCount() {
330330
return buffers.referenced();
331331
}
332+
333+
void distribute() {
334+
connections.forEach(TCConnectionImpl::migrate);
335+
}
332336
}

common/src/main/java/com/tc/net/core/TCWorkerCommManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,14 @@ synchronized void waitDuringPause() throws IOException {
145145
}
146146
}
147147

148+
public boolean isOverweight(int weight) {
149+
if (weight <= totalWorkerComm) return false;
150+
for (CoreNIOServices c : workerCommThreads) {
151+
if (c.getWeight() * 2 < weight && c.getCongestionScore() == 0) return true;
152+
}
153+
return false;
154+
}
155+
148156
public synchronized void pause() {
149157
paused = true;
150158
}

common/src/main/java/com/tc/net/core/event/TCConnectionEventCaller.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.tc.net.protocol.TCNetworkMessage;
2525
import com.tc.util.concurrent.SetOnceFlag;
2626

27-
import java.util.List;
27+
import java.util.Collection;
2828

2929
// calls each event only once
3030
public class TCConnectionEventCaller {
@@ -44,36 +44,36 @@ public TCConnectionEventCaller(Logger logger) {
4444
this.logger = logger;
4545
}
4646

47-
public void fireErrorEvent(List<TCConnectionEventListener> eventListeners, TCConnection conn, Exception exception,
47+
public void fireErrorEvent(Collection<TCConnectionEventListener> eventListeners, TCConnection conn, Exception exception,
4848
TCNetworkMessage context) {
4949
if (errorEvent.attemptSet()) {
5050
final TCConnectionErrorEvent event = new TCConnectionErrorEvent(conn, exception, context);
5151
fireEvent(eventListeners, event, logger, ERROR);
5252
}
5353
}
5454

55-
public void fireConnectEvent(List<TCConnectionEventListener> eventListeners, TCConnection conn) {
55+
public void fireConnectEvent(Collection<TCConnectionEventListener> eventListeners, TCConnection conn) {
5656
if (connectEvent.attemptSet()) {
5757
final TCConnectionEvent event = new TCConnectionEvent(conn);
5858
fireEvent(eventListeners, event, logger, CONNECT);
5959
}
6060
}
6161

62-
public void fireEndOfFileEvent(List<TCConnectionEventListener> eventListeners, TCConnection conn) {
62+
public void fireEndOfFileEvent(Collection<TCConnectionEventListener> eventListeners, TCConnection conn) {
6363
if (eofEvent.attemptSet()) {
6464
final TCConnectionEvent event = new TCConnectionEvent(conn);
6565
fireEvent(eventListeners, event, logger, EOF);
6666
}
6767
}
6868

69-
public void fireCloseEvent(List<TCConnectionEventListener> eventListeners, TCConnection conn) {
69+
public void fireCloseEvent(Collection<TCConnectionEventListener> eventListeners, TCConnection conn) {
7070
if (closeEvent.attemptSet()) {
7171
final TCConnectionEvent event = new TCConnectionEvent(conn);
7272
fireEvent(eventListeners, event, logger, CLOSE);
7373
}
7474
}
7575

76-
private static void fireEvent(List<TCConnectionEventListener> eventListeners, TCConnectionEvent event, Logger logger, int type) {
76+
private static void fireEvent(Collection<TCConnectionEventListener> eventListeners, TCConnectionEvent event, Logger logger, int type) {
7777
for (TCConnectionEventListener listener : eventListeners) {
7878
try {
7979
switch (type) {

0 commit comments

Comments
 (0)