diff --git a/logs/logs.go b/logs/logs.go index 07839c7753..29976d3416 100644 --- a/logs/logs.go +++ b/logs/logs.go @@ -65,13 +65,13 @@ type LogBackend interface { // e.g. a particular log stream in cloudwatchlogs. type LogDest interface { Publish(events []LogEvent) error - NotifySourceStopped() } // LogAgent is the agent handles pure log pipelines type LogAgent struct { Config *config.Config backends map[string]LogBackend + destNames map[LogDest]string collections []LogCollection retentionAlreadyAttempted map[string]bool } @@ -80,6 +80,7 @@ func NewLogAgent(c *config.Config) *LogAgent { return &LogAgent{ Config: c, backends: make(map[string]LogBackend), + destNames: make(map[LogDest]string), retentionAlreadyAttempted: make(map[string]bool), } } @@ -135,6 +136,7 @@ func (l *LogAgent) Run(ctx context.Context) { } retention = l.checkRetentionAlreadyAttempted(retention, logGroup) dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src) + l.destNames[dest] = dname log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention) go l.runSrcToDest(src, dest) } @@ -146,10 +148,8 @@ func (l *LogAgent) Run(ctx context.Context) { } func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) { - eventsCh := make(chan LogEvent) defer src.Stop() - defer dest.NotifySourceStopped() closed := false src.SetOutput(func(e LogEvent) { @@ -168,11 +168,11 @@ func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) { for e := range eventsCh { err := dest.Publish([]LogEvent{e}) if err == ErrOutputStopped { - log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", src.Destination(), src.Group(), src.Stream()) + log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", l.destNames[dest], src.Group(), src.Stream()) return } if err != nil { - log.Printf("E! [logagent] Failed to publish log to %v, error: %v", src.Destination(), err) + log.Printf("E! [logagent] Failed to publish log to %v, error: %v", l.destNames[dest], err) return } } diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index 43bef59896..efd4a782b8 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -41,9 +41,8 @@ type LogFile struct { started bool } -var _ logs.LogCollection = (*LogFile)(nil) - func NewLogFile() *LogFile { + return &LogFile{ configs: make(map[*FileConfig]map[string]*tailerSrc), done: make(chan struct{}), @@ -252,6 +251,11 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc { } } + destination := fileconfig.Destination + if destination == "" { + destination = t.Destination + } + src := NewTailerSrc( groupName, streamName, t.Destination, diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 770ef5e3f9..09641d651d 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -4,6 +4,7 @@ package cloudwatchlogs import ( + "encoding/json" "fmt" "regexp" "strings" @@ -27,6 +28,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/util" ) const ( @@ -38,7 +40,10 @@ const ( defaultFlushTimeout = 5 * time.Second - maxRetryTimeout = 14*24*time.Hour + 10*time.Minute + maxRetryTimeout = 14*24*time.Hour + 10*time.Minute + metricRetryTimeout = 2 * time.Minute + + attributesInFields = "attributesInFields" ) var ( @@ -69,24 +74,22 @@ type CloudWatchLogs struct { Log telegraf.Logger `toml:"-"` + pusherStopChan chan struct{} pusherWaitGroup sync.WaitGroup cwDests sync.Map workerPool pusher.WorkerPool targetManager pusher.TargetManager once sync.Once middleware awsmiddleware.Middleware - configurer *awsmiddleware.Configurer - configurerOnce sync.Once } -var _ logs.LogBackend = (*CloudWatchLogs)(nil) -var _ telegraf.Output = (*CloudWatchLogs)(nil) - func (c *CloudWatchLogs) Connect() error { return nil } func (c *CloudWatchLogs) Close() error { + close(c.pusherStopChan) + c.pusherWaitGroup.Wait() c.cwDests.Range(func(_, value interface{}) bool { if d, ok := value.(*cwDest); ok { @@ -95,8 +98,6 @@ func (c *CloudWatchLogs) Close() error { return true }) - c.pusherWaitGroup.Wait() - if c.workerPool != nil { c.workerPool.Stop() } @@ -105,8 +106,10 @@ func (c *CloudWatchLogs) Close() error { } func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error { - // we no longer expect this to be used. We now use the OTel awsemfexporter for sending EMF metrics to CloudWatch Logs - return fmt.Errorf("unexpected call to Write") + for _, m := range metrics { + c.writeMetricAsStructuredLog(m) + } + return nil } func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGroupClass string, logSrc logs.LogSrc) logs.LogDest { @@ -131,13 +134,7 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { if cwd, ok := c.cwDests.Load(t); ok { - d := cwd.(*cwDest) - d.Lock() - defer d.Unlock() - if !d.stopped { - d.refCount++ - return d - } + return cwd.(*cwDest) } logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log) @@ -153,15 +150,8 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { } c.targetManager = pusher.NewTargetManager(c.Log, client) }) - p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup) - cwd := &cwDest{ - pusher: p, - retryer: logThrottleRetryer, - refCount: 1, - onStopFunc: func() { - c.cwDests.Delete(t) - }, - } + p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup) + cwd := &cwDest{pusher: p, retryer: logThrottleRetryer} c.cwDests.Store(t, cwd) return cwd } @@ -187,10 +177,7 @@ func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlog ) client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"})) if c.middleware != nil { - c.configurerOnce.Do(func() { - c.configurer = awsmiddleware.NewConfigurer(c.middleware.Handlers()) - }) - if err := c.configurer.Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { + if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err) } else { c.Log.Debug("Configured middleware on AWS client") @@ -199,67 +186,145 @@ func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlog return client } -// Description returns a one-sentence description on the Output -func (c *CloudWatchLogs) Description() string { - return "Configuration for AWS CloudWatchLogs output." +func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) { + t, err := c.getTargetFromMetric(m) + if err != nil { + c.Log.Errorf("Failed to find target: %v", err) + } + cwd := c.getDest(t, nil) + if cwd == nil { + c.Log.Warnf("unable to find log destination, group: %v, stream: %v", t.Group, t.Stream) + return + } + cwd.switchToEMF() + cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout) + + e := c.getLogEventFromMetric(m) + if e == nil { + return + } + + cwd.AddEvent(e) } -var sampleConfig = ` - ## Amazon REGION - region = "us-east-1" +func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) { + tags := m.Tags() + logGroup, ok := tags[LogGroupNameTag] + if !ok { + return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name()) + } else { + m.RemoveTag(LogGroupNameTag) + } - ## Amazon Credentials - ## Credentials are loaded in the following order - ## 1) Assumed credentials via STS if role_arn is specified - ## 2) explicit credentials from 'access_key' and 'secret_key' - ## 3) shared profile from 'profile' - ## 4) environment variables - ## 5) shared credentials file - ## 6) EC2 Instance Profile - #access_key = "" - #secret_key = "" - #token = "" - #role_arn = "" - #profile = "" - #shared_credential_file = "" + logStream, ok := tags[LogStreamNameTag] + if ok { + m.RemoveTag(LogStreamNameTag) + } else if logStream == "" { + logStream = c.LogStreamName + } - # The log stream name. - log_stream_name = "" -` + return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil +} -// SampleConfig returns the default configuration of the Output -func (c *CloudWatchLogs) SampleConfig() string { - return sampleConfig +func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent { + var message string + if metric.HasField(LogEntryField) { + var ok bool + if message, ok = metric.Fields()[LogEntryField].(string); !ok { + c.Log.Warnf("The log entry value field is not string type: %v", metric.Fields()) + return nil + } + } else { + content := map[string]interface{}{} + tags := metric.Tags() + // build all the attributesInFields + if val, ok := tags[attributesInFields]; ok { + attributes := strings.Split(val, ",") + mFields := metric.Fields() + for _, attr := range attributes { + if fieldVal, ok := mFields[attr]; ok { + content[attr] = fieldVal + metric.RemoveField(attr) + } + } + metric.RemoveTag(attributesInFields) + delete(tags, attributesInFields) + } + + // build remaining attributes + for k := range tags { + content[k] = tags[k] + } + + for k, v := range metric.Fields() { + var value interface{} + + switch t := v.(type) { + case int: + value = float64(t) + case int32: + value = float64(t) + case int64: + value = float64(t) + case uint: + value = float64(t) + case uint32: + value = float64(t) + case uint64: + value = float64(t) + case float64: + value = t + case bool: + value = t + case string: + value = t + case time.Time: + value = float64(t.Unix()) + + default: + c.Log.Errorf("Detected unexpected fields (%s,%v) when encoding structured log event, value type %T is not supported", k, v, v) + return nil + } + content[k] = value + } + + jsonMap, err := json.Marshal(content) + if err != nil { + c.Log.Errorf("Unalbe to marshal structured log content: %v", err) + } + message = string(jsonMap) + } + + return &structuredLogEvent{ + msg: message, + t: metric.Time(), + } +} + +type structuredLogEvent struct { + msg string + t time.Time +} + +func (e *structuredLogEvent) Message() string { + return e.msg +} + +func (e *structuredLogEvent) Time() time.Time { + return e.t } -// cwDest is responsible for publishing logs from log files to a log group + log stream. -// Logs from more than one log file may be published to the same destination. cwDest closes -// itself when all log file tailers which referenced this cwDest are closed. -// All exported functions should practice thread-safety by acquiring lock the cwDest -// and not calling any other function which requires the lock. +func (e *structuredLogEvent) Done() {} + type cwDest struct { pusher *pusher.Pusher sync.Mutex isEMF bool + stopped bool retryer *retryer.LogThrottleRetryer - - // refCount keeps track of how many LogSrc objects are referencing - // this cwDest object at any given time. Once there are no more - // references, the cwDest object stops itself, closing all goroutines, - // and it can no longer be used - refCount int - stopped bool - onStopFunc func() } -var _ logs.LogDest = (*cwDest)(nil) - func (cd *cwDest) Publish(events []logs.LogEvent) error { - cd.Lock() - defer cd.Unlock() - if cd.stopped { - return logs.ErrOutputStopped - } for _, e := range events { if !cd.isEMF { msg := e.Message() @@ -267,43 +332,20 @@ func (cd *cwDest) Publish(events []logs.LogEvent) error { cd.switchToEMF() } } - cd.addEvent(e) - } - return nil -} - -func (cd *cwDest) NotifySourceStopped() { - cd.Lock() - defer cd.Unlock() - cd.refCount-- - if cd.refCount <= 0 { - cd.stop() + cd.AddEvent(e) } - - if cd.refCount < 0 { - fmt.Printf("E! Negative refCount on cwDest detected. refCount: %d, logGroup: %s, logStream: %s", cd.refCount, cd.pusher.Group, cd.pusher.Stream) + if cd.stopped { + return logs.ErrOutputStopped } + return nil } func (cd *cwDest) Stop() { - cd.Lock() - defer cd.Unlock() - cd.stop() -} - -func (cd *cwDest) stop() { - if cd.stopped { - return - } cd.retryer.Stop() - cd.pusher.Stop() cd.stopped = true - if cd.onStopFunc != nil { - cd.onStopFunc() - } } -func (cd *cwDest) addEvent(e logs.LogEvent) { +func (cd *cwDest) AddEvent(e logs.LogEvent) { // Drop events for metric path logs when queue is full if cd.isEMF { cd.pusher.AddEventNonBlocking(e) @@ -313,6 +355,8 @@ func (cd *cwDest) addEvent(e logs.LogEvent) { } func (cd *cwDest) switchToEMF() { + cd.Lock() + defer cd.Unlock() if !cd.isEMF { cd.isEMF = true cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs) @@ -322,10 +366,44 @@ func (cd *cwDest) switchToEMF() { } } +// Description returns a one-sentence description on the Output +func (c *CloudWatchLogs) Description() string { + return "Configuration for AWS CloudWatchLogs output." +} + +var sampleConfig = ` + ## Amazon REGION + region = "us-east-1" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + #access_key = "" + #secret_key = "" + #token = "" + #role_arn = "" + #profile = "" + #shared_credential_file = "" + + # The log stream name. + log_stream_name = "" +` + +// SampleConfig returns the default configuration of the Output +func (c *CloudWatchLogs) SampleConfig() string { + return sampleConfig +} + func init() { outputs.Add("cloudwatchlogs", func() telegraf.Output { return &CloudWatchLogs{ ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout}, + pusherStopChan: make(chan struct{}), cwDests: sync.Map{}, middleware: agenthealth.NewAgentHealth( zap.NewNop(), diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go index 66f1643fd0..63b6dfd0d0 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go @@ -69,12 +69,13 @@ func TestCreateDestination(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { c := &CloudWatchLogs{ - Log: testutil.Logger{Name: "test"}, - LogGroupName: "G1", - LogStreamName: "S1", - AccessKey: "access_key", - SecretKey: "secret_key", - cwDests: sync.Map{}, + Log: testutil.Logger{Name: "test"}, + LogGroupName: "G1", + LogStreamName: "S1", + AccessKey: "access_key", + SecretKey: "secret_key", + pusherStopChan: make(chan struct{}), + cwDests: sync.Map{}, } dest := c.CreateDest(testCase.cfgLogGroup, testCase.cfgLogStream, testCase.cfgLogRetention, testCase.cfgLogClass, testCase.cfgTailerSrc).(*cwDest) require.Equal(t, testCase.expectedLogGroup, dest.pusher.Group) @@ -88,10 +89,11 @@ func TestCreateDestination(t *testing.T) { func TestDuplicateDestination(t *testing.T) { c := &CloudWatchLogs{ - Log: testutil.Logger{Name: "test"}, - AccessKey: "access_key", - SecretKey: "secret_key", - cwDests: sync.Map{}, + Log: testutil.Logger{Name: "test"}, + AccessKey: "access_key", + SecretKey: "secret_key", + cwDests: sync.Map{}, + pusherStopChan: make(chan struct{}), } // Given the same log group, log stream, same retention, and logClass d1 := c.CreateDest("FILENAME", "", -1, util.InfrequentAccessLogGroupClass, nil) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go index 1d6edf57e9..f5afeb309a 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go @@ -109,11 +109,6 @@ func (s *senderPool) Send(batch *logEventBatch) { }) } -func (s *senderPool) Stop() { - // workerpool is stopped by the plugin - s.sender.Stop() -} - // SetRetryDuration sets the retry duration on the wrapped Sender. func (s *senderPool) SetRetryDuration(duration time.Duration) { s.sender.SetRetryDuration(duration) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go index d9f3860967..b706688034 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go @@ -105,9 +105,10 @@ func TestWorkerPool(t *testing.T) { func TestSenderPool(t *testing.T) { logger := testutil.NewNopLogger() + stop := make(chan struct{}) mockService := new(mockLogsService) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) - s := newSender(logger, mockService, nil, time.Second) + s := newSender(logger, mockService, nil, time.Second, stop) p := NewWorkerPool(12) sp := newSenderPool(p, s) @@ -131,6 +132,5 @@ func TestSenderPool(t *testing.T) { } p.Stop() - s.Stop() assert.Equal(t, int32(200), completed.Load()) } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go index 57256ae033..33656c3bb2 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go @@ -33,10 +33,11 @@ func NewPusher( workerPool WorkerPool, flushTimeout time.Duration, retryDuration time.Duration, + stop <-chan struct{}, wg *sync.WaitGroup, ) *Pusher { - s := createSender(logger, service, targetManager, workerPool, retryDuration) - q := newQueue(logger, target, flushTimeout, entityProvider, s, wg) + s := createSender(logger, service, targetManager, workerPool, retryDuration, stop) + q := newQueue(logger, target, flushTimeout, entityProvider, s, stop, wg) targetManager.PutRetentionPolicy(target) return &Pusher{ Target: target, @@ -48,11 +49,6 @@ func NewPusher( } } -func (p *Pusher) Stop() { - p.Queue.Stop() - p.Sender.Stop() -} - // createSender initializes a Sender. Wraps it in a senderPool if a WorkerPool is provided. func createSender( logger telegraf.Logger, @@ -60,8 +56,9 @@ func createSender( targetManager TargetManager, workerPool WorkerPool, retryDuration time.Duration, + stop <-chan struct{}, ) Sender { - s := newSender(logger, service, targetManager, retryDuration) + s := newSender(logger, service, targetManager, retryDuration, stop) if workerPool == nil { return s } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go index 6d63e3c4ff..54f68621f9 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go @@ -20,21 +20,23 @@ const eventCount = 100000 func TestPusher(t *testing.T) { t.Run("WithSender", func(t *testing.T) { t.Parallel() + stop := make(chan struct{}) var wg sync.WaitGroup - pusher := setupPusher(t, nil, &wg) + pusher := setupPusher(t, nil, stop, &wg) var completed atomic.Int32 generateEvents(t, pusher, &completed) - pusher.Stop() + close(stop) wg.Wait() }) t.Run("WithSenderPool", func(t *testing.T) { t.Parallel() + stop := make(chan struct{}) var wg sync.WaitGroup wp := NewWorkerPool(5) - pusher := setupPusher(t, wp, &wg) + pusher := setupPusher(t, wp, stop, &wg) _, isSenderPool := pusher.Sender.(*senderPool) assert.True(t, isSenderPool) @@ -42,41 +44,12 @@ func TestPusher(t *testing.T) { var completed atomic.Int32 generateEvents(t, pusher, &completed) - pusher.Stop() + close(stop) wg.Wait() wp.Stop() }) } -func TestPusherStop(t *testing.T) { - var wg sync.WaitGroup - - s := &mockSender{} - s.On("Stop").Return() - - logger := testutil.NewNopLogger() - target := Target{} - service := new(stubLogsService) - service.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - return &cloudwatchlogs.PutLogEventsOutput{}, nil - } - mockManager := new(mockTargetManager) - q := newQueue(logger, target, time.Second, nil, s, &wg) - pusher := &Pusher{ - Target: target, - Queue: q, - Service: service, - TargetManager: mockManager, - EntityProvider: nil, - Sender: s, - } - - pusher.Stop() - - s.AssertCalled(t, "Stop") - -} - func generateEvents(t *testing.T, pusher *Pusher, completed *atomic.Int32) { t.Helper() for i := 0; i < eventCount; i++ { @@ -90,7 +63,7 @@ func generateEvents(t *testing.T, pusher *Pusher, completed *atomic.Int32) { } } -func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pusher { +func setupPusher(t *testing.T, workerPool WorkerPool, stop chan struct{}, wg *sync.WaitGroup) *Pusher { t.Helper() logger := testutil.NewNopLogger() target := Target{Group: "G", Stream: "S", Retention: 7} @@ -112,6 +85,7 @@ func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pushe workerPool, time.Second, time.Minute, + stop, wg, ) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go index e8ad65ffdc..da3a28a25a 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go @@ -17,7 +17,6 @@ import ( type Queue interface { AddEvent(e logs.LogEvent) AddEventNonBlocking(e logs.LogEvent) - Stop() } type queue struct { @@ -35,8 +34,7 @@ type queue struct { resetTimerCh chan struct{} flushTimer *time.Timer flushTimeout atomic.Value - stopCh chan struct{} - stopped bool + stop <-chan struct{} lastSentTime atomic.Value initNonBlockingChOnce sync.Once @@ -44,14 +42,13 @@ type queue struct { wg *sync.WaitGroup } -var _ (Queue) = (*queue)(nil) - func newQueue( logger telegraf.Logger, target Target, flushTimeout time.Duration, entityProvider logs.LogEntityProvider, sender Sender, + stop <-chan struct{}, wg *sync.WaitGroup, ) Queue { q := &queue{ @@ -64,7 +61,7 @@ func newQueue( flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(flushTimeout), - stopCh: make(chan struct{}), + stop: stop, startNonBlockCh: make(chan struct{}), wg: wg, } @@ -108,30 +105,33 @@ func (q *queue) AddEventNonBlocking(e logs.LogEvent) { } } -// Stop stops all goroutines associated with this queue instance. -func (q *queue) Stop() { - if q.stopped { - return - } - close(q.stopCh) - q.stopped = true -} - // start is the main loop for processing events and managing the queue. func (q *queue) start() { defer q.wg.Done() mergeChan := make(chan logs.LogEvent) - go q.merge(mergeChan) + // Merge events from both blocking and non-blocking channel + go func() { + var nonBlockingEventsCh <-chan logs.LogEvent + for { + select { + case e := <-q.eventsCh: + mergeChan <- e + case e := <-nonBlockingEventsCh: + mergeChan <- e + case <-q.startNonBlockCh: + nonBlockingEventsCh = q.nonBlockingEventsCh + case <-q.stop: + return + } + } + }() + go q.manageFlushTimer() for { select { - case e, ok := <-mergeChan: - if !ok { - q.send() - return - } + case e := <-mergeChan: // Start timer when first event of the batch is added (happens after a flush timer timeout) if len(q.batch.events) == 0 { q.resetFlushTimer() @@ -149,23 +149,10 @@ func (q *queue) start() { } else { q.resetFlushTimer() } - } - } -} - -// merge merges events from both blocking and non-blocking channel -func (q *queue) merge(mergeChan chan logs.LogEvent) { - defer close(mergeChan) - var nonBlockingEventsCh <-chan logs.LogEvent - for { - select { - case e := <-q.eventsCh: - mergeChan <- e - case e := <-nonBlockingEventsCh: - mergeChan <- e - case <-q.startNonBlockCh: - nonBlockingEventsCh = q.nonBlockingEventsCh - case <-q.stopCh: + case <-q.stop: + if len(q.batch.events) > 0 { + q.send() + } return } } @@ -207,7 +194,7 @@ func (q *queue) manageFlushTimer() { if flushTimeout, ok := q.flushTimeout.Load().(time.Duration); ok { q.flushTimer.Reset(flushTimeout) } - case <-q.stopCh: + case <-q.stop: q.stopFlushTimer() return } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go index b5fc04d02e..8b9b4132e1 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go @@ -86,10 +86,6 @@ func (m *mockSender) RetryDuration() time.Duration { return args.Get(0).(time.Duration) } -func (m *mockSender) Stop() { - m.Called() -} - func TestAddSingleEvent_WithAccountId(t *testing.T) { t.Parallel() var wg sync.WaitGroup @@ -123,7 +119,7 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { } ep := newMockEntityProvider(expectedEntity) - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") @@ -134,8 +130,7 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { time.Sleep(time.Second) require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -160,7 +155,7 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { } ep := newMockEntityProvider(nil) - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") @@ -171,8 +166,7 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { time.Sleep(2 * time.Second) require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -190,15 +184,14 @@ func TestStopQueueWouldDoFinalSend(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) time.Sleep(10 * time.Millisecond) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") - q.Stop() - sender.Stop() + close(stop) wg.Wait() require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") @@ -214,14 +207,13 @@ func TestStopPusherWouldStopRetries(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) time.Sleep(10 * time.Millisecond) triggerSend(t, q) // stop should try flushing the remaining events with retry disabled - q.Stop() - sender.Stop() + close(stop) time.Sleep(50 * time.Millisecond) wg.Wait() @@ -256,12 +248,11 @@ func TestLongMessageHandling(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent(longMsg, time.Now())) triggerSend(t, q) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -285,15 +276,14 @@ func TestRequestIsLessThan1MB(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 8; i++ { q.AddEvent(newStubLogEvent(longMsg, time.Now())) } time.Sleep(10 * time.Millisecond) triggerSend(t, q) triggerSend(t, q) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -311,7 +301,7 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 30000; i++ { q.AddEvent(newStubLogEvent(msg, time.Now())) } @@ -319,8 +309,7 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { for i := 0; i < 5; i++ { triggerSend(t, q) } - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -337,7 +326,7 @@ func TestTimestampPopulation(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 3; i++ { q.AddEvent(newStubLogEvent("msg", time.Time{})) } @@ -345,8 +334,7 @@ func TestTimestampPopulation(t *testing.T) { for i := 0; i < 5; i++ { triggerSend(t, q) } - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -361,7 +349,7 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now().Add(-15*24*time.Hour))) q.AddEventNonBlocking(newStubLogEvent("MSG", time.Now().Add(2*time.Hour+1*time.Minute))) @@ -374,8 +362,7 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { } time.Sleep(20 * time.Millisecond) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -414,7 +401,7 @@ func TestAddMultipleEvents(t *testing.T) { )) } evts[10], evts[90] = evts[90], evts[10] // make events out of order - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for _, e := range evts { q.AddEvent(e) } @@ -425,8 +412,7 @@ func TestAddMultipleEvents(t *testing.T) { time.Sleep(time.Second) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -466,7 +452,7 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { return nil, nil } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG 25hrs ago", time.Now().Add(-25*time.Hour))) q.AddEvent(newStubLogEvent("MSG 24hrs ago", time.Now().Add(-24*time.Hour))) q.AddEvent(newStubLogEvent("MSG 23hrs ago", time.Now().Add(-23*time.Hour))) @@ -475,8 +461,7 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { time.Sleep(10 * time.Millisecond) q.resetFlushTimer() time.Sleep(20 * time.Millisecond) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -496,7 +481,7 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) @@ -504,8 +489,7 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { require.True(t, strings.Contains(logLine, "E!"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logLine)) require.True(t, strings.Contains(logLine, "unhandled error"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logLine)) - q.Stop() - sender.Stop() + close(stop) wg.Wait() require.EqualValues(t, 1, cnt.Load(), fmt.Sprintf("Expecting pusher to call send 1 time, but %d times called", cnt.Load())) } @@ -542,7 +526,7 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) var eventWG sync.WaitGroup eventWG.Add(1) q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) @@ -560,8 +544,7 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { require.True(t, foundUnknownErr, fmt.Sprintf("Expecting error log with unknown error, but received '%s' in the log", logSink)) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -580,7 +563,7 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) var eventWG sync.WaitGroup eventWG.Add(1) q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) @@ -603,8 +586,7 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { require.True(t, strings.Contains(logLine, "W!"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logSink.String())) require.True(t, strings.Contains(logLine, "300"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logSink.String())) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -630,7 +612,7 @@ func TestAddEventNonBlocking(t *testing.T) { start.Add(time.Duration(i)*time.Millisecond), )) } - q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) time.Sleep(200 * time.Millisecond) // Wait until pusher started, merge channel is blocked for _, e := range evts { @@ -641,8 +623,7 @@ func TestAddEventNonBlocking(t *testing.T) { triggerSend(t, q) time.Sleep(20 * time.Millisecond) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -658,7 +639,7 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { } logSink := testutil.NewLogSink() - q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, time.Second, nil, &wg) + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, time.Second, nil, &wg) q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) @@ -667,8 +648,7 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { expected := fmt.Sprintf("All %v retries to G/S failed for PutLogEvents, request dropped.", cnt.Load()-1) require.True(t, strings.HasSuffix(lastLine, expected), fmt.Sprintf("Expecting error log to end with request dropped, but received '%s' in the log", logSink.String())) - q.Stop() - sender.Stop() + close(stop) wg.Wait() } @@ -687,7 +667,7 @@ func testPreparation( retryDuration time.Duration, entityProvider logs.LogEntityProvider, wg *sync.WaitGroup, -) (*queue, Sender) { +) (chan struct{}, *queue) { return testPreparationWithLogger( t, testutil.NewNopLogger(), @@ -709,19 +689,21 @@ func testPreparationWithLogger( retryDuration time.Duration, entityProvider logs.LogEntityProvider, wg *sync.WaitGroup, -) (*queue, Sender) { +) (chan struct{}, *queue) { t.Helper() + stop := make(chan struct{}) tm := NewTargetManager(logger, service) - s := newSender(logger, service, tm, retryDuration) + s := newSender(logger, service, tm, retryDuration, stop) q := newQueue( logger, Target{"G", "S", util.StandardLogGroupClass, retention}, flushTimeout, entityProvider, s, + stop, wg, ) - return q.(*queue), s + return stop, q.(*queue) } func TestQueueCallbackRegistration(t *testing.T) { @@ -747,6 +729,7 @@ func TestQueueCallbackRegistration(t *testing.T) { }).Return() logger := testutil.NewNopLogger() + stop := make(chan struct{}) q := &queue{ target: Target{"G", "S", util.StandardLogGroupClass, -1}, logger: logger, @@ -757,6 +740,7 @@ func TestQueueCallbackRegistration(t *testing.T) { flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(10 * time.Millisecond), + stop: stop, startNonBlockCh: make(chan struct{}), wg: &wg, } @@ -789,6 +773,7 @@ func TestQueueCallbackRegistration(t *testing.T) { }).Return() logger := testutil.NewNopLogger() + stop := make(chan struct{}) q := &queue{ target: Target{"G", "S", util.StandardLogGroupClass, -1}, logger: logger, @@ -799,6 +784,7 @@ func TestQueueCallbackRegistration(t *testing.T) { flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(10 * time.Millisecond), + stop: stop, startNonBlockCh: make(chan struct{}), wg: &wg, } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go index de1bdf6708..365dba91a0 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go @@ -26,7 +26,6 @@ type Sender interface { Send(*logEventBatch) SetRetryDuration(time.Duration) RetryDuration() time.Duration - Stop() } type sender struct { @@ -34,24 +33,21 @@ type sender struct { retryDuration atomic.Value targetManager TargetManager logger telegraf.Logger - stopCh chan struct{} - stopped bool + stop <-chan struct{} } -var _ (Sender) = (*sender)(nil) - func newSender( logger telegraf.Logger, service cloudWatchLogsService, targetManager TargetManager, retryDuration time.Duration, + stop <-chan struct{}, ) Sender { s := &sender{ logger: logger, service: service, targetManager: targetManager, - stopCh: make(chan struct{}), - stopped: false, + stop: stop, } s.retryDuration.Store(retryDuration) return s @@ -129,7 +125,7 @@ func (s *sender) Send(batch *logEventBatch) { s.logger.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait) select { - case <-s.stopCh: + case <-s.stop: s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream) batch.updateState() return @@ -138,14 +134,6 @@ func (s *sender) Send(batch *logEventBatch) { } } -func (s *sender) Stop() { - if s.stopped { - return - } - close(s.stopCh) - s.stopped = true -} - // SetRetryDuration sets the maximum duration for retrying failed log sends. func (s *sender) SetRetryDuration(retryDuration time.Duration) { s.retryDuration.Store(retryDuration) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go index 3b469350ef..ccc6d1b3ba 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go @@ -80,9 +80,8 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called in success scenario") @@ -103,9 +102,8 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{RejectedLogEventsInfo: rejectedInfo}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) }) @@ -122,9 +120,8 @@ func TestSender(t *testing.T) { mockManager.On("InitTarget", mock.Anything).Return(nil).Once() mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) mockManager.AssertExpectations(t) @@ -149,9 +146,8 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.InvalidParameterException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called for InvalidParameterException") @@ -177,9 +173,8 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.DataAlreadyAcceptedException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called for DataAlreadyAcceptedException") @@ -205,9 +200,8 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, errors.New("test")).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called for non-AWS error") @@ -225,9 +219,8 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) }) @@ -251,9 +244,8 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - s := newSender(logger, mockService, mockManager, 100*time.Millisecond) + s := newSender(logger, mockService, mockManager, 100*time.Millisecond, make(chan struct{})) s.Send(batch) - s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called when retry attempts were exhausted") @@ -279,11 +271,12 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - s := newSender(logger, mockService, mockManager, time.Second) + stopCh := make(chan struct{}) + s := newSender(logger, mockService, mockManager, time.Second, stopCh) go func() { time.Sleep(50 * time.Millisecond) - s.Stop() + close(stopCh) }() s.Send(batch)