Skip to content

Commit 818974e

Browse files
LaurenceJJonesblotusmmetc
authored
enhance: Remove docker acquis internal timer use docker events (#3598)
* enhance: Remove docker acquis internal timer use docker events * enhance: split code and add events to mock client * enhance: fixes * enhance: add check_interval test and add deprecation warning * enhance: warnf -> warn * enhance: Add a retry loop to reconnect to docker events when docker is down * enhance: remove max retries seconds as we have a max count instead * enhance: mr linter mad * enhance: keep trying until we hit max timer * enhance: After a reconnect we always check the containers to attempt to resurrect or else we wait until a event comes in which it may not * enhance: Move info outside for loop * enhance: Move info to reconnect goto * enhance: mr linter pls be happy * extract reusable backoff code; apply it to the initial connection, rename method etc. * log messages * connection error --------- Co-authored-by: blotus <[email protected]> Co-authored-by: marco <[email protected]>
1 parent 5fe038d commit 818974e

File tree

2 files changed

+165
-47
lines changed

2 files changed

+165
-47
lines changed

pkg/acquisition/modules/docker/docker.go

Lines changed: 147 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313

1414
dockerTypes "github.com/docker/docker/api/types"
1515
dockerContainer "github.com/docker/docker/api/types/container"
16+
dockerTypesEvents "github.com/docker/docker/api/types/events"
17+
dockerFilter "github.com/docker/docker/api/types/filters"
1618
"github.com/docker/docker/client"
1719
"github.com/prometheus/client_golang/prometheus"
1820
log "github.com/sirupsen/logrus"
@@ -53,7 +55,6 @@ type DockerSource struct {
5355
runningContainerState map[string]*ContainerConfig
5456
compiledContainerName []*regexp.Regexp
5557
compiledContainerID []*regexp.Regexp
56-
CheckIntervalDuration time.Duration
5758
logger *log.Entry
5859
Client client.CommonAPIClient
5960
t *tomb.Tomb
@@ -75,9 +76,8 @@ func (d *DockerSource) GetUuid() string {
7576

7677
func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
7778
d.Config = DockerConfiguration{
78-
FollowStdout: true, // default
79-
FollowStdErr: true, // default
80-
CheckInterval: "1s", // default
79+
FollowStdout: true, // default
80+
FollowStdErr: true, // default
8181
}
8282

8383
err := yaml.UnmarshalStrict(yamlConfig, &d.Config)
@@ -97,9 +97,8 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
9797
return errors.New("use_container_labels and container_name, container_id, container_id_regexp, container_name_regexp are mutually exclusive")
9898
}
9999

100-
d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval)
101-
if err != nil {
102-
return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration)
100+
if d.Config.CheckInterval != "" {
101+
d.logger.Warn("check_interval is deprecated, it will be removed in a future version")
103102
}
104103

105104
if d.Config.Mode == "" {
@@ -495,63 +494,164 @@ func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes.
495494
return nil
496495
}
497496

497+
func (d *DockerSource) checkContainers(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
498+
// to track for garbage collection
499+
runningContainersID := make(map[string]bool)
500+
501+
runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})
502+
if err != nil {
503+
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
504+
for idx, container := range d.runningContainerState {
505+
if d.runningContainerState[idx].t.Alive() {
506+
d.logger.Infof("killing tail for container %s", container.Name)
507+
d.runningContainerState[idx].t.Kill(nil)
508+
509+
if err := d.runningContainerState[idx].t.Wait(); err != nil {
510+
d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
511+
}
512+
}
513+
514+
delete(d.runningContainerState, idx)
515+
}
516+
} else {
517+
log.Errorf("container list err: %s", err)
518+
}
519+
520+
return err
521+
}
522+
523+
for _, container := range runningContainers {
524+
runningContainersID[container.ID] = true
525+
526+
// don't need to re eval an already monitored container
527+
if _, ok := d.runningContainerState[container.ID]; ok {
528+
continue
529+
}
530+
531+
if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
532+
monitChan <- containerConfig
533+
}
534+
}
535+
536+
for containerStateID, containerConfig := range d.runningContainerState {
537+
if _, ok := runningContainersID[containerStateID]; !ok {
538+
deleteChan <- containerConfig
539+
}
540+
}
541+
542+
d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
543+
return nil
544+
}
545+
546+
// subscribeEvents will loop until it can successfully call d.Client.Events()
547+
// without immediately receiving an error. It applies exponential backoff on failures.
548+
// Returns the new (eventsChan, errChan) pair or an error if context/tomb is done.
549+
func (d *DockerSource) subscribeEvents(ctx context.Context) (<-chan dockerTypesEvents.Message, <-chan error, error) {
550+
const (
551+
initialBackoff = 2 * time.Second
552+
backoffFactor = 2
553+
maxBackoff = 60 * time.Second
554+
)
555+
556+
f := dockerFilter.NewArgs()
557+
f.Add("type", "container")
558+
559+
options := dockerTypesEvents.ListOptions{
560+
Filters: f,
561+
}
562+
563+
backoff := initialBackoff
564+
retries := 0
565+
566+
d.logger.Infof("Subscribing to Docker events")
567+
568+
for {
569+
// bail out immediately if the context is canceled
570+
select {
571+
case <-ctx.Done():
572+
return nil, nil, ctx.Err()
573+
case <-d.t.Dying():
574+
return nil, nil, errors.New("connection aborted, shutting down docker watcher")
575+
default:
576+
}
577+
578+
// Try to reconnect
579+
eventsChan, errChan := d.Client.Events(ctx, options)
580+
581+
// Retry if the connection is immediately broken
582+
select {
583+
case err := <-errChan:
584+
d.logger.Errorf("Connection to Docker failed (attempt %d): %v", retries+1, err)
585+
586+
retries++
587+
588+
d.logger.Infof("Sleeping %s before next retry", backoff)
589+
590+
// Wait for 'backoff', but still allow cancellation
591+
select {
592+
case <-time.After(backoff):
593+
// Continue after backoff
594+
case <-ctx.Done():
595+
return nil, nil, ctx.Err()
596+
case <-d.t.Dying():
597+
return nil, nil, errors.New("connection aborted, shutting down docker watcher")
598+
}
599+
600+
backoff = max(backoff*backoffFactor, maxBackoff)
601+
602+
continue
603+
default:
604+
// great success!
605+
return eventsChan, errChan, nil
606+
}
607+
}
608+
}
609+
498610
func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
499-
ticker := time.NewTicker(d.CheckIntervalDuration)
500-
d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
611+
err := d.checkContainers(ctx, monitChan, deleteChan)
612+
if err != nil {
613+
return err
614+
}
615+
616+
eventsChan, errChan, err := d.subscribeEvents(ctx)
617+
if err != nil {
618+
return err
619+
}
501620

