Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,14 @@ dependencies {
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jsr310'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-guava'
}

api (group: 'com.azure', name: 'azure-monitor-query-metrics') {
// exclude libraries already provided by Kestra
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-xml'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jdk8'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jsr310'
exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-guava'
}
//Azure resource manager
implementation 'com.azure.resourcemanager:azure-resourcemanager-datafactory:1.2.0'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.kestra.plugin.azure.monitoring;

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.monitor.query.metrics.MetricsClient;
import com.azure.monitor.query.metrics.MetricsClientBuilder;
import com.azure.monitor.query.metrics.MetricsServiceVersion;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.HttpClient;
import io.kestra.core.http.client.HttpClientException;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.azure.AbstractAzureIdentityConnection;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractMonitoringTask extends AbstractAzureIdentityConnection {
@Schema(
title = "Azure Monitor Metrics regional endpoint",
description = "Must be the regional endpoint (e.g. https://westeurope.metrics.monitor.azure.com)"
)
@NotNull
protected Property<String> endpoint;

protected MetricsClient queryClient(RunContext runContext) throws IllegalVariableEvaluationException {
TokenCredential baseCredential = this.credentials(runContext);

TokenCredential scopedCredential = requestContext -> {
requestContext.setScopes(Collections.singletonList("https://metrics.monitor.azure.com/.default"));
return baseCredential.getToken(requestContext);
};

return new MetricsClientBuilder()
.credential(scopedCredential)
.endpoint(runContext.render(endpoint).as(String.class).orElseThrow())
.serviceVersion(MetricsServiceVersion.getLatest())
.buildClient();
}

protected HttpResponse<Map<String, Object>> ingestMetrics(RunContext runContext, String path, Map<String, Object> body) throws Exception {
var rEndpoint = runContext.render(endpoint).as(String.class).orElseThrow();
TokenCredential credential = this.credentials(runContext);

AccessToken token = credential
.getToken(new TokenRequestContext().addScopes("https://monitor.azure.com/.default"))
.block();

if (token == null) {
throw new IllegalStateException("Failed to acquire Azure access token for ingestion");
}

URI uri = URI.create(rEndpoint + path);

HttpRequest.HttpRequestBuilder builder = HttpRequest.builder()
.uri(uri)
.method("POST")
.addHeader("Authorization", "Bearer " + token.getToken())
.addHeader("Content-Type", "application/json")
.body(HttpRequest.JsonRequestBody.builder().content(body).build());

try (HttpClient client = HttpClient.builder().runContext(runContext).configuration(HttpConfiguration.builder().build()).build()) {
return client.request(builder.build());
} catch (IOException | HttpClientException e) {
throw new RuntimeException("Failed to post data to Azure Monitor ingestion API", e);
}
}
}
76 changes: 76 additions & 0 deletions src/main/java/io/kestra/plugin/azure/monitoring/Push.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.kestra.plugin.azure.monitoring;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.util.Map;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(title = "Query metrics from Azure Monitor.")
@Plugin(
examples = {
@Example(
title = "Query CPU utilization from Azure Monitor for multiple VMs",
code = """
id: azure_monitor_query
namespace: company.team
tasks:
- id: query
type: io.kestra.plugin.azure.monitoring.Query
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
resourceIds:
- "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm1"
- "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm2"
metricNames:
- "Percentage CPU"
metricsNamespace: "Microsoft.Compute/virtualMachines"
window: PT5M
aggregations:
- "Average"
- "Maximum"
"""
)
}
)
public class Push extends AbstractMonitoringTask implements RunnableTask<Push.Output> {
@Schema(title = "DCR ingestion path")
@NotNull
private Property<String> path;

@Schema(title = "Metric data body")
@NotNull
private Property<Map<String, Object>> metrics;

@Override
public Output run(RunContext runContext) throws Exception {
var rPath = runContext.render(path).as(String.class).orElseThrow();
var rMetrics = runContext.render(metrics).asMap(String.class, Object.class);

var response = ingestMetrics(runContext, rPath, rMetrics);

runContext.logger().info("Ingestion request completed with status {}", response.getStatus());

return Output.builder()
.body(response.getBody())
.build();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
private final Map<String, Object> body;
}
}
209 changes: 209 additions & 0 deletions src/main/java/io/kestra/plugin/azure/monitoring/Query.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package io.kestra.plugin.azure.monitoring;

