Skip to content

Commit 6a16e3f

Browse files
authored
Fallback to iterative DescribeLogGroups if batch not supported (#1915)
1 parent d962aad commit 6a16e3f

File tree

2 files changed

+134
-7
lines changed

2 files changed

+134
-7
lines changed

plugins/outputs/cloudwatchlogs/internal/pusher/target.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
baseRetryDelay = 1 * time.Second
2525
maxRetryDelayTarget = 10 * time.Second
2626
numBackoffRetries = 5
27+
28+
errMessageLogGroupIdentifierNotSupported = "Input filter on Log group identifiers is not supported."
2729
)
2830

2931
type Target struct {
@@ -185,21 +187,31 @@ func (m *targetManager) processDescribeLogGroup() {
185187
case target := <-m.dlg:
186188
batch[target.Group] = target
187189
if len(batch) == logGroupIdentifierLimit {
188-
m.updateTargetBatch(batch)
190+
m.updateTargets(batch)
189191
// Reset batch
190192
batch = make(map[string]Target, logGroupIdentifierLimit)
191193
}
192194
case <-t.C:
193195
if len(batch) > 0 {
194-
m.updateTargetBatch(batch)
196+
m.updateTargets(batch)
195197
// Reset batch
196198
batch = make(map[string]Target, logGroupIdentifierLimit)
197199
}
198200
}
199201
}
200202
}
201203

