Skip to content

Commit 24e0f0c

Browse files
authored
refact pkg/acquisition: split docker.go (#4065)
1 parent e234201 commit 24e0f0c

File tree

6 files changed

+501
-466
lines changed

6 files changed

+501
-466
lines changed

pkg/acquisition/docker.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88

99
var (
1010
// verify interface compliance
11-
_ DataSource = (*dockeracquisition.DockerSource)(nil)
12-
_ DSNConfigurer = (*dockeracquisition.DockerSource)(nil)
13-
_ Fetcher = (*dockeracquisition.DockerSource)(nil)
14-
_ Tailer = (*dockeracquisition.DockerSource)(nil)
15-
_ MetricsProvider = (*dockeracquisition.DockerSource)(nil)
11+
_ DataSource = (*dockeracquisition.Source)(nil)
12+
_ DSNConfigurer = (*dockeracquisition.Source)(nil)
13+
_ Fetcher = (*dockeracquisition.Source)(nil)
14+
_ Tailer = (*dockeracquisition.Source)(nil)
15+
_ MetricsProvider = (*dockeracquisition.Source)(nil)
1616
)
1717

1818
//nolint:gochecknoinits
1919
func init() {
20-
registerDataSource("docker", func() DataSource { return &dockeracquisition.DockerSource{} })
20+
registerDataSource("docker", func() DataSource { return &dockeracquisition.Source{} })
2121
}
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
package dockeracquisition
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/url"
8+
"regexp"
9+
"strconv"
10+
"time"
11+
12+
dockerTypesSwarm "github.com/moby/moby/api/types/swarm"
13+
"github.com/moby/moby/client"
14+
yaml "github.com/goccy/go-yaml"
15+
log "github.com/sirupsen/logrus"
16+
17+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
18+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/docker/tracker"
19+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
20+
)
21+
22+
type Configuration struct {
23+
configuration.DataSourceCommonCfg `yaml:",inline"`
24+
25+
CheckInterval string `yaml:"check_interval"`
26+
FollowStdout bool `yaml:"follow_stdout"`
27+
FollowStdErr bool `yaml:"follow_stderr"`
28+
Until string `yaml:"until"`
29+
Since string `yaml:"since"`
30+
DockerHost string `yaml:"docker_host"`
31+
ContainerName []string `yaml:"container_name"`
32+
ContainerID []string `yaml:"container_id"`
33+
ContainerNameRegexp []string `yaml:"container_name_regexp"`
34+
ContainerIDRegexp []string `yaml:"container_id_regexp"`
35+
ServiceName []string `yaml:"service_name"`
36+
ServiceID []string `yaml:"service_id"`
37+
ServiceNameRegexp []string `yaml:"service_name_regexp"`
38+
ServiceIDRegexp []string `yaml:"service_id_regexp"`
39+
UseServiceLabels bool `yaml:"use_service_labels"`
40+
UseContainerLabels bool `yaml:"use_container_labels"`
41+
}
42+
43+
func (dc *Configuration) hasServiceConfig() bool {
44+
return len(dc.ServiceName) > 0 || len(dc.ServiceID) > 0 ||
45+
len(dc.ServiceIDRegexp) > 0 || len(dc.ServiceNameRegexp) > 0 || dc.UseServiceLabels
46+
}
47+
48+
func (dc *Configuration) hasContainerConfig() bool {
49+
return len(dc.ContainerName) > 0 || len(dc.ContainerID) > 0 ||
50+
len(dc.ContainerIDRegexp) > 0 || len(dc.ContainerNameRegexp) > 0 || dc.UseContainerLabels
51+
}
52+
53+
func (d *Source) UnmarshalConfig(yamlConfig []byte) error {
54+
d.Config = Configuration{
55+
FollowStdout: true, // default
56+
FollowStdErr: true, // default
57+
}
58+
59+
if err := yaml.UnmarshalWithOptions(yamlConfig, &d.Config, yaml.Strict()); err != nil {
60+
return fmt.Errorf("while parsing DockerAcquisition configuration: %s", yaml.FormatError(err, false, false))
61+
}
62+
63+
if d.logger != nil {
64+
d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config)
65+
}
66+
67+
// Check if we have any container or service configuration
68+
if !d.Config.hasContainerConfig() && !d.Config.hasServiceConfig() {
69+
return errors.New("no containers or services configuration provided")
70+
}
71+
72+
if d.Config.UseContainerLabels && (len(d.Config.ContainerName) > 0 || len(d.Config.ContainerID) > 0 || len(d.Config.ContainerIDRegexp) > 0 || len(d.Config.ContainerNameRegexp) > 0) {
73+
return errors.New("use_container_labels and container_name, container_id, container_id_regexp, container_name_regexp are mutually exclusive")
74+
}
75+
76+
if d.Config.UseServiceLabels && (len(d.Config.ServiceName) > 0 || len(d.Config.ServiceID) > 0 || len(d.Config.ServiceIDRegexp) > 0 || len(d.Config.ServiceNameRegexp) > 0) {
77+
return errors.New("use_service_labels and service_name, service_id, service_id_regexp, service_name_regexp are mutually exclusive")
78+
}
79+
80+
if d.Config.CheckInterval != "" && d.logger != nil {
81+
d.logger.Warn("check_interval is deprecated, it will be removed in a future version")
82+
}
83+
84+
if d.Config.Mode == "" {
85+
d.Config.Mode = configuration.TAIL_MODE
86+
}
87+
88+
if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE {
89+
return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode)
90+
}
91+
92+
for _, cont := range d.Config.ContainerNameRegexp {
93+
compiled, err := regexp.Compile(cont)
94+
if err != nil {
95+
return fmt.Errorf("container_name_regexp: %w", err)
96+
}
97+
98+
d.compiledContainerName = append(d.compiledContainerName, compiled)
99+
}
100+
101+
for _, cont := range d.Config.ContainerIDRegexp {
102+
compiled, err := regexp.Compile(cont)
103+
if err != nil {
104+
return fmt.Errorf("container_id_regexp: %w", err)
105+
}
106+
107+
d.compiledContainerID = append(d.compiledContainerID, compiled)
108+
}
109+
110+
for _, svc := range d.Config.ServiceNameRegexp {
111+
compiled, err := regexp.Compile(svc)
112+
if err != nil {
113+
return fmt.Errorf("service_name_regexp: %w", err)
114+
}
115+
116+
d.compiledServiceName = append(d.compiledServiceName, compiled)
117+
}
118+
119+
for _, svc := range d.Config.ServiceIDRegexp {
120+
compiled, err := regexp.Compile(svc)
121+
if err != nil {
122+
return fmt.Errorf("service_id_regexp: %w", err)
123+
}
124+
125+
d.compiledServiceID = append(d.compiledServiceID, compiled)
126+
}
127+
128+
if d.Config.Since == "" {
129+
d.Config.Since = time.Now().UTC().Format(time.RFC3339)
130+
}
131+
132+
d.containerLogsOptions = &client.ContainerLogsOptions{
133+
ShowStdout: d.Config.FollowStdout,
134+
ShowStderr: d.Config.FollowStdErr,
135+
Follow: true,
136+
Since: d.Config.Since,
137+
}
138+
139+
if d.Config.Until != "" {
140+
d.containerLogsOptions.Until = d.Config.Until
141+
}
142+
143+
return nil
144+
}
145+
146+
func (d *Source) Configure(ctx context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error {
147+
d.logger = logger
148+
d.metricsLevel = metricsLevel
149+
150+
err := d.UnmarshalConfig(yamlConfig)
151+
if err != nil {
152+
return err
153+
}
154+
155+
d.runningContainerState = tracker.NewTracker[*ContainerConfig]()
156+
d.runningServiceState = tracker.NewTracker[*ContainerConfig]()
157+
158+
d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)
159+
160+
opts := []client.Opt{
161+
client.FromEnv,
162+
client.WithAPIVersionNegotiation(),
163+
}
164+
165+
if d.Config.DockerHost != "" {
166+
opts = append(opts, client.WithHost(d.Config.DockerHost))
167+
}
168+
169+
d.Client, err = client.New(opts...)
170+
if err != nil {
171+
return err
172+
}
173+
174+
info, err := d.Client.Info(ctx, client.InfoOptions{})
175+
if err != nil {
176+
return fmt.Errorf("failed to get docker info: %w", err)
177+
}
178+
179+
if info.Info.Swarm.LocalNodeState == dockerTypesSwarm.LocalNodeStateActive && info.Info.Swarm.ControlAvailable {
180+
hasServiceConfig := d.Config.hasServiceConfig()
181+
if hasServiceConfig {
182+
d.isSwarmManager = true
183+
d.logger.Info("node is swarm manager, enabling swarm detection mode")
184+
}
185+
186+
if !hasServiceConfig {
187+
// we set to false cause user didnt provide service configuration even though we are a swarm manager
188+
d.isSwarmManager = false
189+
d.logger.Warn("node is swarm manager, but no service configuration provided - service monitoring will be disabled, if this is unintentional please apply constraints")
190+
}
191+
}
192+
193+
d.backoffFactory = newDockerBackOffFactory()
194+
195+
return nil
196+
}
197+
198+
func (d *Source) ConfigureByDSN(_ context.Context, dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
199+
var err error
200+
201+
parsedURL, err := url.Parse(dsn)
202+
if err != nil {
203+
return fmt.Errorf("failed to parse DSN %s: %w", dsn, err)
204+
}
205+
206+
if parsedURL.Scheme != d.GetName() {
207+
return fmt.Errorf("invalid DSN %s for docker source, must start with %s://", dsn, d.GetName())
208+
}
209+
210+
d.Config = Configuration{
211+
FollowStdout: true,
212+
FollowStdErr: true,
213+
CheckInterval: "1s",
214+
}
215+
d.Config.UniqueId = uuid
216+
d.Config.ContainerName = make([]string, 0)
217+
d.Config.ContainerID = make([]string, 0)
218+
d.runningContainerState = tracker.NewTracker[*ContainerConfig]()
219+
d.runningServiceState = tracker.NewTracker[*ContainerConfig]()
220+
d.Config.Mode = configuration.CAT_MODE
221+
d.logger = logger
222+
d.Config.Labels = labels
223+
224+
opts := []client.Opt{
225+
client.FromEnv,
226+
client.WithAPIVersionNegotiation(),
227+
}
228+
229+
d.containerLogsOptions = &client.ContainerLogsOptions{
230+
ShowStdout: d.Config.FollowStdout,
231+
ShowStderr: d.Config.FollowStdErr,
232+
Follow: false,
233+
}
234+
235+
containerNameOrID := parsedURL.Host
236+
237+
if containerNameOrID == "" {
238+
return fmt.Errorf("empty %s DSN", d.GetName()+"://")
239+
}
240+
241+
d.Config.ContainerName = append(d.Config.ContainerName, containerNameOrID)
242+
// we add it as an ID also so user can provide docker name or docker ID
243+
d.Config.ContainerID = append(d.Config.ContainerID, containerNameOrID)
244+
245+
parameters := parsedURL.Query()
246+
247+
for k, v := range parameters {
248+
switch k {
249+
case "log_level":
250+
if len(v) != 1 {
251+
return errors.New("only one 'log_level' parameters is required, not many")
252+
}
253+
lvl, err := log.ParseLevel(v[0])
254+
if err != nil {
255+
return fmt.Errorf("unknown level %s: %w", v[0], err)
256+
}
257+
d.logger.Logger.SetLevel(lvl)
258+
case "until":
259+
if len(v) != 1 {
260+
return errors.New("only one 'until' parameters is required, not many")
261+
}
262+
d.containerLogsOptions.Until = v[0]
263+
case "since":
264+
if len(v) != 1 {
265+
return errors.New("only one 'since' parameters is required, not many")
266+
}
267+
d.containerLogsOptions.Since = v[0]
268+
case "follow_stdout":
269+
if len(v) != 1 {
270+
return errors.New("only one 'follow_stdout' parameters is required, not many")
271+
}
272+
followStdout, err := strconv.ParseBool(v[0])
273+
if err != nil {
274+
return fmt.Errorf("parsing 'follow_stdout' parameters: %s", err)
275+
}
276+
d.Config.FollowStdout = followStdout
277+
d.containerLogsOptions.ShowStdout = followStdout
278+
case "follow_stderr":
279+
if len(v) != 1 {
280+
return errors.New("only one 'follow_stderr' parameters is required, not many")
281+
}
282+
followStdErr, err := strconv.ParseBool(v[0])
283+
if err != nil {
284+
return fmt.Errorf("parsing 'follow_stderr' parameters: %s", err)
285+
}
286+
d.Config.FollowStdErr = followStdErr
287+
d.containerLogsOptions.ShowStderr = followStdErr
288+
case "docker_host":
289+
if len(v) != 1 {
290+
return errors.New("only one 'docker_host' parameters is required, not many")
291+
}
292+
opts = append(opts, client.WithHost(v[0]))
293+
}
294+
}
295+
296+
d.Client, err = client.New(opts...)
297+
if err != nil {
298+
return err
299+
}
300+
301+
d.backoffFactory = newDockerBackOffFactory()
302+
303+
return nil
304+
}

