Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions pkg/acquisition/modules/kinesis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,64 @@ type Configuration struct {
MaxRetries int `yaml:"max_retries"`
}

func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
s.Config = Configuration{}
func ConfigurationFromYAML(y []byte) (Configuration, error) {
var cfg Configuration

if err := yaml.UnmarshalWithOptions(y, &cfg, yaml.Strict()); err != nil {
return cfg, fmt.Errorf("cannot parse: %s", yaml.FormatError(err, false, false))
}

err := yaml.UnmarshalWithOptions(yamlConfig, &s.Config, yaml.Strict())
cfg.SetDefaults()
cfg.Normalize()

err := cfg.Validate()
if err != nil {
return fmt.Errorf("cannot parse kinesis datasource configuration: %s", yaml.FormatError(err, false, false))
return cfg, err
}

if s.Config.Mode == "" {
s.Config.Mode = configuration.TAIL_MODE
return cfg, nil
}

func (c *Configuration) SetDefaults() {
if c.Mode == "" {
c.Mode = configuration.TAIL_MODE
}
}

if s.Config.StreamName == "" && !s.Config.UseEnhancedFanOut {
func (c *Configuration) Normalize() {
if c.MaxRetries <= 0 {
c.MaxRetries = 10
}
}

func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
cfg, err := ConfigurationFromYAML(yamlConfig)
if err != nil {
return err
}

s.Config = cfg

return nil
}

func (c *Configuration) Validate() error {
if c.StreamName == "" && !c.UseEnhancedFanOut {
return errors.New("stream_name is mandatory when use_enhanced_fanout is false")
}

if s.Config.StreamARN == "" && s.Config.UseEnhancedFanOut {
if c.StreamARN == "" && c.UseEnhancedFanOut {
return errors.New("stream_arn is mandatory when use_enhanced_fanout is true")
}

if s.Config.ConsumerName == "" && s.Config.UseEnhancedFanOut {
if c.ConsumerName == "" && c.UseEnhancedFanOut {
return errors.New("consumer_name is mandatory when use_enhanced_fanout is true")
}

if s.Config.StreamARN != "" && s.Config.StreamName != "" {
if c.StreamARN != "" && c.StreamName != "" {
return errors.New("stream_arn and stream_name are mutually exclusive")
}

if s.Config.MaxRetries <= 0 {
s.Config.MaxRetries = 10
}

return nil
}

Expand Down