diff --git a/build.gradle b/build.gradle index 93abdc9a..edd12bde 100644 --- a/build.gradle +++ b/build.gradle @@ -127,7 +127,14 @@ dependencies { exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jsr310' exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-guava' } - + api (group: 'com.azure', name: 'azure-monitor-query-metrics') { + // exclude libraries already provided by Kestra + exclude group: 'com.fasterxml.jackson.core' + exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-xml' + exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jdk8' + exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-jsr310' + exclude group: 'com.fasterxml.jackson.datatype', module: 'jackson-datatype-guava' + } //Azure resource manager implementation 'com.azure.resourcemanager:azure-resourcemanager-datafactory:1.2.0' } diff --git a/src/main/java/io/kestra/plugin/azure/monitoring/AbstractMonitoringTask.java b/src/main/java/io/kestra/plugin/azure/monitoring/AbstractMonitoringTask.java new file mode 100644 index 00000000..7368eb2c --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/monitoring/AbstractMonitoringTask.java @@ -0,0 +1,83 @@ +package io.kestra.plugin.azure.monitoring; + +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import com.azure.monitor.query.metrics.MetricsClient; +import com.azure.monitor.query.metrics.MetricsClientBuilder; +import com.azure.monitor.query.metrics.MetricsServiceVersion; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.http.HttpRequest; +import io.kestra.core.http.HttpResponse; +import io.kestra.core.http.client.HttpClient; +import io.kestra.core.http.client.HttpClientException; +import io.kestra.core.http.client.configurations.HttpConfiguration; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.AbstractAzureIdentityConnection; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +public abstract class AbstractMonitoringTask extends AbstractAzureIdentityConnection { + @Schema( + title = "Azure Monitor Metrics regional endpoint", + description = "Must be the regional endpoint (e.g. https://westeurope.metrics.monitor.azure.com)" + ) + @NotNull + protected Property endpoint; + + protected MetricsClient queryClient(RunContext runContext) throws IllegalVariableEvaluationException { + TokenCredential baseCredential = this.credentials(runContext); + + TokenCredential scopedCredential = requestContext -> { + requestContext.setScopes(Collections.singletonList("https://metrics.monitor.azure.com/.default")); + return baseCredential.getToken(requestContext); + }; + + return new MetricsClientBuilder() + .credential(scopedCredential) + .endpoint(runContext.render(endpoint).as(String.class).orElseThrow()) + .serviceVersion(MetricsServiceVersion.getLatest()) + .buildClient(); + } + + protected HttpResponse> ingestMetrics(RunContext runContext, String path, Map body) throws Exception { + var rEndpoint = runContext.render(endpoint).as(String.class).orElseThrow(); + TokenCredential credential = this.credentials(runContext); + + AccessToken token = credential + .getToken(new TokenRequestContext().addScopes("https://monitor.azure.com/.default")) + .block(); + + if (token == null) { + throw new IllegalStateException("Failed to acquire Azure access token for ingestion"); + } + + URI uri = URI.create(rEndpoint + path); + + HttpRequest.HttpRequestBuilder builder = HttpRequest.builder() + .uri(uri) + .method("POST") + .addHeader("Authorization", "Bearer " + token.getToken()) + .addHeader("Content-Type", "application/json") + .body(HttpRequest.JsonRequestBody.builder().content(body).build()); + + try (HttpClient client = HttpClient.builder().runContext(runContext).configuration(HttpConfiguration.builder().build()).build()) { + return client.request(builder.build()); + } catch (IOException | HttpClientException e) { + throw new RuntimeException("Failed to post data to Azure Monitor ingestion API", e); + } + } +} diff --git a/src/main/java/io/kestra/plugin/azure/monitoring/Push.java b/src/main/java/io/kestra/plugin/azure/monitoring/Push.java new file mode 100644 index 00000000..54bda423 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/monitoring/Push.java @@ -0,0 +1,76 @@ +package io.kestra.plugin.azure.monitoring; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema(title = "Query metrics from Azure Monitor.") +@Plugin( + examples = { + @Example( + title = "Query CPU utilization from Azure Monitor for multiple VMs", + code = """ + id: azure_monitor_query + namespace: company.team + tasks: + - id: query + type: io.kestra.plugin.azure.monitoring.Query + tenantId: "{{ secret('AZURE_TENANT_ID') }}" + clientId: "{{ secret('AZURE_CLIENT_ID') }}" + clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}" + resourceIds: + - "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm1" + - "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm2" + metricNames: + - "Percentage CPU" + metricsNamespace: "Microsoft.Compute/virtualMachines" + window: PT5M + aggregations: + - "Average" + - "Maximum" + """ + ) + } +) +public class Push extends AbstractMonitoringTask implements RunnableTask { + @Schema(title = "DCR ingestion path") + @NotNull + private Property path; + + @Schema(title = "Metric data body") + @NotNull + private Property> metrics; + + @Override + public Output run(RunContext runContext) throws Exception { + var rPath = runContext.render(path).as(String.class).orElseThrow(); + var rMetrics = runContext.render(metrics).asMap(String.class, Object.class); + + var response = ingestMetrics(runContext, rPath, rMetrics); + + runContext.logger().info("Ingestion request completed with status {}", response.getStatus()); + + return Output.builder() + .body(response.getBody()) + .build(); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + private final Map body; + } +} diff --git a/src/main/java/io/kestra/plugin/azure/monitoring/Query.java b/src/main/java/io/kestra/plugin/azure/monitoring/Query.java new file mode 100644 index 00000000..b6053287 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/monitoring/Query.java @@ -0,0 +1,209 @@ +package io.kestra.plugin.azure.monitoring; + +import com.azure.core.http.rest.Response; +import com.azure.core.util.Context; +import com.azure.monitor.query.metrics.MetricsClient; +import com.azure.monitor.query.metrics.models.AggregationType; +import com.azure.monitor.query.metrics.models.MetricsQueryResourcesOptions; +import com.azure.monitor.query.metrics.models.MetricsQueryResourcesResult; +import com.azure.monitor.query.metrics.models.MetricsQueryResult; +import com.azure.monitor.query.metrics.models.MetricsQueryTimeInterval; +import com.fasterxml.jackson.core.type.TypeReference; +import io.kestra.core.models.annotations.*; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.JacksonMapper; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema(title = "Query metrics from Azure Monitor.") +@Plugin( + examples = { + @Example( + title = "Query CPU utilization from Azure Monitor for multiple VMs", + code = """ + id: azure_monitor_query + namespace: company.team + tasks: + - id: query + type: io.kestra.plugin.azure.monitoring.Query + tenantId: "{{ secret('AZURE_TENANT_ID') }}" + clientId: "{{ secret('AZURE_CLIENT_ID') }}" + clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}" + resourceIds: + - "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm1" + - "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm2" + metricNames: + - "Percentage CPU" + metricsNamespace: "Microsoft.Compute/virtualMachines" + window: PT5M + aggregations: + - "Average" + - "Maximum" + """ + ) + } +) +public class Query extends AbstractMonitoringTask implements RunnableTask { + @Schema(title = "List of Azure Resource IDs to query metrics from") + @NotNull + private Property> resourceIds; + + @Schema(title = "List of metric names to query") + @NotNull + private Property> metricNames; + + @Schema( + title = "Metrics namespace", + description = "The namespace of the metrics, e.g., 'Microsoft.Compute/virtualMachines'" + ) + @NotNull + private Property metricsNamespace; + + @Schema( + title = "Time window for metrics", + description = "Duration looking back from now, e.g., PT5M for 5 minutes" + ) + @Builder.Default + private Property window = Property.ofValue(Duration.ofMinutes(5)); + + @Schema( + title = "Aggregation types", + description = "List of aggregation types: Average, Total, Maximum, Minimum, Count" + ) + private Property> aggregations; + + @Schema( + title = "Time grain for data aggregation", + description = "ISO 8601 duration format, e.g., PT1M for 1 minute" + ) + private Property interval; + + @Schema(title = "Filter expression to apply to the query") + private Property filter; + + @Schema(title = "Top N time series to return") + private Property top; + + @Schema(title = "Order by clause for sorting results") + private Property orderBy; + + @Schema( + title = "Dimension name(s) to rollup results by", + description = "For example, 'City' to combine multiple city dimension values into one timeseries" + ) + private Property rollupBy; + + @Override + public Output run(RunContext runContext) throws Exception { + var rResourceIds = runContext.render(this.resourceIds).asList(String.class); + var rMetricNames = runContext.render(this.metricNames).asList(String.class); + var rMetricsNamespace = runContext.render(this.metricsNamespace).as(String.class).orElseThrow(); + var rWindow = runContext.render(this.window).as(Duration.class).orElse(Duration.ofMinutes(5)); + var rAggregations = runContext.render(this.aggregations).asList(String.class); + var rInterval = runContext.render(this.interval).as(Duration.class).orElse(null); + var rFilter = runContext.render(this.filter).as(String.class).orElse(null); + var rTop = runContext.render(this.top).as(Integer.class).orElse(null); + var rOrderBy = runContext.render(this.orderBy).as(String.class).orElse(null); + var rRollupBy = runContext.render(this.rollupBy).as(String.class).orElse(null); + var endTime = OffsetDateTime.now(ZoneOffset.UTC); + OffsetDateTime startTime = endTime.minus(rWindow); + + AtomicInteger datapoints = new AtomicInteger(); + AtomicInteger metrics = new AtomicInteger(); + + MetricsQueryResourcesOptions options = new MetricsQueryResourcesOptions(); + options.setTimeInterval(new MetricsQueryTimeInterval(startTime, endTime)); + + if (rAggregations != null && !rAggregations.isEmpty()) { + List aggregationTypes = rAggregations.stream() + .map(AggregationType::fromString) + .toList(); + options.setAggregations(aggregationTypes); + } + + if (rInterval != null) { + options.setGranularity(rInterval); + } + + if (rFilter != null) { + options.setFilter(rFilter); + } + + if (rTop != null) { + options.setTop(rTop); + } + + if (rOrderBy != null) { + options.setOrderBy(rOrderBy); + } + + if (rRollupBy != null) { + options.setRollupBy(rRollupBy); + } + + var client = this.queryClient(runContext); + + Response result = client.queryResourcesWithResponse(rResourceIds, rMetricNames, rMetricsNamespace, options, Context.NONE); + + List> metricsResults = result.getValue() + .getMetricsQueryResults() + .stream() + .peek(r -> { + for (var m : r.getMetrics()) { + metrics.incrementAndGet(); + m.getTimeSeries().forEach(ts -> datapoints.addAndGet(ts.getValues().size())); + } + }) + .map(r -> JacksonMapper.ofJson().convertValue(r, new TypeReference>() {})) + .toList(); + + + runContext.logger().info("Fetched {} datapoints across {} metrics from {} resources", datapoints.get(), metrics.get(), result.getValue().getMetricsQueryResults().size()); + + return Output.builder() + .datapoints(datapoints.get()) + .metrics(metrics.get()) + .results(metricsResults) + .resources(metricsResults.size()) + .build(); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "Total number of datapoints fetched across all metrics and resources" + ) + private final Integer datapoints; + + @Schema( + title = "Total number of unique metrics fetched across all resources" + ) + private final Integer metrics; + + @Schema( + title = "Total number of resources queried" + ) + private final Integer resources; + + @Schema( + title = "List of metrics results" + ) + private final List> results; + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/azure/monitoring/Trigger.java b/src/main/java/io/kestra/plugin/azure/monitoring/Trigger.java new file mode 100644 index 00000000..bd800e25 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/monitoring/Trigger.java @@ -0,0 +1,164 @@ +package io.kestra.plugin.azure.monitoring; + +import io.kestra.core.models.annotations.*; +import io.kestra.core.models.conditions.ConditionContext; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.triggers.*; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.*; +import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema(title = "Trigger a flow when Azure Monitor metrics match a query condition.") +@Plugin( + examples = { + @Example( + title = "Trigger when Azure Monitor metric query returns non-empty results", + full = true, + code = """ + id: azure_monitor_trigger + namespace: company.team + + tasks: + - id: each + type: io.kestra.plugin.core.flow.ForEach + values: "{{ trigger.series }}" + tasks: + - id: log + type: io.kestra.plugin.core.log.Log + message: "Datapoint: {{ json(taskrun.value) }}" + + triggers: + - id: watch + type: io.kestra.plugin.azure.monitoring.Trigger + interval: "PT1M" + tenantId: "{{ secret('AZURE_TENANT_ID') }}" + clientId: "{{ secret('AZURE_CLIENT_ID') }}" + clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}" + resourceIds: + - "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm1" + metricNames: + - "Percentage CPU" + metricsNamespace: "Microsoft.Compute/virtualMachines" + window: PT5M + aggregations: + - "Average" + """ + ), + @Example( + title = "Trigger when CPU exceeds threshold with filter", + full = true, + code = """ + id: azure_monitor_cpu_alert + namespace: company.team + + tasks: + - id: alert + type: io.kestra.plugin.core.log.Log + message: "High CPU detected: {{ trigger.count }} datapoints" + + triggers: + - id: watch_cpu + type: io.kestra.plugin.azure.monitoring.Trigger + interval: "PT5M" + tenantId: "{{ secret('AZURE_TENANT_ID') }}" + clientId: "{{ secret('AZURE_CLIENT_ID') }}" + clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}" + resourceIds: + - "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm1" + - "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Compute/virtualMachines/vm2" + metricNames: + - "Percentage CPU" + metricsNamespace: "Microsoft.Compute/virtualMachines" + window: PT5M + aggregations: + - "Average" + filter: "Average gt 80" + """ + ) + } +) +public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput { + protected Property tenantId; + + protected Property clientId; + + protected Property clientSecret; + + protected Property subscriptionId; + + @NotNull + protected Property endpoint; + + @Builder.Default + private final Duration interval = Duration.ofSeconds(60); + + private Property> resourceIds; + + private Property> metricNames; + + private Property metricsNamespace; + + @Builder.Default + private Property window = Property.ofValue(Duration.ofMinutes(5)); + + private Property> aggregations; + + private Property granularity; + + private Property filter; + + private Property top; + + private Property orderBy; + + private Property rollupBy; + + @Override + public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { + RunContext runContext = conditionContext.getRunContext(); + Logger logger = runContext.logger(); + + Query.Output output = Query.builder() + .id(this.id) + .type(Query.class.getName()) + .tenantId(this.tenantId) + .clientId(this.clientId) + .clientSecret(this.clientSecret) + .resourceIds(this.resourceIds) + .metricNames(this.metricNames) + .metricsNamespace(this.metricsNamespace) + .window(this.window) + .aggregations(this.aggregations) + .interval(this.granularity) + .filter(this.filter) + .top(this.top) + .orderBy(this.orderBy) + .rollupBy(this.rollupBy) + .endpoint(this.endpoint) + .build() + .run(runContext); + + runContext.logger().info("Fetched {} datapoints across {} metrics from {} resources", output.getDatapoints(), output.getMetrics(), output.getResults().size()); + + if (output.getDatapoints() == 0) { + return Optional.empty(); + } + + return Optional.of( + TriggerService.generateExecution(this, conditionContext, context, output) + ); + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/azure/monitoring/package-info.java b/src/main/java/io/kestra/plugin/azure/monitoring/package-info.java new file mode 100644 index 00000000..780149e0 --- /dev/null +++ b/src/main/java/io/kestra/plugin/azure/monitoring/package-info.java @@ -0,0 +1,9 @@ +@PluginSubGroup( + description = "This sub-group of plugins contains tasks for using Azure Monitor. \n" + + "Azure Monitor provides full-stack observability across applications, infrastructure, and networks, " + + "and enables querying and pushing metrics using Azure Monitor Query and Ingestion APIs.", + categories = { PluginSubGroup.PluginCategory.CLOUD } +) +package io.kestra.plugin.azure.monitoring; + +import io.kestra.core.models.annotations.PluginSubGroup; diff --git a/src/main/resources/icons/io.kestra.plugin.azure.monitoring.svg b/src/main/resources/icons/io.kestra.plugin.azure.monitoring.svg new file mode 100644 index 00000000..544ab408 --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.azure.monitoring.svg @@ -0,0 +1 @@ +Icon-manage-317 \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/monitoring/QueryTest.java b/src/test/java/io/kestra/plugin/azure/monitoring/QueryTest.java new file mode 100644 index 00000000..2b8979b1 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/monitoring/QueryTest.java @@ -0,0 +1,72 @@ +package io.kestra.plugin.azure.monitoring; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +@Disabled("To run this test provide your service principal credentials") +class QueryTest { + + @Inject + private RunContextFactory runContextFactory; + + @Value("${kestra.variables.globals.azure.monitoring.tenantId}") + protected String tenantId; + + @Value("${kestra.variables.globals.azure.monitoring.clientId}") + protected String clientId; + + @Value("${kestra.variables.globals.azure.monitoring.clientSecret}") + protected String clientSecret; + + @Value("${kestra.variables.globals.azure.monitoring.resourceId}") + protected String resourceId; + + @Test + void testQueryResource() throws Exception { + RunContext runContext = runContextFactory.of(); + + Query task = Query.builder() + .id(QueryTest.class.getSimpleName()) + .type(Query.class.getName()) + .tenantId(Property.ofValue(this.tenantId)) + .clientId(Property.ofValue(this.clientId)) + .clientSecret(Property.ofValue(this.clientSecret)) + .resourceIds(Property.ofValue(List.of(this.resourceId))) + .metricNames(Property.ofValue(List.of("Percentage CPU"))) + .metricsNamespace(Property.ofValue("Microsoft.Compute/virtualMachines")) + .window(Property.ofValue(Duration.ofMinutes(5))) + .aggregations(Property.ofValue(List.of("Average", "Maximum"))) + .build(); + + Query.Output output = task.run(runContext); + + assertThat(output, notNullValue()); + assertThat(output.getDatapoints(), greaterThanOrEqualTo(0)); + assertThat(output.getResources(), notNullValue()); + + Map firstResult = output.getResults().getFirst(); + assertThat(firstResult, hasKey("metrics")); + + @SuppressWarnings("unchecked") + List> metrics = (List>) firstResult.get("metrics"); + assertThat(metrics, not(empty())); + + Map firstMetric = metrics.getFirst(); + assertThat(firstMetric, hasKey("metricName")); + assertThat(firstMetric.get("metricName"), is("Percentage CPU")); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/azure/monitoring/TriggerTest.java b/src/test/java/io/kestra/plugin/azure/monitoring/TriggerTest.java new file mode 100644 index 00000000..04f9dcf9 --- /dev/null +++ b/src/test/java/io/kestra/plugin/azure/monitoring/TriggerTest.java @@ -0,0 +1,65 @@ +package io.kestra.plugin.azure.monitoring; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.conditions.ConditionContext; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.utils.TestsUtils; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +@KestraTest +@Disabled("To run this test provide your service principal credentials") +public class TriggerTest { + @Inject + private RunContextFactory runContextFactory; + + @Value("${kestra.variables.globals.azure.monitoring.tenantId}") + protected String tenantId; + + @Value("${kestra.variables.globals.azure.monitoring.resourceId}") + protected String resourceId; + + @Test + void shouldTriggerWhenMetricsExist() throws Exception { + Trigger trigger = Trigger.builder() + .id("monitor-" + System.currentTimeMillis()) + .type(Trigger.class.getName()) + .tenantId(Property.ofValue(tenantId)) + .endpoint(Property.ofValue("some/endpoint")) + .resourceIds(Property.ofValue(List.of(resourceId))) + .metricNames(Property.ofValue(List.of("Percentage CPU"))) + .metricsNamespace(Property.ofValue("Microsoft.Compute/virtualMachines")) + .window(Property.ofValue(Duration.ofMinutes(5))) + .aggregations(Property.ofValue(List.of("Average"))) + .interval(Duration.ofMinutes(1)) + .build(); + + Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); + + Optional execution = trigger.evaluate(context.getKey(), context.getValue()); + + assertThat(execution.isPresent(), is(true)); + Execution exec = execution.get(); + + assertThat(exec.getTrigger().getId(), startsWith("monitor-")); + assertThat(exec.getTrigger().getVariables(), hasKey("datapoints")); + assertThat((Integer) exec.getTrigger().getVariables().get("datapoints"), greaterThan(0)); + + List> results = (List>) exec.getTrigger().getVariables().get("results"); + assertThat(results, not(empty())); + assertThat(results.getFirst(), hasKey("metrics")); + } +} diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 2424d4e3..00a8fa5a 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -33,4 +33,7 @@ kestra: datafactory: tenantId: "" subscriptionId: "" - + monitoring: + tenantId: "" + resourceId: "/subscriptions/YOUR_SUB_ID/resourceGroups/YOUR_RG/providers/Microsoft.Compute/virtualMachines/YOUR_VM" + metricsNamespace: "Microsoft.Compute/virtualMachines" \ No newline at end of file