Skip to content

Commit c3f98f0

Browse files
committed
Add metrics for active worker and active coordinator counts
1 parent aa239da commit c3f98f0

File tree

7 files changed

+62
-18
lines changed

7 files changed

+62
-18
lines changed

core/trino-main/src/main/java/io/trino/node/AllNodes.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public record AllNodes(
2525
Set<InternalNode> drainingNodes,
2626
Set<InternalNode> drainedNodes,
2727
Set<InternalNode> shuttingDownNodes,
28-
Set<InternalNode> activeCoordinators)
28+
Set<InternalNode> activeCoordinators,
29+
Set<InternalNode> activeWorkers)
2930
{
3031
public AllNodes
3132
{
@@ -35,5 +36,6 @@ public record AllNodes(
3536
drainingNodes = ImmutableSet.copyOf(requireNonNull(drainingNodes, "drainingNodes is null"));
3637
shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
3738
activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null"));
39+
activeWorkers = ImmutableSet.copyOf(requireNonNull(activeWorkers, "activeWorkers is null"));
3840
}
3941
}

core/trino-main/src/main/java/io/trino/node/CoordinatorNodeManager.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,11 @@ private synchronized void refreshNodesInternal()
236236
.filter(InternalNode::isCoordinator)
237237
.collect(toImmutableSet());
238238

239-
AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators);
239+
Set<InternalNode> workers = activeNodes.stream()
240+
.filter(InternalNode::isWorker)
241+
.collect(toImmutableSet());
242+
243+
AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators, workers);
240244
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
241245
if (!allNodes.equals(this.allNodes)) {
242246
// assign allNodes to a local variable for use in the callback below
@@ -284,6 +288,18 @@ public int getShuttingDownNodeCount()
284288
return getAllNodes().shuttingDownNodes().size();
285289
}
286290

291+
@Managed
292+
public int getActiveCoordinatorCount()
293+
{
294+
return getAllNodes().activeCoordinators().size();
295+
}
296+
297+
@Managed
298+
public int getActiveWorkerCount()
299+
{
300+
return getAllNodes().activeWorkers().size();
301+
}
302+
287303
@VisibleForTesting
288304
synchronized Set<InternalNode> getInvalidNodes()
289305
{

core/trino-main/src/main/java/io/trino/node/InternalNode.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.node;
1515

16+
import com.google.common.annotations.VisibleForTesting;
1617
import io.airlift.slice.XxHash64;
1718
import io.trino.client.NodeVersion;
1819
import io.trino.spi.HostAddress;
@@ -41,15 +42,23 @@ public class InternalNode
4142
private final URI internalUri;
4243
private final NodeVersion nodeVersion;
4344
private final boolean coordinator;
45+
private final boolean worker;
4446
private final long longHashCode;
4547

46-
public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator)
48+
@VisibleForTesting
49+
InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator)
50+
{
51+
this(nodeIdentifier, internalUri, nodeVersion, coordinator, !coordinator);
52+
}
53+
54+
public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator, boolean worker)
4755
{
4856
nodeIdentifier = emptyToNull(nullToEmpty(nodeIdentifier).trim());
4957
this.nodeIdentifier = requireNonNull(nodeIdentifier, "nodeIdentifier is null or empty");
5058
this.internalUri = requireNonNull(internalUri, "internalUri is null");
5159
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
5260
this.coordinator = coordinator;
61+
this.worker = worker;
5362
this.longHashCode = new XxHash64(coordinator ? 1 : 0)
5463
.update(nodeIdentifier.getBytes(UTF_8))
5564
.update(internalUri.toString().getBytes(UTF_8))
@@ -102,6 +111,12 @@ public boolean isCoordinator()
102111
return coordinator;
103112
}
104113

114+
@Override
115+
public boolean isWorker()
116+
{
117+
return worker;
118+
}
119+
105120
public NodeVersion getNodeVersion()
106121
{
107122
return nodeVersion;

core/trino-main/src/main/java/io/trino/node/TestingInternalNodeManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
public class TestingInternalNodeManager
4141
implements InternalNodeManager
4242
{
43-
public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true);
43+
public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true, true);
4444

