Skip to content

Commit ae23a22

Browse files
committed
Add metrics for active worker and active coordinator counts
1 parent 6114935 commit ae23a22

File tree

8 files changed

+58
-4
lines changed

8 files changed

+58
-4
lines changed

core/trino-main/src/main/java/io/trino/node/AllNodes.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public record AllNodes(
2525
Set<InternalNode> drainingNodes,
2626
Set<InternalNode> drainedNodes,
2727
Set<InternalNode> shuttingDownNodes,
28-
Set<InternalNode> activeCoordinators)
28+
Set<InternalNode> activeCoordinators,
29+
Set<InternalNode> activeWorkers)
2930
{
3031
public AllNodes
3132
{
@@ -35,5 +36,6 @@ public record AllNodes(
3536
drainingNodes = ImmutableSet.copyOf(requireNonNull(drainingNodes, "drainingNodes is null"));
3637
shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
3738
activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null"));
39+
activeWorkers = ImmutableSet.copyOf(requireNonNull(activeWorkers, "activeWorkers is null"));
3840
}
3941
}

core/trino-main/src/main/java/io/trino/node/CoordinatorNodeManager.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,11 @@ private synchronized void refreshNodesInternal()
236236
.filter(InternalNode::isCoordinator)
237237
.collect(toImmutableSet());
238238

239-
AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators);
239+
Set<InternalNode> workers = activeNodes.stream()
240+
.filter(InternalNode::isWorker)
241+
.collect(toImmutableSet());
242+
243+
AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators, workers);
240244
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
241245
if (!allNodes.equals(this.allNodes)) {
242246
// assign allNodes to a local variable for use in the callback below
@@ -284,6 +288,18 @@ public int getShuttingDownNodeCount()
284288
return getAllNodes().shuttingDownNodes().size();
285289
}
286290

291+
@Managed
292+
public int getActiveCoordinatorCount()
293+
{
294+
return getAllNodes().activeCoordinators().size();
295+
}
296+
297+
@Managed
298+
public int getActiveWorkerCount()
299+
{
300+
return getAllNodes().activeWorkers().size();
301+
}
302+
287303
@VisibleForTesting
288304
synchronized Set<InternalNode> getInvalidNodes()
289305
{

core/trino-main/src/main/java/io/trino/node/InternalNode.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,22 @@ public class InternalNode
4141
private final URI internalUri;
4242
private final NodeVersion nodeVersion;
4343
private final boolean coordinator;
44+
private final boolean worker;
4445
private final long longHashCode;
4546

4647
public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator)
48+
{
49+
this(nodeIdentifier, internalUri, nodeVersion, coordinator, !coordinator);
50+
}
51+
52+
public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator, boolean worker)
4753
{
4854
nodeIdentifier = emptyToNull(nullToEmpty(nodeIdentifier).trim());
4955
this.nodeIdentifier = requireNonNull(nodeIdentifier, "nodeIdentifier is null or empty");
5056
this.internalUri = requireNonNull(internalUri, "internalUri is null");
5157
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
5258
this.coordinator = coordinator;
59+
this.worker = worker;
5360
this.longHashCode = new XxHash64(coordinator ? 1 : 0)
5461
.update(nodeIdentifier.getBytes(UTF_8))
5562
.update(internalUri.toString().getBytes(UTF_8))
@@ -102,6 +109,12 @@ public boolean isCoordinator()
102109
return coordinator;
103110
}
104111

112+
@Override
113+
public boolean isWorker()
114+
{
115+
return worker;
116+
}
117+
105118
public NodeVersion getNodeVersion()
106119
{
107120
return nodeVersion;

core/trino-main/src/main/java/io/trino/node/TestingInternalNodeManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
public class TestingInternalNodeManager
4141
implements InternalNodeManager
4242
{
43-
public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true);
43+
public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true, true);
4444

4545
private final InternalNode currentNode;
4646
private final ExecutorService nodeStateEventExecutor;
@@ -63,6 +63,7 @@ public TestingInternalNodeManager(InternalNode currentNode)
6363
ImmutableSet.of(),
6464
ImmutableSet.of(),
6565
ImmutableSet.of(),
66+
ImmutableSet.of(currentNode),
6667
ImmutableSet.of(currentNode));
6768
this.nodeStateEventExecutor = newSingleThreadExecutor(daemonThreadsNamed("node-state-events-%s"));
6869
}
@@ -100,6 +101,9 @@ public synchronized void addNodes(Collection<InternalNode> internalNodes)
100101
ImmutableSet.of(),
101102
newActiveNodes.stream()
102103
.filter(InternalNode::isCoordinator)
104+
.collect(toImmutableSet()),
105+
newActiveNodes.stream()
106+
.filter(InternalNode::isWorker)
103107
.collect(toImmutableSet())));
104108
}
105109

@@ -119,6 +123,9 @@ public synchronized void removeNode(InternalNode internalNode)
119123
ImmutableSet.of(),
120124
newActiveNodes.stream()
121125
.filter(InternalNode::isCoordinator)
126+
.collect(toImmutableSet()),
127+
newActiveNodes.stream()
128+
.filter(InternalNode::isWorker)
122129
.collect(toImmutableSet())));
123130
}
124131

core/trino-main/src/main/java/io/trino/server/ServerMainModule.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,13 +486,15 @@ public static InternalNode currentInternalNode(
486486
HttpServerInfo httpServerInfo,
487487
NodeVersion nodeVersion,
488488
ServerConfig serverConfig,
489+
NodeSchedulerConfig nodeSchedulerConfig,
489490
InternalCommunicationConfig internalCommunicationConfig)
490491
{
491492
return new InternalNode(
492493
nodeInfo.getNodeId(),
493494
internalCommunicationConfig.isHttpsRequired() ? httpServerInfo.getHttpsUri() : httpServerInfo.getHttpUri(),
494495
nodeVersion,
495-
serverConfig.isCoordinator());
496+
serverConfig.isCoordinator(),
497+
!serverConfig.isCoordinator() || nodeSchedulerConfig.isIncludeCoordinator());
496498
}
497499

498500
private static class RegisterFunctionBundles

core/trino-spi/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,12 @@
297297
<old>method void io.trino.spi.block.PageBuilderStatus::&lt;init&gt;()</old>
298298
<justification>Method is unnecessary</justification>
299299
</item>
300+
<item>
301+
<ignore>true</ignore>
302+
<code>java.method.addedToInterface</code>
303+
<new>method boolean io.trino.spi.Node::isWorker()</new>
304+
<justification>Added the boolean method isWorker</justification>
305+
</item>
300306
</differences>
301307
</revapi.differences>
302308
</analysisConfiguration>

core/trino-spi/src/main/java/io/trino/spi/Node.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public interface Node
2424
String getVersion();
2525

2626
boolean isCoordinator();
27+
28+
boolean isWorker();
2729
}

plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,10 @@ public boolean isCoordinator()
5757
{
5858
return false;
5959
}
60+
61+
@Override
62+
public boolean isWorker()
63+
{
64+
return true;
65+
}
6066
}

0 commit comments

Comments
 (0)