Skip to content

Commit bfbd89b

Browse files
jotakronensc
andauthored
Merge confgen & main models (#254) (#267)
* Confgen: init from options (avoid many globals) * Use main model in confgen, use pipeline builder Fixes #254 - Removed duplicated confgen model - Use PipelineBuilder to simplify pipeline generation - Make confgen easier to consume as a lib (e.g. for NOO) - Do not make it necessary to call "Run": parsing definition files should be sufficient - Do not make it necessary to work with files written on disk: work with []byte instead - SkipWithTags should not result in an error when a file is skipped - Add confgen tests * Avoid using globals A side-effect of removing globals in write_loki is that it changes how the config is read, and set with defaults. Instead of unmarshaling a second time to automatically get defaults, we now call an explicit function that sets the default. Also, now removing loki URL default, it now has to be set explicitely * Update ConnTrack builder * More defer cleanup in tests, and use ioutils / temp dir/files Also fixed jsonnet dir actually used as filename prefix rather than directory * User ConfigFileStruct in confgen * Update pkg/config/config.go Co-authored-by: Ronen Schaffer <[email protected]> * Use config.ConfigFileStruct in tests Co-authored-by: Ronen Schaffer <[email protected]>
1 parent cba9b51 commit bfbd89b

37 files changed

+660
-479
lines changed

cmd/confgenerator/main.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ var (
3939
logLevel string
4040
envPrefix = "FLP_CONFGEN"
4141
defaultLogFileName = ".confgen"
42+
opts confgen.Options
4243
)
4344

4445
// rootCmd represents the root command
@@ -92,8 +93,8 @@ func initLogger() {
9293
log.SetFormatter(&log.TextFormatter{DisableColors: false, FullTimestamp: true})
9394
}
9495

95-
func dumpConfig() {
96-
configAsJSON, _ := json.MarshalIndent(confgen.Opt, "", "\t")
96+
func dumpConfig(opts *confgen.Options) {
97+
configAsJSON, _ := json.MarshalIndent(opts, "", "\t")
9798
log.Infof("configuration:\n%s\n", configAsJSON)
9899
}
99100

@@ -128,12 +129,12 @@ func initFlags() {
128129

129130
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
130131
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
131-
rootCmd.PersistentFlags().StringVar(&confgen.Opt.SrcFolder, "srcFolder", "network_definitions", "source folder")
132-
rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestConfFile, "destConfFile", "/tmp/flowlogs-pipeline.conf.yaml", "destination configuration file")
133-
rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestDocFile, "destDocFile", "/tmp/metrics.md", "destination documentation file (.md)")
134-
rootCmd.PersistentFlags().StringVar(&confgen.Opt.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder")
135-
rootCmd.PersistentFlags().StringSliceVar(&confgen.Opt.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags")
136-
rootCmd.PersistentFlags().StringSliceVar(&confgen.Opt.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki")
132+
rootCmd.PersistentFlags().StringVar(&opts.SrcFolder, "srcFolder", "network_definitions", "source folder")
133+
rootCmd.PersistentFlags().StringVar(&opts.DestConfFile, "destConfFile", "/tmp/flowlogs-pipeline.conf.yaml", "destination configuration file")
134+
rootCmd.PersistentFlags().StringVar(&opts.DestDocFile, "destDocFile", "/tmp/metrics.md", "destination documentation file (.md)")
135+
rootCmd.PersistentFlags().StringVar(&opts.DestGrafanaJsonnetFolder, "destGrafanaJsonnetFolder", "/tmp/jsonnet", "destination grafana jsonnet folder")
136+
rootCmd.PersistentFlags().StringSliceVar(&opts.SkipWithTags, "skipWithTags", nil, "Skip definitions with Tags")
137+
rootCmd.PersistentFlags().StringSliceVar(&opts.GenerateStages, "generateStages", nil, "Produce only specified stages (ingest, transform_generic, transform_network, extract_aggregate, encode_prom, write_loki")
137138
}
138139

139140
func main() {
@@ -150,15 +151,10 @@ func run() {
150151
fmt.Printf("Starting %s:\n=====\nBuild Version: %s\nBuild Date: %s\n\n",
151152
filepath.Base(os.Args[0]), BuildVersion, BuildDate)
152153
// Dump the configuration
153-
dumpConfig()
154+
dumpConfig(&opts)
154155
// creating a new configuration generator
155-
confGen, err := confgen.NewConfGen()
156-
if err != nil {
157-
log.Fatalf("failed to initialize NewConfGen %s", err)
158-
os.Exit(1)
159-
}
160-
161-
err = confGen.Run()
156+
confGen := confgen.NewConfGen(&opts)
157+
err := confGen.Run()
162158
if err != nil {
163159
log.Fatalf("failed to initialize NewConfGen %s", err)
164160
os.Exit(1)

cmd/flowlogs-pipeline/main.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ var (
4343
logLevel string
4444
envPrefix = "FLOWLOGS-PIPILNE"
4545
defaultLogFileName = ".flowlogs-pipeline"
46+
opts config.Options
4647
)
4748

4849
// rootCmd represents the root command
@@ -98,8 +99,8 @@ func initLogger() {
9899
log.SetFormatter(&log.TextFormatter{DisableColors: false, FullTimestamp: true, PadLevelText: true, DisableQuote: true})
99100
}
100101

101-
func dumpConfig() {
102-
configAsJSON, _ := json.MarshalIndent(config.Opt, "", " ")
102+
func dumpConfig(opts config.Options) {
103+
configAsJSON, _ := json.MarshalIndent(opts, "", " ")
103104
fmt.Printf("Using configuration:\n%s\n", configAsJSON)
104105
}
105106

@@ -133,9 +134,9 @@ func initFlags() {
133134
cobra.OnInitialize(initConfig)
134135
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
135136
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
136-
rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port")
137-
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine, "pipeline", "", "json of config file pipeline field")
138-
rootCmd.PersistentFlags().StringVar(&config.Opt.Parameters, "parameters", "", "json of config file parameters field")
137+
rootCmd.PersistentFlags().StringVar(&opts.Health.Port, "health.port", "8080", "Health server port")
138+
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
139+
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
139140
}
140141

141142
func main() {
@@ -159,9 +160,9 @@ func run() {
159160
filepath.Base(os.Args[0]), BuildVersion, BuildDate)
160161

161162
// Dump configuration
162-
dumpConfig()
163+
dumpConfig(opts)
163164

164-
err = config.ParseConfig()
165+
cfg, err := config.ParseConfig(opts)
165166
if err != nil {
166167
log.Errorf("error in parsing config file: %v", err)
167168
os.Exit(1)
@@ -171,14 +172,14 @@ func run() {
171172
utils.SetupElegantExit()
172173

173174
// Create new flows pipeline
174-
mainPipeline, err = pipeline.NewPipeline()
175+
mainPipeline, err = pipeline.NewPipeline(&cfg)
175176
if err != nil {
176177
log.Fatalf("failed to initialize pipeline %s", err)
177178
os.Exit(1)
178179
}
179180

180181
// Start health report server
181-
health.NewHealthServer(mainPipeline)
182+
health.NewHealthServer(&opts, mainPipeline)
182183

183184
// Starts the flows pipeline
184185
mainPipeline.Run()

pkg/api/write_loki.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,30 @@ type WriteLoki struct {
4646
TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"`
4747
}
4848

49-
func GetWriteLokiDefaults() WriteLoki {
50-
return WriteLoki{
51-
URL: "http://loki:3100/",
52-
BatchWait: "1s",
53-
BatchSize: 100 * 1024,
54-
Timeout: "10s",
55-
MinBackoff: "1s",
56-
MaxBackoff: "5m",
57-
MaxRetries: 10,
58-
StaticLabels: model.LabelSet{},
59-
TimestampLabel: "TimeReceived",
60-
TimestampScale: "1s",
49+
func (w *WriteLoki) SetDefaults() {
50+
if w.BatchWait == "" {
51+
w.BatchWait = "1s"
52+
}
53+
if w.BatchSize == 0 {
54+
w.BatchSize = 100 * 1024
55+
}
56+
if w.Timeout == "" {
57+
w.Timeout = "10s"
58+
}
59+
if w.MinBackoff == "" {
60+
w.MinBackoff = "1s"
61+
}
62+
if w.MaxBackoff == "" {
63+
w.MaxBackoff = "1s"
64+
}
65+
if w.MaxRetries == 0 {
66+
w.MaxRetries = 10
67+
}
68+
if w.TimestampLabel == "" {
69+
w.TimestampLabel = "TimeReceived"
70+
}
71+
if w.TimestampScale == "" {
72+
w.TimestampScale = "1s"
6173
}
6274
}
6375

pkg/confgen/confgen.go

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"gopkg.in/yaml.v2"
3030
)
3131

32-
var (
32+
const (
3333
definitionExt = ".yaml"
3434
definitionHeader = "#flp_confgen"
3535
configFileName = "config.yaml"
@@ -50,6 +50,7 @@ type Definition struct {
5050
type Definitions []Definition
5151

5252
type ConfGen struct {
53+
opts *Options
5354
config *Config
5455
transformRules api.NetworkTransformRules
5556
aggregateDefinitions aggregate.Definitions
@@ -71,47 +72,52 @@ type DefFile struct {
7172

7273
func (cg *ConfGen) Run() error {
7374
var err error
74-
cg.config, err = cg.ParseConfigFile(Opt.SrcFolder + "/" + configFileName)
75+
cg.config, err = cg.ParseConfigFile(cg.opts.SrcFolder + "/" + configFileName)
7576
if err != nil {
7677
log.Debugf("cg.ParseConfigFile err: %v ", err)
7778
return err
7879
}
7980

80-
definitionFiles := cg.GetDefinitionFiles(Opt.SrcFolder)
81+
definitionFiles := getDefinitionFiles(cg.opts.SrcFolder)
8182
for _, definitionFile := range definitionFiles {
82-
err := cg.parseFile(definitionFile)
83+
b, err := ioutil.ReadFile(definitionFile)
8384
if err != nil {
84-
log.Debugf("cg.parseFile err: %v ", err)
85+
log.Debugf("ioutil.ReadFile err: %v ", err)
86+
continue
87+
}
88+
err = cg.ParseDefinition(definitionFile, b)
89+
if err != nil {
90+
log.Debugf("cg.parseDefinition err: %v ", err)
8591
continue
8692
}
8793
}
8894

89-
cg.Dedupe()
95+
cg.dedupe()
9096

91-
if len(Opt.GenerateStages) != 0 {
92-
config := cg.GenerateTruncatedConfig(Opt.GenerateStages)
93-
err = cg.writeConfigFile(Opt.DestConfFile, config)
97+
if len(cg.opts.GenerateStages) != 0 {
98+
cfg := cg.GenerateTruncatedConfig()
99+
err = cg.writeConfigFile(cg.opts.DestConfFile, cfg)
94100
if err != nil {
95101
log.Debugf("cg.GenerateTruncatedConfig err: %v ", err)
96102
return err
97103
}
98104
return nil
99105
} else {
100106
config := cg.GenerateFlowlogs2PipelineConfig()
101-
err = cg.writeConfigFile(Opt.DestConfFile, config)
107+
err = cg.writeConfigFile(cg.opts.DestConfFile, config)
102108
if err != nil {
103109
log.Debugf("cg.GenerateFlowlogs2PipelineConfig err: %v ", err)
104110
return err
105111
}
106112
}
107113

108-
err = cg.generateDoc(Opt.DestDocFile)
114+
err = cg.generateDoc(cg.opts.DestDocFile)
109115
if err != nil {
110116
log.Debugf("cg.generateDoc err: %v ", err)
111117
return err
112118
}
113119

114-
err = cg.generateGrafanaJsonnet(Opt.DestGrafanaJsonnetFolder)
120+
err = cg.generateGrafanaJsonnet(cg.opts.DestGrafanaJsonnetFolder)
115121
if err != nil {
116122
log.Debugf("cg.generateGrafanaJsonnet err: %v ", err)
117123
return err
@@ -120,62 +126,44 @@ func (cg *ConfGen) Run() error {
120126
return nil
121127
}
122128

123-
func (cg *ConfGen) checkHeader(fileName string) error {
124-
// check header
125-
f, err := os.OpenFile(fileName, os.O_RDONLY, 0644)
126-
if err != nil {
127-
log.Debugf("os.OpenFile error: %v ", err)
128-
return err
129-
}
129+
func checkHeader(bytes []byte) error {
130130
header := make([]byte, len(definitionHeader))
131-
_, err = f.Read(header)
132-
if err != nil || string(header) != definitionHeader {
133-
log.Debugf("Wrong header file: %s ", fileName)
131+
copy(header, bytes)
132+
if string(header) != definitionHeader {
134133
return fmt.Errorf("wrong header")
135134
}
136-
err = f.Close()
137-
if err != nil {
138-
log.Debugf("f.Close err: %v ", err)
139-
return err
140-
}
141-
142135
return nil
143136
}
144137

145-
func (cg *ConfGen) parseFile(fileName string) error {
146-
138+
func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error {
147139
// check header
148-
err := cg.checkHeader(fileName)
140+
err := checkHeader(bytes)
149141
if err != nil {
150-
log.Debugf("cg.checkHeader err: %v ", err)
142+
log.Debugf("%s cg.checkHeader err: %v ", name, err)
151143
return err
152144
}
153145

154146
// parse yaml
155147
var defFile DefFile
156-
yamlFile, err := ioutil.ReadFile(fileName)
148+
err = yaml.Unmarshal(bytes, &defFile)
157149
if err != nil {
158-
log.Debugf("ioutil.ReadFile err: %v ", err)
159-
return err
160-
}
161-
err = yaml.Unmarshal(yamlFile, &defFile)
162-
if err != nil {
163-
log.Debugf("yaml.Unmarshal err: %v ", err)
150+
log.Debugf("%s yaml.Unmarshal err: %v ", name, err)
164151
return err
165152
}
166153

167154
//skip if their skip tag match
168-
for _, skipTag := range Opt.SkipWithTags {
155+
for _, skipTag := range cg.opts.SkipWithTags {
169156
for _, tag := range defFile.Tags {
170157
if skipTag == tag {
171-
return fmt.Errorf("skipping definition %s due to skip tag %s", fileName, tag)
158+
log.Infof("skipping definition %s due to skip tag %s", name, tag)
159+
return nil
172160
}
173161
}
174162
}
175163

176164
// parse definition
177165
definition := Definition{
178-
FileName: fileName,
166+
FileName: name,
179167
Description: defFile.Description,
180168
Details: defFile.Details,
181169
Usage: defFile.Usage,
@@ -215,7 +203,7 @@ func (cg *ConfGen) parseFile(fileName string) error {
215203
return nil
216204
}
217205

218-
func (*ConfGen) GetDefinitionFiles(rootPath string) []string {
206+
func getDefinitionFiles(rootPath string) []string {
219207

220208
var files []string
221209

@@ -235,11 +223,12 @@ func (*ConfGen) GetDefinitionFiles(rootPath string) []string {
235223
return files
236224
}
237225

238-
func NewConfGen() (*ConfGen, error) {
226+
func NewConfGen(opts *Options) *ConfGen {
239227
return &ConfGen{
228+
opts: opts,
240229
transformRules: api.NetworkTransformRules{},
241230
aggregateDefinitions: aggregate.Definitions{},
242231
definitions: Definitions{},
243232
visualizations: Visualizations{},
244-
}, nil
233+
}
245234
}

0 commit comments

Comments
 (0)