202-
func (m *targetManager) updateTargetBatch(targets map[string]Target) {
204+
func (m *targetManager) updateTargets(targets map[string]Target) {
205+
err := m.updateTargetsBatch(targets)
206+
if err != nil {
207+
m.logger.Debug("falling back to describing log groups by prefix")
208+
m.updateTargetsIteratively(targets)
209+
}
210+
}
211+
212+
// updateTargetsBatch will call DLG for the entire batch (single call). Will return an error if
213+
// DLG by identifiers is not supported.
214+
func (m *targetManager) updateTargetsBatch(targets map[string]Target) error {
203215
identifiers := make([]*string, 0, len(targets))
204216
for logGroup := range targets {
205217
identifiers = append(identifiers, aws.String(logGroup))
@@ -211,7 +223,11 @@ func (m *targetManager) updateTargetBatch(targets map[string]Target) {
211223
for attempt := 0; attempt < numBackoffRetries; attempt++ {
212224
output, err := m.service.DescribeLogGroups(describeLogGroupsInput)
213225
if err != nil {
214-
m.logger.Errorf("failed to describe log group retention for targets %v: %v", targets, err)
226+
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == cloudwatchlogs.ErrCodeInvalidParameterException && aerr.Message() == errMessageLogGroupIdentifierNotSupported {
227+
return err
228+
}
229+
230+
m.logger.Errorf("failed to batch describe log group retention for targets %v: %v", targets, err)
215231
time.Sleep(m.calculateBackoff(attempt))
216232
continue
217233
}
@@ -225,6 +241,49 @@ func (m *targetManager) updateTargetBatch(targets map[string]Target) {
225241
}
226242
break
227243
}
244+
return nil
245+
}
246+
247+
// updateTargetsIteratively will iterate through the targets and call DLG for each target.
248+
func (m *targetManager) updateTargetsIteratively(targets map[string]Target) {
249+
for _, target := range targets {
250+
for attempt := 0; attempt < numBackoffRetries; attempt++ {
251+
currentRetention, err := m.getRetention(target)
252+
if err != nil {
253+
m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err)
254+
time.Sleep(m.calculateBackoff(attempt))
255+
continue
256+
}
257+
258+
if currentRetention != target.Retention && target.Retention > 0 {
259+
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
260+
m.prp <- target
261+
}
262+
break // no change in retention
263+
}
264+
}
265+
}
266+
267+
func (m *targetManager) getRetention(target Target) (int, error) {
268+
input := &cloudwatchlogs.DescribeLogGroupsInput{
269+
LogGroupNamePrefix: aws.String(target.Group),
270+
}
271+
272+
output, err := m.service.DescribeLogGroups(input)
273+
if err != nil {
274+
return 0, fmt.Errorf("describe log groups failed: %w", err)
275+
}
276+
277+
for _, group := range output.LogGroups {
278+
if *group.LogGroupName == target.Group {
279+
if group.RetentionInDays == nil {
280+
return 0, nil
281+
}
282+
return int(*group.RetentionInDays), nil
283+
}
284+
}
285+
286+
return 0, fmt.Errorf("log group %v not found", target.Group)
228287
}
229288

230289
func (m *targetManager) processPutRetentionPolicy() {

plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,14 +460,57 @@ func TestDescribeLogGroupsBatching(t *testing.T) {
460460
batch := make(map[string]Target)
461461
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
462462
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
463-
tm.updateTargetBatch(batch)
463+
tm.updateTargets(batch)
464464

465465
// Wait for ticker to fire (slightly longer than 5 seconds)
466466
time.Sleep(5100 * time.Millisecond)
467467

468468
mockService.AssertNotCalled(t, "PutRetentionPolicy")
469469
})
470470

471+
t.Run("FallbackToPrefix", func(t *testing.T) {
472+
mockService := new(mockLogsService)
473+
474+
// First call with identifiers fails with unsupported error
475+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
476+
return len(input.LogGroupIdentifiers) > 0
477+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{},
478+
awserr.New(cloudwatchlogs.ErrCodeInvalidParameterException, errMessageLogGroupIdentifierNotSupported, nil)).Once()
479+
480+
// Fallback calls with prefix for each group
481+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
482+
return input.LogGroupNamePrefix != nil && *input.LogGroupNamePrefix == "group-1"
483+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
484+
LogGroups: []*cloudwatchlogs.LogGroup{
485+
{LogGroupName: aws.String("group-1"), RetentionInDays: aws.Int64(1)},
486+
},
487+
}, nil).Once()
488+
489+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
490+
return input.LogGroupNamePrefix != nil && *input.LogGroupNamePrefix == "group-2"
491+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
492+
LogGroups: []*cloudwatchlogs.LogGroup{
493+
{LogGroupName: aws.String("group-2"), RetentionInDays: aws.Int64(7)},
494+
},
495+
}, nil).Once()
496+
497+
mockService.On("PutRetentionPolicy", mock.MatchedBy(func(input *cloudwatchlogs.PutRetentionPolicyInput) bool {
498+
return *input.LogGroupName == "group-1" && *input.RetentionInDays == 7
499+
})).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()
500+
501+
manager := NewTargetManager(logger, mockService)
502+
tm := manager.(*targetManager)
503+
504+
batch := make(map[string]Target)
505+
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
506+
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
507+
508+
tm.updateTargets(batch)
509+
time.Sleep(100 * time.Millisecond)
510+
511+
mockService.AssertExpectations(t)
512+
})
513+
471514
t.Run("RetentionPolicyUpdate", func(t *testing.T) {
472515
mockService := new(mockLogsService)
473516

@@ -497,7 +540,7 @@ func TestDescribeLogGroupsBatching(t *testing.T) {
497540
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
498541
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
499542

500-
tm.updateTargetBatch(batch)
543+
tm.updateTargets(batch)
501544
time.Sleep(100 * time.Millisecond)
502545

503546
mockService.AssertExpectations(t)
@@ -523,13 +566,38 @@ func TestDescribeLogGroupsBatching(t *testing.T) {
523566
batch := make(map[string]Target)
524567
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
525568

526-
tm.updateTargetBatch(batch)
569+
tm.updateTargets(batch)
527570
// Sleep enough for retry
528571
time.Sleep(2 * time.Second)
529572

530573
mockService.AssertExpectations(t)
531574
})
532575

576+
t.Run("FallbackToPrefix/OtherError", func(t *testing.T) {
577+
mockService := new(mockLogsService)
578+
579+
// First call with identifiers fails with different error (should not fallback)
580+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
581+
return len(input.LogGroupIdentifiers) > 0
582+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{},
583+
awserr.New(cloudwatchlogs.ErrCodeInvalidParameterException, "Different error message", nil)).Times(numBackoffRetries)
584+
585+
manager := NewTargetManager(logger, mockService)
586+
tm := manager.(*targetManager)
587+
588+
batch := make(map[string]Target)
589+
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
590+
591+
tm.updateTargets(batch)
592+
time.Sleep(2 * time.Second)
593+
594+
mockService.AssertExpectations(t)
595+
// Should not call prefix-based DescribeLogGroups
596+
mockService.AssertNotCalled(t, "DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
597+
return input.LogGroupNamePrefix != nil
598+
}))
599+
})
600+
533601
}
534602

535603
func TestCalculateBackoff(t *testing.T) {

0 commit comments

Comments
 (0)