Skip to content

Commit f711714

Browse files
authored
Merge pull request #225 from jotak/no-decode-stage
Remove Decoder stage (breaking change)
2 parents 4549b72 + 947fda2 commit f711714

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+183
-715
lines changed

README.md

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,12 @@ FLP is a framework. The main FLP object is the **pipeline**. FLP **pipeline** ca
112112

113113
The pipeline is constructed of a sequence of stages. Each stage is classified into one of the following types:
114114
- **ingest** - obtain flows from some source, one entry per line
115-
- **decode** - parse input lines into a known format, e.g., dictionary (map) of AWS or goflow data
116115
- **transform** - convert entries into a standard format; can include multiple transform stages
117116
- **write** - provide the means to write the data to some target, e.g. loki, standard output, etc
118117
- **extract** - derive a set of metrics from the imported flows
119118
- **encode** - make the data available in appropriate format (e.g. prometheus)
120119

121120
The first stage in a pipeline must be an **ingest** stage.
122-
The **ingest** stage is typically followed by a **decode** stage, unless the **ingest** stage also performs the decoding.
123121
Each stage (other than an **ingest** stage) specifies the stage it follows.
124122
Multiple stages may follow from a particular stage, thus allowing the same data to be consumed by multiple parallel pipelines.
125123
For example, multiple **transform** stages may be performed and the results may be output to different targets.
@@ -132,14 +130,12 @@ A full configuration file with the data consumed by two different transforms mi
132130
```yaml
133131
pipeline:
134132
- name: ingest1
135-
- name: decode1
136-
follows: ingest1
137133
- name: generic1
138-
follows: decode1
134+
follows: ingest1
139135
- name: write1
140136
follows: generic1
141137
- name: generic2
142-
follows: decode1
138+
follows: ingest1
143139
- name: write2
144140
follows: generic2
145141
parameters:
@@ -148,9 +144,8 @@ parameters:
148144
type: file_loop
149145
file:
150146
filename: hack/examples/ocp-ipfix-flowlogs.json
151-
- name: decode1
152-
decode:
153-
type: json
147+
decoder:
148+
type: json
154149
- name: generic1
155150
transform:
156151
type: generic
@@ -200,19 +195,16 @@ For example:
200195
log-level: info
201196
pipeline:
202197
- name: ingest_file
203-
- name: decode_json
204-
follows: ingest_file
205198
- name: write_stdout
206-
follows: write_stdout
207-
parameters
208-
- name ingest_file
199+
follows: ingest_file
200+
parameters:
201+
- name: ingest_file
209202
ingest:
210203
type: file
211204
file:
212205
filename: hack/examples/ocp-ipfix-flowlogs.json
213-
- name: decode_json
214-
decode:
215-
type: json
206+
decoder:
207+
type: json
216208
- name: write_stdout
217209
write:
218210
type: stdout
@@ -224,11 +216,11 @@ parameters
224216

225217
2. Using command line parameters:
226218

227-
`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"decode\":{\"type\":\"json\"},\"name\":\"decode1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"`
219+
`./flowlogs-pipeline --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --parameters "[{\"ingest\":{\"file\":{\"filename\":\"hack/examples/ocp-ipfix-flowlogs.json\"},\"decoder\":{\"type\":\"json\"},\"type\":\"file\"},\"name\":\"ingest1\"},{\"name\":\"write1\",\"write\":{\"type\":\"stdout\"}}]"`
228220

229221
Options included in the command line override the options specified in the config file.
230222

231-
`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"decode1\"},{\"follows\":\"decode1\",\"name\":\"write1\"}]" --config <configFile>`
223+
`flowlogs-pipeline --log-level debug --pipeline "[{\"name\":\"ingest1\"},{\"follows\":\"ingest1\",\"name\":\"write1\"}]" --config <configFile>`
232224

233225
3. TODO: environment variables
234226

contrib/benchmarks/baseline/config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ pipeline:
44
type: file
55
file:
66
filename: ../../contrib/benchmarks/baseline/log-lines.json
7-
decode:
8-
type: json
7+
decoder:
8+
type: json
99
encode:
1010
type: none
1111
extract:

contrib/benchmarks/transform/config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ pipeline:
44
type: file
55
file:
66
filename: ../../contrib/benchmarks/baseline/log-lines.json
7-
decode:
8-
type: json
7+
decoder:
8+
type: json
99
encode:
1010
type: none
1111
extract:

contrib/kubernetes/flowlogs-pipeline.conf.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ parameters:
88
portLegacy: 2056
99
type: collector
1010
name: ingest_collector
11-
- decode:
12-
type: json
13-
name: decode_json
1411
- name: transform_generic
1512
transform:
1613
generic:
@@ -328,8 +325,6 @@ parameters:
328325
pipeline:
329326
- name: ingest_collector
330327
- follows: ingest_collector
331-
name: decode_json
332-
- follows: decode_json
333328
name: transform_generic
334329
- follows: transform_generic
335330
name: transform_network

docs/api.md

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ Following is the supported API format for the kafka ingest:
5959
groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity)
6060
startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
6161
batchReadTimeout: how often (in milliseconds) to process input
62+
decoder: decoder to use (E.g. json or protobuf)
63+
type: (enum) one of the following:
64+
json: JSON decoder
65+
protobuf: Protobuf decoder
6266
</pre>
6367
## Ingest GRPC from Network Observability eBPF Agent
6468
Following is the supported API format for the Network Observability eBPF ingest:
@@ -68,13 +72,6 @@ Following is the supported API format for the Network Observability eBPF ingest:
6872
port: the port number to listen on
6973
bufferLength: the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)
7074
</pre>
71-
## Aws ingest API
72-
Following is the supported API format for Aws flow entries:
73-
74-
<pre>
75-
aws:
76-
fields: list of aws flow log fields
77-
</pre>
7875
## Transform Generic API
7976
Following is the supported API format for generic transformations:
8077

hack/deploy-and-monitor-k8s-network-workload.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ pipeline:
3131
port: 2055
3232
portLegacy: 2056
3333
type: collector
34-
decode:
35-
type: json
3634
encode:
3735
type: none
3836
extract:

hack/examples/ocp-ipfix-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pipeline:
33
type: file
44
file:
55
filename: hack/examples/ocp-ipfix-flowlogs.json
6-
decode:
7-
type: json
6+
decoder:
7+
type: json
88
transform:
99
type: generic

pkg/api/api.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ const (
2424
CollectorType = "collector"
2525
GRPCType = "grpc"
2626
KafkaType = "kafka"
27-
JSONType = "json"
28-
PBType = "protobuf"
29-
AWSType = "aws"
3027
StdoutType = "stdout"
3128
LokiType = "loki"
3229
AggregateType = "aggregates"
@@ -56,7 +53,6 @@ type API struct {
5653
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
5754
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
5855
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
59-
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
6056
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
6157
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
6258
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`

pkg/api/decode_aws.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

pkg/api/decoder.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package api
2+
3+
type Decoder struct {
4+
Type string `yaml:"type" json:"type" enum:"DecoderEnum" doc:"one of the following:"`
5+
}
6+
7+
type DecoderEnum struct {
8+
JSON string `yaml:"json" json:"json" doc:"JSON decoder"`
9+
Protobuf string `yaml:"protobuf" json:"protobuf" doc:"Protobuf decoder"`
10+
}
11+
12+
func DecoderName(decoder string) string {
13+
return GetEnumName(DecoderEnum{}, decoder)
14+
}

0 commit comments

Comments
 (0)