Skip to content

feat(core): Expose service instance metrics via API and add new pending worker messages metric.#14495

Open
ammeek wants to merge 38 commits intodevelopfrom
feat/scale-workers-to-zero
Open

feat(core): Expose service instance metrics via API and add new pending worker messages metric.#14495
ammeek wants to merge 38 commits intodevelopfrom
feat/scale-workers-to-zero

Conversation

@ammeek
Copy link

@ammeek ammeek commented Feb 9, 2026

✨ Description

What does this PR change?
This PR adds the ability to query service instance metrics via the web server.
This PR also add a new metric for the amount of unconsumed messages on the worker queue.

🔗 Related Issue

Part of #424

🛠️ Backend Checklist

If this PR does not include any backend changes, delete this entire section.

  • Code compiles successfully and passes all checks
  • All unit and integration tests pass

@ammeek ammeek requested review from a team and fhussonnois February 9, 2026 11:24
@github-project-automation github-project-automation bot moved this to To review in Pull Requests Feb 9, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2026

🐋 Docker image

ghcr.io/kestra-io/kestra-pr:14495
docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ghcr.io/kestra-io/kestra-pr:14495 server local

🧪 Java Unit Tests

TestsPassed ✅Skipped ⚠️FailedTime ⏱
Java Tests Report3902 ran3887 ✅15 ⚠️0 ❌42m 40s 557ms

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2026

Tests report quick summary:

success ✅ > tests: 3902, success: 3887, skipped: 15, failed: 0

unfold for details
Project Status Success Skipped Failed
cli success ✅ 80 0 0
core success ✅ 1856 1 0
executor success ✅ 4 0 0
jdbc success ✅ 12 0 0
jdbc-h2 success ✅ 473 0 0
jdbc-mysql success ✅ 476 0 0
jdbc-postgres success ✅ 476 0 0
processor success ✅ 7 0 0
runner-memory success ✅ 1 0 0
scheduler success ✅ 23 0 0
script success ✅ 11 0 0
storage-local success ✅ 64 0 0
webserver success ✅ 414 0 0
worker success ✅ 4 0 0

Develocity build scan: https://develocity.kestra.io/s/qyjkzscd7yope

Copy link
Member

@loicmathieu loicmathieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a full review, @fhussonnois will do the final one.
But I think you mix consumerGroup with queueType, the worker group will be passed using a consumerGroup.

@ammeek
Copy link
Author

ammeek commented Feb 16, 2026

@fhussonnois and @loicmathieu this pr is now ready for review.

QueueInterface<WorkerJob> workerJobQueue = workerJobQueueProvider.get();
this.register(
getQueueLagForConsumerGroup(WORKERJOB_NAMED, null, Worker.class, workerJobQueue),
MetricRegistry.TAG_WORKER_GROUP, "default",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that if a user creates a worker group with the name default which is totally acceptable, it will clash with it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loicmathieu This seems like a valid issue. Unfortunately we are currently using the Micronaut management metric endpoint to fetch this value for the scaling work. The metrics endpoint only supports filtering for specific tag value not metrics without values. This means if we remove this value we won't be able to tell the difference between the cumulative metric value for all tags and the metric value for workers that are not inside of a specifically designed worker group. I can see three ways to resolve this issue, I've ordered them in preference from top to bottom:

  • Add a new endpoint which will allow us to filter metric values with grater granularity. If we where to do this I would suggest that we make it match the k8 metric provider spec as we can then expose the values as external metrics which could be useful in the future.
  • instead of defaulting to 'default' for as the tag value for workers that are not inside of a specifically designed worker group we could instead use the string value of 'null' this is significantly less likely to picked by a end user.
  • We could add a restriction to the names allow for worker groups to stop people from picking default. This solution seem the least appealing as it add a restriction to the user and decreases UX.

Copy link
Member

@loicmathieu loicmathieu Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use (default) in a bunch of other places, it may be a good idea to use this.
Unfortunately, we didn't add validation to the Worker group key so we accept any character.

My point is default has a high risk of being used, null is a bit akward so something else you be found that use special char so it has less risk to be used by a user.

(default) as when we log a worker group with no key, <default> of __default__ all work for me but not just default.

.expireAfterWrite(Duration.ofSeconds(30))
.build();

private final Set<String> availableWorkerGroups = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a concurrent collection

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But anyway, I'm not fan of maintaining the list of worker groups here, we already have that inside the WorkerGroupExecutor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this set and always fetch worker groups from the MetaStore, they are in cache their anyway


private final ExecutorService poolExecutor;
private final ExecutorService asyncPoolExecutor;
private final WorkerGroupExecutorInterface workerGroupExecutor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not used


scheduler.run();
assertTrue(queueCount.await(15, TimeUnit.SECONDS));
assertTrue(queueCount.await(30, TimeUnit.SECONDS));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good idea, it's already too long

}


@Scheduled(fixedDelay = "30000s", initialDelay = "30s")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You refresh each 8h?

Comment on lines 70 to 73
if (metric.tags().stream().map(Metric.Tag::key).noneMatch(key -> key.equals("instance_id"))) {
tags.add(MetricRegistry.SERVICE_ID);
tags.add(serviceInstance.uid());
metricTags.add(new Metric.Tag(MetricRegistry.SERVICE_ID, serviceInstance.uid()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good idea, this would lead to metrics explosion

metricKey, k -> new AtomicReference<>()
).set(metric.value());

String filteredName = metric.name().replaceAll("^kestra\\.", "");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you doing that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loicmathieu in the service instance table metrics are stored in the following format with the kestra prefix

kestra.metric.name

This means if we push the above value directly into the web servers metric registry then the resulting name will have two kestra prefixes e.g..

kestra.kestra.metric.name

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is configurable, you must get the configured prefix from kestra.metrics.prefix and do a substring


toRemove.forEach(metricKey -> {
log.debug("Removing metric {} from shared metrics, as the associated service instance is no longer active", metricKey);
metricRegistry.removeMeter(sharedMetricsGauges.remove(metricKey));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good idea, metrics should never be removed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @loicmathieu what do you think the idea behaviour here should be when there are no service instance pushing a metric for example when all worker are scaled to zero? Should the metric value reset to zero?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, setting the gauge to 0 will work

Comment on lines 32 to 34
private final Map<MetricKey, AtomicReference<Number>> sharedMetricsValues = new HashMap<>();

private final Map<MetricKey, io.micrometer.core.instrument.Gauge> sharedMetricsGauges = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are set and used by different threads so you need to use concurrent collections

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: To review

Development

Successfully merging this pull request may close these issues.

2 participants