Skip to content

Commit 176d522

Browse files
committed
Expose goflow2 workers and sockets config
Fixes #951 (partly)
1 parent 85b4629 commit 176d522

File tree

3 files changed

+41
-21
lines changed

3 files changed

+41
-21
lines changed

docs/api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ Following is the supported API format for the NetFlow / IPFIX collector:
9090
port: the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion
9191
portLegacy: the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion
9292
batchMaxLen: the number of accumulated flows before being forwarded for processing
93+
workers: the number of netflow/ipfix decoding workers
94+
sockets: the number of listening sockets
9395
</pre>
9496
## Ingest Kafka API
9597
Following is the supported API format for the kafka ingest:

pkg/api/ingest_collector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,6 @@ type IngestCollector struct {
2222
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"`
2323
PortLegacy int `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
2424
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
25+
Workers int `yaml:"workers,omitempty" json:"workers,omitempty" doc:"the number of netflow/ipfix decoding workers"`
26+
Sockets int `yaml:"sockets,omitempty" json:"sockets,omitempty" doc:"the number of listening sockets"`
2527
}

pkg/pipeline/ingest/ingest_collector.go

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ type ingestCollector struct {
4747
hostname string
4848
port int
4949
portLegacy int
50+
workers int
51+
sockets int
5052
in chan map[string]interface{}
5153
exitChan <-chan struct{}
5254
metrics *metrics
@@ -126,30 +128,34 @@ func (c *ingestCollector) initCollectorListener(ctx context.Context) {
126128
}
127129
defer tpl.Close(ctx)
128130

129-
go func() {
130-
sNF := utils.NewStateNetFlow()
131-
sNF.Format = formatter
132-
sNF.Transport = transporter
133-
sNF.Logger = log.StandardLogger()
134-
sNF.TemplateSystem = tpl
135-
136-
log.Infof("listening for netflow on host %s, port = %d", c.hostname, c.port)
137-
err = sNF.FlowRoutine(1, c.hostname, c.port, false)
138-
log.Fatal(err)
139-
}()
131+
log.Infof("listening for netflow on host %s, port = %d", c.hostname, c.port)
132+
for i := 0; i < c.sockets; i++ {
133+
go func() {
134+
sNF := utils.NewStateNetFlow()
135+
sNF.Format = formatter
136+
sNF.Transport = transporter
137+
sNF.Logger = log.StandardLogger()
138+
sNF.TemplateSystem = tpl
139+
140+
err = sNF.FlowRoutine(c.workers, c.hostname, c.port, c.sockets > 1)
141+
log.Fatal(err)
142+
}()
143+
}
140144
}
141145

142146
if c.portLegacy > 0 {
143-
go func() {
144-
sLegacyNF := utils.NewStateNFLegacy()
145-
sLegacyNF.Format = formatter
146-
sLegacyNF.Transport = transporter
147-
sLegacyNF.Logger = log.StandardLogger()
148-
149-
log.Infof("listening for legacy netflow on host %s, port = %d", c.hostname, c.portLegacy)
150-
err = sLegacyNF.FlowRoutine(1, c.hostname, c.portLegacy, false)
151-
log.Fatal(err)
152-
}()
147+
log.Infof("listening for legacy netflow on host %s, port = %d", c.hostname, c.portLegacy)
148+
for i := 0; i < c.sockets; i++ {
149+
go func() {
150+
sLegacyNF := utils.NewStateNFLegacy()
151+
sLegacyNF.Format = formatter
152+
sLegacyNF.Transport = transporter
153+
sLegacyNF.Logger = log.StandardLogger()
154+
155+
err = sLegacyNF.FlowRoutine(c.workers, c.hostname, c.portLegacy, c.sockets > 1)
156+
log.Fatal(err)
157+
}()
158+
}
153159
}
154160
}
155161

@@ -177,10 +183,18 @@ func NewIngestCollector(opMetrics *operational.Metrics, params config.StageParam
177183
if jsonIngestCollector.Port == 0 && jsonIngestCollector.PortLegacy == 0 {
178184
return nil, fmt.Errorf("no ingest port specified")
179185
}
186+
if jsonIngestCollector.Workers == 0 {
187+
jsonIngestCollector.Workers = 1
188+
}
189+
if jsonIngestCollector.Sockets == 0 {
190+
jsonIngestCollector.Sockets = 1
191+
}
180192

181193
log.Infof("hostname = %s", jsonIngestCollector.HostName)
182194
log.Infof("port = %d", jsonIngestCollector.Port)
183195
log.Infof("portLegacy = %d", jsonIngestCollector.PortLegacy)
196+
log.Infof("workers = %d", jsonIngestCollector.Workers)
197+
log.Infof("sockets = %d", jsonIngestCollector.Sockets)
184198

185199
in := make(chan map[string]interface{}, channelSize)
186200
metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) })
@@ -189,6 +203,8 @@ func NewIngestCollector(opMetrics *operational.Metrics, params config.StageParam
189203
hostname: jsonIngestCollector.HostName,
190204
port: jsonIngestCollector.Port,
191205
portLegacy: jsonIngestCollector.PortLegacy,
206+
workers: jsonIngestCollector.Workers,
207+
sockets: jsonIngestCollector.Sockets,
192208
exitChan: pUtils.ExitChannel(),
193209
in: in,
194210
metrics: metrics,

0 commit comments

Comments
 (0)