Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/querying/sql-metadata-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ Servers table lists all discovered servers in the cluster.
|start_time|STRING|Timestamp in ISO8601 format when the server was announced in the cluster|
|version|VARCHAR|Druid version running on the server|
|labels|VARCHAR|Labels for the server configured using the property [`druid.labels`](../configuration/index.md)|
|available_processors|BIGINT|Total number of CPU processors available to the server|
|total_memory|BIGINT|Total memory in bytes available to the server|

To retrieve information about all servers, use the query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,9 +1018,8 @@ protected String getBrokerUrl()

/**
* curr_size on historicals changes because cluster state is not isolated across
* different
* integration tests, zero it out for consistent test results
* version and start_time are not configurable therefore we zero them as well
* different integration tests, zero it out for consistent test results.
* version, start_time, available_processors, total_memory are not configurable therefore we zero them as well
*/
protected static List<Map<String, Object>> getServersWithoutNonConfigurableFields(List<Map<String, Object>> servers)
{
Expand All @@ -1031,6 +1030,8 @@ protected static List<Map<String, Object>> getServersWithoutNonConfigurableField
newServer.put("curr_size", 0);
newServer.put("start_time", "0");
newServer.put("version", "0.0.0");
newServer.put("available_processors", 0);
newServer.put("total_memory", 0);
return newServer;
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
"is_leader": null,
"start_time": "0",
"version": "0.0.0",
"labels": null
"labels": null,
"available_processors": 0,
"total_memory": 0
},
{
"server": "%%BROKER%%:8282",
Expand All @@ -25,6 +27,8 @@
"is_leader": null,
"start_time": "0",
"version": "0.0.0",
"labels": null
"labels": null,
"available_processors": 0,
"total_memory": 0
}
]
13 changes: 13 additions & 0 deletions processing/src/main/java/org/apache/druid/utils/JvmUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.sun.management.OperatingSystemMXBean;

import java.io.File;
import java.lang.management.ManagementFactory;
Expand All @@ -44,6 +45,7 @@ public class JvmUtils
private static RuntimeInfo RUNTIME_INFO = new RuntimeInfo();

private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private static final OperatingSystemMXBean OPERATING_SYSTEM_MX_BEAN = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

private static int computeMajorVersion()
{
Expand Down Expand Up @@ -139,4 +141,15 @@ public static List<URL> systemClassPath()
).collect(Collectors.toList());
return jobURLs;
}

/**
* Get the total memory of the machine it is running on. This function is container aware.
* If the machine is running in a container, the function will return the total memory of the container.
* If the machine is not running in a container, the function will return the total memory of the machine.
* @return the total memory of the machine it is running on in bytes.
*/
public static long getTotalMemory()
{
return OPERATING_SYSTEM_MX_BEAN.getTotalPhysicalMemorySize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.druid.client.DruidServer;
import org.apache.druid.jackson.StringObjectPairList;
Expand All @@ -32,6 +33,7 @@
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand All @@ -50,10 +52,13 @@
public class DiscoveryDruidNode
{
private static final Logger LOG = new Logger(DiscoveryDruidNode.class);
private static final int UNKNOWN_VALUE = -1;

private final DruidNode druidNode;
private final NodeRole nodeRole;
private final DateTime startTime;
private final Integer availableProcessors;
private final Long totalMemory;

/**
* Map of service name -> DruidServices.
Expand All @@ -65,20 +70,36 @@ public class DiscoveryDruidNode
*/
private final Map<String, DruidService> services = new HashMap<>();

/**
* Constructor for tests. In production, the @Inject constructor is used instead.
*/
@VisibleForTesting
public DiscoveryDruidNode(
DruidNode druidNode,
NodeRole nodeRole,
Map<String, DruidService> services,
DateTime startTime
)
{
this(druidNode, nodeRole, services, startTime, Runtime.getRuntime().availableProcessors(), JvmUtils.getTotalMemory());
}

public DiscoveryDruidNode(
DruidNode druidNode,
NodeRole nodeRole,
Map<String, DruidService> services
)
{
this(druidNode, nodeRole, services, DateTimes.nowUtc());
this(druidNode, nodeRole, services, DateTimes.nowUtc(), Runtime.getRuntime().availableProcessors(), JvmUtils.getTotalMemory());
}

public DiscoveryDruidNode(
DruidNode druidNode,
NodeRole nodeRole,
Map<String, DruidService> services,
DateTime startTime
DateTime startTime,
Integer availableProcessors,
Long totalMemory
)
{
this.druidNode = druidNode;
Expand All @@ -88,6 +109,10 @@ public DiscoveryDruidNode(
this.services.putAll(services);
}
this.startTime = startTime;

// Happens if service is running older version of Druid
this.availableProcessors = availableProcessors != null ? availableProcessors : UNKNOWN_VALUE;
this.totalMemory = totalMemory != null ? totalMemory : UNKNOWN_VALUE;
}

@JsonCreator
Expand All @@ -96,6 +121,8 @@ private static DiscoveryDruidNode fromJson(
@JsonProperty("nodeType") NodeRole nodeRole,
@JsonProperty("services") Map<String, StringObjectPairList> rawServices,
@JsonProperty("startTime") DateTime startTime,
@JsonProperty("availableProcessors") Integer availableProcessors,
@JsonProperty("totalMemory") Long totalMemory,
@JacksonInject ObjectMapper jsonMapper
)
{
Expand All @@ -111,7 +138,7 @@ private static DiscoveryDruidNode fromJson(
}
}
}
return new DiscoveryDruidNode(druidNode, nodeRole, services, startTime);
return new DiscoveryDruidNode(druidNode, nodeRole, services, startTime, availableProcessors, totalMemory);
}