pkg/acquisition/modules/docker/docker_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ service_id_regexp:
130130

131131
for _, tc := range tests {
132132
t.Run(tc.config, func(t *testing.T) {
133-
f := DockerSource{}
133+
f := Source{}
134134
err := f.Configure(ctx, []byte(tc.config), subLogger, metrics.AcquisitionMetricsLevelNone)
135135
cstest.RequireErrorContains(t, err, tc.expectedErr)
136136
})
@@ -190,7 +190,7 @@ func TestConfigureDSN(t *testing.T) {
190190

191191
for _, test := range tests {
192192
t.Run(test.name, func(t *testing.T) {
193-
f := DockerSource{}
193+
f := Source{}
194194
err := f.ConfigureByDSN(ctx, test.dsn, map[string]string{"type": "testtype"}, subLogger, "")
195195
cstest.AssertErrorContains(t, err, test.expectedErr)
196196
})
@@ -339,7 +339,7 @@ service_name_regexp:
339339

340340
dockerTomb := tomb.Tomb{}
341341
out := make(chan pipeline.Event)
342-
dockerSource := DockerSource{}
342+
dockerSource := Source{}
343343
err := dockerSource.Configure(ctx, []byte(ts.config), subLogger, metrics.AcquisitionMetricsLevelNone)
344344
cstest.AssertErrorContains(t, err, ts.expectedErr)
345345

@@ -490,7 +490,7 @@ use_service_labels: true`,
490490

491491
for _, test := range tests {
492492
t.Run(test.name, func(t *testing.T) {
493-
f := DockerSource{}
493+
f := Source{}
494494
err := f.Configure(ctx, []byte(test.config), subLogger, metrics.AcquisitionMetricsLevelNone)
495495
require.NoError(t, err)
496496

@@ -564,7 +564,7 @@ service_name:
564564
for _, test := range tests {
565565
t.Run(test.name, func(t *testing.T) {
566566
subLogger := log.WithField("type", "docker")
567-
f := DockerSource{
567+
f := Source{
568568
Client: &mockDockerCli{},
569569
}
570570

@@ -692,7 +692,7 @@ func TestOneShot(t *testing.T) {
692692
t.Run(ts.dsn, func(t *testing.T) {
693693
subLogger := log.WithField("type", "docker")
694694

695-
dockerClient := &DockerSource{}
695+
dockerClient := &Source{}
696696
labels := make(map[string]string)
697697
labels["type"] = ts.logType
698698

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package dockeracquisition
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
6+
"github.com/crowdsecurity/crowdsec/pkg/metrics"
7+
)
8+
9+
func (*Source) GetMetrics() []prometheus.Collector {
10+
return []prometheus.Collector{
11+
metrics.DockerDatasourceLinesRead,
12+
}
13+
}
14+
15+
func (*Source) GetAggregMetrics() []prometheus.Collector {
16+
return []prometheus.Collector{
17+
metrics.DockerDatasourceLinesRead,
18+
}
19+
}

0 commit comments

Comments
 (0)