Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,15 @@ public CompletionStage<Response> describeCluster(
KafkaCluster.Fields.NODE_POOLS,
KafkaCluster.Fields.CRUISE_CONTROL_ENABLED,
}))
List<String> fields) {
List<String> fields,

@Parameter(description = "Time range for metrics in minutes")
@QueryParam("duration")
Integer durationMinutes) {
Comment on lines +189 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to add the @DefaultValue and make the arg an int, then the null checking can be removed elsewhere.


requestedFields.accept(fields);

return clusterService.describeCluster(fields)
return clusterService.describeCluster(fields, durationMinutes)
.thenApply(KafkaCluster.KafkaClusterData::new)
.thenApply(Response::ok)
.thenApply(Response.ResponseBuilder::build);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ public CompletionStage<Response> describeConfigs(
@ResourcePrivilege(Privilege.GET)
public CompletionStage<Response> getNodeMetrics(
@PathParam("clusterId") String clusterId,
@PathParam("nodeId") String nodeId) {
@PathParam("nodeId") String nodeId,
@QueryParam("duration") @DefaultValue("60") int durationMinutes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default be 5 minutes?


return nodeService.getNodeMetrics(nodeId)
return nodeService.getNodeMetrics(nodeId, durationMinutes)
.thenApply(metrics -> new NodeMetrics.MetricsResponse(nodeId, metrics))
.thenApply(Response::ok)
.thenApply(Response.ResponseBuilder::build);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public List<KafkaCluster> listClusters(ListRequestContext<KafkaCluster> listSupp
.toList();
}

public CompletionStage<KafkaCluster> describeCluster(List<String> fields) {
public CompletionStage<KafkaCluster> describeCluster(List<String> fields, Integer durationMinutes) {
Admin adminClient = kafkaContext.admin();
DescribeClusterOptions options = new DescribeClusterOptions()
.includeAuthorizedOperations(fields.contains(KafkaCluster.Fields.AUTHORIZED_OPERATIONS));
Expand All @@ -172,7 +172,7 @@ public CompletionStage<KafkaCluster> describeCluster(List<String> fields) {
threadContext.currentContextExecutor())
.thenApplyAsync(this::addKafkaContextData, threadContext.currentContextExecutor())
.thenApply(this::addKafkaResourceData)
.thenCompose(cluster -> addMetrics(cluster, fields))
.thenCompose(cluster -> addMetrics(cluster, fields, durationMinutes))
.thenApply(this::setManaged)
.thenApplyAsync(
permissionService.addPrivileges(ResourceTypes.Global.KAFKAS, KafkaCluster::getId),
Expand Down Expand Up @@ -354,11 +354,13 @@ KafkaCluster setManaged(KafkaCluster cluster) {
return cluster;
}

CompletionStage<KafkaCluster> addMetrics(KafkaCluster cluster, List<String> fields) {
CompletionStage<KafkaCluster> addMetrics(KafkaCluster cluster, List<String> fields, Integer durationMinutes) {
if (!fields.contains(KafkaCluster.Fields.METRICS)) {
return CompletableFuture.completedStage(cluster);
}

int finalDuration = (durationMinutes != null) ? durationMinutes : 5;

if (kafkaContext.prometheus() == null) {
logger.warnf("Kafka cluster metrics were requested, but Prometheus URL is not configured");
cluster.metrics(null);
Expand All @@ -380,7 +382,12 @@ CompletionStage<KafkaCluster> addMetrics(KafkaCluster cluster, List<String> fiel
throw new UncheckedIOException(e);
}

var rangeResults = metricsService.queryRanges(rangeQuery).toCompletableFuture();
String promInterval = "5m";
if (finalDuration >= 1440) promInterval = "30m";

final String finalizedQuery = rangeQuery.replace("[5m]", "[" + promInterval + "]");

var rangeResults = metricsService.queryRanges(finalizedQuery, finalDuration).toCompletableFuture();
var valueResults = metricsService.queryValues(valueQuery).toCompletableFuture();

return CompletableFuture.allOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,18 @@ CompletionStage<Map<String, List<Metrics.ValueMetric>>> queryValues(String query
});
}

CompletionStage<Map<String, List<Metrics.RangeMetric>>> queryRanges(String query) {
public CompletionStage<Map<String, List<Metrics.RangeMetric>>> queryRanges(String query, int durationMinutes) {
PrometheusAPI prometheusAPI = kafkaContext.prometheus();

return fetchMetrics(
() -> {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Instant start = now.minus(30, ChronoUnit.MINUTES);
Instant start = now.minus(durationMinutes, ChronoUnit.MINUTES);
Instant end = now;
return prometheusAPI.queryRange(query, start, end, "25");

String step = calculateStep(durationMinutes);

return prometheusAPI.queryRange(query, start, end, step);
},
(metric, attributes) -> {
List<RangeEntry> values = metric.getJsonArray("values")
Expand All @@ -148,6 +151,16 @@ CompletionStage<Map<String, List<Metrics.RangeMetric>>> queryRanges(String query
});
}


private String calculateStep(int durationMinutes) {
if (durationMinutes <= 15) return "15s";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit on the style. It's shorter on one line, but it may be best to use the same form for everything.

Suggested change
if (durationMinutes <= 15) return "15s";
if (durationMinutes <= 15) {
return "15s";
}

if (durationMinutes <= 60) return "1m";
if (durationMinutes <= 360) return "5m";
if (durationMinutes <= 1440) return "15m";
if (durationMinutes <= 2880) return "30m";
return "2h";
}

<M> CompletionStage<Map<String, List<M>>> fetchMetrics(
Supplier<JsonObject> operation,
BiFunction<JsonObject, Map<String, String>, M> builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ private <M extends Metrics.Metric> void extractNodeMetrics(
});
}

public CompletionStage<Metrics> getNodeMetrics(String nodeId) {
public CompletionStage<Metrics> getNodeMetrics(String nodeId, int durationMinutes) {
if (kafkaContext.prometheus() == null) {
logger.warnf("Metrics requested for node %s, but Prometheus is not configured", nodeId);
return CompletableFuture.completedStage(new Metrics());
Expand All @@ -534,38 +534,39 @@ public CompletionStage<Metrics> getNodeMetrics(String nodeId) {
String namespace = clusterConfig.getNamespace();
String name = clusterConfig.getName();

String rangeQuery;
String rawRangeQuery;
String valueQuery;


try (
var rangesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_ranges.promql");
var valuesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_values.promql")
) {
rangeQuery = new String(rangesStream.readAllBytes(), StandardCharsets.UTF_8)
rawRangeQuery = new String(rangesStream.readAllBytes(), StandardCharsets.UTF_8)
.formatted(namespace, name);
valueQuery = new String(valuesStream.readAllBytes(), StandardCharsets.UTF_8)
.formatted(namespace, name);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

Metrics nodeMetrics = new Metrics();

String promInterval = "5m";
if (durationMinutes >= 1440) promInterval = "30m";
if (durationMinutes >= 10080) promInterval = "2h";

var rangeFuture = metricsService.queryRanges(rangeQuery).toCompletableFuture();
final String finalizedQuery = rawRangeQuery.replace("[5m]", "[" + promInterval + "]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to use the placeholders in the promql templates and pass the interval to the formatted method above (and in KafkaClusterService the same way). That will make it clear in the template that it's set at runtime and also eliminate post-processing on the query strings.


logger.debugf("Executing PromQL: %s", finalizedQuery);

Metrics nodeMetrics = new Metrics();
var rangeFuture = metricsService.queryRanges(finalizedQuery, durationMinutes).toCompletableFuture();
var valueFuture = metricsService.queryValues(valueQuery).toCompletableFuture();

return CompletableFuture.allOf(rangeFuture, valueFuture)
.thenApply(nothing -> {
extractNodeMetrics(
nodeId,
rangeFuture.join(),
nodeMetrics.ranges());

extractNodeMetrics(
nodeId,
valueFuture.join(),
nodeMetrics.values());

extractNodeMetrics(nodeId, rangeFuture.join(), nodeMetrics.ranges());
extractNodeMetrics(nodeId, valueFuture.join(), nodeMetrics.values());
return nodeMetrics;
});
}
Expand Down
17 changes: 12 additions & 5 deletions ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,22 @@ export async function getKafkaCluster(
clusterId: string,
params?: {
fields?: string;
duration?: number; // Optional duration
},
): Promise<ApiResponse<ClusterDetail>> {
const queryParams = new URLSearchParams({
"fields[kafkas]":
params?.fields ??
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,listeners,metrics,conditions,nodePools,cruiseControlEnabled",
});

if (params?.duration) {
queryParams.append("duration", params.duration.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be explicit about what the duration is for and to match the other params, what do you think about calling it duration[metrics] ?

}

return fetchData(
`/api/kafkas/${clusterId}`,
new URLSearchParams({
"fields[kafkas]":
params?.fields ??
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,listeners,conditions,nodePools,cruiseControlEnabled",
}),
queryParams,
(rawData: any) => ClusterResponse.parse(rawData).data,
undefined,
{
Expand Down
14 changes: 14 additions & 0 deletions ui/api/nodes/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
NodeRoles,
BrokerStatus,
ControllerStatus,
NodeMetricsResponseSchema,
NodeMetrics,
} from "@/api/nodes/schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";

Expand Down Expand Up @@ -67,3 +69,15 @@ export async function getNodeConfiguration(
(rawData) => ConfigResponseSchema.parse(rawData).data,
);
}

export async function getNodeMetrics(
kafkaId: string,
nodeId: number | string,
duration: number,
): Promise<ApiResponse<NodeMetrics>> {
return fetchData(
`/api/kafkas/${kafkaId}/nodes/${nodeId}/metrics?duration=${duration}`,
"",
(rawData) => NodeMetricsResponseSchema.parse(rawData),
);
}
42 changes: 38 additions & 4 deletions ui/api/nodes/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ const ControllerStatusSchema = z.union([
z.literal("Unknown"),
]);

const NodePoolsSchema = z.record(z.string(), z.object({
roles: z.array(z.string()),
count: z.number(),
}));
const NodePoolsSchema = z.record(
z.string(),
z.object({
roles: z.array(z.string()),
count: z.number(),
}),
);

export const NodeSchema = z.object({
id: z.string(),
Expand Down Expand Up @@ -116,6 +119,35 @@ const ConfigSchema = z.object({
),
});

export const MetricsSchema = z.object({
values: z.record(
z.string(),
z.array(
z.object({
value: z.string(),
nodeId: z.string(),
}),
),
),
ranges: z.record(
z.string(),
z.array(
z.object({
range: z.array(z.tuple([z.string(), z.string()])),
nodeId: z.string().optional(),
}),
),
),
});

export const NodeMetricsResponseSchema = z.object({
data: z.object({
attributes: z.object({
metrics: MetricsSchema.optional().nullable(),
}),
}),
});

export type NodeConfig = z.infer<typeof ConfigSchema>;

export type BrokerStatus = z.infer<typeof BrokerStatusSchema>;
Expand All @@ -129,3 +161,5 @@ export type Statuses = z.infer<typeof StatusesSchema>;
export const ConfigResponseSchema = z.object({
data: ConfigSchema,
});

export type NodeMetrics = z.infer<typeof NodeMetricsResponseSchema>;
13 changes: 13 additions & 0 deletions ui/api/topics/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
Topic,
TopicCreateResponse,
TopicCreateResponseSchema,
TopicMetrics,
TopicMetricsResponseSchema,
TopicResponse,
TopicsResponse,
TopicsResponseSchema,
Expand Down Expand Up @@ -206,3 +208,14 @@ export async function setTopicAsViewed(kafkaId: string, topicId: string) {
return viewedTopics;
}
}

export async function gettopicMetrics(
kafkaId: string,
topicId: number | string,
): Promise<ApiResponse<TopicMetrics>> {
return fetchData(
`/api/kafkas/${kafkaId}/topics/${topicId}/metrics`,
"",
(rawData) => TopicMetricsResponseSchema.parse(rawData),
);
}
Loading