Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ linters:
- cyclop
- errname
- exhaustive
- exportloopref
- copyloopvar
- gocritic
- gofmt
- gosimple
Expand All @@ -16,15 +16,17 @@ linters:
- stylecheck
- typecheck
- unused
run:
go: "1.22"
linters-settings:
stylecheck:
go: "1.22"
gocritic:
enabled-checks:
- hugeParam
- rangeExprCopy
- rangeValCopy
- indexAlloc
- deprecatedComment
settings:
ifElseChain:
minThreshold: 3
cyclop:
max-complexity: 20
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ifneq ($(CLEAN_BUILD),)
LDFLAGS ?= -X 'main.buildVersion=${VERSION}-${BUILD_SHA}' -X 'main.buildDate=${BUILD_DATE}'
endif

GOLANGCI_LINT_VERSION = v1.56.2
GOLANGCI_LINT_VERSION = v1.61.0

FLP_BIN_FILE=flowlogs-pipeline
CG_BIN_FILE=confgenerator
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,13 @@ Usage:

General
help Display this help.
prereqs Check if prerequisites are met, and install missing dependencies
prereqs-kind Check if prerequisites are met for running kind, and install missing dependencies
vendors Check go vendors

Develop
lint Lint the code
compile Compile main flowlogs-pipeline and config generator
build Build flowlogs-pipeline executable and update the docs
docs Update flowlogs-pipeline documentation
clean Clean
Expand Down
16 changes: 10 additions & 6 deletions cmd/apitodoc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ func iterate(output io.Writer, data interface{}, indent int) {
if err != nil {
dataTypeName = "(unknown)"
}
if dataType == reflect.Slice || dataType == reflect.Map {
//nolint:exhaustive
switch dataType {
case reflect.Slice, reflect.Map:
// DEBUG code: fmt.Fprintf(output, "%s %s <-- %s \n",strings.Repeat(" ",4*indent),dataTypeName,dataType )
zeroElement := reflect.Zero(reflect.ValueOf(data).Type().Elem()).Interface()
iterate(output, zeroElement, newIndent)
return
} else if dataType == reflect.Struct {
case reflect.Struct:
// DEBUG code: fmt.Fprintf(output,"%s %s <-- %s \n",strings.Repeat(" ",4*indent),dataTypeName,dataType )
for i := 0; i < d.NumField(); i++ {
val := reflect.Indirect(reflect.ValueOf(data))
Expand All @@ -63,16 +65,18 @@ func iterate(output io.Writer, data interface{}, indent int) {
}
}
return
} else if dataType == reflect.Ptr {
case reflect.Ptr:
// DEBUG code: fmt.Fprintf(output, "%s %s <-- %s \n", strings.Repeat(" ", 4*indent), dataTypeName, dataType)
elemType := reflect.TypeOf(data).Elem()
zeroElement := reflect.Zero(elemType).Interface()
// Since we only "converted" Ptr to Struct and the actual output is done in the next iteration, we call
// iterate() with the same `indent` as the current level
iterate(output, zeroElement, indent)
} else if strings.HasPrefix(dataTypeName, "api.") && strings.HasSuffix(dataTypeName, "Enum") {
// set placeholder for enum
fmt.Fprintf(output, "placeholder @%s:%d@\n", strings.TrimPrefix(dataTypeName, "api."), 4*newIndent)
default:
if strings.HasPrefix(dataTypeName, "api.") && strings.HasSuffix(dataTypeName, "Enum") {
// set placeholder for enum
fmt.Fprintf(output, "placeholder @%s:%d@\n", strings.TrimPrefix(dataTypeName, "api."), 4*newIndent)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
|:---|:---|
| **Description** | Counter of hits per secondary network index for Kubernetes enrichment |
| **Type** | counter |
| **Labels** | kind, network, warning |
| **Labels** | kind, namespace, network, warning |


### stage_duration_ms
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/encode_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ type EncodeS3 struct {
Secure bool `yaml:"secure,omitempty" json:"secure,omitempty" doc:"true for https, false for http (default: false)"`
ObjectHeaderParameters map[string]interface{} `yaml:"objectHeaderParameters,omitempty" json:"objectHeaderParameters,omitempty" doc:"parameters to include in object header (key/value pairs)"`
// TBD: (TLS?) security parameters
//TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
// TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
}
2 changes: 1 addition & 1 deletion pkg/confgen/confgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error {
return err
}

//skip if their skip tag match
// skip if their skip tag match
for _, skipTag := range cg.opts.SkipWithTags {
for _, tag := range defFile.Tags {
if skipTag == tag {
Expand Down
8 changes: 4 additions & 4 deletions pkg/confgen/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (cg *ConfGen) generateVisualizeText(vgs []VisualizationGrafana) string {
for _, vs := range vgs {
title := vs.Title
dashboard := vs.Dashboard
section = section + fmt.Sprintf("| **Visualized as** | \"%s\" on dashboard `%s` |\n", title, dashboard)
section += fmt.Sprintf("| **Visualized as** | \"%s\" on dashboard `%s` |\n", title, dashboard)
}

return section
Expand All @@ -42,7 +42,7 @@ func (cg *ConfGen) generatePromEncodeText(metrics api.MetricsItems) string {
for i := range metrics {
mType := metrics[i].Type
name := cg.config.Encode.Prom.Prefix + metrics[i].Name
section = section + fmt.Sprintf("| **Exposed as** | `%s` of type `%s` |\n", name, mType)
section += fmt.Sprintf("| **Exposed as** | `%s` of type `%s` |\n", name, mType)
}

return section
Expand All @@ -57,7 +57,7 @@ func (cg *ConfGen) generateOperationText(definitions api.AggregateDefinitions) s
if operationKey != "" {
operationKey = fmt.Sprintf("field `%s`", operationKey)
}
section = section + fmt.Sprintf("| **OperationType** | aggregate by `%s` and `%s` %s |\n", by, operation, operationKey)
section += fmt.Sprintf("| **OperationType** | aggregate by `%s` and `%s` %s |\n", by, operation, operationKey)
}

return section
Expand All @@ -70,7 +70,7 @@ func (cg *ConfGen) generateDoc(fileName string) error {
replacer := strings.NewReplacer("-", " ", "_", " ")
name := replacer.Replace(filepath.Base(metric.FileName[:len(metric.FileName)-len(filepath.Ext(metric.FileName))]))

labels := strings.Join(metric.Tags[:], ", ")
labels := strings.Join(metric.Tags, ", ")
// TODO: add support for multiple operations
operation := cg.generateOperationText(metric.Aggregates.Rules)
expose := cg.generatePromEncodeText(metric.PromEncode.Metrics)
Expand Down
2 changes: 1 addition & 1 deletion pkg/confgen/grafana_jsonnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (cg *ConfGen) GenerateGrafanaJSON() (string, error) {
if err != nil {
return "", err
}
panelsJSON = panelsJSON + jsonStr
panelsJSON += jsonStr
}
return panelsJSON, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (e *EncodeProm) checkConfUpdate() {
break
}
default:
//Nothing to do
// Nothing to do
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/encode/encode_prom_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type Predicate func(flow config.GenericMap) bool

var variableExtractor, _ = regexp.Compile(`\$\(([^\)]+)\)`)
var variableExtractor = regexp.MustCompile(`\$\(([^\)]+)\)`)

type MetricInfo struct {
*api.MetricsItem
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/encode/metrics_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow con
return nil, 0, ""
}
if info.ValueScale != 0 {
floatVal = floatVal / info.ValueScale
floatVal /= info.ValueScale
}

entryLabels, key := extractLabelsAndKey(flow, info)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/encode/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (e *EncodeOtlpLogs) LogWrite(entry config.GenericMap) {
msg := string(msgByteArray)
// TODO: Decide whether the content should be delivered as Body or as Attributes
lrc := logs.LogRecordConfig{
//Timestamp: &now, // take timestamp from entry, if present?
// Timestamp: &now, // take timestamp from entry, if present?
ObservedTimestamp: now,
SeverityNumber: &sn,
SeverityText: &st,
Expand Down
48 changes: 23 additions & 25 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,31 +149,29 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
if operation == OperationCount {
groupState.totalValue = float64(groupState.totalCount + 1)
groupState.recentOpValue = float64(groupState.recentCount + 1)
} else {
if operationKey != "" {
value, ok := entry[operationKey]
if ok {
valueString := util.ConvertToString(value)
if valueFloat64, err := strconv.ParseFloat(valueString, 64); err != nil {
// Log as debug to avoid performance impact
log.Debugf("UpdateByEntry error when parsing float '%s': %v", valueString, err)
} else {
switch operation {
case OperationSum:
groupState.totalValue += valueFloat64
groupState.recentOpValue += valueFloat64
case OperationMax:
groupState.totalValue = math.Max(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64)
case OperationMin:
groupState.totalValue = math.Min(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64)
case OperationAvg:
groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1)
groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1)
case OperationRawValues:
groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64)
}
} else if operationKey != "" {
value, ok := entry[operationKey]
if ok {
valueString := util.ConvertToString(value)
if valueFloat64, err := strconv.ParseFloat(valueString, 64); err != nil {
// Log as debug to avoid performance impact
log.Debugf("UpdateByEntry error when parsing float '%s': %v", valueString, err)
} else {
switch operation {
case OperationSum:
groupState.totalValue += valueFloat64
groupState.recentOpValue += valueFloat64
case OperationMax:
groupState.totalValue = math.Max(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64)
case OperationMin:
groupState.totalValue = math.Min(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64)
case OperationAvg:
groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1)
groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1)
case OperationRawValues:
groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ func (aggregates *Aggregates) addAggregate(aggregateDefinition *api.AggregateDef
expiryTime: expiryTime.Duration,
}

appendedAggregates := append(aggregates.Aggregates, aggregate)
return appendedAggregates
return append(aggregates.Aggregates, aggregate)
}

func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ func assertHashOrder(t *testing.T, expected []uint64, actualRecords []config.Gen

func hex2int(hexStr string) uint64 {
// remove 0x suffix if found in the input string
cleaned := strings.Replace(hexStr, "0x", "", -1)
cleaned := strings.ReplaceAll(hexStr, "0x", "")

// base 16 for hexadecimal
result, _ := strconv.ParseUint(cleaned, 16, 64)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/timebased/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (fs *FilterStruct) CalculateValue(l *list.List, oldestValidTime time.Time)
}
}
if fs.Rule.OperationType == api.FilterOperationAvg && nItems > 0 {
currentValue = currentValue / float64(nItems)
currentValue /= float64(nItems)
}
if fs.Rule.OperationType == api.FilterOperationCnt {
currentValue = float64(nItems)
Expand Down
6 changes: 2 additions & 4 deletions pkg/pipeline/extract/timebased/timebased.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ func CreateIndexKeysAndFilters(rules []api.TimebasedFilterRule) (map[string]*Ind
}
tmpIndexKeyStructs[filterRule.IndexKey] = rStruct
log.Debugf("new IndexKeyTable: name = %s = %v", filterRule.IndexKey, *rStruct)
} else {
if filterRule.TimeInterval.Duration > rStruct.maxTimeInterval {
rStruct.maxTimeInterval = filterRule.TimeInterval.Duration
}
} else if filterRule.TimeInterval.Duration > rStruct.maxTimeInterval {
rStruct.maxTimeInterval = filterRule.TimeInterval.Duration
}
// verify the validity of the OperationType field in the filterRule
switch filterRule.OperationType {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/transform/kubernetes/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const nodeZoneLabelName = "topology.kubernetes.io/zone"

func fillInK8sZone(outputEntry config.GenericMap, rule *api.K8sRule, kubeInfo *inf.Info, zonePrefix string) {
if !rule.AddZone {
//Nothing to do
// Nothing to do
return
}
switch kubeInfo.Type {
Expand All @@ -121,7 +121,7 @@ func fillInK8sZone(outputEntry config.GenericMap, rule *api.K8sRule, kubeInfo *i
return

case inf.TypeService:
//A service is not assigned to a dedicated zone, skipping
// A service is not assigned to a dedicated zone, skipping
return
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/transform/location/location_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func Test_GetLocation(t *testing.T) {
}

func Test_unzip(t *testing.T) {
//success
// success
buf := new(bytes.Buffer)
zipWriter := zip.NewWriter(buf)
_, _ = zipWriter.Create("test_file_in_zip")
Expand All @@ -137,29 +137,29 @@ func Test_unzip(t *testing.T) {
err := unzip("/tmp/test_zip.zip", "/tmp/")
require.Nil(t, err)

//failed unzip
// failed unzip
err = unzip("fake_test", "fake_test")
require.Error(t, err)

//failed os.MkdirAll
// failed os.MkdirAll
_osio.MkdirAll = func(string, os.FileMode) error { return fmt.Errorf("test") }
err = unzip("/tmp/test_zip.zip", "/tmp/")
require.Error(t, err)
_osio.MkdirAll = os.MkdirAll

//failed os.OpenFile
// failed os.OpenFile
_osio.OpenFile = func(string, int, os.FileMode) (*os.File, error) { return nil, fmt.Errorf("test") }
err = unzip("/tmp/test_zip.zip", "/tmp/")
require.Error(t, err)
_osio.OpenFile = os.OpenFile

//failed io.Copy
// failed io.Copy
_osio.Copy = func(io.Writer, io.Reader) (int64, error) { return 0, fmt.Errorf("test") }
err = unzip("/tmp/test_zip.zip", "/tmp/")
require.Error(t, err)
_osio.Copy = io.Copy

//failed os.MkdirAll dir
// failed os.MkdirAll dir
_osio.MkdirAll = func(string, os.FileMode) error { return fmt.Errorf("test") }
buf = new(bytes.Buffer)
zipWriter = zip.NewWriter(buf)
Expand Down
Loading
Loading