diff --git a/docs/changelog/132675.yaml b/docs/changelog/132675.yaml new file mode 100644 index 0000000000000..a451a27334be7 --- /dev/null +++ b/docs/changelog/132675.yaml @@ -0,0 +1,5 @@ +pr: 132675 +summary: Add second max queue latency stat to `ClusterInfo` +area: Allocation +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 6fd3133686b64..958bfd8756dac 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -59,6 +60,8 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableSet; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.CoreMatchers.equalTo; @@ -346,6 +349,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED ) + // Manually control cluster info refreshes + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") .build(); var masterName = internalCluster().startMasterOnlyNode(settings); var dataNodeName = internalCluster().startDataOnlyNode(settings); @@ -369,11 +374,8 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { } ); - // Do some writes to create some write thread pool activity. - final String indexName = randomIdentifier(); - for (int i = 0; i < randomIntBetween(1, 1000); i++) { - index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); - } + // Generate some writes to get some non-zero write thread pool stats. + doALotOfDataNodeWrites(); // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes. final InternalClusterInfoService masterClusterInfoService = asInstanceOf( @@ -387,7 +389,7 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); - assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected var dataNodeId = getNodeId(dataNodeName); var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); assertNotNull(nodeUsageStatsForThreadPool); @@ -400,4 +402,174 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); } + + /** + * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latencies: + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and + * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis()}. The latter looks at currently queued tasks, + * and the former tracks the queue latency of tasks when they are taken off of the queue to start execution. + */ + public void testMaxQueueLatenciesInClusterInfo() throws Exception { + var settings = Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + // Manually control cluster info refreshes + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") + .build(); + var masterName = internalCluster().startMasterOnlyNode(settings); + var dataNodeName = internalCluster().startDataOnlyNode(settings); + ensureStableCluster(2); + assertEquals(internalCluster().getMasterName(), masterName); + assertNotEquals(internalCluster().getMasterName(), dataNodeName); + logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); + + // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads. + var barrier = blockDataNodeIndexing(dataNodeName); + try { + // Arbitrary number of tasks, which will queue because all the write threads are occupied already, greater than one: only + // strictly need a single task to occupy the queue. + int numberOfTasks = randomIntBetween(1, 5); + Thread[] threadsToJoin = new Thread[numberOfTasks]; + String indexName = randomIdentifier(); + createIndex( + indexName, + // NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool. + Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)).put(SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + for (int i = 0; i < numberOfTasks; ++i) { + threadsToJoin[i] = startParallelSingleWrite(indexName); + } + + // Reach into the data node's write thread pool to check that tasks have reached the queue. + var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE); + assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor; + var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor; + assertBusy( + // Wait for the parallel threads' writes to get queued in the write thread pool. + () -> assertThat( + "Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), + greaterThan(0L) + ) + ); + + // Force a refresh of the ClusterInfo state to collect fresh info from the data node. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + + // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue, which is called from the + // TransportNodeUsageStatsForThreadPoolsAction that a ClusterInfoService refresh initiates, should return a max queue + // latency > 0; + { + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data node should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThan(0L)); + } + + // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset + // by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue + // latencies will be present in the next call. There will be nothing in the queue to peek at now, so the result of the max + // queue latency result in TransportNodeUsageStatsForThreadPoolsAction will reflect + // #getMaxQueueLatencyMillisSinceLastPollAndReset and not #peekMaxQueueLatencyInQueue. + barrier.await(); + for (int i = 0; i < numberOfTasks; ++i) { + threadsToJoin[i].join(); + } + assertThat( + "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor, + trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), + equalTo(0L) + ); + + final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + { + final Map usageStatsForThreadPools = nextClusterInfo + .getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + } + } finally { + // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. If the + // callers have already all been successfully released, then there will be nothing left to interrupt. + logger.info("---> Ensuring release of the barrier on write thread pool tasks"); + barrier.reset(); + } + + // Now that there's nothing in the queue, and no activity since the last ClusterInfo refresh, the max latency returned should be + // zero. Verify this. + final InternalClusterInfoService masterClusterInfoService = asInstanceOf( + InternalClusterInfoService.class, + internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class) + ); + final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService); + { + final Map usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools(); + logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools); + assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected + var dataNodeId = getNodeId(dataNodeName); + var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId); + assertNotNull(nodeUsageStatsForThreadPool); + logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool); + + assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId()); + var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), equalTo(0f)); + assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L)); + } + } + + /** + * Do some writes to create some write thread pool activity. + */ + private void doALotOfDataNodeWrites() { + final String indexName = randomIdentifier(); + final int randomInt = randomIntBetween(500, 1000); + for (int i = 0; i < randomInt; i++) { + index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar")); + } + } + + /** + * Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually. + */ + private Thread startParallelSingleWrite(String indexName) { + Thread running = new Thread(() -> doSingleWrite(indexName)); + running.start(); + return running; + } + + private void doSingleWrite(String indexName) { + final int randomId = randomIntBetween(500, 1000); + index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar")); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java index 29bc8efbbb192..93a5c6f7dad88 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java @@ -104,7 +104,10 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation( (float) trackingForWriteExecutor.pollUtilization( TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION ), - trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset() + Math.max( + trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(), + trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis() + ) ); Map perThreadPool = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 20f480af172bd..e37f411789fd0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -151,7 +151,8 @@ public int getCurrentQueueSize() { * Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero. * Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max * latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use - * {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned. + * {@link #peekMaxQueueLatencyInQueueMillis}. If there have been no tasks in the queue since the last call, then zero latency is + * returned. */ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { if (trackMaxQueueLatency == false) { @@ -164,23 +165,29 @@ public long getMaxQueueLatencyMillisSinceLastPollAndReset() { * Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the * queue and calculates how long it has been there. Returns zero if there is no queue. */ - public long peekMaxQueueLatencyInQueue() { + public long peekMaxQueueLatencyInQueueMillis() { if (trackMaxQueueLatency == false) { return 0; } + var queue = getQueue(); - if (queue.isEmpty()) { + assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue + : "Not the type of queue expected: " + queue.getClass(); + var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue + ? (LinkedTransferQueue) queue + : (SizeBlockingQueue) queue; + + var task = linkedTransferOrSizeBlockingQueue.peek(); + if (task == null) { + // There's nothing in the queue right now. return 0; } - assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass(); - var linkedTransferQueue = (LinkedTransferQueue) queue; - var task = linkedTransferQueue.peek(); assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass(); var wrappedTask = ((WrappedRunnable) task).unwrap(); assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass(); var timedTask = (TimedRunnable) wrappedTask; - return timedTask.getTimeSinceCreationNanos(); + return TimeUnit.NANOSECONDS.toMillis(timedTask.getTimeSinceCreationNanos()); } /** diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 58ac4635b2a4e..2a8c42c95cd0e 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -1024,6 +1024,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public String toString() { + return "Info[name=" + + name + + ",type=" + + type + + ",min=" + + min + + ",max=" + + max + + ",keepAlive=" + + keepAlive + + ",queueSize=" + + queueSize + + "]"; + } + } /** diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index b4b33d1265bcb..408050b01453d 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -95,7 +95,7 @@ public void testExecutionEWMACalculation() throws Exception { /** * Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued. - * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}. + * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis}. */ public void testFrontOfQueueLatency() throws Exception { ThreadContext context = new ThreadContext(Settings.EMPTY); @@ -135,7 +135,7 @@ public void testFrontOfQueueLatency() throws Exception { logger.info("--> executor: {}", executor); // Check that the peeking at a non-existence queue returns zero. - assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue()); + assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueueMillis()); // Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has // one thread) and can be peeked at. @@ -143,10 +143,10 @@ public void testFrontOfQueueLatency() throws Exception { executor.execute(() -> {}); waitForTimeToElapse(); - var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); + var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueueMillis(); assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L)); waitForTimeToElapse(); - var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue(); + var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueueMillis(); assertThat( "Expected a second peek to report a longer duration", updatedFrontOfQueueDuration, @@ -156,7 +156,7 @@ public void testFrontOfQueueLatency() throws Exception { // Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty. safeAwait(barrier); safeAwait(barrier); - assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue()); + assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueueMillis()); } finally { ThreadPool.terminate(executor, 10, TimeUnit.SECONDS); } @@ -463,8 +463,8 @@ long getQueueTimeNanos() { */ private static void waitForTimeToElapse() throws InterruptedException { final var startNanoTime = System.nanoTime(); - while ((System.nanoTime() - startNanoTime) < 1) { - Thread.sleep(Duration.ofNanos(1)); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) < 1) { + Thread.sleep(Duration.ofMillis(1)); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 5c2f9646f03b3..7b36165a4f5a3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -163,6 +163,7 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; @@ -204,11 +205,14 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -2915,4 +2919,39 @@ protected static void deletePipeline(String id) { ) ); } + + /** + * Submits as many tasks to the given data node's write thread pool as there are write threads. These tasks will wait on the barrier + * that is returned, which waits for total-write-threads + 1 callers. The caller can release the tasks by calling + * {@code barrier.await()} or interrupt them with {@code barrier.reset()}. + */ + public CyclicBarrier blockDataNodeIndexing(String dataNodeName) { + // Block the executor workers to simulate long-running write tasks + var threadpool = internalCluster().getInstance(ThreadPool.class, dataNodeName); + var executor = threadpool.executor(ThreadPool.Names.WRITE); + final var executorInfo = threadpool.info(ThreadPool.Names.WRITE); + final var executorThreads = executorInfo.getMax(); + var barrier = new CyclicBarrier(executorThreads + 1); + for (int i = 0; i < executorThreads; i++) { + executor.execute(() -> longAwait(barrier)); + } + logger.info( + "---> Submitted [" + + executorThreads + + "] tasks to the write thread pool that will wait on a barrier until released. Write thread pool info: " + + executorInfo + ); + return barrier; + } + + private static void longAwait(CyclicBarrier barrier) { + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (BrokenBarrierException | TimeoutException e) { + throw new AssertionError(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + } }