Skip to content

Commit 8a7af21

Browse files
authored
refact pkg/acquisition: cloudwatch configuration (#4058)
1 parent ee5d5c1 commit 8a7af21

File tree

1 file changed

+69
-28
lines changed

1 file changed

+69
-28
lines changed

pkg/acquisition/modules/cloudwatch/config.go

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,52 +39,91 @@ type Configuration struct {
3939
AwsRegion string `yaml:"aws_region,omitempty"`
4040
}
4141

42-
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
43-
s.Config = Configuration{}
44-
if err := yaml.UnmarshalWithOptions(yamlConfig, &s.Config, yaml.Strict()); err != nil {
45-
return fmt.Errorf("cannot parse CloudwatchSource configuration: %s", yaml.FormatError(err, false, false))
42+
func ConfigurationFromYAML(y []byte) (Configuration, []ValidationWarning, error) {
43+
var cfg Configuration
44+
45+
if err := yaml.UnmarshalWithOptions(y, &cfg, yaml.Strict()); err != nil {
46+
return cfg, nil, fmt.Errorf("cannot parse: %s", yaml.FormatError(err, false, false))
47+
}
48+
49+
cfg.SetDefaults()
50+
51+
warns, err := cfg.Validate()
52+
if err != nil {
53+
return cfg, warns, err
4654
}
4755

48-
if s.Config.GroupName == "" {
49-
return errors.New("group_name is mandatory for CloudwatchSource")
56+
return cfg, warns, nil
57+
}
58+
59+
func (c *Configuration) SetDefaults() {
60+
if c.Mode == "" {
61+
c.Mode = configuration.TAIL_MODE
5062
}
5163

52-
if s.Config.Mode == "" {
53-
s.Config.Mode = configuration.TAIL_MODE
64+
if c.DescribeLogStreamsLimit == nil {
65+
c.DescribeLogStreamsLimit = &def_DescribeLogStreamsLimit
5466
}
5567

56-
if s.Config.DescribeLogStreamsLimit == nil {
57-
s.Config.DescribeLogStreamsLimit = &def_DescribeLogStreamsLimit
68+
if c.PollNewStreamInterval == nil {
69+
c.PollNewStreamInterval = &def_PollNewStreamInterval
5870
}
5971

60-
if s.Config.PollNewStreamInterval == nil {
61-
s.Config.PollNewStreamInterval = &def_PollNewStreamInterval
72+
if c.MaxStreamAge == nil {
73+
c.MaxStreamAge = &def_MaxStreamAge
6274
}
6375

64-
if s.Config.MaxStreamAge == nil {
65-
s.Config.MaxStreamAge = &def_MaxStreamAge
76+
if c.PollStreamInterval == nil {
77+
c.PollStreamInterval = &def_PollStreamInterval
6678
}
6779

68-
if s.Config.PollStreamInterval == nil {
69-
s.Config.PollStreamInterval = &def_PollStreamInterval
80+
if c.StreamReadTimeout == nil {
81+
c.StreamReadTimeout = &def_StreamReadTimeout
7082
}
7183

72-
if s.Config.StreamReadTimeout == nil {
73-
s.Config.StreamReadTimeout = &def_StreamReadTimeout
84+
if c.GetLogEventsPagesLimit == nil {
85+
c.GetLogEventsPagesLimit = &def_GetLogEventsPagesLimit
86+
}
87+
88+
if c.AwsApiCallTimeout == nil {
89+
c.AwsApiCallTimeout = &def_AwsApiCallTimeout
90+
}
91+
92+
if c.AwsConfigDir == nil {
93+
c.AwsConfigDir = &def_AwsConfigDir
94+
}
95+
}
96+
97+
98+
type ValidationWarning string
99+
100+
func (c *Configuration) Validate() ([]ValidationWarning, error) {
101+
var warns []ValidationWarning
102+
103+
if c.GroupName == "" {
104+
return warns, errors.New("group_name is mandatory for CloudwatchSource")
74105
}
75106

76-
if s.Config.GetLogEventsPagesLimit == nil {
77-
s.Config.GetLogEventsPagesLimit = &def_GetLogEventsPagesLimit
107+
if *c.MaxStreamAge > *c.StreamReadTimeout {
108+
warns = append(warns, "max_stream_age > stream_read_timeout, stream might keep being opened/closed")
78109
}
79110

80-
if s.Config.AwsApiCallTimeout == nil {
81-
s.Config.AwsApiCallTimeout = &def_AwsApiCallTimeout
111+
return warns, nil
112+
}
113+
114+
115+
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
116+
cfg, warns, err := ConfigurationFromYAML(yamlConfig)
117+
if err != nil {
118+
return err
82119
}
83120

84-
if s.Config.AwsConfigDir == nil {
85-
s.Config.AwsConfigDir = &def_AwsConfigDir
121+
for _, w := range warns {
122+
s.logger.Warn(w)
86123
}
87124

125+
s.Config = cfg
126+
88127
return nil
89128
}
90129

@@ -98,6 +137,8 @@ func (s *Source) Configure(ctx context.Context, yamlConfig []byte, logger *log.E
98137

99138
s.logger = logger.WithField("group", s.Config.GroupName)
100139

140+
// XXX not really useful logging
141+
101142
s.logger.Debugf("Starting configuration for Cloudwatch group %s", s.Config.GroupName)
102143
s.logger.Tracef("describelogstreams_limit set to %d", *s.Config.DescribeLogStreamsLimit)
103144
s.logger.Tracef("poll_new_stream_interval set to %v", *s.Config.PollNewStreamInterval)
@@ -106,13 +147,13 @@ func (s *Source) Configure(ctx context.Context, yamlConfig []byte, logger *log.E
106147
s.logger.Tracef("stream_read_timeout set to %v", *s.Config.StreamReadTimeout)
107148
s.logger.Tracef("getlogeventspages_limit set to %v", *s.Config.GetLogEventsPagesLimit)
108149
s.logger.Tracef("aws_api_timeout set to %v", *s.Config.AwsApiCallTimeout)
150+
s.logger.Tracef("aws_config_dir set to %s", *s.Config.AwsConfigDir)
109151

110-
if *s.Config.MaxStreamAge > *s.Config.StreamReadTimeout {
111-
s.logger.Warningf("max_stream_age > stream_read_timeout, stream might keep being opened/closed")
112-
}
152+
return s.setupAWS(ctx)
153+
}
113154

114-
s.logger.Tracef("aws_config_dir set to %s", *s.Config.AwsConfigDir)
115155

156+
func (s *Source) setupAWS(ctx context.Context) error {
116157
if *s.Config.AwsConfigDir != "" {
117158
_, err := os.Stat(*s.Config.AwsConfigDir)
118159
if err != nil {

0 commit comments

Comments
 (0)