4545
private final InternalNode currentNode;
4646
private final ExecutorService nodeStateEventExecutor;
@@ -63,6 +63,7 @@ public TestingInternalNodeManager(InternalNode currentNode)
6363
ImmutableSet.of(),
6464
ImmutableSet.of(),
6565
ImmutableSet.of(),
66+
ImmutableSet.of(currentNode),
6667
ImmutableSet.of(currentNode));
6768
this.nodeStateEventExecutor = newSingleThreadExecutor(daemonThreadsNamed("node-state-events-%s"));
6869
}
@@ -100,6 +101,9 @@ public synchronized void addNodes(Collection<InternalNode> internalNodes)
100101
ImmutableSet.of(),
101102
newActiveNodes.stream()
102103
.filter(InternalNode::isCoordinator)
104+
.collect(toImmutableSet()),
105+
newActiveNodes.stream()
106+
.filter(InternalNode::isWorker)
103107
.collect(toImmutableSet())));
104108
}
105109

@@ -119,6 +123,9 @@ public synchronized void removeNode(InternalNode internalNode)
119123
ImmutableSet.of(),
120124
newActiveNodes.stream()
121125
.filter(InternalNode::isCoordinator)
126+
.collect(toImmutableSet()),
127+
newActiveNodes.stream()
128+
.filter(InternalNode::isWorker)
122129
.collect(toImmutableSet())));
123130
}
124131

core/trino-main/src/main/java/io/trino/server/ServerMainModule.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,13 +486,15 @@ public static InternalNode currentInternalNode(
486486
HttpServerInfo httpServerInfo,
487487
NodeVersion nodeVersion,
488488
ServerConfig serverConfig,
489+
NodeSchedulerConfig nodeSchedulerConfig,
489490
InternalCommunicationConfig internalCommunicationConfig)
490491
{
491492
return new InternalNode(
492493
nodeInfo.getNodeId(),
493494
internalCommunicationConfig.isHttpsRequired() ? httpServerInfo.getHttpsUri() : httpServerInfo.getHttpUri(),
494495
nodeVersion,
495-
serverConfig.isCoordinator());
496+
serverConfig.isCoordinator(),
497+
!serverConfig.isCoordinator() || nodeSchedulerConfig.isIncludeCoordinator());
496498
}
497499

498500
private static class RegisterFunctionBundles

core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ public void setUp()
127127
private void setUpNodes()
128128
{
129129
nodeManager.addNodes(
130-
new InternalNode("other1", URI.create("http://10.0.0.1:11"), NodeVersion.UNKNOWN, false),
131-
new InternalNode("other2", URI.create("http://10.0.0.1:12"), NodeVersion.UNKNOWN, false),
132-
new InternalNode("other3", URI.create("http://10.0.0.1:13"), NodeVersion.UNKNOWN, false));
130+
new InternalNode("other1", URI.create("http://10.0.0.1:11"), NodeVersion.UNKNOWN, false, true),
131+
new InternalNode("other2", URI.create("http://10.0.0.1:12"), NodeVersion.UNKNOWN, false, true),
132+
new InternalNode("other3", URI.create("http://10.0.0.1:13"), NodeVersion.UNKNOWN, false, true));
133133
}
134134

135135
@AfterEach
@@ -171,9 +171,9 @@ public void testTopologyAwareScheduling()
171171
{
172172
NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService);
173173
InternalNodeManager nodeManager = TestingInternalNodeManager.createDefault(
174-
new InternalNode("node1", URI.create("http://host1.rack1:11"), NodeVersion.UNKNOWN, false),
175-
new InternalNode("node2", URI.create("http://host2.rack1:12"), NodeVersion.UNKNOWN, false),
176-
new InternalNode("node3", URI.create("http://host3.rack2:13"), NodeVersion.UNKNOWN, false));
174+
new InternalNode("node1", URI.create("http://host1.rack1:11"), NodeVersion.UNKNOWN, false, true),
175+
new InternalNode("node2", URI.create("http://host2.rack1:12"), NodeVersion.UNKNOWN, false, true),
176+
new InternalNode("node3", URI.create("http://host3.rack2:13"), NodeVersion.UNKNOWN, false, true));
177177

