33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
6- * You may obtain a copy ofthe License at
6+ * You may obtain a copy of the License at
77 *
88 * http://www.apache.org/licenses/LICENSE-2.0
99 *
1010 * Unless required by applicable law or agreed to in writing, software
1111 * distributed under the License is distributed on an "AS IS" BASIS,
1212 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13- * See the License for the specificlanguage governing permissions and
13+ * See the License for the specific language governing permissions and
1414 * limitations under the License.
1515 *
1616 */
1717
1818package decode
1919
2020import (
21+ "encoding/json"
22+ "github.com/netobserv/flowlogs2metrics/pkg/api"
2123 "github.com/netobserv/flowlogs2metrics/pkg/config"
2224 log "github.com/sirupsen/logrus"
2325 "strings"
@@ -51,11 +53,12 @@ func (c *decodeAws) Decode(in []interface{}) []config.GenericMap {
5153 out := make ([]config.GenericMap , 0 )
5254 nItems := len (in )
5355 log .Debugf ("nItems = %d" , nItems )
54- for _ , line := range in {
56+ for i , line := range in {
5557 lineSlice := strings .Fields (line .(string ))
5658 nFields := len (lineSlice )
57- if nFields > len (c .keyTags ) {
58- nFields = len (c .keyTags )
59+ if nFields != len (c .keyTags ) {
60+ log .Errorf ("decodeAws Decode: wrong number of fields in line %d" , i + 1 )
61+ continue
5962 }
6063 record := make (config.GenericMap )
6164 for i := 0 ; i < nFields ; i ++ {
@@ -71,12 +74,23 @@ func (c *decodeAws) Decode(in []interface{}) []config.GenericMap {
7174// NewDecodeAws create a new decode
7275func NewDecodeAws () (Decoder , error ) {
7376 log .Debugf ("entering NewDecodeAws" )
74- RecordKeys := config .Opt .PipeLine .Ingest .Aws .Fields
75- if RecordKeys == nil {
76- RecordKeys = defaultKeys
77+ var recordKeys []string
78+ fieldsString := config .Opt .PipeLine .Decode .Aws
79+ log .Debugf ("fieldsString = %v" , fieldsString )
80+ if fieldsString != "" {
81+ var awsFields api.EncodeAwsStruct
82+ err := json .Unmarshal ([]byte (fieldsString ), & awsFields )
83+ if err != nil {
84+ log .Errorf ("NewDecodeAws: error in unmarshalling fields: %v" , err )
85+ return nil , err
86+ }
87+ recordKeys = awsFields .Fields
88+ } else {
89+ recordKeys = defaultKeys
7790 }
78- log .Debugf ("RecordKeys = %v" , RecordKeys )
91+
92+ log .Debugf ("recordKeys = %v" , recordKeys )
7993 return & decodeAws {
80- keyTags : RecordKeys ,
94+ keyTags : recordKeys ,
8195 }, nil
8296}
0 commit comments