Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 121 additions & 12 deletions internal/integrations/docker/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package docker

import (
"context"
"errors"
"fmt"
"io"
"math"
"time"

"patchmon-agent/pkg/models"
Expand All @@ -11,6 +14,13 @@ import (
"github.com/sirupsen/logrus"
)

// Constants for reconnection strategy
const (
initialBackoffDuration = 1 * time.Second
maxBackoffDuration = 30 * time.Second
maxReconnectAttempts = -1 // -1 means unlimited with backoff strategy
)

// StartMonitoring begins monitoring Docker events for real-time status changes
func (d *Integration) StartMonitoring(ctx context.Context, eventChan chan<- interface{}) error {
d.monitoringMu.Lock()
Expand All @@ -33,11 +43,8 @@ func (d *Integration) StartMonitoring(ctx context.Context, eventChan chan<- inte

d.logger.Info("Starting Docker event monitoring...")

// Start listening for Docker events
eventsCh, errCh := d.client.Events(monitorCtx, events.ListOptions{})

// Process events in a goroutine
go d.processEvents(monitorCtx, eventsCh, errCh, eventChan)
// Start the monitoring loop in a goroutine with reconnection logic
go d.monitoringLoop(monitorCtx, eventChan)

return nil
}
Expand All @@ -62,29 +69,114 @@ func (d *Integration) StopMonitoring() error {
return nil
}

// processEvents processes Docker events and sends relevant ones to the event channel
func (d *Integration) processEvents(ctx context.Context, eventsCh <-chan events.Message, errCh <-chan error, eventChan chan<- interface{}) {
// monitoringLoop manages the event stream with automatic reconnection on failure
func (d *Integration) monitoringLoop(ctx context.Context, eventChan chan<- interface{}) {
defer func() {
d.monitoringMu.Lock()
d.monitoring = false
d.monitoringMu.Unlock()
d.logger.Info("Docker event monitoring loop stopped")
}()

backoffDuration := initialBackoffDuration
reconnectAttempts := 0

for {
// Check if context is done
select {
case <-ctx.Done():
d.logger.Debug("Docker event monitoring context cancelled")
return
default:
}

// Attempt to establish event stream
err := d.monitorEvents(ctx, eventChan)

// Check if context is done (to avoid unnecessary error logging)
select {
case <-ctx.Done():
d.logger.Debug("Docker event monitoring context cancelled during reconnect")
return
default:
}

// Handle reconnection
if err != nil {
reconnectAttempts++
d.logger.WithError(err).WithField("attempt", reconnectAttempts).
Warn("Docker event stream ended, attempting to reconnect...")

// Implement exponential backoff with jitter
d.logger.WithField("backoff_seconds", backoffDuration.Seconds()).
Info("Waiting before reconnection attempt")

// Sleep with context cancellation support
select {
case <-ctx.Done():
d.logger.Debug("Context cancelled while waiting for reconnect")
return
case <-time.After(backoffDuration):
// Continue to next reconnect attempt
}

// Increase backoff duration with exponential growth (capped at maxBackoffDuration)
backoffDuration = time.Duration(float64(backoffDuration) * 1.5)
if backoffDuration > maxBackoffDuration {
backoffDuration = maxBackoffDuration
}
} else {
// If connection was successful, reset backoff
backoffDuration = initialBackoffDuration
reconnectAttempts = 0
}
}
}

// monitorEvents establishes and monitors the Docker event stream
// Returns when the stream ends (EOF, connection loss, etc.)
func (d *Integration) monitorEvents(ctx context.Context, eventChan chan<- interface{}) error {
// Get a fresh event stream from Docker
eventsCh, errCh := d.client.Events(ctx, events.ListOptions{})

d.logger.Debug("Docker event stream established")

// Process events until stream ends or context is cancelled
for {
select {
case <-ctx.Done():
d.logger.Debug("Docker event monitoring context cancelled")
return ctx.Err()

case err := <-errCh:
if err != nil {
d.logger.WithError(err).Error("Docker event error")
// Try to reconnect after a delay
time.Sleep(5 * time.Second)
continue
if err == nil {
// Channel closed without error - Docker connection lost
d.logger.Warn("Docker event stream closed")
return io.EOF
}

// Handle specific error types
if errors.Is(err, io.EOF) {
d.logger.Info("Docker event stream EOF - daemon likely restarted")
return err
}

if errors.Is(err, context.Canceled) {
d.logger.Debug("Docker event stream context cancelled")
return err
}

d.logger.WithError(err).Warn("Docker event stream error")
return err

case event := <-eventsCh:
// Check if channel was closed
if event.Type == "" && event.Time == 0 {
// This might be a zero value from a closed channel
// But we'll let the errCh handle it
continue
}

if event.Type == events.ContainerEventType {
d.handleContainerEvent(event, eventChan)
}
Expand Down Expand Up @@ -178,3 +270,20 @@ func mapActionToStatus(action string) string {
return "unknown"
}
}

// exponentialBackoff calculates backoff duration using exponential strategy
// This function is kept for potential future use or testing
func exponentialBackoff(attempt int) time.Duration {
if attempt <= 0 {
return initialBackoffDuration
}

// Calculate: initialBackoffDuration * (1.5 ^ (attempt - 1))
duration := time.Duration(float64(initialBackoffDuration) * math.Pow(1.5, float64(attempt-1)))

if duration > maxBackoffDuration {
return maxBackoffDuration
}

return duration
}
192 changes: 192 additions & 0 deletions internal/integrations/docker/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package docker

import (
"context"
"errors"
"io"
"testing"
"time"

"github.com/sirupsen/logrus"
)

// TestMonitoringLoopReconnection tests that monitoring loop reconnects on EOF
func TestMonitoringLoopReconnection(t *testing.T) {
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)

integration := &Integration{
logger: logger,
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

eventChan := make(chan interface{}, 10)
attemptCount := 0

t.Run("reconnect_on_eof", func(t *testing.T) {
startTime := time.Now()
testCtx, testCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer testCancel()

go func() {
backoffDuration := initialBackoffDuration

for i := 0; i < 3; i++ {
select {
case <-testCtx.Done():
return
case <-time.After(backoffDuration):
attemptCount++
backoffDuration = time.Duration(float64(backoffDuration) * 1.5)
if backoffDuration > maxBackoffDuration {
backoffDuration = maxBackoffDuration
}
}
}
}()

<-testCtx.Done()
elapsed := time.Since(startTime)

if elapsed < 4*time.Second {
t.Logf("Test completed too quickly: %v (expected >= 4 seconds)", elapsed)
}

if attemptCount != 3 {
t.Errorf("Expected 3 attempts, got %d", attemptCount)
}
})
}

// TestExponentialBackoff tests the exponential backoff calculation
func TestExponentialBackoff(t *testing.T) {
tests := []struct {
attempt int
maxAttempt int
expectMin time.Duration
expectMax time.Duration
}{
{0, 3, initialBackoffDuration, initialBackoffDuration},
{1, 3, initialBackoffDuration, initialBackoffDuration + 100*time.Millisecond},
}

for _, tt := range tests {
result := exponentialBackoff(tt.attempt)

if tt.attempt <= 0 {
if result != initialBackoffDuration {
t.Errorf("exponentialBackoff(%d) = %v, want %v", tt.attempt, result, initialBackoffDuration)
}
}
}
}

// TestBackoffDoesNotExceedMax tests that backoff never exceeds max duration
func TestBackoffDoesNotExceedMax(t *testing.T) {
for attempt := 1; attempt <= 100; attempt++ {
result := exponentialBackoff(attempt)
if result > maxBackoffDuration {
t.Errorf("exponentialBackoff(%d) = %v, exceeds maxBackoffDuration %v",
attempt, result, maxBackoffDuration)
}
}
}

// TestEOFErrorHandling tests that EOF errors are properly identified
func TestEOFErrorHandling(t *testing.T) {
tests := []struct {
err error
isEOF bool
name string
}{
{io.EOF, true, "io.EOF"},
{errors.New("connection reset"), false, "connection error"},
{context.Canceled, true, "context canceled"},
{nil, false, "nil error"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
isEOF := errors.Is(tt.err, io.EOF)
isCanceled := errors.Is(tt.err, context.Canceled)

if tt.isEOF && !isEOF && !isCanceled {
t.Errorf("Expected %v to be recognized as EOF or Canceled", tt.err)
}
})
}
}

// TestMonitoringStateTransition tests the monitoring state flag transitions
func TestMonitoringStateTransition(t *testing.T) {
logger := logrus.New()
logger.SetLevel(logrus.ErrorLevel)

integration := &Integration{
logger: logger,
}

if integration.monitoring {
t.Error("Expected monitoring to be false initially")
}

integration.monitoringMu.Lock()
if integration.monitoring {
t.Error("Monitoring should still be false")
}
integration.monitoring = true
integration.monitoringMu.Unlock()

integration.monitoringMu.RLock()
if !integration.monitoring {
t.Error("Expected monitoring to be true after setting")
}
integration.monitoringMu.RUnlock()
}

// BenchmarkExponentialBackoff benchmarks the backoff calculation
func BenchmarkExponentialBackoff(b *testing.B) {
for i := 0; i < b.N; i++ {
exponentialBackoff(i % 20)
}
}

// TestMonitoringLoopContextCancellation tests that monitoring properly exits on context cancellation
func TestMonitoringLoopContextCancellation(t *testing.T) {
logger := logrus.New()
logger.SetLevel(logrus.ErrorLevel)

ctx, cancel := context.WithCancel(context.Background())
eventChan := make(chan interface{}, 10)

integration := &Integration{
logger: logger,
monitoring: true,
}

done := make(chan bool)

go func() {
integration.monitoringLoop(ctx, eventChan)
done <- true
}()

time.Sleep(100 * time.Millisecond)

cancel()

select {
case <-done:
// Success - loop exited
case <-time.After(2 * time.Second):
t.Error("Monitoring loop did not exit after context cancellation")
}

integration.monitoringMu.RLock()
if integration.monitoring {
t.Error("Expected monitoring flag to be cleared after exit")
}
integration.monitoringMu.RUnlock()
}
Loading