From ae23a2208a704309b78ee53d3ad747f16cdd23f7 Mon Sep 17 00:00:00 2001 From: Gertjan Al <12525070+gertjanal@users.noreply.github.com> Date: Sun, 23 Nov 2025 21:34:46 +0100 Subject: [PATCH] Add metrics for active worker and active coordinator counts --- .../src/main/java/io/trino/node/AllNodes.java | 4 +++- .../io/trino/node/CoordinatorNodeManager.java | 18 +++++++++++++++++- .../main/java/io/trino/node/InternalNode.java | 13 +++++++++++++ .../trino/node/TestingInternalNodeManager.java | 9 ++++++++- .../java/io/trino/server/ServerMainModule.java | 4 +++- core/trino-spi/pom.xml | 6 ++++++ .../src/main/java/io/trino/spi/Node.java | 2 ++ .../trino/spooling/filesystem/TestingNode.java | 6 ++++++ 8 files changed, 58 insertions(+), 4 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/node/AllNodes.java b/core/trino-main/src/main/java/io/trino/node/AllNodes.java index 392a5ee14cf2..035330340d30 100644 --- a/core/trino-main/src/main/java/io/trino/node/AllNodes.java +++ b/core/trino-main/src/main/java/io/trino/node/AllNodes.java @@ -25,7 +25,8 @@ public record AllNodes( Set drainingNodes, Set drainedNodes, Set shuttingDownNodes, - Set activeCoordinators) + Set activeCoordinators, + Set activeWorkers) { public AllNodes { @@ -35,5 +36,6 @@ public record AllNodes( drainingNodes = ImmutableSet.copyOf(requireNonNull(drainingNodes, "drainingNodes is null")); shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null")); activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null")); + activeWorkers = ImmutableSet.copyOf(requireNonNull(activeWorkers, "activeWorkers is null")); } } diff --git a/core/trino-main/src/main/java/io/trino/node/CoordinatorNodeManager.java b/core/trino-main/src/main/java/io/trino/node/CoordinatorNodeManager.java index 1b11984a8c30..076815fd37f4 100644 --- a/core/trino-main/src/main/java/io/trino/node/CoordinatorNodeManager.java +++ b/core/trino-main/src/main/java/io/trino/node/CoordinatorNodeManager.java @@ -236,7 +236,11 @@ private synchronized void refreshNodesInternal() .filter(InternalNode::isCoordinator) .collect(toImmutableSet()); - AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators); + Set workers = activeNodes.stream() + .filter(InternalNode::isWorker) + .collect(toImmutableSet()); + + AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators, workers); // only update if all nodes actually changed (note: this does not include the connectors registered with the nodes) if (!allNodes.equals(this.allNodes)) { // assign allNodes to a local variable for use in the callback below @@ -284,6 +288,18 @@ public int getShuttingDownNodeCount() return getAllNodes().shuttingDownNodes().size(); } + @Managed + public int getActiveCoordinatorCount() + { + return getAllNodes().activeCoordinators().size(); + } + + @Managed + public int getActiveWorkerCount() + { + return getAllNodes().activeWorkers().size(); + } + @VisibleForTesting synchronized Set getInvalidNodes() { diff --git a/core/trino-main/src/main/java/io/trino/node/InternalNode.java b/core/trino-main/src/main/java/io/trino/node/InternalNode.java index 954d27edd6b7..2a126ad21b0f 100644 --- a/core/trino-main/src/main/java/io/trino/node/InternalNode.java +++ b/core/trino-main/src/main/java/io/trino/node/InternalNode.java @@ -41,15 +41,22 @@ public class InternalNode private final URI internalUri; private final NodeVersion nodeVersion; private final boolean coordinator; + private final boolean worker; private final long longHashCode; public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator) + { + this(nodeIdentifier, internalUri, nodeVersion, coordinator, !coordinator); + } + + public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator, boolean worker) { nodeIdentifier = emptyToNull(nullToEmpty(nodeIdentifier).trim()); this.nodeIdentifier = requireNonNull(nodeIdentifier, "nodeIdentifier is null or empty"); this.internalUri = requireNonNull(internalUri, "internalUri is null"); this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); this.coordinator = coordinator; + this.worker = worker; this.longHashCode = new XxHash64(coordinator ? 1 : 0) .update(nodeIdentifier.getBytes(UTF_8)) .update(internalUri.toString().getBytes(UTF_8)) @@ -102,6 +109,12 @@ public boolean isCoordinator() return coordinator; } + @Override + public boolean isWorker() + { + return worker; + } + public NodeVersion getNodeVersion() { return nodeVersion; diff --git a/core/trino-main/src/main/java/io/trino/node/TestingInternalNodeManager.java b/core/trino-main/src/main/java/io/trino/node/TestingInternalNodeManager.java index 7b6a7f84f52f..3f3bc8f9306b 100644 --- a/core/trino-main/src/main/java/io/trino/node/TestingInternalNodeManager.java +++ b/core/trino-main/src/main/java/io/trino/node/TestingInternalNodeManager.java @@ -40,7 +40,7 @@ public class TestingInternalNodeManager implements InternalNodeManager { - public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true); + public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true, true); private final InternalNode currentNode; private final ExecutorService nodeStateEventExecutor; @@ -63,6 +63,7 @@ public TestingInternalNodeManager(InternalNode currentNode) ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), + ImmutableSet.of(currentNode), ImmutableSet.of(currentNode)); this.nodeStateEventExecutor = newSingleThreadExecutor(daemonThreadsNamed("node-state-events-%s")); } @@ -100,6 +101,9 @@ public synchronized void addNodes(Collection internalNodes) ImmutableSet.of(), newActiveNodes.stream() .filter(InternalNode::isCoordinator) + .collect(toImmutableSet()), + newActiveNodes.stream() + .filter(InternalNode::isWorker) .collect(toImmutableSet()))); } @@ -119,6 +123,9 @@ public synchronized void removeNode(InternalNode internalNode) ImmutableSet.of(), newActiveNodes.stream() .filter(InternalNode::isCoordinator) + .collect(toImmutableSet()), + newActiveNodes.stream() + .filter(InternalNode::isWorker) .collect(toImmutableSet()))); } diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java index a997796094b6..711ef750c998 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java @@ -486,13 +486,15 @@ public static InternalNode currentInternalNode( HttpServerInfo httpServerInfo, NodeVersion nodeVersion, ServerConfig serverConfig, + NodeSchedulerConfig nodeSchedulerConfig, InternalCommunicationConfig internalCommunicationConfig) { return new InternalNode( nodeInfo.getNodeId(), internalCommunicationConfig.isHttpsRequired() ? httpServerInfo.getHttpsUri() : httpServerInfo.getHttpUri(), nodeVersion, - serverConfig.isCoordinator()); + serverConfig.isCoordinator(), + !serverConfig.isCoordinator() || nodeSchedulerConfig.isIncludeCoordinator()); } private static class RegisterFunctionBundles diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 14ab18f30ce2..dbfaa442b514 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -297,6 +297,12 @@ method void io.trino.spi.block.PageBuilderStatus::<init>() Method is unnecessary + + true + java.method.addedToInterface + method boolean io.trino.spi.Node::isWorker() + Added the boolean method isWorker + diff --git a/core/trino-spi/src/main/java/io/trino/spi/Node.java b/core/trino-spi/src/main/java/io/trino/spi/Node.java index 45162802445a..a653e90435e7 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/Node.java +++ b/core/trino-spi/src/main/java/io/trino/spi/Node.java @@ -24,4 +24,6 @@ public interface Node String getVersion(); boolean isCoordinator(); + + boolean isWorker(); } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java index 3017c4087d76..655c04b2c7fd 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java @@ -57,4 +57,10 @@ public boolean isCoordinator() { return false; } + + @Override + public boolean isWorker() + { + return true; + } }