diff --git a/pkg/acquisition/modules/kinesis/config.go b/pkg/acquisition/modules/kinesis/config.go index ae22c48acbd..5a55917b9d2 100644 --- a/pkg/acquisition/modules/kinesis/config.go +++ b/pkg/acquisition/modules/kinesis/config.go @@ -31,38 +31,64 @@ type Configuration struct { MaxRetries int `yaml:"max_retries"` } -func (s *Source) UnmarshalConfig(yamlConfig []byte) error { - s.Config = Configuration{} +func ConfigurationFromYAML(y []byte) (Configuration, error) { + var cfg Configuration + + if err := yaml.UnmarshalWithOptions(y, &cfg, yaml.Strict()); err != nil { + return cfg, fmt.Errorf("cannot parse: %s", yaml.FormatError(err, false, false)) + } - err := yaml.UnmarshalWithOptions(yamlConfig, &s.Config, yaml.Strict()) + cfg.SetDefaults() + cfg.Normalize() + + err := cfg.Validate() if err != nil { - return fmt.Errorf("cannot parse kinesis datasource configuration: %s", yaml.FormatError(err, false, false)) + return cfg, err } - if s.Config.Mode == "" { - s.Config.Mode = configuration.TAIL_MODE + return cfg, nil +} + +func (c *Configuration) SetDefaults() { + if c.Mode == "" { + c.Mode = configuration.TAIL_MODE } +} - if s.Config.StreamName == "" && !s.Config.UseEnhancedFanOut { +func (c *Configuration) Normalize() { + if c.MaxRetries <= 0 { + c.MaxRetries = 10 + } +} + +func (s *Source) UnmarshalConfig(yamlConfig []byte) error { + cfg, err := ConfigurationFromYAML(yamlConfig) + if err != nil { + return err + } + + s.Config = cfg + + return nil +} + +func (c *Configuration) Validate() error { + if c.StreamName == "" && !c.UseEnhancedFanOut { return errors.New("stream_name is mandatory when use_enhanced_fanout is false") } - if s.Config.StreamARN == "" && s.Config.UseEnhancedFanOut { + if c.StreamARN == "" && c.UseEnhancedFanOut { return errors.New("stream_arn is mandatory when use_enhanced_fanout is true") } - if s.Config.ConsumerName == "" && s.Config.UseEnhancedFanOut { + if c.ConsumerName == "" && c.UseEnhancedFanOut { return errors.New("consumer_name is mandatory when use_enhanced_fanout is true") } - if s.Config.StreamARN != "" && s.Config.StreamName != "" { + if c.StreamARN != "" && c.StreamName != "" { return errors.New("stream_arn and stream_name are mutually exclusive") } - if s.Config.MaxRetries <= 0 { - s.Config.MaxRetries = 10 - } - return nil }