@@ -80,9 +80,9 @@ func (r *PendingPodsRecorder) Clear() {
80
80
r .recorder .Set (float64 (0 ))
81
81
}
82
82
83
- // histgramVecMetric is the data structure passed in the buffer channel between the main framework thread
83
+ // histogramVecMetric is the data structure passed in the buffer channel between the main framework thread
84
84
// and the metricsRecorder goroutine.
85
- type histgramVecMetric struct {
85
+ type histogramVecMetric struct {
86
86
metric * metrics.HistogramVec
87
87
labelValues []string
88
88
value float64
@@ -102,7 +102,7 @@ type gaugeVecMetricKey struct {
102
102
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
103
103
type MetricAsyncRecorder struct {
104
104
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
105
- bufferCh chan * histgramVecMetric
105
+ bufferCh chan * histogramVecMetric
106
106
// if bufferSize is reached, incoming metrics will be discarded.
107
107
bufferSize int
108
108
// how often the recorder runs to flush the metrics.
@@ -125,7 +125,7 @@ type MetricAsyncRecorder struct {
125
125
126
126
func NewMetricsAsyncRecorder (bufferSize int , interval time.Duration , stopCh <- chan struct {}) * MetricAsyncRecorder {
127
127
recorder := & MetricAsyncRecorder {
128
- bufferCh : make (chan * histgramVecMetric , bufferSize ),
128
+ bufferCh : make (chan * histogramVecMetric , bufferSize ),
129
129
bufferSize : bufferSize ,
130
130
interval : interval ,
131
131
stopCh : stopCh ,
@@ -156,8 +156,9 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event
156
156
func (r * MetricAsyncRecorder ) ObserveInFlightEventsAsync (eventLabel string , valueToAdd float64 ) {
157
157
r .aggregatedInflightEventMetric [gaugeVecMetricKey {metricName : InFlightEvents .Name , labelValue : eventLabel }] += int (valueToAdd )
158
158
159
- // Only flush the metric to the channal if the interval is reached.
159
+ // Only flush the metric to the channel if the interval is reached.
160
160
// The values are flushed to Prometheus in the run() function, which runs once the interval time.
161
+ // Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid.
161
162
if time .Since (r .aggregatedInflightEventMetricLastFlushTime ) > r .interval {
162
163
for key , value := range r .aggregatedInflightEventMetric {
163
164
newMetric := & gaugeVecMetric {
@@ -171,11 +172,13 @@ func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valu
171
172
}
172
173
}
173
174
r .aggregatedInflightEventMetricLastFlushTime = time .Now ()
175
+ // reset
176
+ r .aggregatedInflightEventMetric = make (map [gaugeVecMetricKey ]int )
174
177
}
175
178
}
176
179
177
180
func (r * MetricAsyncRecorder ) observeMetricAsync (m * metrics.HistogramVec , value float64 , labelsValues ... string ) {
178
- newMetric := & histgramVecMetric {
181
+ newMetric := & histogramVecMetric {
179
182
metric : m ,
180
183
labelValues : labelsValues ,
181
184
value : value ,
0 commit comments