Skip to content

Commit e244d9c

Browse files
author
Myron Scott
committed
add new weighting scores to help balance threads
1 parent 443213e commit e244d9c

File tree

4 files changed

+79
-22
lines changed

4 files changed

+79
-22
lines changed

common/src/main/java/com/tc/net/core/CoreNIOServices.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.nio.channels.Selector;
4646
import java.nio.channels.ServerSocketChannel;
4747
import java.nio.channels.SocketChannel;
48+
import java.time.Duration;
4849
import java.util.ArrayList;
4950
import java.util.Iterator;
5051
import java.util.LinkedHashMap;
@@ -170,6 +171,10 @@ private void updateListenerString() {
170171
private synchronized String getListenerString() {
171172
return this.listenerString;
172173
}
174+
175+
public long getCongestionScore() {
176+
return readerComm.getCongestionScore() + writerComm.getCongestionScore();
177+
}
173178

174179
public long getTotalBytesRead() {
175180
return readerComm.getTotalBytesRead() + writerComm.getTotalBytesRead();
@@ -349,6 +354,8 @@ protected class CommThread extends Thread {
349354
private final String name;
350355
private long bytesMoved = 0;
351356
private final COMM_THREAD_MODE mode;
357+
private long congestionScore = 0;
358+
private long lastIdleNanos = System.nanoTime();
352359

353360
public CommThread(COMM_THREAD_MODE mode) {
354361
name = commThreadName + (mode == COMM_THREAD_MODE.NIO_READER ? "_R" : "_W");
@@ -610,23 +617,21 @@ private void selectLoop() throws IOException {
610617

611618
Selector localSelector = this.selector;
612619
Queue<Runnable> localSelectorTasks = this.selectorTasks;
613-
int congestionDetected = 0;
614620

615621
while (true) {
616622
final int numKeys;
617623
try {
618-
if (isReader()) {
619-
int localKeys = localSelector.selectNow();
620-
if (localKeys == 0 && localSelectorTasks.isEmpty()) {
621-
congestionDetected = 0;
624+
int localKeys = localSelector.selectNow();
625+
if (localKeys == 0 && localSelectorTasks.isEmpty()) {
626+
congestionScore = 0;
627+
lastIdleNanos = System.nanoTime();
628+
if (!isStopRequested()) {
622629
localKeys = localSelector.select();
623-
} else {
624-
congestionDetected++;
625630
}
626-
numKeys = localKeys;
627631
} else {
628-
numKeys = localSelector.select();
632+
congestionScore++;
629633
}
634+
numKeys = localKeys;
630635
} catch (IOException ioe) {
631636
throw ioe;
632637
} catch (CancelledKeyException cke) {
@@ -663,7 +668,7 @@ private void selectLoop() throws IOException {
663668
Util.selfInterruptIfNeeded(isInterrupted);
664669

665670
final Set<SelectionKey> selectedKeys = localSelector.selectedKeys();
666-
if ((0 == numKeys) && (0 == selectedKeys.size())) {
671+
if ((0 == numKeys) && (selectedKeys.isEmpty())) {
667672
continue;
668673
}
669674

@@ -719,7 +724,9 @@ private void selectLoop() throws IOException {
719724
}
720725
}
721726
} // for
722-
if (isReader() && congestionDetected > 10 && workerCommMgr != null && workerCommMgr.isUnbalanced()) {
727+
if (isReader() &&
728+
congestionScore > 100 && System.nanoTime() - lastIdleNanos > Duration.ofSeconds(2).toNanos() &&
729+
workerCommMgr != null && workerCommMgr.isOverweight(getWeight())) {
723730
for (SelectionKey key : selectedKeys) {
724731
TCConnectionImpl connect = (TCConnectionImpl)key.attachment();
725732
connect.migrate();
@@ -783,6 +790,10 @@ private void doConnect(SelectionKey key) {
783790
}
784791
}
785792

793+
public long getCongestionScore() {
794+
return congestionScore;
795+
}
796+
786797
public long getTotalBytesRead() {
787798
return this.mode == COMM_THREAD_MODE.NIO_READER ? this.bytesMoved : 0;
788799
}

common/src/main/java/com/tc/net/core/TCWorkerCommManager.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,12 @@ synchronized void waitDuringPause() throws IOException {
145145
}
146146
}
147147

148-
public boolean isUnbalanced() {
149-
int min = Integer.MAX_VALUE;
150-
int max = 0;
148+
public boolean isOverweight(int weight) {
149+
if (weight <= totalWorkerComm) return false;
151150
for (CoreNIOServices c : workerCommThreads) {
152-
int w = c.getWeight();
153-
min = Math.min(min, w);
154-
max = Math.max(max, w);
151+
if (c.getWeight() * 2 < weight && c.getCongestionScore() == 0) return true;
155152
}
156-
return (max - min) > totalWorkerComm;
153+
return false;
157154
}
158155

159156
public synchronized void pause() {

common/src/test/java/com/tc/net/core/TCWorkerCommManagerTest.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public TCWorkerCommManagerTest() {
5959
}
6060

6161
private synchronized ClientMessageTransport createClient(String clientName) {
62-
TCConnectionManager connection = new TCConnectionManagerImpl("Client-TestCommMgr", 0, new ClearTextSocketEndpointFactory());
62+
TCConnectionManager connection = new TCConnectionManagerImpl("Client-TestCommMgr-" + clientName, 0, new ClearTextSocketEndpointFactory());
6363
clientConnectionMgrs.add(connection);
6464
CommunicationsManager commsMgr = new CommunicationsManagerImpl(new NullMessageMonitor(),
6565
new TransportNetworkStackHarnessFactory(),
@@ -131,6 +131,53 @@ public void testSimpleOpenAndClose() throws Exception {
131131
connMgr.shutdown();
132132
}
133133

134+
public void testOverweight() throws Exception {
135+
TCConnectionManager connMgr = new TCConnectionManagerImpl("Target-Server-TestCommsMgr", 2, new ClearTextSocketEndpointFactory());
136+
CommunicationsManager commsMgr = new CommunicationsManagerImpl(new NullMessageMonitor(),
137+
new TransportNetworkStackHarnessFactory(),
138+
connMgr,
139+
new NullConnectionPolicy());
140+
NetworkListener listener = commsMgr.createListener(new InetSocketAddress(0), (c)->true,
141+
new DefaultConnectionIdFactory(), (MessageTransport t)->true);
142+
listener.start(Collections.<ConnectionID>emptySet());
143+
int port = listener.getBindPort();
144+
145+
ClientMessageTransport[] clients = new ClientMessageTransport[32];
146+
147+
for (int x=0;x<clients.length;x++) {
148+
clients[x] = createBasicClient("client" + x);
149+
}
150+
151+
InetSocketAddress serverAddress = InetSocketAddress.createUnresolved("localhost", port);
152+
153+
for (ClientMessageTransport t : clients) {
154+
t.open(serverAddress);
155+
}
156+
157+
waitForConnected(clients);
158+
waitForTotalWeights(commsMgr,2, 32);
159+
160+
for (int x=0;x<clients.length;x++) {
161+
if (x%2 == 0) {
162+
clients[x].close();
163+
}
164+
}
165+
166+
waitForTotalWeights(commsMgr,2, 16);
167+
168+
TCCommImpl comm = (TCCommImpl)connMgr.getTcComm();
169+
Assert.assertEquals(0, comm.getWeightForWorkerComm(0));
170+
Assert.assertEquals(16, comm.getWeightForWorkerComm(1));
171+
172+
TCConnection[] all = connMgr.getAllConnections();
173+
for (int x=0;x<all.length/2;x++) {
174+
((TCConnectionImpl)all[x]).migrate();
175+
}
176+
waitForWeight(commsMgr,0,8);
177+
Assert.assertEquals(comm.getWeightForWorkerComm(0), 8);
178+
Assert.assertEquals(comm.getWeightForWorkerComm(1), 8);
179+
}
180+
134181
public void testReaderandWriterCommThread() throws Exception {
135182
// comms manager with 4 worker comms
136183
logger.debug("Running target test");
@@ -367,8 +414,10 @@ private static void waitForWeight(final CommunicationsManager communicationsMana
367414
CallableWaiter.waitOnCallable(new Callable<Boolean>() {
368415
@Override
369416
public Boolean call() throws Exception {
370-
return ((TCCommImpl)communicationsManager.getConnectionManager()
371-
.getTcComm()).getWeightForWorkerComm(commId) == weight;
417+
int w = ((TCCommImpl)communicationsManager.getConnectionManager()
418+
.getTcComm()).getWeightForWorkerComm(commId);
419+
System.out.println("waiting for id:" + commId + " weight " + w + " expected " + weight);
420+
return w == weight;
372421
}
373422
});
374423
}

galvan-support/src/test/java/org/terracotta/testing/rules/ClientLeakIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void testHealthCheckThreadDies() throws Exception {
172172
CLUSTER.getClusterControl().startAllServers();
173173
CLUSTER.getClusterControl().waitForActive();
174174

175-
String connectionName = "LEAKTESTCLIENT";
175+
String connectionName = "LEAKTESTCLIENT2";
176176
Properties props = new Properties();
177177
props.setProperty(ConnectionPropertyNames.CONNECTION_NAME, connectionName);
178178
Connection leak = ConnectionFactory.connect(CLUSTER.getConnectionURI(), props);

0 commit comments

Comments
 (0)