import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.monitor.query.metrics.MetricsClient;
import com.azure.monitor.query.metrics.models.AggregationType;
import com.azure.monitor.query.metrics.models.MetricsQueryResourcesOptions;
import com.azure.monitor.query.metrics.models.MetricsQueryResourcesResult;
import com.azure.monitor.query.metrics.models.MetricsQueryResult;
import com.azure.monitor.query.metrics.models.MetricsQueryTimeInterval;
import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.models.annotations.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(title = "Query metrics from Azure Monitor.")
@Plugin(
examples = {
@Example(
title = "Query CPU utilization from Azure Monitor for multiple VMs",
code = """
id: azure_monitor_query
namespace: company.team
tasks:
- id: query
type: io.kestra.plugin.azure.monitoring.Query
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
resourceIds:
- "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm1"
- "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm2"
metricNames:
- "Percentage CPU"
metricsNamespace: "Microsoft.Compute/virtualMachines"
window: PT5M
aggregations:
- "Average"
- "Maximum"
"""
)
}
)
public class Query extends AbstractMonitoringTask implements RunnableTask<Query.Output> {
@Schema(title = "List of Azure Resource IDs to query metrics from")
@NotNull
private Property<List<String>> resourceIds;

@Schema(title = "List of metric names to query")
@NotNull
private Property<List<String>> metricNames;

@Schema(
title = "Metrics namespace",
description = "The namespace of the metrics, e.g., 'Microsoft.Compute/virtualMachines'"
)
@NotNull
private Property<String> metricsNamespace;

@Schema(
title = "Time window for metrics",
description = "Duration looking back from now, e.g., PT5M for 5 minutes"
)
@Builder.Default
private Property<Duration> window = Property.ofValue(Duration.ofMinutes(5));

@Schema(
title = "Aggregation types",
description = "List of aggregation types: Average, Total, Maximum, Minimum, Count"
)
private Property<List<String>> aggregations;

@Schema(
title = "Time grain for data aggregation",
description = "ISO 8601 duration format, e.g., PT1M for 1 minute"
)
private Property<Duration> interval;

@Schema(title = "Filter expression to apply to the query")
private Property<String> filter;

@Schema(title = "Top N time series to return")
private Property<Integer> top;

@Schema(title = "Order by clause for sorting results")
private Property<String> orderBy;

@Schema(
title = "Dimension name(s) to rollup results by",
description = "For example, 'City' to combine multiple city dimension values into one timeseries"
)
private Property<String> rollupBy;

@Override
public Output run(RunContext runContext) throws Exception {
var rResourceIds = runContext.render(this.resourceIds).asList(String.class);
var rMetricNames = runContext.render(this.metricNames).asList(String.class);
var rMetricsNamespace = runContext.render(this.metricsNamespace).as(String.class).orElseThrow();
var rWindow = runContext.render(this.window).as(Duration.class).orElse(Duration.ofMinutes(5));
var rAggregations = runContext.render(this.aggregations).asList(String.class);
var rInterval = runContext.render(this.interval).as(Duration.class).orElse(null);
var rFilter = runContext.render(this.filter).as(String.class).orElse(null);
var rTop = runContext.render(this.top).as(Integer.class).orElse(null);
var rOrderBy = runContext.render(this.orderBy).as(String.class).orElse(null);
var rRollupBy = runContext.render(this.rollupBy).as(String.class).orElse(null);
var endTime = OffsetDateTime.now(ZoneOffset.UTC);
OffsetDateTime startTime = endTime.minus(rWindow);

AtomicInteger datapoints = new AtomicInteger();
AtomicInteger metrics = new AtomicInteger();

MetricsQueryResourcesOptions options = new MetricsQueryResourcesOptions();
options.setTimeInterval(new MetricsQueryTimeInterval(startTime, endTime));

if (rAggregations != null && !rAggregations.isEmpty()) {
List<AggregationType> aggregationTypes = rAggregations.stream()
.map(AggregationType::fromString)
.toList();
options.setAggregations(aggregationTypes);
}

if (rInterval != null) {
options.setGranularity(rInterval);
}

if (rFilter != null) {
options.setFilter(rFilter);
}

if (rTop != null) {
options.setTop(rTop);
}

if (rOrderBy != null) {
options.setOrderBy(rOrderBy);
}

if (rRollupBy != null) {
options.setRollupBy(rRollupBy);
}

var client = this.queryClient(runContext);

Response<MetricsQueryResourcesResult> result = client.queryResourcesWithResponse(rResourceIds, rMetricNames, rMetricsNamespace, options, Context.NONE);

List<Map<String, Object>> metricsResults = result.getValue()
.getMetricsQueryResults()
.stream()
.peek(r -> {
for (var m : r.getMetrics()) {
metrics.incrementAndGet();
m.getTimeSeries().forEach(ts -> datapoints.addAndGet(ts.getValues().size()));
}
})
.map(r -> JacksonMapper.ofJson().convertValue(r, new TypeReference<Map<String, Object>>() {}))
.toList();


runContext.logger().info("Fetched {} datapoints across {} metrics from {} resources", datapoints.get(), metrics.get(), result.getValue().getMetricsQueryResults().size());

return Output.builder()
.datapoints(datapoints.get())
.metrics(metrics.get())
.results(metricsResults)
.resources(metricsResults.size())
.build();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "Total number of datapoints fetched across all metrics and resources"
)
private final Integer datapoints;

@Schema(
title = "Total number of unique metrics fetched across all resources"
)
private final Integer metrics;

@Schema(
title = "Total number of resources queried"
)
private final Integer resources;

@Schema(
title = "List of metrics results"
)
private final List<Map<String, Object>> results;
}
}
Loading
Loading