Skip to content

Commit 1deb0ab

Browse files
committed
+ pool-size-test
1 parent daf214d commit 1deb0ab

File tree

4 files changed

+140
-92
lines changed

4 files changed

+140
-92
lines changed

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.HashMap;
3838
import java.util.HashSet;
3939
import java.util.Iterator;
40+
import java.util.LinkedHashSet;
4041
import java.util.LinkedList;
4142
import java.util.List;
4243
import java.util.Map;
@@ -220,14 +221,20 @@ class ServerImpl extends TcpDiscoveryImpl {
220221
/** Maximal interval of connection check to next node in the ring. */
221222
private static final long MAX_CON_CHECK_INTERVAL = 500;
222223

223-
/** Minimal size of pool to ping remote DC at the connection recovery. */
224-
private static final int RMT_DC_PING_MIN_POOL_SIZE = Math.max(8, Runtime.getRuntime().availableProcessors() / 2);
224+
/**
225+
* @see #connCheckTick
226+
* @see #effectiveExchangeTimeout()
227+
*/
228+
private static final int CONNECTION_RECOVERY_TICKS = 3;
225229

226230
/**
227231
* Part of the connection recovery timeout to ping remote DC. We can't spend the whole timeout.
228232
* We need some time reserved to close the ring to local DC.
233+
*
234+
* @see #CONNECTION_RECOVERY_TICKS
235+
* @see #connCheckTick
229236
*/
230-
private static final double RMT_DC_PING_RECOVER_TIMEOUT_RATIO = 0.65;
237+
private static final double RMT_DC_PING_TIMEOUT_RATIO = 0.5;
231238

232239
/** Number of retries to ping remote DC. Must not be less than 2. */
233240
private static final int RMT_DC_PING_TRIES = 3;
@@ -238,12 +245,20 @@ class ServerImpl extends TcpDiscoveryImpl {
238245
/** Interval of checking connection to next node in the ring. */
239246
private long connCheckInterval;
240247

241-
/** Fundamental value for connection checking actions. */
248+
/**
249+
* Foundumental timeout tick for actions related to the ring connection recovery. Often, we cannot use whole
250+
* connection recovery timeout for a one single action. We may have to check several nodes in a row.
251+
* The less this value, the more connection checking actions we can do within the connection recovery timeout.
252+
* The more nodes we can check. But the less timeout we have for each such an action.
253+
*/
242254
private long connCheckTick;
243255

244256
/** */
245257
private final IgniteThreadPoolExecutor utilityPool;
246258

259+
/** Pool size to ping remote DC if a corner node loses the ring connection. */
260+
private final int pingRmtDcPoolSz;
261+
247262
/** Nodes ring. */
248263
@GridToStringExclude
249264
private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@ -343,9 +358,11 @@ class ServerImpl extends TcpDiscoveryImpl {
343358
/**
344359
* @param adapter Adapter.
345360
*/
346-
ServerImpl(TcpDiscoverySpi adapter, int utilityPoolSize) {
361+
ServerImpl(TcpDiscoverySpi adapter, int utilityPoolSize, int pingRmtDcPoolSize) {
347362
super(adapter);
348363

364+
// Using a size changing pool with a non-limited task queue is useless because it prefers to put task in a queue
365+
// instead of creating a new thread.
349366
utilityPool = new IgniteThreadPoolExecutor("disco-pool",
350367
spi.ignite().name(),
351368
utilityPoolSize,
@@ -362,6 +379,8 @@ class ServerImpl extends TcpDiscoveryImpl {
362379

363380
cliConnEnabled = props.get(0);
364381
srvConnEnabled = props.get(1);
382+
383+
pingRmtDcPoolSz = pingRmtDcPoolSize;
365384
}
366385

367386
/** {@inheritDoc} */
@@ -429,8 +448,7 @@ class ServerImpl extends TcpDiscoveryImpl {
429448

430449
lastRingMsgSentTime = 0;
431450

432-
// Foundumental timeout value for actions related to connection check.
433-
connCheckTick = effectiveExchangeTimeout() / 3;
451+
connCheckTick = effectiveExchangeTimeout() / CONNECTION_RECOVERY_TICKS;
434452

435453
// Since we take in account time of last sent message, the interval should be quite short to give enough piece
436454
// of failure detection timeout as send-and-acknowledge timeout of the message to send.
@@ -3294,12 +3312,13 @@ private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
32943312

32953313
sendMessageToClients(msg);
32963314

3297-
List<TcpDiscoveryNode> failedNodes;
3315+
// Must sequential, a list or a LinkedHashSet, etc.
3316+
Collection<TcpDiscoveryNode> failedNodes;
32983317

32993318
TcpDiscoverySpiState state;
33003319

33013320
synchronized (mux) {
3302-
failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet());
3321+
failedNodes = new LinkedHashSet<>(ServerImpl.this.failedNodes.keySet());
33033322

33043323
state = spiState;
33053324
}
@@ -3438,7 +3457,7 @@ else if (!sndState.isStartingPoint())
34383457
boolean previousNode = sndState.markLastFailedNodeAlive();
34393458

34403459
if (previousNode)
3441-
failedNodes.remove(failedNodes.size() - 1);
3460+
failedNodes.remove(F.last(failedNodes));
34423461
else {
34433462
newNextNode = false;
34443463

@@ -3524,7 +3543,7 @@ else if (!sndState.isStartingPoint())
35243543
debugLog(msg, "Initialized connection with next node: " + next.id());
35253544

35263545
errs = null;
3527-
sndState = null;
3546+
35283547
success = true;
35293548

35303549
next.lastSuccessfulAddress(addr);
@@ -3800,7 +3819,7 @@ else if (!failedNextNode && sndState != null && sndState.isBackward()) {
38003819
", next=" + next + ']');
38013820

38023821
if (prev)
3803-
failedNodes.remove(failedNodes.size() - 1);
3822+
failedNodes.remove(F.last(failedNodes));
38043823
else {
38053824
newNextNode = false;
38063825

@@ -8248,7 +8267,13 @@ private class CrossRingMessageSendState {
82488267
/** Keeps number of attempt to ping node. Negative value means that node successfully responded. */
82498268
@Nullable private Map<TcpDiscoveryNode, Integer> rmtDcPingRes;
82508269

8251-
/** Thread pool to ping remote DC. */
8270+
/**
8271+
* Thread pool to ping remote DC. We do not use {@link #utilityPool} because we need significantly more threads
8272+
* to ping a remote DC during the connection recovery. The thread pools here come with an unlimited task queue.
8273+
* With such a task queue, thread pools prefer putting a task in its queue instead of creating a new worker thread.
8274+
* To utilize more threads we have to keep the core pool size large enough. But we don't need a wide discovery
8275+
* thread pool for ordinary, typical tasks.
8276+
*/
82528277
@Nullable private volatile ThreadPoolExecutor rmtDcPingPool;
82538278

82548279
/** Stop remote DC ping flag. */
@@ -8322,35 +8347,30 @@ private boolean markLastFailedNodeAlive() {
83228347

83238348
/** */
83248349
private void pingRemoteDC(List<TcpDiscoveryNode> nodesToPing) {
8325-
rmtDcPingMaxTimeNs = System.nanoTime()
8326-
+ (long)((failTimeNanos - System.nanoTime()) * RMT_DC_PING_RECOVER_TIMEOUT_RATIO);
8350+
rmtDcPingMaxTimeNs = System.nanoTime() + (long)((failTimeNanos - System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO);
83278351

83288352
assert !remoteDcPingStarted();
83298353
assert !stopRmtDcPing;
83308354
assert !F.isEmpty(nodesToPing);
83318355

8332-
int poolSz = Math.max(RMT_DC_PING_MIN_POOL_SIZE, Runtime.getRuntime().availableProcessors()) / 2;
8333-
83348356
AtomicInteger thrdIdx = new AtomicInteger();
83358357

8336-
rmtDcPingPool = new ThreadPoolExecutor(poolSz, poolSz, 0, TimeUnit.MILLISECONDS,
8358+
rmtDcPingPool = new ThreadPoolExecutor(pingRmtDcPoolSz, pingRmtDcPoolSz, 0, TimeUnit.MILLISECONDS,
83378359
new LinkedBlockingQueue<>(), r -> {
8338-
Thread t = new Thread(r, "disco-remote-dc-ping-worker-" + thrdIdx.incrementAndGet());
8339-
8360+
Thread t = new Thread(r, "disco-remote-dc-ping-worker-" + thrdIdx.getAndIncrement());
83408361
t.setDaemon(true);
8341-
83428362
return t;
83438363
});
83448364

83458365
if (log.isInfoEnabled()) {
8346-
log.info("During the connection recovery, starting ping of DC '"
8347-
+ nodesToPing.get(0).dataCenterId() + "'. Nodes number to ping: " + nodesToPing.size()
8348-
+ ". Timeout: " + U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
8366+
log.info("During the connection recovery, starting ping of DC '" + nodesToPing.get(0).dataCenterId()
8367+
+ "'. Nodes number to ping: " + nodesToPing.size() + ". Timeout: "
8368+
+ U.nanosToMillis(rmtDcPingMaxTimeNs - System.nanoTime()) + "ms.");
83498369
}
83508370

83518371
rmtDcPingRes = new ConcurrentHashMap<>(nodesToPing.size(), 1.0f);
83528372

8353-
// Merk every node to ping with 0 attempts.
8373+
// Fill the map and mark every node to ping with 0 attempts.
83548374
nodesToPing.forEach(n -> rmtDcPingRes.put(n, 0));
83558375

83568376
// We should assume that the ping won't receive any response or exception and would spend its whole timeout.
@@ -8383,7 +8403,7 @@ private synchronized void scheduleNodePingJob(TcpDiscoveryNode node, int steps)
83838403
/** */
83848404
private void pingNodeJob(TcpDiscoveryNode node, int steps) {
83858405
// Total allowed ping timeout per step.
8386-
double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) * RMT_DC_PING_RECOVER_TIMEOUT_RATIO) / steps;
8406+
double stepTimeoutNs = ((failTimeNanos - System.nanoTime()) * RMT_DC_PING_TIMEOUT_RATIO) / steps;
83878407

83888408
long attemptDelayNs = (long)(stepTimeoutNs * RMT_DC_PING_ATTEMPT_DELAY_RATIO) / (RMT_DC_PING_TRIES - 1);
83898409

@@ -8405,12 +8425,9 @@ private void pingNodeJob(TcpDiscoveryNode node, int steps) {
84058425
Thread.sleep(U.nanosToMillis(attemptDelayNs));
84068426

84078427
if (log.isDebugEnabled()) {
8408-
log.debug("During the connection recovery, pinging " + node
8409-
+ " of DC '" + node.dataCenterId()
8410-
+ "'. Attempt: " + attempt
8411-
+ ". Timeout per address (ms): " + addrsTimeoutMs
8412-
+ ". Total time left (ms): " + remoteDcPingTimeLeft()
8413-
+ ". Nodes to ping left: " + nodesToPingLeft() + '.');
8428+
log.debug("During the connection recovery, pinging " + node + " of DC '" + node.dataCenterId()
8429+
+ "'. Attempt: " + attempt + ". Timeout per address (ms): " + addrsTimeoutMs
8430+
+ ". Total time left (ms): " + remoteDcPingTimeLeft() + ". Nodes to ping left: " + nodesToPingLeft() + '.');
84148431
}
84158432

84168433
for (InetSocketAddress addrs : nodeAddrs) {
@@ -8426,9 +8443,8 @@ private void pingNodeJob(TcpDiscoveryNode node, int steps) {
84268443
rmtDcPingRes.put(node, -1);
84278444

84288445
if (log.isDebugEnabled()) {
8429-
log.debug("During the connection recovery, node " + node
8430-
+ " of DC '" + node.dataCenterId() + "' has responded to the ping " +
8431-
"at attempt " + attempt + '.');
8446+
log.debug("During the connection recovery, node " + node + " of DC '" + node.dataCenterId()
8447+
+ "' has responded to the ping at attempt " + attempt + '.');
84328448
}
84338449

84348450
// At least one node's address reached.
@@ -8445,9 +8461,9 @@ private void pingNodeJob(TcpDiscoveryNode node, int steps) {
84458461

84468462
if (rmtDcPingRes.get(node) > 0) {
84478463
if (log.isDebugEnabled()) {
8448-
log.debug("During the connection recovery, node " + node
8449-
+ " of DC '" + node.dataCenterId() + "' didn't respond to the ping"
8450-
+ " at attempt " + attempt + " within the timeout " + addrsTimeoutMs + "ms.");
8464+
log.debug("During the connection recovery, node " + node + " of DC '" + node.dataCenterId()
8465+
+ "' didn't respond to the ping at attempt " + attempt + " within the timeout " + addrsTimeoutMs
8466+
+ "ms.");
84518467
}
84528468

84538469
if (!remoteDcPingStopped() && attempt < RMT_DC_PING_TRIES)

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery
303303
public static final int DFLT_DISCOVERY_METRICS_QNT_WARN = 500;
304304

305305
/** */
306-
private static final int DFLT_UTLITY_POOL_SIZE = 2;
306+
public static final int DFLT_UTLITY_POOL_SIZE = 2;
307+
308+
/** Pool size to ping remote DC at the connection recovery. */
309+
public static final int DFLT_RMT_DC_PING_POOL_SIZE = Math.max(8, Runtime.getRuntime().availableProcessors() / 2);
307310

308311
/** Ssl message pattern for StreamCorruptedException. */
309312
private static Pattern sslMsgPattern = Pattern.compile("invalid stream header: 150\\d0\\d00");
@@ -2156,7 +2159,7 @@ protected void initializeImpl() {
21562159
if (sockTimeout == 0)
21572160
sockTimeout = DFLT_SOCK_TIMEOUT;
21582161

2159-
impl = new ServerImpl(this, DFLT_UTLITY_POOL_SIZE);
2162+
impl = new ServerImpl(this, DFLT_UTLITY_POOL_SIZE, DFLT_RMT_DC_PING_POOL_SIZE);
21602163
}
21612164

21622165
metricsUpdateFreq = ignite.configuration().getMetricsUpdateFrequency();

0 commit comments

Comments
 (0)