Skip to content

Commit 22e0767

Browse files
committed
refact pkg/acquisition: kinesis configuration
1 parent d88be56 commit 22e0767

File tree

1 file changed

+40
-14
lines changed

1 file changed

+40
-14
lines changed

pkg/acquisition/modules/kinesis/config.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,38 +31,64 @@ type Configuration struct {
3131
MaxRetries int `yaml:"max_retries"`
3232
}
3333

34-
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
35-
s.Config = Configuration{}
34+
func ConfigurationFromYAML(y []byte) (Configuration, error) {
35+
var cfg Configuration
36+
37+
if err := yaml.UnmarshalWithOptions(y, &cfg, yaml.Strict()); err != nil {
38+
return cfg, fmt.Errorf("cannot parse: %s", yaml.FormatError(err, false, false))
39+
}
3640

37-
err := yaml.UnmarshalWithOptions(yamlConfig, &s.Config, yaml.Strict())
41+
cfg.SetDefaults()
42+
cfg.Normalize()
43+
44+
err := cfg.Validate()
3845
if err != nil {
39-
return fmt.Errorf("cannot parse kinesis datasource configuration: %s", yaml.FormatError(err, false, false))
46+
return cfg, err
4047
}
4148

42-
if s.Config.Mode == "" {
43-
s.Config.Mode = configuration.TAIL_MODE
49+
return cfg, nil
50+
}
51+
52+
func (c *Configuration) SetDefaults() {
53+
if c.Mode == "" {
54+
c.Mode = configuration.TAIL_MODE
4455
}
56+
}
4557

46-
if s.Config.StreamName == "" && !s.Config.UseEnhancedFanOut {
58+
func (c *Configuration) Normalize() {
59+
if c.MaxRetries <= 0 {
60+
c.MaxRetries = 10
61+
}
62+
}
63+
64+
func (s *Source) UnmarshalConfig(yamlConfig []byte) error {
65+
cfg, err := ConfigurationFromYAML(yamlConfig)
66+
if err != nil {
67+
return err
68+
}
69+
70+
s.Config = cfg
71+
72+
return nil
73+
}
74+
75+
func (c *Configuration) Validate() error {
76+
if c.StreamName == "" && !c.UseEnhancedFanOut {
4777
return errors.New("stream_name is mandatory when use_enhanced_fanout is false")
4878
}
4979

50-
if s.Config.StreamARN == "" && s.Config.UseEnhancedFanOut {
80+
if c.StreamARN == "" && c.UseEnhancedFanOut {
5181
return errors.New("stream_arn is mandatory when use_enhanced_fanout is true")
5282
}
5383

54-
if s.Config.ConsumerName == "" && s.Config.UseEnhancedFanOut {
84+
if c.ConsumerName == "" && c.UseEnhancedFanOut {
5585
return errors.New("consumer_name is mandatory when use_enhanced_fanout is true")
5686
}
5787

58-
if s.Config.StreamARN != "" && s.Config.StreamName != "" {
88+
if c.StreamARN != "" && c.StreamName != "" {
5989
return errors.New("stream_arn and stream_name are mutually exclusive")
6090
}
6191

62-
if s.Config.MaxRetries <= 0 {
63-
s.Config.MaxRetries = 10
64-
}
65-
6692
return nil
6793
}
6894

0 commit comments

Comments
 (0)