@@ -31,38 +31,64 @@ type Configuration struct {
3131 MaxRetries int `yaml:"max_retries"`
3232}
3333
34- func (s * Source ) UnmarshalConfig (yamlConfig []byte ) error {
35- s .Config = Configuration {}
34+ func ConfigurationFromYAML (y []byte ) (Configuration , error ) {
35+ var cfg Configuration
36+
37+ if err := yaml .UnmarshalWithOptions (y , & cfg , yaml .Strict ()); err != nil {
38+ return cfg , fmt .Errorf ("cannot parse: %s" , yaml .FormatError (err , false , false ))
39+ }
3640
37- err := yaml .UnmarshalWithOptions (yamlConfig , & s .Config , yaml .Strict ())
41+ cfg .SetDefaults ()
42+ cfg .Normalize ()
43+
44+ err := cfg .Validate ()
3845 if err != nil {
39- return fmt . Errorf ( "cannot parse kinesis datasource configuration: %s" , yaml . FormatError ( err , false , false ))
46+ return cfg , err
4047 }
4148
42- if s .Config .Mode == "" {
43- s .Config .Mode = configuration .TAIL_MODE
49+ return cfg , nil
50+ }
51+
52+ func (c * Configuration ) SetDefaults () {
53+ if c .Mode == "" {
54+ c .Mode = configuration .TAIL_MODE
4455 }
56+ }
4557
46- if s .Config .StreamName == "" && ! s .Config .UseEnhancedFanOut {
58+ func (c * Configuration ) Normalize () {
59+ if c .MaxRetries <= 0 {
60+ c .MaxRetries = 10
61+ }
62+ }
63+
64+ func (s * Source ) UnmarshalConfig (yamlConfig []byte ) error {
65+ cfg , err := ConfigurationFromYAML (yamlConfig )
66+ if err != nil {
67+ return err
68+ }
69+
70+ s .Config = cfg
71+
72+ return nil
73+ }
74+
75+ func (c * Configuration ) Validate () error {
76+ if c .StreamName == "" && ! c .UseEnhancedFanOut {
4777 return errors .New ("stream_name is mandatory when use_enhanced_fanout is false" )
4878 }
4979
50- if s . Config . StreamARN == "" && s . Config .UseEnhancedFanOut {
80+ if c . StreamARN == "" && c .UseEnhancedFanOut {
5181 return errors .New ("stream_arn is mandatory when use_enhanced_fanout is true" )
5282 }
5383
54- if s . Config . ConsumerName == "" && s . Config .UseEnhancedFanOut {
84+ if c . ConsumerName == "" && c .UseEnhancedFanOut {
5585 return errors .New ("consumer_name is mandatory when use_enhanced_fanout is true" )
5686 }
5787
58- if s . Config . StreamARN != "" && s . Config .StreamName != "" {
88+ if c . StreamARN != "" && c .StreamName != "" {
5989 return errors .New ("stream_arn and stream_name are mutually exclusive" )
6090 }
6191
62- if s .Config .MaxRetries <= 0 {
63- s .Config .MaxRetries = 10
64- }
65-
6692 return nil
6793}
6894
0 commit comments