/**
Expand Down Expand Up @@ -188,6 +215,18 @@ public DateTime getStartTime()
return startTime;
}

@JsonProperty
public Integer getAvailableProcessors()
{
return availableProcessors;
}

@JsonProperty
public Long getTotalMemory()
{
return totalMemory;
}

@Nullable
@JsonIgnore
public <T extends DruidService> T getService(String key, Class<T> clazz)
Expand Down Expand Up @@ -235,13 +274,15 @@ public boolean equals(Object o)
DiscoveryDruidNode that = (DiscoveryDruidNode) o;
return Objects.equals(druidNode, that.druidNode) &&
Objects.equals(nodeRole, that.nodeRole) &&
Objects.equals(services, that.services);
Objects.equals(services, that.services) &&
Objects.equals(availableProcessors, that.availableProcessors) &&
Objects.equals(totalMemory, that.totalMemory);
}

@Override
public int hashCode()
{
return Objects.hash(druidNode, nodeRole, services);
return Objects.hash(druidNode, nodeRole, services, availableProcessors, totalMemory);
}

@Override
Expand All @@ -252,6 +293,8 @@ public String toString()
", nodeRole='" + nodeRole + '\'' +
", services=" + services + '\'' +
", startTime=" + startTime +
", availableProcessors=" + availableProcessors +
", totalMemory=" + totalMemory +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public DiscoveryDruidNodeTest()
public void testEquals()
{
EqualsVerifier.forClass(DiscoveryDruidNode.class)
.withNonnullFields("druidNode", "nodeRole", "services")
.withNonnullFields("druidNode", "nodeRole", "services", "availableProcessors", "totalMemory")
.withIgnoredFields("startTime")
.usingGetClass()
.verify();
Expand Down Expand Up @@ -156,7 +156,9 @@ public void testDeserializeWithDataNodeServiceWithAWrongPropertyOrder() throws J
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"availableProcessors\" : 3,\n"
+ " \"totalMemory\" : 1234\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
Expand All @@ -175,7 +177,10 @@ public void testDeserializeWithDataNodeServiceWithAWrongPropertyOrder() throws J
ImmutableMap.of(
"dataNodeService",
new DataNodeService("_default_tier", 1000000000, ServerType.BROKER, 0)
)
),
null,
3,
1234L
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
Expand Down Expand Up @@ -206,7 +211,9 @@ public void testDeserialize_duplicateProperties_shouldSucceedToDeserialize() thr
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"availableProcessors\" : 6,\n"
+ " \"totalMemory\" : 5432\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
Expand All @@ -225,7 +232,10 @@ public void testDeserialize_duplicateProperties_shouldSucceedToDeserialize() thr
ImmutableMap.of(
"dataNodeService",
new DataNodeService("_default_tier", 1000000000, ServerType.BROKER, 0)
)
),
null,
6,
5432L
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
Expand Down Expand Up @@ -257,7 +267,9 @@ public void testDeserialize_duplicateKeysWithDifferentValus_shouldIgnoreDataNode
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"availableProcessors\" : 4,\n"
+ " \"totalMemory\" : 246810\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
Expand All @@ -273,7 +285,10 @@ public void testDeserialize_duplicateKeysWithDifferentValus_shouldIgnoreDataNode
null
),
NodeRole.BROKER,
ImmutableMap.of()
ImmutableMap.of(),
null,
4,
246810L
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ public class SystemSchema extends AbstractSchema
.add("start_time", ColumnType.STRING)
.add("version", ColumnType.STRING)
.add("labels", ColumnType.STRING)
.add("available_processors", ColumnType.LONG)
.add("total_memory", ColumnType.LONG)
.build();

static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
Expand Down Expand Up @@ -653,7 +655,9 @@ private Object[] buildRowForNonDataServer(DiscoveryDruidNode discoveryDruidNode)
null,
toStringOrNull(discoveryDruidNode.getStartTime()),
node.getVersion(),
node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels())
node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels()),
(long) discoveryDruidNode.getAvailableProcessors(),
discoveryDruidNode.getTotalMemory()
};
}

Expand All @@ -678,7 +682,9 @@ private Object[] buildRowForNonDataServerWithLeadership(
isLeader ? 1L : 0L,
toStringOrNull(discoveryDruidNode.getStartTime()),
node.getVersion(),
node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels())
node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels()),
(long) discoveryDruidNode.getAvailableProcessors(),
discoveryDruidNode.getTotalMemory()
};
}

Expand Down Expand Up @@ -715,7 +721,9 @@ private Object[] buildRowForDiscoverableDataServer(
null,
toStringOrNull(discoveryDruidNode.getStartTime()),
node.getVersion(),
node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels())
node.getLabels() == null ? null : JacksonUtils.writeValueAsString(jsonMapper, node.getLabels()),
(long) discoveryDruidNode.getAvailableProcessors(),
discoveryDruidNode.getTotalMemory()
};
}

Expand Down
Loading
Loading