diff --git a/pkg/scheduler/in_memory_build_queue.go b/pkg/scheduler/in_memory_build_queue.go index 8a11b57c..14c052a4 100644 --- a/pkg/scheduler/in_memory_build_queue.go +++ b/pkg/scheduler/in_memory_build_queue.go @@ -5,7 +5,9 @@ import ( "context" "encoding/json" "fmt" + "maps" "math" + "slices" "sort" "strconv" "sync" @@ -46,57 +48,69 @@ import ( var ( inMemoryBuildQueuePrometheusMetrics sync.Once - inMemoryBuildQueueInFlightDeduplicationsTotal = prometheus.NewCounterVec( + schedulerRegistry = prometheus.NewRegistry() + + inMemoryBuildQueueInFlightDeduplicationsTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_in_flight_deduplications_total", Help: "Number of times an Execute() request of a cacheable action was performed, and whether it was in-flight deduplicated against an existing task.", }, - []string{"instance_name_prefix", "platform", "size_class", "outcome"}) + []string{"outcome"}, + } - inMemoryBuildQueueInvocationsCreatedTotal = prometheus.NewCounterVec( + inMemoryBuildQueueInvocationsCreatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_created_total", Help: "Number of times an invocation object was created by creating a size class queue or scheduling a task through Execute().", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) - inMemoryBuildQueueInvocationsActivatedTotal = prometheus.NewCounterVec( + []string{"depth"}, + } + + inMemoryBuildQueueInvocationsActivatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_activated_total", Help: "Number of times an invocation object transitioned from being idle to having queued or executing operations.", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) - inMemoryBuildQueueInvocationsDeactivatedTotal = prometheus.NewCounterVec( + []string{"depth"}, + } + + inMemoryBuildQueueInvocationsDeactivatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_deactivated_total", Help: "Number of times an invocation object transitioned from having queued or executing operations to being idle.", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) - inMemoryBuildQueueInvocationsRemovedTotal = prometheus.NewCounterVec( + []string{"depth"}, + } + + inMemoryBuildQueueInvocationsRemovedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_removed_total", Help: "Number of times an invocation object was removed.", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) + []string{"depth"}, + } - inMemoryBuildQueueTasksScheduledTotal = prometheus.NewCounterVec( + inMemoryBuildQueueTasksScheduledTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_tasks_scheduled_total", Help: "Number of times tasks were scheduled, either by calling Execute() or through initial size class selection retries.", }, - []string{"instance_name_prefix", "platform", "size_class", "assignment", "do_not_cache"}) - inMemoryBuildQueueTasksQueuedDurationSeconds = prometheus.NewHistogramVec( + []string{"assignment", "do_not_cache"}, + } + + inMemoryBuildQueueTasksQueuedDurationSecondsTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -104,8 +118,10 @@ var ( Help: "Time in seconds that tasks were queued before executing.", Buckets: util.DecimalExponentialBuckets(-3, 6, 2), }, - []string{"instance_name_prefix", "platform", "size_class"}) - inMemoryBuildQueueTasksExecutingDurationSeconds = prometheus.NewHistogramVec( + []string{}, + } + + inMemoryBuildQueueTasksExecutingDurationSecondsTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -113,8 +129,10 @@ var ( Help: "Time in seconds that tasks were executing before completing.", Buckets: util.DecimalExponentialBuckets(-3, 6, 2), }, - []string{"instance_name_prefix", "platform", "size_class", "result", "grpc_code"}) - inMemoryBuildQueueTasksExecutingRetries = prometheus.NewHistogramVec( + []string{"result", "grpc_code"}, + } + + inMemoryBuildQueueTasksExecutingRetriesTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -122,8 +140,10 @@ var ( Help: "Number of times that tasks were retried before completing.", Buckets: prometheus.LinearBuckets(0, 1, 11), }, - []string{"instance_name_prefix", "platform", "size_class", "result", "grpc_code"}) - inMemoryBuildQueueTasksCompletedDurationSeconds = prometheus.NewHistogramVec( + []string{"result", "grpc_code"}, + } + + inMemoryBuildQueueTasksCompletedDurationSecondsTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -131,34 +151,40 @@ var ( Help: "Time in seconds that tasks were completed before being removed.", Buckets: util.DecimalExponentialBuckets(-3, 6, 2), }, - []string{"instance_name_prefix", "platform", "size_class"}) + []string{}, + } - inMemoryBuildQueueWorkersCreatedTotal = prometheus.NewCounterVec( + inMemoryBuildQueueWorkersCreatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_workers_created_total", Help: "Number of workers created by Synchronize().", }, - []string{"instance_name_prefix", "platform", "size_class"}) - inMemoryBuildQueueWorkersTerminatingTotal = prometheus.NewCounterVec( + []string{}, + } + + inMemoryBuildQueueWorkersTerminatingTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_workers_terminating_total", Help: "Number of workers that have entered the terminating state.", }, - []string{"instance_name_prefix", "platform", "size_class"}) - inMemoryBuildQueueWorkersRemovedTotal = prometheus.NewCounterVec( + []string{}, + } + + inMemoryBuildQueueWorkersRemovedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_workers_removed_total", Help: "Number of workers removed due to expiration.", }, - []string{"instance_name_prefix", "platform", "size_class", "state"}) + []string{"state"}, + } - inMemoryBuildQueueWorkerInvocationStickinessRetained = prometheus.NewHistogramVec( + inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -166,9 +192,48 @@ var ( Help: "How many levels of worker invocation stickiness were respected, as configured through worker_invocation_stickiness_limits.", Buckets: prometheus.LinearBuckets(0, 1, 11), }, - []string{"instance_name_prefix", "platform", "size_class"}) + []string{}, + } ) +// counterTemplate is a utility struct for generating and registering prometheus.CounterVec +// resources from a set of options, and required labels to appear in the resulting vec. +type counterTemplate struct { + opts prometheus.CounterOpts + requiredLabels []string +} + +// createMetric creates an instance of the counter vec, including whatever additional labels are provided. +func (ct *counterTemplate) createMetric(additionalLabels []string) *prometheus.CounterVec { + return prometheus.NewCounterVec(ct.opts, append(ct.requiredLabels, additionalLabels...)) +} + +// createAndRegisterMetric performs the same task as createMetric, but also registers the metric in the provided registry. +func (ct *counterTemplate) createAndRegisterMetric(additionalLabels []string, registry *prometheus.Registry) *prometheus.CounterVec { + metric := ct.createMetric(additionalLabels) + registry.MustRegister(metric) + return metric +} + +// counterTemplate is a utility struct for generating and registering prometheus.HistogramVec +// resources from a set of options, and required labels to appear in the resulting vec. +type histogramTemplate struct { + opts prometheus.HistogramOpts + requiredLabels []string +} + +// createMetric creates an instance of the counter vec, including whatever additional labels are provided. +func (ht *histogramTemplate) createMetric(additionalLabels []string) *prometheus.HistogramVec { + return prometheus.NewHistogramVec(ht.opts, append(ht.requiredLabels, additionalLabels...)) +} + +// createAndRegisterMetric performs the same task as createMetric, but also registers the metric in the provided registry. +func (ht *histogramTemplate) createAndRegisterMetric(additionalLabels []string, registry *prometheus.Registry) *prometheus.HistogramVec { + metric := ht.createMetric(additionalLabels) + registry.MustRegister(metric) + return metric +} + // InMemoryBuildQueueConfiguration contains all the tunable settings of // the InMemoryBuildQueue. type InMemoryBuildQueueConfiguration struct { @@ -284,24 +349,7 @@ var inMemoryBuildQueueCapabilitiesProvider = capabilities.NewStaticProvider(&rem // execution requests. All of these are created by sending it RPCs. func NewInMemoryBuildQueue(contentAddressableStorage blobstore.BlobAccess, clock clock.Clock, uuidGenerator util.UUIDGenerator, configuration *InMemoryBuildQueueConfiguration, maximumMessageSizeBytes int, actionRouter routing.ActionRouter, executeAuthorizer, modifyDrainsAuthorizer, killOperationsAuthorizer auth.Authorizer) *InMemoryBuildQueue { inMemoryBuildQueuePrometheusMetrics.Do(func() { - prometheus.MustRegister(inMemoryBuildQueueInFlightDeduplicationsTotal) - - prometheus.MustRegister(inMemoryBuildQueueInvocationsCreatedTotal) - prometheus.MustRegister(inMemoryBuildQueueInvocationsActivatedTotal) - prometheus.MustRegister(inMemoryBuildQueueInvocationsDeactivatedTotal) - prometheus.MustRegister(inMemoryBuildQueueInvocationsRemovedTotal) - - prometheus.MustRegister(inMemoryBuildQueueTasksScheduledTotal) - prometheus.MustRegister(inMemoryBuildQueueTasksQueuedDurationSeconds) - prometheus.MustRegister(inMemoryBuildQueueTasksExecutingDurationSeconds) - prometheus.MustRegister(inMemoryBuildQueueTasksExecutingRetries) - prometheus.MustRegister(inMemoryBuildQueueTasksCompletedDurationSeconds) - - prometheus.MustRegister(inMemoryBuildQueueWorkersCreatedTotal) - prometheus.MustRegister(inMemoryBuildQueueWorkersTerminatingTotal) - prometheus.MustRegister(inMemoryBuildQueueWorkersRemovedTotal) - - prometheus.MustRegister(inMemoryBuildQueueWorkerInvocationStickinessRetained) + prometheus.MustRegister(schedulerRegistry) }) return &InMemoryBuildQueue{ @@ -330,6 +378,23 @@ var ( _ buildqueuestate.BuildQueueStateServer = (*InMemoryBuildQueue)(nil) ) +// getPlatformProperties returns the list of platform properties from a remoteexecution.Platform +// as a map. Multiple instances of the same property name are concatenated with a comma delimeter. +func getPlatformProperties(platform *remoteexecution.Platform) map[string]string { + if platform == nil { + return make(map[string]string, 0) + } + properties := make(map[string]string, len(platform.Properties)) + for _, property := range platform.Properties { + if currVal, ok := properties[property.Name]; ok { + properties[property.Name] = fmt.Sprintf("%s,%s", currVal, property.Value) + } else { + properties[property.Name] = property.Value + } + } + return properties +} + // RegisterPredeclaredPlatformQueue adds a platform queue to // InMemoryBuildQueue that remains present, regardless of whether // workers appear. @@ -352,6 +417,7 @@ func (bq *InMemoryBuildQueue) RegisterPredeclaredPlatformQueue(instanceNamePrefi if err != nil { return err } + platformProperties := getPlatformProperties(platformMessage) bq.enter(bq.clock.Now()) defer bq.leave() @@ -360,7 +426,7 @@ func (bq *InMemoryBuildQueue) RegisterPredeclaredPlatformQueue(instanceNamePrefi return status.Error(codes.AlreadyExists, "A queue with the same instance name prefix or platform already exists") } - pq := bq.addPlatformQueue(platformKey, workerInvocationStickinessLimits, maximumQueuedBackgroundLearningOperations, backgroundLearningOperationPriority) + pq := bq.addPlatformQueue(platformKey, platformProperties, workerInvocationStickinessLimits, maximumQueuedBackgroundLearningOperations, backgroundLearningOperationPriority) for _, sizeClass := range sizeClasses { pq.addSizeClassQueue(bq, sizeClass, false) } @@ -461,7 +527,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re // Task is already associated with the current // invocation. Simply wait on the operation that // already exists. - scq.inFlightDeduplicationsSameInvocation.Inc() + scq.metrics.inFlightDeduplicationsSameInvocation.Inc() return o.waitExecution(bq, out) } @@ -479,7 +545,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re default: panic("Task in unexpected stage") } - scq.inFlightDeduplicationsOtherInvocation.Inc() + scq.metrics.inFlightDeduplicationsOtherInvocation.Inc() return o.waitExecution(bq, out) } @@ -529,7 +595,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re } if !action.DoNotCache { bq.inFlightDeduplicationMap[actionDigest] = t - scq.inFlightDeduplicationsNew.Inc() + scq.metrics.inFlightDeduplicationsNew.Inc() } i := scq.getOrCreateInvocation(bq, invocationKeys) o := t.newOperation(bq, in.ExecutionPolicy.GetPriority(), i, false) @@ -578,6 +644,7 @@ func (bq *InMemoryBuildQueue) Synchronize(ctx context.Context, request *remotewo if err != nil { return nil, err } + platformProperties := getPlatformProperties(request.Platform) workerKey := newWorkerKey(request.WorkerId) bq.enter(bq.clock.Now()) @@ -619,7 +686,7 @@ func (bq *InMemoryBuildQueue) Synchronize(ctx context.Context, request *remotewo // pair has not been observed before. Create a // new platform queue containing a single size // class queue. - pq = bq.addPlatformQueue(platformKey, nil, 0, 0) + pq = bq.addPlatformQueue(platformKey, platformProperties, nil, 0, 0) } scq = pq.addSizeClassQueue(bq, request.SizeClass, true) } @@ -645,7 +712,7 @@ func (bq *InMemoryBuildQueue) Synchronize(ctx context.Context, request *remotewo } i.idleWorkersCount++ scq.workers[workerKey] = w - scq.workersCreatedTotal.Inc() + scq.metrics.workersCreatedTotal.Inc() } // Install cleanup handlers to ensure stale workers and queues @@ -1271,12 +1338,15 @@ func (bq *InMemoryBuildQueue) getIdleSynchronizeResponse() *remoteworker.Synchro } } +// + // addPlatformQueue creates a new platform queue for a given platform. -func (bq *InMemoryBuildQueue) addPlatformQueue(platformKey platform.Key, workerInvocationStickinessLimits []time.Duration, maximumQueuedBackgroundLearningOperations int, backgroundLearningOperationPriority int32) *platformQueue { +func (bq *InMemoryBuildQueue) addPlatformQueue(platformKey platform.Key, properties map[string]string, workerInvocationStickinessLimits []time.Duration, maximumQueuedBackgroundLearningOperations int, backgroundLearningOperationPriority int32) *platformQueue { pq := &platformQueue{ - platformKey: platformKey, - instanceNamePatcher: digest.NewInstanceNamePatcher(platformKey.GetInstanceNamePrefix(), digest.EmptyInstanceName), - workerInvocationStickinessLimits: workerInvocationStickinessLimits, + platformKey: platformKey, + properties: properties, + instanceNamePatcher: digest.NewInstanceNamePatcher(platformKey.GetInstanceNamePrefix(), digest.EmptyInstanceName), + workerInvocationStickinessLimits: workerInvocationStickinessLimits, maximumQueuedBackgroundLearningOperations: maximumQueuedBackgroundLearningOperations, backgroundLearningOperationPriority: backgroundLearningOperationPriority, } @@ -1350,6 +1420,7 @@ func (k *sizeClassKey) getSizeClassQueueName() *buildqueuestate.SizeClassQueueNa // instance/platform for which one or more workers exist. type platformQueue struct { platformKey platform.Key + properties map[string]string instanceNamePatcher digest.InstanceNamePatcher workerInvocationStickinessLimits []time.Duration maximumQueuedBackgroundLearningOperations int @@ -1374,7 +1445,13 @@ func (pq *platformQueue) addSizeClassQueue(bq *InMemoryBuildQueue, sizeClass uin "platform": platformStr, "size_class": sizeClassStr, } - tasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotal.MustCurryWith(platformLabels) + for p, pv := range pq.properties { + // do not overwrite existing labels, in case of weirdly named properties. + if _, ok := platformLabels[p]; !ok { + platformLabels[p] = pv + } + } + metrics := newSizeClassQueueMetrics(platformLabels) scq := &sizeClassQueue{ platformQueue: pq, sizeClass: sizeClass, @@ -1389,31 +1466,15 @@ func (pq *platformQueue) addSizeClassQueue(bq *InMemoryBuildQueue, sizeClass uin drains: map[string]*buildqueuestate.DrainState{}, undrainWakeup: make(chan struct{}), - inFlightDeduplicationsSameInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "SameInvocation"), - inFlightDeduplicationsOtherInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "OtherInvocation"), - inFlightDeduplicationsNew: inMemoryBuildQueueInFlightDeduplicationsTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "New"), - - tasksScheduledWorker: newTasksScheduledCounterVec(tasksScheduledTotal, "Worker"), - tasksScheduledQueue: newTasksScheduledCounterVec(tasksScheduledTotal, "Queue"), - tasksQueuedDurationSeconds: inMemoryBuildQueueTasksQueuedDurationSeconds.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - tasksExecutingDurationSeconds: inMemoryBuildQueueTasksExecutingDurationSeconds.MustCurryWith(platformLabels), - tasksExecutingRetries: inMemoryBuildQueueTasksExecutingRetries.MustCurryWith(platformLabels), - tasksCompletedDurationSeconds: inMemoryBuildQueueTasksCompletedDurationSeconds.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - - workersCreatedTotal: inMemoryBuildQueueWorkersCreatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - workersTerminatingTotal: inMemoryBuildQueueWorkersTerminatingTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - workersRemovedIdleTotal: inMemoryBuildQueueWorkersRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "Idle"), - workersRemovedExecutingTotal: inMemoryBuildQueueWorkersRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "Executing"), - - workerInvocationStickinessRetained: inMemoryBuildQueueWorkerInvocationStickinessRetained.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), + metrics: metrics, } scq.rootInvocation.sizeClassQueue = scq scq.incrementInvocationsCreatedTotal(0) // Force creation of all metrics associated with this platform // queue to make recording rules work. - scq.tasksExecutingDurationSeconds.WithLabelValues("Success", "") - scq.tasksExecutingRetries.WithLabelValues("Success", "") + scq.metrics.tasksExecutingDurationSeconds.WithLabelValues("Success", "") + scq.metrics.tasksExecutingRetries.WithLabelValues("Success", "") // Insert the new size class queue into the platform queue. // Keep the size class queues sorted, so that they are provided @@ -1460,6 +1521,89 @@ type invocationsMetrics struct { removedTotal prometheus.Counter } +type sizeClassQueueMetrics struct { + registry *prometheus.Registry + inFlightDeduplicationsSameInvocation prometheus.Counter + inFlightDeduplicationsOtherInvocation prometheus.Counter + inFlightDeduplicationsNew prometheus.Counter + + tasksScheduledWorker tasksScheduledCounterVec + tasksScheduledQueue tasksScheduledCounterVec + + tasksQueuedDurationSeconds prometheus.Observer + tasksExecutingDurationSeconds prometheus.ObserverVec + tasksExecutingRetries prometheus.ObserverVec + tasksCompletedDurationSeconds prometheus.Observer + + workersCreatedTotal prometheus.Counter + workersTerminatingTotal prometheus.Counter + workersRemovedIdleTotal prometheus.Counter + workersRemovedExecutingTotal prometheus.Counter + + workerInvocationStickinessRetained prometheus.Observer + + invocationsCreatedTotal *prometheus.CounterVec + invocationsActivatedTotal *prometheus.CounterVec + invocationsDeactivatedTotal *prometheus.CounterVec + invocationsRemovedTotal *prometheus.CounterVec +} + +// newSizeClassQueueMetrics creates a new set of newSizeClassQueueMetrics and registers all +// newly created metrics with the associated regisry. The registry should be deregistered from +// the global registry if the sizeclass is ever deleted. +func newSizeClassQueueMetrics(properties map[string]string) sizeClassQueueMetrics { + propertyKeys := slices.Collect[string](maps.Keys(properties)) + registry := prometheus.NewRegistry() + schedulerRegistry.MustRegister(registry) + + inMemoryBuildQueueInFlightDeduplicationsTotal := inMemoryBuildQueueInFlightDeduplicationsTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsCreatedTotal := inMemoryBuildQueueInvocationsCreatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsActivatedTotal := inMemoryBuildQueueInvocationsActivatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsDeactivatedTotal := inMemoryBuildQueueInvocationsDeactivatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsRemovedTotal := inMemoryBuildQueueInvocationsRemovedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksQueuedDurationSeconds := inMemoryBuildQueueTasksQueuedDurationSecondsTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksExecutingDurationSeconds := inMemoryBuildQueueTasksExecutingDurationSecondsTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksExecutingRetries := inMemoryBuildQueueTasksExecutingRetriesTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksCompletedDurationSeconds := inMemoryBuildQueueTasksCompletedDurationSecondsTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkersCreatedTotal := inMemoryBuildQueueWorkersCreatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkersTerminatingTotal := inMemoryBuildQueueWorkersTerminatingTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkersRemovedTotal := inMemoryBuildQueueWorkersRemovedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkerInvocationStickinessRetained := inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate.createAndRegisterMetric(propertyKeys, registry) + + tasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotal.MustCurryWith(properties) + update := func(base, updates map[string]string) map[string]string { + clone := maps.Clone(base) + maps.Insert(clone, maps.All(updates)) + return clone + } + return sizeClassQueueMetrics{ + registry: registry, + inFlightDeduplicationsSameInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.With(update(properties, map[string]string{"outcome": "SameInvocation"})), + inFlightDeduplicationsOtherInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.With(update(properties, map[string]string{"outcome": "OtherInvocation"})), + inFlightDeduplicationsNew: inMemoryBuildQueueInFlightDeduplicationsTotal.With(update(properties, map[string]string{"outcome": "New"})), + + tasksScheduledWorker: newTasksScheduledCounterVec(tasksScheduledTotal, "Worker"), + tasksScheduledQueue: newTasksScheduledCounterVec(tasksScheduledTotal, "Queue"), + tasksQueuedDurationSeconds: inMemoryBuildQueueTasksQueuedDurationSeconds.With(properties), + tasksExecutingDurationSeconds: inMemoryBuildQueueTasksExecutingDurationSeconds.MustCurryWith(properties), + tasksExecutingRetries: inMemoryBuildQueueTasksExecutingRetries.MustCurryWith(properties), + tasksCompletedDurationSeconds: inMemoryBuildQueueTasksCompletedDurationSeconds.With(properties), + + workersCreatedTotal: inMemoryBuildQueueWorkersCreatedTotal.With(properties), + workersTerminatingTotal: inMemoryBuildQueueWorkersTerminatingTotal.With(properties), + workersRemovedIdleTotal: inMemoryBuildQueueWorkersRemovedTotal.With(update(properties, map[string]string{"state": "Idle"})), + workersRemovedExecutingTotal: inMemoryBuildQueueWorkersRemovedTotal.With(update(properties, map[string]string{"state": "Executing"})), + + workerInvocationStickinessRetained: inMemoryBuildQueueWorkerInvocationStickinessRetained.With(properties), + + invocationsCreatedTotal: inMemoryBuildQueueInvocationsCreatedTotal.MustCurryWith(properties), + invocationsActivatedTotal: inMemoryBuildQueueInvocationsActivatedTotal.MustCurryWith(properties), + invocationsDeactivatedTotal: inMemoryBuildQueueInvocationsDeactivatedTotal.MustCurryWith(properties), + invocationsRemovedTotal: inMemoryBuildQueueInvocationsRemovedTotal.MustCurryWith(properties), + } +} + type sizeClassQueue struct { platformQueue *platformQueue sizeClass uint32 @@ -1476,25 +1620,9 @@ type sizeClassQueue struct { undrainWakeup chan struct{} // Prometheus metrics. - inFlightDeduplicationsSameInvocation prometheus.Counter - inFlightDeduplicationsOtherInvocation prometheus.Counter - inFlightDeduplicationsNew prometheus.Counter + metrics sizeClassQueueMetrics invocationsMetrics []invocationsMetrics - - tasksScheduledWorker tasksScheduledCounterVec - tasksScheduledQueue tasksScheduledCounterVec - tasksQueuedDurationSeconds prometheus.Observer - tasksExecutingDurationSeconds prometheus.ObserverVec - tasksExecutingRetries prometheus.ObserverVec - tasksCompletedDurationSeconds prometheus.Observer - - workersCreatedTotal prometheus.Counter - workersTerminatingTotal prometheus.Counter - workersRemovedIdleTotal prometheus.Counter - workersRemovedExecutingTotal prometheus.Counter - - workerInvocationStickinessRetained prometheus.Observer } func (scq *sizeClassQueue) getKey() sizeClassKey { @@ -1516,6 +1644,7 @@ func (scq *sizeClassQueue) remove(bq *InMemoryBuildQueue) { "Workers for this instance name, platform and size class disappeared while task was queued", ).Proto()) scq.invocationsMetrics[0].removedTotal.Inc() + schedulerRegistry.Unregister(scq.metrics.registry) delete(bq.sizeClassQueues, scq.getKey()) pq := scq.platformQueue @@ -1547,9 +1676,9 @@ func (scq *sizeClassQueue) removeStaleWorker(bq *InMemoryBuildQueue, workerKey w w := scq.workers[workerKey] scq.markWorkerTerminating(w) if t := w.currentTask; t == nil { - scq.workersRemovedIdleTotal.Inc() + scq.metrics.workersRemovedIdleTotal.Inc() } else { - scq.workersRemovedExecutingTotal.Inc() + scq.metrics.workersRemovedExecutingTotal.Inc() t.complete(bq, &remoteexecution.ExecuteResponse{ Status: status.Newf(codes.Unavailable, "Worker %s disappeared while task was executing", workerKey).Proto(), }, false) @@ -1603,16 +1732,15 @@ func (scq *sizeClassQueue) getOrCreateInvocation(bq *InMemoryBuildQueue, invocat // with zero. func (scq *sizeClassQueue) incrementInvocationsCreatedTotal(depth int) { if len(scq.invocationsMetrics) == depth { - instanceNamePrefix, platformStr, sizeClassStr := scq.platformQueue.getSizeClassQueueLabels(scq.sizeClass) depthStr := strconv.FormatInt(int64(depth), 10) scq.invocationsMetrics = append( scq.invocationsMetrics, invocationsMetrics{ - createdTotal: inMemoryBuildQueueInvocationsCreatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), - activatedTotal: inMemoryBuildQueueInvocationsActivatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), - deactivatedTotal: inMemoryBuildQueueInvocationsDeactivatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), - removedTotal: inMemoryBuildQueueInvocationsRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), + createdTotal: scq.metrics.invocationsCreatedTotal.WithLabelValues(depthStr), + activatedTotal: scq.metrics.invocationsActivatedTotal.WithLabelValues(depthStr), + deactivatedTotal: scq.metrics.invocationsDeactivatedTotal.WithLabelValues(depthStr), + removedTotal: scq.metrics.invocationsRemovedTotal.WithLabelValues(depthStr), }) } @@ -1621,7 +1749,7 @@ func (scq *sizeClassQueue) incrementInvocationsCreatedTotal(depth int) { func (scq *sizeClassQueue) markWorkerTerminating(w *worker) { if !w.terminating { - scq.workersTerminatingTotal.Inc() + scq.metrics.workersTerminatingTotal.Inc() w.terminating = true } } @@ -2385,7 +2513,7 @@ func (t *task) schedule(bq *InMemoryBuildQueue) { // TODO: Do we want to provide a histogram // on how far the new invocation is removed // from the original one? - t.registerQueuedStageStarted(bq, &scq.tasksScheduledWorker) + t.registerQueuedStageStarted(bq, &scq.metrics.tasksScheduledWorker) i.idleSynchronizingWorkers[0].worker.assignUnqueuedTaskAndWakeUp(bq, t, 0) return } @@ -2397,7 +2525,7 @@ func (t *task) schedule(bq *InMemoryBuildQueue) { // Queue the operation, so that workers // can pick it up when they become // available. - t.registerQueuedStageStarted(bq, &scq.tasksScheduledQueue) + t.registerQueuedStageStarted(bq, &scq.metrics.tasksScheduledQueue) for _, o := range t.operations { o.enqueue() } @@ -2602,7 +2730,7 @@ func (t *task) registerQueuedStageStarted(bq *InMemoryBuildQueue, tasksScheduled // task finishing the QUEUED stage. func (t *task) registerQueuedStageFinished(bq *InMemoryBuildQueue) { scq := t.getCurrentSizeClassQueue() - scq.tasksQueuedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) + scq.metrics.tasksQueuedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) t.currentStageStartTime = bq.now } @@ -2610,8 +2738,8 @@ func (t *task) registerQueuedStageFinished(bq *InMemoryBuildQueue) { // the task finishing the EXECUTING stage. func (t *task) registerExecutingStageFinished(bq *InMemoryBuildQueue, result, grpcCode string) { scq := t.getCurrentSizeClassQueue() - scq.tasksExecutingDurationSeconds.WithLabelValues(result, grpcCode).Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) - scq.tasksExecutingRetries.WithLabelValues(result, grpcCode).Observe(float64(t.retryCount)) + scq.metrics.tasksExecutingDurationSeconds.WithLabelValues(result, grpcCode).Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) + scq.metrics.tasksExecutingRetries.WithLabelValues(result, grpcCode).Observe(float64(t.retryCount)) t.currentStageStartTime = bq.now } @@ -2619,7 +2747,7 @@ func (t *task) registerExecutingStageFinished(bq *InMemoryBuildQueue, result, gr // the task finishing the COMPLETED stage, meaning the task got removed. func (t *task) registerCompletedStageFinished(bq *InMemoryBuildQueue) { scq := t.getCurrentSizeClassQueue() - scq.tasksCompletedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) + scq.metrics.tasksCompletedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) t.currentStageStartTime = bq.now } @@ -2781,7 +2909,7 @@ func (w *worker) assignNextQueuedTask(bq *InMemoryBuildQueue, scq *sizeClassQueu // One or more operations are enqueued in this // invocation directly. Pick the most preferable // operation. - scq.workerInvocationStickinessRetained.Observe(float64(stickinessRetained)) + scq.metrics.workerInvocationStickinessRetained.Observe(float64(stickinessRetained)) w.assignQueuedTask(bq, i.queuedOperations[0].task, stickinessRetained) return true } else if len(i.queuedChildren) > 0 {