178178
// contents of taskMap indicate the node-task map for the current stage
179179
Map<InternalNode, RemoteTask> taskMap = new HashMap<>();
@@ -312,7 +312,7 @@ public void testBasicAssignment()
312312
public void testMaxSplitsPerNode()
313313
{
314314
setUpNodes();
315-
InternalNode newNode = new InternalNode("other4", URI.create("http://10.0.0.1:14"), NodeVersion.UNKNOWN, false);
315+
InternalNode newNode = new InternalNode("other4", URI.create("http://10.0.0.1:14"), NodeVersion.UNKNOWN, false, true);
316316
nodeManager.addNodes(newNode);
317317

318318
ImmutableList.Builder<Split> initialSplits = ImmutableList.builder();
@@ -371,7 +371,7 @@ public void testBasicAssignmentMaxUnacknowledgedSplitsPerTask()
371371
public void testMaxSplitsPerNodePerTask()
372372
{
373373
setUpNodes();
374-
InternalNode newNode = new InternalNode("other4", URI.create("http://10.0.0.1:14"), NodeVersion.UNKNOWN, false);
374+
InternalNode newNode = new InternalNode("other4", URI.create("http://10.0.0.1:14"), NodeVersion.UNKNOWN, false, true);
375375
nodeManager.addNodes(newNode);
376376

377377
ImmutableList.Builder<Split> initialSplits = ImmutableList.builder();
@@ -471,9 +471,9 @@ public void testSplitCount()
471471
@Test
472472
public void testOptimizedLocalScheduling()
473473
{
474-
InternalNode node1 = new InternalNode("node1", URI.create("http://10.0.0.1:11"), NodeVersion.UNKNOWN, false);
474+
InternalNode node1 = new InternalNode("node1", URI.create("http://10.0.0.1:11"), NodeVersion.UNKNOWN, false, true);
475475
nodeManager.addNodes(node1);
476-
InternalNode node2 = new InternalNode("node2", URI.create("http://10.0.0.1:12"), NodeVersion.UNKNOWN, false);
476+
InternalNode node2 = new InternalNode("node2", URI.create("http://10.0.0.1:12"), NodeVersion.UNKNOWN, false, true);
477477
nodeManager.addNodes(node2);
478478

479479
Set<Split> splits = new LinkedHashSet<>();
@@ -603,9 +603,9 @@ public void testMaxUnacknowledgedSplitsPerTask()
603603
public void testTopologyAwareFailover()
604604
{
605605
nodeManager = TestingInternalNodeManager.createDefault(
606-
new InternalNode("node1", URI.create("http://host1.rack1:11"), NodeVersion.UNKNOWN, false),
607-
new InternalNode("node2", URI.create("http://host2.rack1:12"), NodeVersion.UNKNOWN, false),
608-
new InternalNode("node3", URI.create("http://host3.rack2:13"), NodeVersion.UNKNOWN, false));
606+
new InternalNode("node1", URI.create("http://host1.rack1:11"), NodeVersion.UNKNOWN, false, true),
607+
new InternalNode("node2", URI.create("http://host2.rack1:12"), NodeVersion.UNKNOWN, false, true),
608+
new InternalNode("node3", URI.create("http://host3.rack2:13"), NodeVersion.UNKNOWN, false, true));
609609
NodeSelectorFactory nodeSelectorFactory = new TopologyAwareNodeSelectorFactory(
610610
new TestNetworkTopology(),
611611
CURRENT_NODE,

core/trino-spi/src/main/java/io/trino/spi/Node.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public interface Node
2424
String getVersion();
2525

2626
boolean isCoordinator();
27+
28+
boolean isWorker();
2729
}

0 commit comments

Comments
 (0)