Skip to content

Commit b5ed15b

Browse files
committed
feat: implement a force flush
1 parent 0ac5d74 commit b5ed15b

File tree

3 files changed

+23
-11
lines changed

3 files changed

+23
-11
lines changed

pkg/scheduler/backend/queue/active_queue.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
204204
aq.schedCycle++
205205
// In flight, no concurrent events yet.
206206
if aq.isSchedulingQueueHintEnabled {
207-
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1)
207+
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false)
208208
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
209209
}
210210

@@ -297,7 +297,7 @@ func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event frame
297297

298298
_, ok := aq.inFlightPods[newPod.UID]
299299
if ok {
300-
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1)
300+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
301301
aq.inFlightEvents.PushBack(&clusterEvent{
302302
event: event,
303303
oldObj: oldPod,
@@ -314,7 +314,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f
314314
defer aq.lock.Unlock()
315315

316316
if len(aq.inFlightPods) != 0 {
317-
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1)
317+
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
318318
aq.inFlightEvents.PushBack(&clusterEvent{
319319
event: event,
320320
oldObj: oldObj,
@@ -346,7 +346,6 @@ func (aq *activeQueue) done(pod types.UID) {
346346

347347
// Remove the pod from the list.
348348
aq.inFlightEvents.Remove(inFlightPod)
349-
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1)
350349

351350
aggrMetricsCounter := map[string]int{}
352351
// Remove events which are only referred to by this Pod
@@ -370,8 +369,14 @@ func (aq *activeQueue) done(pod types.UID) {
370369
}
371370

372371
for evLabel, count := range aggrMetricsCounter {
373-
aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count))
372+
aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count), false)
374373
}
374+
375+
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1,
376+
// If it's the last Pod in inFlightPods, we should force-flush the metrics.
377+
// Otherwise, especially in small clusters, which don't get a new Pod frequently,
378+
// the metrics might not be flushed for a long time.
379+
len(aq.inFlightPods) == 0)
375380
}
376381

377382
// close closes the activeQueue.

pkg/scheduler/metrics/metric_recorder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,16 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event
151151
}
152152

153153
// ObserveInFlightEventsAsync observes the in_flight_events metric.
154+
//
154155
// Note that this function is not goroutine-safe;
155156
// we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events.
156-
func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) {
157+
func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64, forceFlush bool) {
157158
r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd)
158159

159160
// Only flush the metric to the channel if the interval is reached.
160161
// The values are flushed to Prometheus in the run() function, which runs once the interval time.
161162
// Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid.
162-
if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval {
163+
if forceFlush || time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval {
163164
for key, value := range r.aggregatedInflightEventMetric {
164165
newMetric := &gaugeVecMetric{
165166
metric: InFlightEvents,

pkg/scheduler/metrics/metric_recorder_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ func TestInFlightEventAsync(t *testing.T) {
116116
}
117117

118118
podAddLabel := "Pod/Add"
119-
r.ObserveInFlightEventsAsync(podAddLabel, 10)
120-
r.ObserveInFlightEventsAsync(podAddLabel, -1)
121-
r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1)
119+
r.ObserveInFlightEventsAsync(podAddLabel, 10, false)
120+
r.ObserveInFlightEventsAsync(podAddLabel, -1, false)
121+
r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1, false)
122122

123123
if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{
124124
{metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9,
@@ -130,7 +130,7 @@ func TestInFlightEventAsync(t *testing.T) {
130130
r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush
131131

132132
// It adds -4 and flushes the metric to the channel.
133-
r.ObserveInFlightEventsAsync(podAddLabel, -4)
133+
r.ObserveInFlightEventsAsync(podAddLabel, -4, false)
134134
if len(r.aggregatedInflightEventMetric) != 0 {
135135
t.Errorf("aggregatedInflightEventMetric should be cleared, but got: %v", r.aggregatedInflightEventMetric)
136136
}
@@ -164,4 +164,10 @@ func TestInFlightEventAsync(t *testing.T) {
164164
}, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" {
165165
t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d)
166166
}
167+
168+
// Test force flush
169+
r.ObserveInFlightEventsAsync(podAddLabel, 1, true)
170+
if len(r.aggregatedInflightEventMetric) != 0 {
171+
t.Errorf("aggregatedInflightEventMetric should be force-flushed, but got: %v", r.aggregatedInflightEventMetric)
172+
}
167173
}

0 commit comments

Comments
 (0)