Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/trino-main/src/main/java/io/trino/node/AllNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public record AllNodes(
Set<InternalNode> drainingNodes,
Set<InternalNode> drainedNodes,
Set<InternalNode> shuttingDownNodes,
Set<InternalNode> activeCoordinators)
Set<InternalNode> activeCoordinators,
Set<InternalNode> activeWorkers)
{
public AllNodes
{
Expand All @@ -35,5 +36,6 @@ public record AllNodes(
drainingNodes = ImmutableSet.copyOf(requireNonNull(drainingNodes, "drainingNodes is null"));
shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null"));
activeWorkers = ImmutableSet.copyOf(requireNonNull(activeWorkers, "activeWorkers is null"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,11 @@ private synchronized void refreshNodesInternal()
.filter(InternalNode::isCoordinator)
.collect(toImmutableSet());

AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators);
Set<InternalNode> workers = activeNodes.stream()
.filter(InternalNode::isWorker)
.collect(toImmutableSet());

AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators, workers);
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
if (!allNodes.equals(this.allNodes)) {
// assign allNodes to a local variable for use in the callback below
Expand Down Expand Up @@ -284,6 +288,18 @@ public int getShuttingDownNodeCount()
return getAllNodes().shuttingDownNodes().size();
}

@Managed
public int getActiveCoordinatorCount()
{
return getAllNodes().activeCoordinators().size();
}

@Managed
public int getActiveWorkerCount()
{
return getAllNodes().activeWorkers().size();
}

@VisibleForTesting
synchronized Set<InternalNode> getInvalidNodes()
{
Expand Down
13 changes: 13 additions & 0 deletions core/trino-main/src/main/java/io/trino/node/InternalNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,22 @@ public class InternalNode
private final URI internalUri;
private final NodeVersion nodeVersion;
private final boolean coordinator;
private final boolean worker;
private final long longHashCode;

public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor is used in quite some test files. Shall I remove this constructor and update the tests, or shall I keep it this way?

{
this(nodeIdentifier, internalUri, nodeVersion, coordinator, !coordinator);
}

public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator, boolean worker)
{
nodeIdentifier = emptyToNull(nullToEmpty(nodeIdentifier).trim());
this.nodeIdentifier = requireNonNull(nodeIdentifier, "nodeIdentifier is null or empty");
this.internalUri = requireNonNull(internalUri, "internalUri is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.coordinator = coordinator;
this.worker = worker;
this.longHashCode = new XxHash64(coordinator ? 1 : 0)
.update(nodeIdentifier.getBytes(UTF_8))
.update(internalUri.toString().getBytes(UTF_8))
Expand Down Expand Up @@ -102,6 +109,12 @@ public boolean isCoordinator()
return coordinator;
}

@Override
public boolean isWorker()
{
return worker;
}

public NodeVersion getNodeVersion()
{
return nodeVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class TestingInternalNodeManager
implements InternalNodeManager
{
public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true);
public static final InternalNode CURRENT_NODE = new InternalNode("local", URI.create("local://127.0.0.1:8080"), NodeVersion.UNKNOWN, true, true);

private final InternalNode currentNode;
private final ExecutorService nodeStateEventExecutor;
Expand All @@ -63,6 +63,7 @@ public TestingInternalNodeManager(InternalNode currentNode)
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(currentNode),
ImmutableSet.of(currentNode));
this.nodeStateEventExecutor = newSingleThreadExecutor(daemonThreadsNamed("node-state-events-%s"));
}
Expand Down Expand Up @@ -100,6 +101,9 @@ public synchronized void addNodes(Collection<InternalNode> internalNodes)
ImmutableSet.of(),
newActiveNodes.stream()
.filter(InternalNode::isCoordinator)
.collect(toImmutableSet()),
newActiveNodes.stream()
.filter(InternalNode::isWorker)
.collect(toImmutableSet())));
}

Expand All @@ -119,6 +123,9 @@ public synchronized void removeNode(InternalNode internalNode)
ImmutableSet.of(),
newActiveNodes.stream()
.filter(InternalNode::isCoordinator)
.collect(toImmutableSet()),
newActiveNodes.stream()
.filter(InternalNode::isWorker)
.collect(toImmutableSet())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,15 @@ public static InternalNode currentInternalNode(
HttpServerInfo httpServerInfo,
NodeVersion nodeVersion,
ServerConfig serverConfig,
NodeSchedulerConfig nodeSchedulerConfig,
InternalCommunicationConfig internalCommunicationConfig)
{
return new InternalNode(
nodeInfo.getNodeId(),
internalCommunicationConfig.isHttpsRequired() ? httpServerInfo.getHttpsUri() : httpServerInfo.getHttpUri(),
nodeVersion,
serverConfig.isCoordinator());
serverConfig.isCoordinator(),
!serverConfig.isCoordinator() || nodeSchedulerConfig.isIncludeCoordinator());
}

private static class RegisterFunctionBundles
Expand Down
6 changes: 6 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@
<old>method void io.trino.spi.block.PageBuilderStatus::&lt;init&gt;()</old>
<justification>Method is unnecessary</justification>
</item>
<item>
<ignore>true</ignore>
<code>java.method.addedToInterface</code>
<new>method boolean io.trino.spi.Node::isWorker()</new>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary to add it, since the worker means not coordinator. we already has isCoordinator, why not just reuse it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because a node can be a worker and coordinator at the same time using node-scheduler.include-coordinator as described here: https://trino.io/docs/current/installation/deployment.html#config-properties

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is still a coordinator, that is allow schedule task on it but it is not a worker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With node-scheduler.include-coordinator=true, the UI adds it to the active worker count, but that number is not available via the metrics. The only metric available is the number of all nodes in the cluster.

So with node-scheduler.include-coordinator=true, the active node count is the number of available workers, with node-scheduler.include-coordinator=false` (the default), the active node count - the number of coordinators is the number of available workers. This PR introduces a new metric that is always consistent with the number of active workers as shown in the UI

<justification>Added the boolean method isWorker</justification>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
2 changes: 2 additions & 0 deletions core/trino-spi/src/main/java/io/trino/spi/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public interface Node
String getVersion();

boolean isCoordinator();

boolean isWorker();
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,10 @@ public boolean isCoordinator()
{
return false;
}

@Override
public boolean isWorker()
{
return true;
}
}
Loading