502621
for {
503622
select {
504623
case <-d.t.Dying():
505624
d.logger.Infof("stopping container watcher")
506625
return nil
507-
case <-ticker.C:
508-
// to track for garbage collection
509-
runningContainersID := make(map[string]bool)
510626

511-
runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})
512-
if err != nil {
513-
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
514-
for idx, container := range d.runningContainerState {
515-
if d.runningContainerState[idx].t.Alive() {
516-
d.logger.Infof("killing tail for container %s", container.Name)
517-
d.runningContainerState[idx].t.Kill(nil)
518-
519-
if err := d.runningContainerState[idx].t.Wait(); err != nil {
520-
d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
521-
}
522-
}
523-
524-
delete(d.runningContainerState, idx)
525-
}
526-
} else {
527-
log.Errorf("container list err: %s", err)
627+
case event := <-eventsChan:
628+
if event.Action == dockerTypesEvents.ActionStart || event.Action == dockerTypesEvents.ActionDie {
629+
if err := d.checkContainers(ctx, monitChan, deleteChan); err != nil {
630+
d.logger.Warnf("Failed to check containers: %v", err)
528631
}
632+
}
529633

634+
case err := <-errChan:
635+
if err == nil {
530636
continue
531637
}
532638

533-
for _, container := range runningContainers {
534-
runningContainersID[container.ID] = true
639+
d.logger.Errorf("Docker events error: %v", err)
535640

536-
// don't need to re eval an already monitored container
537-
if _, ok := d.runningContainerState[container.ID]; ok {
538-
continue
539-
}
540-
541-
if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
542-
monitChan <- containerConfig
543-
}
641+
// try to reconnect, replacing our channels on success. They are never nil if err is nil.
642+
newEvents, newErr, recErr := d.subscribeEvents(ctx)
643+
if recErr != nil {
644+
return recErr
544645
}
545646

546-
for containerStateID, containerConfig := range d.runningContainerState {
547-
if _, ok := runningContainersID[containerStateID]; !ok {
548-
deleteChan <- containerConfig
549-
}
550-
}
647+
eventsChan, errChan = newEvents, newErr
551648

552-
d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
553-
554-
ticker.Reset(d.CheckIntervalDuration)
649+
d.logger.Info("Successfully reconnected to Docker events")
650+
// We check containers after a reconnection because the docker daemon might have restarted
651+
// and the container tombs may have self deleted
652+
if err := d.checkContainers(ctx, monitChan, deleteChan); err != nil {
653+
d.logger.Warnf("Failed to check containers: %v", err)
654+
}
555655
}
556656
}
557657
}

pkg/acquisition/modules/docker/docker_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
dockerTypes "github.com/docker/docker/api/types"
1515
dockerContainer "github.com/docker/docker/api/types/container"
16+
dockerTypesEvents "github.com/docker/docker/api/types/events"
1617
"github.com/docker/docker/client"
1718
log "github.com/sirupsen/logrus"
1819
"github.com/stretchr/testify/assert"
@@ -50,6 +51,15 @@ source: docker`,
5051
config: `
5152
mode: cat
5253
source: docker
54+
container_name:
55+
- toto`,
56+
expectedErr: "",
57+
},
58+
{
59+
config: `
60+
mode: cat
61+
source: docker
62+
check_interval: 10s
5363
container_name:
5464
- toto`,
5565
expectedErr: "",
@@ -273,6 +283,14 @@ func (cli *mockDockerCli) ContainerInspect(ctx context.Context, c string) (docke
273283
return r, nil
274284
}
275285

286+
// Since we are mocking the docker client, we return channels that will never be used
287+
func (cli *mockDockerCli) Events(ctx context.Context, options dockerTypesEvents.ListOptions) (<-chan dockerTypesEvents.Message, <-chan error) {
288+
eventsChan := make(chan dockerTypesEvents.Message)
289+
errChan := make(chan error)
290+
291+
return eventsChan, errChan
292+
}
293+
276294
func TestOneShot(t *testing.T) {
277295
ctx := t.Context()
278296

0 commit comments

Comments
 (0)