Skip to content

Commit 0ed03d7

Browse files
bjrarasky333999
andauthored
Fix race conditions in applicationsignals processor (#1758)
Co-authored-by: Kaushik Surya <[email protected]>
1 parent 92115e8 commit 0ed03d7

File tree

4 files changed

+125
-79
lines changed

4 files changed

+125
-79
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ PKG_WITH_DATA_RACE += internal/tls
211211
PKG_WITH_DATA_RACE += plugins/inputs/logfile
212212
PKG_WITH_DATA_RACE += plugins/inputs/logfile/tail
213213
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch$$
214-
PKG_WITH_DATA_RACE += plugins/processors/awsapplicationsignals
215214
PKG_WITH_DATA_RACE += plugins/processors/ec2tagger
216215
PKG_WITH_DATA_RACE_PATTERN := $(shell echo '$(PKG_WITH_DATA_RACE)' | tr ' ' '|')
217216
test-data-race:

plugins/processors/awsapplicationsignals/internal/cardinalitycontrol/metrics_limiter.go

Lines changed: 81 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ type MetricsLimiter struct {
5151

5252
logger *zap.Logger
5353
ctx context.Context
54-
mapLock sync.RWMutex
55-
services map[string]*service
54+
services sync.Map
5655
}
5756

5857
func NewMetricsLimiter(config *config.LimiterConfig, logger *zap.Logger) Limiter {
@@ -70,7 +69,7 @@ func NewMetricsLimiter(config *config.LimiterConfig, logger *zap.Logger) Limiter
7069

7170
logger: logger,
7271
ctx: ctx,
73-
services: map[string]*service{},
72+
services: sync.Map{},
7473
}
7574

7675
go func() {
@@ -97,18 +96,16 @@ func (m *MetricsLimiter) Admit(metricName string, attributes, resourceAttributes
9796
}
9897
admitted := true
9998

100-
m.mapLock.RLock()
101-
svc := m.services[serviceName]
102-
m.mapLock.RUnlock()
103-
if svc == nil {
104-
m.mapLock.Lock()
105-
svc = m.services[serviceName]
106-
if svc == nil {
107-
svc = newService(serviceName, m.DropThreshold, m.RotationInterval, m.ctx, m.logger)
108-
m.services[serviceName] = svc
99+
val, loaded := m.services.Load(serviceName)
100+
if !loaded {
101+
valToStore := newService(serviceName, m.DropThreshold, m.RotationInterval, m.ctx, m.logger)
102+
val, loaded = m.services.LoadOrStore(serviceName, valToStore)
103+
if loaded {
104+
valToStore.cancelFunc()
105+
m.logger.Info(fmt.Sprintf("[%s] cancel newly created service entry as an existing one is found", serviceName))
109106
}
110-
m.mapLock.Unlock()
111107
}
108+
svc := val.(*service)
112109

113110
metricData := newMetricData(serviceName, metricName, labels)
114111

@@ -118,8 +115,10 @@ func (m *MetricsLimiter) Admit(metricName string, attributes, resourceAttributes
118115
return true, nil
119116
}
120117

121-
if !svc.admitMetricData(metricData) {
122-
svc.rollupMetricData(attributes)
118+
svc.rwLock.Lock()
119+
defer svc.rwLock.Unlock()
120+
if !svc.admitMetricDataLocked(metricData) {
121+
svc.rollupMetricDataLocked(attributes)
123122

124123
svc.totalRollup++
125124
admitted = false
@@ -130,10 +129,6 @@ func (m *MetricsLimiter) Admit(metricName string, attributes, resourceAttributes
130129
}
131130

132131
svc.totalMetricSent++
133-
134-
svc.rwLock.RLock()
135-
defer svc.rwLock.RUnlock()
136-
137132
svc.totalCount++
138133
svc.InsertMetricDataToPrimary(metricData)
139134
svc.InsertMetricDataToSecondary(metricData)
@@ -156,23 +151,19 @@ func (m *MetricsLimiter) filterAWSDeclaredAttributes(attributes, resourceAttribu
156151
}
157152

158153
func (m *MetricsLimiter) removeStaleServices() {
159-
var svcToRemove []string
160-
for name, svc := range m.services {
161-
if svc.rotations > 3 {
162-
if svc.countSnapshot[0] == svc.countSnapshot[1] && svc.countSnapshot[1] == svc.countSnapshot[2] {
163-
svc.cancelFunc()
164-
svcToRemove = append(svcToRemove, name)
165-
}
154+
m.services.Range(func(key, value any) bool {
155+
svc, ok := value.(*service)
156+
if !ok {
157+
m.logger.Warn("failed to convert type with key" + key.(string) + ".")
158+
return true
166159
}
167-
}
168-
169-
m.mapLock.Lock()
170-
defer m.mapLock.Unlock()
171-
172-
for _, name := range svcToRemove {
173-
m.logger.Info("remove stale service " + name + ".")
174-
delete(m.services, name)
175-
}
160+
if svc.isStale() {
161+
svc.cancelFunc()
162+
m.logger.Info("remove stale service " + key.(string) + ".")
163+
m.services.Delete(key)
164+
}
165+
return true
166+
})
176167
}
177168

178169
type service struct {
@@ -290,7 +281,7 @@ func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) {
290281
// Check if this oldMetric is the new minimum, find the new minMetric after the updates
291282
if t.minMetric.hashKey == hashValue {
292283
// Find the new minMetrics after update the frequency
293-
t.minMetric = t.findMinMetric()
284+
t.minMetric = t.findMinMetricLocked()
294285
}
295286
return
296287
}
@@ -300,7 +291,7 @@ func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) {
300291
if newMetric.frequency > t.minMetric.frequency {
301292
delete(t.metricMap, t.minMetric.hashKey)
302293
t.metricMap[hashValue] = newMetric
303-
t.minMetric = t.findMinMetric()
294+
t.minMetric = t.findMinMetricLocked()
304295
}
305296
} else {
306297
// Check if this newMetric is the new minimum.
@@ -311,8 +302,17 @@ func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) {
311302
}
312303
}
313304

314-
// findMinMetric removes and returns the key-value pair with the minimum value.
315-
func (t *topKMetrics) findMinMetric() *MetricData {
305+
func (t *topKMetrics) Admit(metric *MetricData) bool {
306+
_, found := t.metricMap[metric.hashKey]
307+
if len(t.metricMap) < t.sizeLimit || found {
308+
return true
309+
}
310+
return false
311+
}
312+
313+
// findMinMetricLocked removes and returns the key-value pair with the minimum value.
314+
// It assumes the caller already holds the read/write lock.
315+
func (t *topKMetrics) findMinMetricLocked() *MetricData {
316316
// Find the new minimum metric and smallest frequency.
317317
var newMinMetric *MetricData
318318
smallestFrequency := int(^uint(0) >> 1) // Initialize with the maximum possible integer value
@@ -326,15 +326,11 @@ func (t *topKMetrics) findMinMetric() *MetricData {
326326
return newMinMetric
327327
}
328328

329-
func (s *service) admitMetricData(metric *MetricData) bool {
330-
_, found := s.primaryTopK.metricMap[metric.hashKey]
331-
if len(s.primaryTopK.metricMap) < s.primaryTopK.sizeLimit || found {
332-
return true
333-
}
334-
return false
329+
func (s *service) admitMetricDataLocked(metric *MetricData) bool {
330+
return s.primaryTopK.Admit(metric)
335331
}
336332

337-
func (s *service) rollupMetricData(attributes pcommon.Map) {
333+
func (s *service) rollupMetricDataLocked(attributes pcommon.Map) {
338334
for _, indexAttr := range awsDeclaredMetricAttributes {
339335
if (indexAttr == common.CWMetricAttributeEnvironment) || (indexAttr == common.CWMetricAttributeLocalService) || (indexAttr == common.CWMetricAttributeRemoteService) {
340336
continue
@@ -349,6 +345,44 @@ func (s *service) rollupMetricData(attributes pcommon.Map) {
349345
}
350346
}
351347

348+
func (s *service) rotateVisitRecords() error {
349+
s.rwLock.Lock()
350+
defer s.rwLock.Unlock()
351+
352+
cmsDepth := s.primaryCMS.depth
353+
cmsWidth := s.primaryCMS.width
354+
topKLimit := s.primaryTopK.sizeLimit
355+
356+
nextPrimaryCMS := s.secondaryCMS
357+
nextPrimaryTopK := s.secondaryTopK
358+
359+
s.secondaryCMS = NewCountMinSketch(cmsDepth, cmsWidth)
360+
s.secondaryTopK = newTopKMetrics(topKLimit)
361+
362+
if nextPrimaryCMS != nil && nextPrimaryTopK != nil {
363+
s.primaryCMS = nextPrimaryCMS
364+
s.primaryTopK = nextPrimaryTopK
365+
} else {
366+
s.logger.Info(fmt.Sprintf("[%s] secondary visit records are nil.", s.name))
367+
}
368+
369+
s.countSnapshot[s.rotations%3] = s.totalCount
370+
s.rotations++
371+
372+
return nil
373+
}
374+
375+
func (s *service) isStale() bool {
376+
s.rwLock.RLock()
377+
defer s.rwLock.RUnlock()
378+
if s.rotations > 3 {
379+
if s.countSnapshot[0] == s.countSnapshot[1] && s.countSnapshot[1] == s.countSnapshot[2] {
380+
return true
381+
}
382+
}
383+
return false
384+
}
385+
352386
// As a starting point, you can use rules of thumb, such as setting the depth to be around 4-6 times the logarithm of the expected number of distinct items and the width based on your memory constraints. However, these are rough guidelines, and the optimal size will depend on your unique application and requirements.
353387
func newService(name string, limit int, rotationInterval time.Duration, parentCtx context.Context, logger *zap.Logger) *service {
354388
depth := defaultCMSDepth
@@ -374,7 +408,7 @@ func newService(name string, limit int, rotationInterval time.Duration, parentCt
374408
select {
375409
case <-rotationTicker.C:
376410
svc.logger.Info(fmt.Sprintf("[%s] rotating visit records, current rotation %d", name, svc.rotations))
377-
if err := rotateVisitRecords(svc); err != nil {
411+
if err := svc.rotateVisitRecords(); err != nil {
378412
svc.logger.Error(fmt.Sprintf("[%s] failed to rotate visit records.", name), zap.Error(err))
379413
}
380414
case <-ctx.Done():
@@ -389,30 +423,3 @@ func newService(name string, limit int, rotationInterval time.Duration, parentCt
389423
svc.logger.Info(fmt.Sprintf("[%s] service entry is created.\n", name))
390424
return svc
391425
}
392-
393-
func rotateVisitRecords(svc *service) error {
394-
svc.rwLock.Lock()
395-
defer svc.rwLock.Unlock()
396-
397-
cmsDepth := svc.primaryCMS.depth
398-
cmsWidth := svc.primaryCMS.width
399-
topKLimit := svc.primaryTopK.sizeLimit
400-
401-
nextPrimaryCMS := svc.secondaryCMS
402-
nextPrimaryTopK := svc.secondaryTopK
403-
404-
svc.secondaryCMS = NewCountMinSketch(cmsDepth, cmsWidth)
405-
svc.secondaryTopK = newTopKMetrics(topKLimit)
406-
407-
if nextPrimaryCMS != nil && nextPrimaryTopK != nil {
408-
svc.primaryCMS = nextPrimaryCMS
409-
svc.primaryTopK = nextPrimaryTopK
410-
} else {
411-
svc.logger.Info(fmt.Sprintf("[%s] secondary visit records are nil.", svc.name))
412-
}
413-
414-
svc.countSnapshot[svc.rotations%3] = svc.totalCount
415-
svc.rotations++
416-
417-
return nil
418-
}

plugins/processors/awsapplicationsignals/internal/cardinalitycontrol/metrics_limiter_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,12 @@ func TestClearStaleService(t *testing.T) {
164164
cancel()
165165

166166
metricsLimiter := limiter.(*MetricsLimiter)
167-
assert.Equal(t, 0, len(metricsLimiter.services))
167+
serviceCount := 0
168+
metricsLimiter.services.Range(func(_, _ interface{}) bool {
169+
serviceCount++
170+
return true
171+
})
172+
assert.Equal(t, 0, serviceCount)
168173
}
169174

170175
func TestInheritanceAfterRotation(t *testing.T) {
@@ -220,14 +225,18 @@ func TestRotationInterval(t *testing.T) {
220225
// wait for secondary to be created
221226
time.Sleep(7 * time.Second)
222227
for i := 0; i < 5; i++ {
228+
svc.rwLock.Lock()
223229
svc.secondaryCMS.matrix[0][0] = 1
230+
svc.rwLock.Unlock()
224231

225232
// wait for rotation
226233
time.Sleep(5 * time.Second)
227234

228235
// verify secondary is promoted to primary
236+
svc.rwLock.Lock()
229237
assert.Equal(t, 0, svc.secondaryCMS.matrix[0][0])
230238
assert.Equal(t, 1, svc.primaryCMS.matrix[0][0])
239+
svc.rwLock.Unlock()
231240
}
232241
}
233242

plugins/processors/awsapplicationsignals/processor_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@ package awsapplicationsignals
55

66
import (
77
"context"
8+
"fmt"
89
"sync"
910
"testing"
11+
"time"
1012

1113
"github.com/stretchr/testify/assert"
1214
"go.opentelemetry.io/collector/pdata/pmetric"
1315
"go.opentelemetry.io/collector/pdata/ptrace"
1416
"go.uber.org/zap"
17+
"golang.org/x/exp/rand"
1518

19+
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common"
1620
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config"
1721
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/rules"
1822
)
@@ -134,31 +138,58 @@ func TestProcessMetricsLowercase(t *testing.T) {
134138

135139
func TestProcessMetricsWithConcurrency(t *testing.T) {
136140
logger, _ := zap.NewDevelopment()
141+
ctx := context.Background()
137142
ap := &awsapplicationsignalsprocessor{
138143
logger: logger,
139144
config: &config.Config{
140145
Resolvers: []config.Resolver{config.NewGenericResolver("")},
141-
Rules: testRules,
146+
Rules: []rules.Rule{},
147+
Limiter: &config.LimiterConfig{
148+
Threshold: 2,
149+
Disabled: false,
150+
LogDroppedMetrics: false,
151+
RotationInterval: 10 * time.Millisecond,
152+
GarbageCollectionInterval: 20 * time.Millisecond,
153+
ParentContext: ctx,
154+
},
142155
},
143156
}
144157

145-
ctx := context.Background()
146158
ap.StartMetrics(ctx, nil)
147159

148160
var wg sync.WaitGroup
149-
for i := 0; i < 100; i++ {
161+
for i := 0; i < 10000; i++ {
150162
wg.Add(1)
151163
go func() {
152164
defer wg.Done()
165+
166+
time.Sleep(time.Duration(rand.Intn(50)*100) * time.Millisecond)
167+
153168
lowercaseMetrics := pmetric.NewMetrics()
154169
errorMetric := lowercaseMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
155170
errorMetric.SetName("error")
171+
errorGauge := errorMetric.SetEmptyGauge().DataPoints().AppendEmpty()
172+
errorGauge.SetIntValue(1)
173+
errorGauge.Attributes().PutStr("Telemetry.Source", "UnitTest")
174+
errorGauge.Attributes().PutStr(common.CWMetricAttributeLocalService, fmt.Sprintf("UnitTest%d", rand.Intn(200)))
156175
latencyMetric := lowercaseMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
157176
latencyMetric.SetName("latency")
177+
histogram := latencyMetric.SetEmptyExponentialHistogram().DataPoints().AppendEmpty()
178+
histogram.SetSum(1)
179+
histogram.SetCount(1)
180+
histogram.SetMin(0)
181+
histogram.SetMax(1)
182+
histogram.Attributes().PutStr("Telemetry.Source", "UnitTest")
183+
histogram.Attributes().PutStr(common.CWMetricAttributeLocalService, fmt.Sprintf("UnitTest%d", rand.Intn(200)))
158184
faultMetric := lowercaseMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
185+
faultGauge := faultMetric.SetEmptyGauge().DataPoints().AppendEmpty()
186+
faultGauge.SetIntValue(1)
187+
faultGauge.Attributes().PutStr("Telemetry.Source", "UnitTest")
188+
faultGauge.Attributes().PutStr(common.CWMetricAttributeLocalService, fmt.Sprintf("UnitTest%d", rand.Intn(200)))
159189
faultMetric.SetName("fault")
160190

161191
ap.processMetrics(ctx, lowercaseMetrics)
192+
162193
assert.Equal(t, "Error", lowercaseMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Name())
163194
assert.Equal(t, "Latency", lowercaseMetrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0).Name())
164195
assert.Equal(t, "Fault", lowercaseMetrics.ResourceMetrics().At(2).ScopeMetrics().At(0).Metrics().At(0).Name())

0 commit comments

Comments
 (0)