You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
* initial code to support s3 encode
* vendor updates to support s3 encode
* added timed creation of objects
* added initial unit test
* refactor
* addressed some review comments
* added description of object store usage in README
* corrected usage of Duration in Ticker
* fixed race condition in test
Copy file name to clipboardExpand all lines: README.md
+41Lines changed: 41 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -782,6 +782,47 @@ parameters:
782
782
783
783
> Note: to view loki flow-logs in `grafana`: Use the `Explore` tab and choose the `loki` datasource. In the `Log Browser` enter `{job="flowlogs-pipeline"}` and press `Run query`
784
784
785
+
### Object Store encoder
786
+
787
+
The object store encoder allows to export flows into an object store using the S3 API.
788
+
A batch of flow logs received in some time interval are collected and stored in a single object.
789
+
The configuration provides the URL of the object store, credentials to access the object store, the bucket in the object store into which the objects should be placed, and parameters (key/value pairs) to be stored as metadata of the created objects.
790
+
Object names are constructed according to the following format:
The `{stream-id}` is derived from the time flowlogs-pipeline started to run.
796
+
The syntax of a sample configuration file is as follows:
797
+
798
+
```
799
+
parameters:
800
+
- name: encodeS3
801
+
encode:
802
+
type: s3
803
+
s3:
804
+
endpoint: 1.2.3.4:9000
805
+
bucket: bucket1
806
+
account: tenant1
807
+
accessKeyId: accessKey1
808
+
secretAccessKey: secretAccessKey1
809
+
writeTimeout: 60s
810
+
batchSize: 100
811
+
objectHeaderParameters:
812
+
key1: val1
813
+
key2: val2
814
+
key3: val3
815
+
key4: val4
816
+
```
817
+
818
+
The key/value pairs in `objectHeaderParameters` may contain arbitrary configuration information that the administrator wants to save as metadata for the produced objects, such as `tenant_id` or `network_interface_id`.
819
+
The content of the object consists of object header fields followed by the actual flow logs.
820
+
The object header contains the following fields: `version`, `capture_start_time`, `capture_end_time`, `number_of_flow_logs`, plus all the fields provided in the configuration under the `objectHeaderParameters`.
821
+
822
+
If no flow logs arrive within the `writeTimeout` period, then an object is created with no flows.
823
+
An object is created either when we have accumulated `batchSize` flow logs or when `writeTimeout` seconds have passed.
Copy file name to clipboardExpand all lines: pkg/api/api.go
+2Lines changed: 2 additions & 0 deletions
Original file line number
Diff line number
Diff line change
@@ -25,6 +25,7 @@ const (
25
25
GRPCType="grpc"
26
26
FakeType="fake"
27
27
KafkaType="kafka"
28
+
S3Type="s3"
28
29
StdoutType="stdout"
29
30
LokiType="loki"
30
31
AggregateType="aggregates"
@@ -52,6 +53,7 @@ const (
52
53
typeAPIstruct {
53
54
PromEncodePromEncode`yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
54
55
KafkaEncodeEncodeKafka`yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
56
+
S3EncodeEncodeS3`yaml:"s3" doc:"## S3 encode API\nFollowing is the supported API format for S3 encode:\n"`
55
57
IngestCollectorIngestCollector`yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
56
58
IngestKafkaIngestKafka`yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
57
59
IngestGRPCProtoIngestGRPCProto`yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
* Licensed under the Apache License, Version 2.0 (the "License");
5
+
* you may not use this file except in compliance with the License.
6
+
* You may obtain a copy of the License at
7
+
*
8
+
* http://www.apache.org/licenses/LICENSE-2.0
9
+
*
10
+
* Unless required by applicable law or agreed to in writing, software
11
+
* distributed under the License is distributed on an "AS IS" BASIS,
12
+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+
* See the License for the specific language governing permissions and
14
+
* limitations under the License.
15
+
*
16
+
*/
17
+
18
+
package api
19
+
20
+
typeEncodeS3struct {
21
+
Accountstring`yaml:"account" json:"account" doc:"tenant id for this flow collector"`
22
+
Endpointstring`yaml:"endpoint" json:"endpoint" doc:"address of s3 server"`
23
+
AccessKeyIdstring`yaml:"accessKeyId" json:"accessKeyId" doc:"username to connect to server"`
24
+
SecretAccessKeystring`yaml:"secretAccessKey" json:"secretAccessKey" doc:"password to connect to server"`
25
+
Bucketstring`yaml:"bucket" json:"bucket" doc:"bucket into which to store objects"`
26
+
WriteTimeoutDuration`yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation"`
27
+
BatchSizeint`yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many flows will be buffered before being sent to an object"`
28
+
ObjectHeaderParametersmap[string]interface{} `yaml:"objectHeaderParameters,omitempty" json:"objectHeaderParameters,omitempty" doc:"parameters to include in object header (key/value pairs)"`
0 commit comments