diff --git a/cmd/shell-operator/main.go b/cmd/shell-operator/main.go index 7ee76102..f01fe4b6 100644 --- a/cmd/shell-operator/main.go +++ b/cmd/shell-operator/main.go @@ -10,7 +10,7 @@ import ( "github.com/flant/kube-client/klogtolog" "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/debug" - "github.com/flant/shell-operator/pkg/filter/jq" + "github.com/flant/shell-operator/pkg/jq" ) func main() { @@ -30,9 +30,7 @@ func main() { // print version kpApp.Command("version", "Show version.").Action(func(_ *kingpin.ParseContext) error { - fmt.Printf("%s %s\n", app.AppName, app.Version) - fl := jq.NewFilter() - fmt.Println(fl.FilterInfo()) + fmt.Printf("%s %s\n%s\n", app.AppName, app.Version, jq.Info()) return nil }) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 7551a49c..1be58bed 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -1,6 +1,96 @@ +/* +Package filter provides functionality for transforming and filtering objects +from plain kubernetes unstructured objects to structured objects, that shell-operator can work with. + - Supports JQ Expressions + - Supports Custom Filter Functions + - Supports Plain Filter Functions + +Enriches objects with metadata and calculates a checksum. +*/ package filter -type Filter interface { - ApplyFilter(filterStr string, data map[string]any) ([]byte, error) - FilterInfo() string +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "runtime" + "runtime/trace" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/flant/shell-operator/pkg/jq" + kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" + utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum" +) + +type FilterFn func(obj *unstructured.Unstructured) (result interface{}, err error) + +// RunFn runs a filter function on an object and returns an ObjectAndFilterResult. +func RunFn(filterFn FilterFn, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + defer trace.StartRegion(context.Background(), "FilterRunFn").End() + + filtered, err := filterFn(obj) + if err != nil { + return nil, fmt.Errorf("filter function (%s) execution error: %v", funcName(filterFn), err) + } + + filteredBytes, err := json.Marshal(filtered) + if err != nil { + return nil, err + } + + return &kemtypes.ObjectAndFilterResult{ + Object: obj, + Metadata: kemtypes.ObjectAndFilterResultMetadata{ + ResourceId: resourceID(obj), + Checksum: utils_checksum.CalculateChecksum(string(filteredBytes)), + }, + FilterResult: filtered, + }, nil +} + +// RunExpression runs a jq expression on an object and returns an ObjectAndFilterResult. +func RunExpression(expression *jq.Expression, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + defer trace.StartRegion(context.Background(), "FilterRunExpression").End() + + filtered, err := jq.ExecuteJQ(expression, obj) + if err != nil { + return nil, fmt.Errorf("jq expression execution error: %v", err) + } + + return &kemtypes.ObjectAndFilterResult{ + Object: obj, + Metadata: kemtypes.ObjectAndFilterResultMetadata{ + ResourceId: resourceID(obj), + Checksum: utils_checksum.CalculateChecksum(string(filtered)), + JqFilter: expression.Query(), + }, + FilterResult: filtered, + }, nil +} + +// RunPlain runs NO filter function on an object and returns an ObjectAndFilterResult. +func RunPlain(obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + // TODO: json operation could be avoided, we can caclculate checksum from the object itself + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + + return &kemtypes.ObjectAndFilterResult{ + Object: obj, + Metadata: kemtypes.ObjectAndFilterResultMetadata{ + ResourceId: resourceID(obj), + Checksum: utils_checksum.CalculateChecksum(string(data)), + }, + }, nil +} + +func resourceID(obj *unstructured.Unstructured) string { + return fmt.Sprintf("%s/%s/%s", obj.GetNamespace(), obj.GetKind(), obj.GetName()) +} + +func funcName(fn FilterFn) string { + return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() } diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go new file mode 100644 index 00000000..061fc0cf --- /dev/null +++ b/pkg/filter/filter_test.go @@ -0,0 +1,225 @@ +package filter + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/flant/shell-operator/pkg/jq" +) + +// Helper function to create test objects +func newTestObject(data map[string]interface{}) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: data} +} + +// Test RunFn function - custom filter function +func TestRunFn(t *testing.T) { + t.Run("with custom filter function - success", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{"name": "nginx"}, + }, + }, + }) + + filterFn := func(o *unstructured.Unstructured) (interface{}, error) { + return map[string]interface{}{"name": o.GetName()}, nil + } + + result, err := RunFn(filterFn, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, obj, result.Object) + assert.NotEmpty(t, result.Metadata.Checksum) + assert.Equal(t, map[string]interface{}{"name": "test-pod"}, result.FilterResult) + }) + + t.Run("with custom filter function - error", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{"foo": "bar"}) + expectedErr := errors.New("filter failed") + + filterFn := func(_ *unstructured.Unstructured) (interface{}, error) { + return nil, expectedErr + } + + result, err := RunFn(filterFn, obj) + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "filter function") + assert.Contains(t, err.Error(), "filter failed") + }) + + t.Run("with custom filter function - invalid JSON result", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{"foo": "bar"}) + + // Return something that can't be marshaled to JSON + filterFn := func(_ *unstructured.Unstructured) (interface{}, error) { + return make(chan int), nil // channels can't be marshaled + } + + result, err := RunFn(filterFn, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) +} + +// Test RunExpression function - jq expression filter +func TestRunExpression(t *testing.T) { + t.Run("with jq expression - simple filter", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "nodeName": "node-1", + }, + }) + + expr, err := jq.CompileExpression(".spec.nodeName") + require.NoError(t, err) + + result, err := RunExpression(expr, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, `"node-1"`, string(result.FilterResult.([]byte))) + assert.NotEmpty(t, result.Metadata.Checksum) + assert.Equal(t, ".spec.nodeName", result.Metadata.JqFilter) + }) + + t.Run("with jq expression - complex filter", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{"name": "nginx", "image": "nginx:1.19"}, + map[string]interface{}{"name": "sidecar", "image": "sidecar:latest"}, + }, + }, + }) + + expr, err := jq.CompileExpression(".spec.containers[].name") + require.NoError(t, err) + + result, err := RunExpression(expr, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, `["nginx","sidecar"]`, string(result.FilterResult.([]byte))) + }) + + t.Run("with jq expression - filter returns null", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "spec": map[string]interface{}{}, + }) + + expr, err := jq.CompileExpression(".spec.nonexistent") + require.NoError(t, err) + + result, err := RunExpression(expr, obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, "null", string(result.FilterResult.([]byte))) + }) + + t.Run("with jq expression - error", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "spec": "not-an-object", + }) + + expr, err := jq.CompileExpression(".spec.field") + require.NoError(t, err) + + result, err := RunExpression(expr, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) +} + +// Test RunPlain function - no filter +func TestRunPlain(t *testing.T) { + t.Run("no filter - full object checksum", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "test-cm", + "namespace": "kube-system", + }, + "data": map[string]interface{}{ + "key": "value", + }, + }) + + result, err := RunPlain(obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, obj, result.Object) + assert.NotEmpty(t, result.Metadata.Checksum) + assert.Nil(t, result.FilterResult) + assert.Equal(t, "kube-system/ConfigMap/test-cm", result.Metadata.ResourceId) + }) + + t.Run("no filter - empty object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + result, err := RunPlain(obj) + require.NoError(t, err) + assert.NotNil(t, result) + assert.NotEmpty(t, result.Metadata.Checksum) + }) +} + +// Test edge cases and resource ID generation +func TestResourceIdGeneration(t *testing.T) { + t.Run("standard kubernetes object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "my-pod", + "namespace": "default", + }, + }) + + result, err := RunPlain(obj) + require.NoError(t, err) + assert.Equal(t, "default/Pod/my-pod", result.Metadata.ResourceId) + }) + + t.Run("cluster-scoped resource", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Node", + "metadata": map[string]interface{}{ + "name": "node-1", + }, + }) + + result, err := RunPlain(obj) + require.NoError(t, err) + assert.Equal(t, "/Node/node-1", result.Metadata.ResourceId) + }) + + t.Run("missing metadata", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Unknown", + }) + + result, err := RunPlain(obj) + require.NoError(t, err) + assert.Equal(t, "/Unknown/", result.Metadata.ResourceId) + }) +} diff --git a/pkg/filter/jq/apply.go b/pkg/filter/jq/apply.go deleted file mode 100644 index 83905572..00000000 --- a/pkg/filter/jq/apply.go +++ /dev/null @@ -1,81 +0,0 @@ -package jq - -import ( - "encoding/json" - "errors" - - "github.com/itchyny/gojq" - - "github.com/flant/shell-operator/pkg/filter" -) - -var _ filter.Filter = (*Filter)(nil) - -func NewFilter() *Filter { - return &Filter{} -} - -type Filter struct{} - -// ApplyFilter runs jq expression provided in jqFilter with jsonData as input. -func (f *Filter) ApplyFilter(jqFilter string, data map[string]any) ([]byte, error) { - query, err := gojq.Parse(jqFilter) - if err != nil { - return nil, err - } - - var workData any - if data == nil { - workData = nil - } else { - workData, err = deepCopyAny(data) - if err != nil { - return nil, err - } - } - - iter := query.Run(workData) - result := make([]any, 0) - for { - v, ok := iter.Next() - if !ok { - break - } - if err, ok := v.(error); ok { - var errGoJq *gojq.HaltError - if errors.As(err, &errGoJq) && errGoJq.Value() == nil { - break - } - return nil, err - } - result = append(result, v) - } - - switch len(result) { - case 0: - return []byte("null"), nil - case 1: - return json.Marshal(result[0]) - default: - return json.Marshal(result) - } -} - -func (f *Filter) FilterInfo() string { - return "jqFilter implementation: using itchyny/gojq" -} - -func deepCopyAny(input any) (any, error) { - if input == nil { - return nil, nil - } - data, err := json.Marshal(input) - if err != nil { - return nil, err - } - var output any - if err := json.Unmarshal(data, &output); err != nil { - return nil, err - } - return output, nil -} diff --git a/pkg/filter/jq/apply_test.go b/pkg/filter/jq/apply_test.go deleted file mode 100644 index 39b24374..00000000 --- a/pkg/filter/jq/apply_test.go +++ /dev/null @@ -1,202 +0,0 @@ -package jq - -import ( - "encoding/json" - "testing" - - . "github.com/onsi/gomega" -) - -func Test_ApplyFilter_SingleDocumentModification(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `. + {"status": "active"}` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"name": "John", "age": 30}) - g.Expect(err).Should(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(Equal(map[string]any{"name": "John", "age": 30.0, "status": "active"})) -} - -func Test_ApplyFilter_ExtractValuesFromDocument(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `.user.details` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"user": map[string]any{"name": "John", "details": map[string]any{"location": "New York", "occupation": "Developer"}}}) - g.Expect(err).Should(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(Equal(map[string]any{"location": "New York", "occupation": "Developer"})) -} - -func Test_ApplyFilter_MultipleJsonDocumentsInArray(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `.users[] | . + {"status": "active"}` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"users": []any{map[string]any{"name": "John", "status": "inactive"}, map[string]any{"name": "Jane", "status": "inactive"}}}) - g.Expect(err).Should(BeNil()) - - var resultMap []any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(HaveLen(2)) - - expected := []map[string]any{ - {"name": "John", "status": "active"}, - {"name": "Jane", "status": "active"}, - } - - g.Expect(resultMap).Should(ConsistOf(expected)) -} - -func Test_ApplyFilter_InvalidFilter(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - // Test invalid jq syntax - invalidSyntax := `invalid syntax` - result, err := filter.ApplyFilter(invalidSyntax, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) - - // Test invalid jq function - invalidFunction := `. | invalid_function` - result, err = filter.ApplyFilter(invalidFunction, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_InvalidJson(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - jqFilter := `.name` - - result, err := filter.ApplyFilter(jqFilter, map[string]any{"name": "John"}) - g.Expect(err).Should(BeNil()) - g.Expect(result).ShouldNot(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).ShouldNot(BeNil()) -} - -func Test_ApplyFilter_NilInputData(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter(`.`, nil) - g.Expect(err).Should(BeNil()) - g.Expect(result).ShouldNot(BeNil()) -} - -func Test_ApplyFilter_EmptyFilter(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter("", map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_InvalidJsonInDeepCopy(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - // Create invalid JSON data that cannot be marshaled - invalidData := map[string]any{ - "channel": make(chan int), // channel cannot be marshaled to JSON - } - - result, err := filter.ApplyFilter(`.`, invalidData) - g.Expect(err).ShouldNot(BeNil()) // Expect an error due to invalid JSON - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_EmptyResult(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - // Filter that returns no results - result, err := filter.ApplyFilter(`.nonexistent`, map[string]any{"name": "John"}) - g.Expect(err).Should(BeNil()) - g.Expect(result).ShouldNot(BeNil()) - - var resultMap any - err = json.Unmarshal(result, &resultMap) - g.Expect(err).Should(BeNil()) - g.Expect(resultMap).Should(BeNil()) // Expect result to be nil (empty) -} - -func Test_ApplyFilter_InvalidJqSyntax(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter(`invalid syntax`, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_InvalidJqFunction(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - result, err := filter.ApplyFilter(`. | invalid_function`, map[string]any{"name": "John"}) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(result).Should(BeNil()) -} - -func Test_ApplyFilter_PanicSafety(t *testing.T) { - g := NewWithT(t) - filter := NewFilter() - - defer func() { - if r := recover(); r != nil { - t.Errorf("ApplyFilter panicked: %v", r) - } - }() - - // Test with data that could potentially cause a panic - _, err := filter.ApplyFilter(`.`, map[string]any{"key": func() {}}) - g.Expect(err).ShouldNot(BeNil()) -} - -func Test_deepCopyAny(t *testing.T) { - g := NewWithT(t) - - // Test copying a map - inputMap := map[string]any{"foo": "bar", "num": 42} - copyMap, err := deepCopyAny(inputMap) - g.Expect(err).Should(BeNil()) - g.Expect(copyMap).Should(Equal(map[string]any{"foo": "bar", "num": float64(42)})) - g.Expect(copyMap).ShouldNot(BeIdenticalTo(inputMap)) - - // Test copying a slice - inputSlice := []any{"a", 1, true} - copySlice, err := deepCopyAny(inputSlice) - g.Expect(err).Should(BeNil()) - g.Expect(copySlice).Should(Equal([]any{"a", float64(1), true})) - g.Expect(copySlice).ShouldNot(BeIdenticalTo(inputSlice)) - - // Test copying nil - copyNil, err := deepCopyAny(nil) - g.Expect(err).Should(BeNil()) - g.Expect(copyNil).Should(BeNil()) - - // Test copying a value that cannot be marshaled to JSON - inputInvalid := map[string]any{"ch": make(chan int)} - copyInvalid, err := deepCopyAny(inputInvalid) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(copyInvalid).Should(BeNil()) -} diff --git a/pkg/hook/config/config_v0.go b/pkg/hook/config/config_v0.go index 4fb3d89b..abb4c498 100644 --- a/pkg/hook/config/config_v0.go +++ b/pkg/hook/config/config_v0.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" htypes "github.com/flant/shell-operator/pkg/hook/types" + "github.com/flant/shell-operator/pkg/jq" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" smtypes "github.com/flant/shell-operator/pkg/schedule_manager/types" @@ -107,7 +108,14 @@ func (cv0 *HookConfigV0) ConvertAndCheck(c *HookConfig) error { }) } monitor.WithLabelSelector(kubeCfg.Selector) - monitor.JqFilter = kubeCfg.JqFilter + + if kubeCfg.JqFilter != "" { + filter, err := jq.CompileExpression(kubeCfg.JqFilter) + if err != nil { + return fmt.Errorf("invalid jqFilter: %w", err) + } + monitor.JqFilter = filter + } kubeConfig := htypes.OnKubernetesEventConfig{} kubeConfig.Monitor = monitor diff --git a/pkg/hook/config/config_v1.go b/pkg/hook/config/config_v1.go index 59687fb2..71934a71 100644 --- a/pkg/hook/config/config_v1.go +++ b/pkg/hook/config/config_v1.go @@ -13,6 +13,7 @@ import ( "github.com/flant/shell-operator/pkg/app" htypes "github.com/flant/shell-operator/pkg/hook/types" + "github.com/flant/shell-operator/pkg/jq" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" smtypes "github.com/flant/shell-operator/pkg/schedule_manager/types" @@ -130,7 +131,15 @@ func (cv1 *HookConfigV1) ConvertAndCheck(c *HookConfig) error { monitor.WithFieldSelector((*kemtypes.FieldSelector)(kubeCfg.FieldSelector)) monitor.WithNamespaceSelector((*kemtypes.NamespaceSelector)(kubeCfg.Namespace)) monitor.WithLabelSelector(kubeCfg.LabelSelector) - monitor.JqFilter = kubeCfg.JqFilter + + if kubeCfg.JqFilter != "" { + filter, err := jq.CompileExpression(kubeCfg.JqFilter) + if err != nil { + return fmt.Errorf("invalid jqFilter: %w", err) + } + monitor.JqFilter = filter + } + // executeHookOnEvent is a priority if kubeCfg.ExecuteHookOnEvents != nil { monitor.WithEventTypes(kubeCfg.ExecuteHookOnEvents) diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index 97f500a0..4c47505e 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -344,7 +344,9 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern Type: kubeEvent.Type, Objects: kubeEvent.Objects, } - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter + if link.BindingConfig.Monitor.JqFilter != nil { + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query() + } bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom bc.Metadata.Group = link.BindingConfig.Group @@ -359,7 +361,9 @@ func ConvertKubeEventToBindingContext(kubeEvent kemtypes.KubeEvent, link *Kubern WatchEvent: kEvent, Objects: kubeEvent.Objects, } - bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter + if link.BindingConfig.Monitor.JqFilter != nil { + bc.Metadata.JqFilter = link.BindingConfig.Monitor.JqFilter.Query() + } bc.Metadata.BindingType = htypes.OnKubernetesEvent bc.Metadata.IncludeSnapshots = link.BindingConfig.IncludeSnapshotsFrom bc.Metadata.Group = link.BindingConfig.Group diff --git a/pkg/jq/jq.go b/pkg/jq/jq.go new file mode 100644 index 00000000..c1dfefbd --- /dev/null +++ b/pkg/jq/jq.go @@ -0,0 +1,73 @@ +package jq + +import ( + "encoding/json" + "errors" + + "github.com/itchyny/gojq" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +type Expression struct { + *gojq.Code + query string +} + +func (e *Expression) Query() string { + return e.query +} + +// ExecuteJQ executes jq expression and collect results +func ExecuteJQ(expression *Expression, obj *unstructured.Unstructured) ([]byte, error) { + iter := expression.Run(obj.UnstructuredContent()) + var results []any + + for { + v, ok := iter.Next() + if !ok { + break + } + + // Handle errors from jq execution + if err, ok := v.(error); ok { + // HaltError with nil value means graceful termination + var haltErr *gojq.HaltError + if errors.As(err, &haltErr) && haltErr.Value() == nil { + break + } + return nil, err + } + + results = append(results, v) + } + + // Marshal results based on count + switch len(results) { + case 0: + return []byte("null"), nil + case 1: + return json.Marshal(results[0]) + default: + return json.Marshal(results) + } +} + +func CompileExpression(expression string) (*Expression, error) { + parsedQuery, err := gojq.Parse(expression) + if err != nil { + return nil, err + } + compiledQuery, err := gojq.Compile(parsedQuery) + if err != nil { + return nil, err + } + + return &Expression{ + Code: compiledQuery, + query: expression, + }, nil +} + +func Info() string { + return "jq implementation: using itchyny/gojq" +} diff --git a/pkg/jq/jq_test.go b/pkg/jq/jq_test.go new file mode 100644 index 00000000..92cee5e4 --- /dev/null +++ b/pkg/jq/jq_test.go @@ -0,0 +1,428 @@ +package jq + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// Helper function to create test objects +func newTestObject(data map[string]interface{}) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: data} +} + +// Test ExecuteJQ function +func TestExecuteJQ(t *testing.T) { + t.Run("simple selection", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "name": "test", + "age": 42, + }) + + expr, err := CompileExpression(".name") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"test"`, string(result)) + }) + + t.Run("multiple results", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }) + + expr, err := CompileExpression(".items[]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[1,2,3]", string(result)) + }) + + t.Run("no results - returns null", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + expr, err := CompileExpression("empty") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("single null result", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": nil, + }) + + expr, err := CompileExpression(".value") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("complex object transformation", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "users": []interface{}{ + map[string]interface{}{"name": "Alice", "age": 30}, + map[string]interface{}{"name": "Bob", "age": 25}, + }, + }) + + expr, err := CompileExpression(".users | map({name, older: (.age > 26)})") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + + var parsed []map[string]interface{} + err = json.Unmarshal(result, &parsed) + require.NoError(t, err) + assert.Len(t, parsed, 2) + assert.Equal(t, "Alice", parsed[0]["name"]) + assert.Equal(t, true, parsed[0]["older"]) + assert.Equal(t, "Bob", parsed[1]["name"]) + assert.Equal(t, false, parsed[1]["older"]) + }) + + t.Run("array construction", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "a": 1, + "b": 2, + }) + + expr, err := CompileExpression("[.a, .b, (.a + .b)]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[1,2,3]", string(result)) + }) + + t.Run("object construction", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "firstName": "John", + "lastName": "Doe", + }) + + expr, err := CompileExpression("{fullName: (.firstName + \" \" + .lastName)}") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"fullName":"John Doe"}`, string(result)) + }) + + t.Run("type error", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": "string", + }) + + expr, err := CompileExpression(".value.field") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) + + t.Run("halt error with nil value - graceful termination", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "items": []interface{}{1, 2, 3}, + }) + + // limit(0) produces halt error with nil value + expr, err := CompileExpression(".items[] | limit(0; .)") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "null", string(result)) + }) + + t.Run("division by zero", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "value": 10, + }) + + expr, err := CompileExpression(".value / 0") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + assert.Error(t, err) + assert.Nil(t, result) + }) + + t.Run("nested array and object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "data": map[string]interface{}{ + "items": []interface{}{ + map[string]interface{}{ + "tags": []interface{}{"a", "b", "c"}, + }, + map[string]interface{}{ + "tags": []interface{}{"x", "y"}, + }, + }, + }, + }) + + expr, err := CompileExpression(".data.items[].tags[]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `["a","b","c","x","y"]`, string(result)) + }) + + t.Run("conditional expression", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "status": "active", + }) + + expr, err := CompileExpression(`if .status == "active" then "running" else "stopped" end`) + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"running"`, string(result)) + }) + + t.Run("boolean values", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "enabled": true, + "disabled": false, + }) + + expr, err := CompileExpression("[.enabled, .disabled]") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "[true,false]", string(result)) + }) + + t.Run("numeric operations", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "a": 10, + "b": 3, + }) + + expr, err := CompileExpression("{sum: (.a + .b), diff: (.a - .b), prod: (.a * .b)}") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"sum":13,"diff":7,"prod":30}`, string(result)) + }) + + t.Run("string operations", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "text": "Hello, World!", + }) + + expr, err := CompileExpression(".text | ascii_upcase") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"HELLO, WORLD!"`, string(result)) + }) + + t.Run("empty object", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{}) + + expr, err := CompileExpression(".") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, "{}", string(result)) + }) + + t.Run("special characters in strings", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "text": "Line 1\nLine 2\tTabbed", + }) + + expr, err := CompileExpression(".text") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + + var text string + err = json.Unmarshal(result, &text) + require.NoError(t, err) + assert.Equal(t, "Line 1\nLine 2\tTabbed", text) + }) + + t.Run("unicode characters", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "emoji": "๐Ÿš€โœจ๐ŸŽ‰", + "cyrillic": "ะŸั€ะธะฒะตั‚", + }) + + expr, err := CompileExpression("{emoji, cyrillic}") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.JSONEq(t, `{"emoji":"๐Ÿš€โœจ๐ŸŽ‰","cyrillic":"ะŸั€ะธะฒะตั‚"}`, string(result)) + }) + + t.Run("kubernetes object filtering", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + "labels": map[string]interface{}{ + "app": "nginx", + }, + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + }, + }, + }) + + expr, err := CompileExpression(".metadata.name") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `"test-pod"`, string(result)) + }) + + t.Run("kubernetes object complex filtering", func(t *testing.T) { + obj := newTestObject(map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-pod", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "containers": []interface{}{ + map[string]interface{}{ + "name": "nginx", + "image": "nginx:latest", + }, + map[string]interface{}{ + "name": "sidecar", + "image": "sidecar:latest", + }, + }, + }, + }) + + expr, err := CompileExpression(".spec.containers[].name") + require.NoError(t, err) + + result, err := ExecuteJQ(expr, obj) + require.NoError(t, err) + assert.Equal(t, `["nginx","sidecar"]`, string(result)) + }) +} + +// Test CompileExpression function +func TestCompileExpression(t *testing.T) { + t.Run("valid simple expression", func(t *testing.T) { + expr, err := CompileExpression(".field") + require.NoError(t, err) + assert.NotNil(t, expr) + assert.NotNil(t, expr.Code) + assert.Equal(t, ".field", expr.Query()) + }) + + t.Run("valid complex expression", func(t *testing.T) { + expr, err := CompileExpression(`.items[] | select(.type == "pod") | .metadata.name`) + require.NoError(t, err) + assert.NotNil(t, expr) + assert.NotNil(t, expr.Code) + }) + + t.Run("invalid expression - syntax error", func(t *testing.T) { + expr, err := CompileExpression(".field[") + assert.Error(t, err) + assert.Nil(t, expr) + }) + + t.Run("invalid expression - incomplete", func(t *testing.T) { + expr, err := CompileExpression(".") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("empty expression", func(t *testing.T) { + expr, err := CompileExpression("") + assert.Error(t, err) + assert.Nil(t, expr) + }) + + t.Run("expression with function", func(t *testing.T) { + expr, err := CompileExpression(".items | length") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with pipe", func(t *testing.T) { + expr, err := CompileExpression(".data | keys | sort") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with map", func(t *testing.T) { + expr, err := CompileExpression(".items | map(.name)") + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with select", func(t *testing.T) { + expr, err := CompileExpression(`.items[] | select(.status == "active")`) + require.NoError(t, err) + assert.NotNil(t, expr) + }) + + t.Run("expression with multiple pipes", func(t *testing.T) { + expr, err := CompileExpression(".items[] | select(.type == \"pod\") | .metadata.name | length") + require.NoError(t, err) + assert.NotNil(t, expr) + }) +} + +// Test Expression struct methods +func TestExpression(t *testing.T) { + t.Run("Query method returns original expression", func(t *testing.T) { + originalQuery := ".metadata.name" + expr, err := CompileExpression(originalQuery) + require.NoError(t, err) + assert.Equal(t, originalQuery, expr.Query()) + }) + + t.Run("Code field is set correctly", func(t *testing.T) { + expr, err := CompileExpression(".field") + require.NoError(t, err) + assert.NotNil(t, expr.Code) + }) +} + +// Test Info function +func TestInfo(t *testing.T) { + t.Run("returns correct implementation info", func(t *testing.T) { + info := Info() + assert.Equal(t, "jq implementation: using itchyny/gojq", info) + }) +} diff --git a/pkg/kube/object_patch/helpers.go b/pkg/kube/object_patch/helpers.go index 21da061e..a9d22c28 100644 --- a/pkg/kube/object_patch/helpers.go +++ b/pkg/kube/object_patch/helpers.go @@ -12,7 +12,7 @@ import ( k8yaml "sigs.k8s.io/yaml" "github.com/flant/kube-client/manifest" - "github.com/flant/shell-operator/pkg/filter" + "github.com/flant/shell-operator/pkg/jq" ) func unmarshalFromJSONOrYAML(specs []byte) ([]OperationSpec, error) { @@ -64,11 +64,11 @@ func unmarshalFromYaml(yamlSpecs []byte) ([]OperationSpec, error) { return specSlice, nil } -func applyJQPatch(jqFilter string, fl filter.Filter, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - filterResult, err := fl.ApplyFilter(jqFilter, obj.UnstructuredContent()) +func applyJQPatch(jqFilter *jq.Expression, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + filterResult, err := jq.ExecuteJQ(jqFilter, obj) if err != nil { return nil, fmt.Errorf("failed to apply jqFilter:\n%sto Object:\n%s\n"+ - "error: %s", jqFilter, obj, err) + "error: %s", jqFilter.Query(), obj, err) } retObj := &unstructured.Unstructured{} diff --git a/pkg/kube/object_patch/operation.go b/pkg/kube/object_patch/operation.go index 6f8ab26c..1af5c872 100644 --- a/pkg/kube/object_patch/operation.go +++ b/pkg/kube/object_patch/operation.go @@ -11,7 +11,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - "github.com/flant/shell-operator/pkg/filter/jq" + "github.com/flant/shell-operator/pkg/jq" ) // OperationSpec a JSON and YAML representation of the operation for shell hooks @@ -288,8 +288,12 @@ func newPatchOperation(patchType types.PatchType, patch any, apiVersion, kind, n func NewPatchWithJQOperation(jqQuery string, apiVersion string, kind string, namespace string, name string, opts ...sdkpkg.PatchCollectorOption) sdkpkg.PatchCollectorOperation { return newFilterOperation(func(u *unstructured.Unstructured) (*unstructured.Unstructured, error) { - filter := jq.NewFilter() - return applyJQPatch(jqQuery, filter, u) + expression, err := jq.CompileExpression(jqQuery) + if err != nil { + return nil, err + } + + return applyJQPatch(expression, u) }, apiVersion, kind, namespace, name, opts...) } diff --git a/pkg/kube_events_manager/filter.go b/pkg/kube_events_manager/filter.go deleted file mode 100644 index 7b704cd0..00000000 --- a/pkg/kube_events_manager/filter.go +++ /dev/null @@ -1,68 +0,0 @@ -package kubeeventsmanager - -import ( - "context" - "encoding/json" - "fmt" - "reflect" - "runtime" - "runtime/trace" - - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/flant/shell-operator/pkg/filter" - kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" - utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum" -) - -// applyFilter filters object json representation with jq expression, calculate checksum -// over result and return ObjectAndFilterResult. If jqFilter is empty, no filter -// is required and checksum is calculated over full json representation of the object. -func applyFilter(jqFilter string, fl filter.Filter, filterFn func(obj *unstructured.Unstructured) (result interface{}, err error), obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { - defer trace.StartRegion(context.Background(), "ApplyJqFilter").End() - - res := &kemtypes.ObjectAndFilterResult{ - Object: obj, - } - res.Metadata.JqFilter = jqFilter - res.Metadata.ResourceId = resourceId(obj) - - // If filterFn is passed, run it and return result. - if filterFn != nil { - filteredObj, err := filterFn(obj) - if err != nil { - return nil, fmt.Errorf("filterFn (%s) contains an error: %v", runtime.FuncForPC(reflect.ValueOf(filterFn).Pointer()).Name(), err) - } - - filteredBytes, err := json.Marshal(filteredObj) - if err != nil { - return nil, err - } - - res.FilterResult = filteredObj - res.Metadata.Checksum = utils_checksum.CalculateChecksum(string(filteredBytes)) - - return res, nil - } - - // Render obj to JSON text to apply jq filter. - if jqFilter == "" { - data, err := json.Marshal(obj) - if err != nil { - return nil, err - } - res.Metadata.Checksum = utils_checksum.CalculateChecksum(string(data)) - } else { - var err error - var filtered []byte - filtered, err = fl.ApplyFilter(jqFilter, obj.UnstructuredContent()) - if err != nil { - return nil, fmt.Errorf("jqFilter: %v", err) - } - - res.FilterResult = string(filtered) - res.Metadata.Checksum = utils_checksum.CalculateChecksum(string(filtered)) - } - - return res, nil -} diff --git a/pkg/kube_events_manager/filter_test.go b/pkg/kube_events_manager/filter_test.go deleted file mode 100644 index 5c1961ab..00000000 --- a/pkg/kube_events_manager/filter_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package kubeeventsmanager - -import ( - "encoding/json" - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/flant/shell-operator/pkg/filter/jq" -) - -func TestApplyFilter(t *testing.T) { - t.Run("filter func with error", func(t *testing.T) { - uns := &unstructured.Unstructured{Object: map[string]interface{}{"foo": "bar"}} - filter := jq.NewFilter() - _, err := applyFilter("", filter, filterFuncWithError, uns) - assert.EqualError(t, err, "filterFn (github.com/flant/shell-operator/pkg/kube_events_manager.filterFuncWithError) contains an error: invalid character 'a' looking for beginning of value") - }) -} - -func filterFuncWithError(_ *unstructured.Unstructured) (interface{}, error) { - var s []string - err := json.Unmarshal([]byte("asdasd"), &s) - - return s, err -} diff --git a/pkg/kube_events_manager/monitor_config.go b/pkg/kube_events_manager/monitor_config.go index 27df1a17..e7f7f3ec 100644 --- a/pkg/kube_events_manager/monitor_config.go +++ b/pkg/kube_events_manager/monitor_config.go @@ -5,6 +5,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "github.com/flant/shell-operator/pkg/jq" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" ) @@ -24,7 +25,7 @@ type MonitorConfig struct { NamespaceSelector *kemtypes.NamespaceSelector LabelSelector *metav1.LabelSelector FieldSelector *kemtypes.FieldSelector - JqFilter string + JqFilter *jq.Expression Logger *log.Logger Mode kemtypes.KubeEventMode KeepFullObjectsInMemory bool diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index ebb8a5f6..d1a1db22 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -17,7 +17,7 @@ import ( "k8s.io/client-go/tools/cache" klient "github.com/flant/kube-client/client" - "github.com/flant/shell-operator/pkg/filter/jq" + "github.com/flant/shell-operator/pkg/filter" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/utils/measure" @@ -221,16 +221,7 @@ func (ei *resourceInformer) loadExistedObjects() error { // copy loop var to avoid duplication of pointer in filteredObjects obj := item - var objFilterRes *kemtypes.ObjectAndFilterResult - var err error - func() { - defer measure.Duration(func(d time.Duration) { - ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) - })() - filter := jq.NewFilter() - objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, &obj) - }() - + objFilterRes, err := ei.transformObject(&obj) if err != nil { return err } @@ -301,15 +292,7 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty // Always calculate checksum and update cache, because we need an actual state in ei.cachedObjects. - var objFilterRes *kemtypes.ObjectAndFilterResult - var err error - func() { - defer measure.Duration(func(d time.Duration) { - ei.metricStorage.HistogramObserve(metrics.KubeJqFilterDurationSeconds, d.Seconds(), ei.Monitor.Metadata.MetricLabels, nil) - })() - filter := jq.NewFilter() - objFilterRes, err = applyFilter(ei.Monitor.JqFilter, filter, ei.Monitor.FilterFunc, obj) - }() + objFilterRes, err := ei.transformObject(obj) if err != nil { log.Error("handleWatchEvent: applyFilter error", slog.String("debugName", ei.Monitor.Metadata.DebugName), @@ -490,3 +473,24 @@ func (ei *resourceInformer) getCachedObjectsInfoIncrement() CachedObjectsInfo { ei.cachedObjectsIncrement = &CachedObjectsInfo{} return info } + +func (ei *resourceInformer) transformObject(obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) { + recordDuration := measure.Duration(func(d time.Duration) { + ei.metricStorage.HistogramObserve( + metrics.KubeJqFilterDurationSeconds, + d.Seconds(), + ei.Monitor.Metadata.MetricLabels, + nil, + ) + }) + defer recordDuration() + + if ei.Monitor.JqFilter != nil { + return filter.RunExpression(ei.Monitor.JqFilter, obj) + } + if ei.Monitor.FilterFunc != nil { + return filter.RunFn(ei.Monitor.FilterFunc, obj) + } + + return filter.RunPlain(obj) +} diff --git a/pkg/kube_events_manager/types/types.go b/pkg/kube_events_manager/types/types.go index 5bdfb8d7..1f898e1a 100644 --- a/pkg/kube_events_manager/types/types.go +++ b/pkg/kube_events_manager/types/types.go @@ -35,16 +35,18 @@ const ( ) type ObjectAndFilterResult struct { - Metadata struct { - JqFilter string - Checksum string - ResourceId string // Used for sorting - RemoveObject bool - } + Metadata ObjectAndFilterResultMetadata Object *unstructured.Unstructured // here is a pointer because of MarshalJSON receiver FilterResult interface{} } +type ObjectAndFilterResultMetadata struct { + JqFilter string + Checksum string + ResourceId string // Used for sorting + RemoveObject bool +} + // Map constructs a map suitable for use in binding context. func (o ObjectAndFilterResult) Map() map[string]interface{} { m := map[string]interface{}{} diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index 10121625..c5460d9c 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -9,8 +9,8 @@ import ( "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/config" "github.com/flant/shell-operator/pkg/debug" - "github.com/flant/shell-operator/pkg/filter/jq" "github.com/flant/shell-operator/pkg/hook" + "github.com/flant/shell-operator/pkg/jq" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" "github.com/flant/shell-operator/pkg/metrics" schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager" @@ -27,10 +27,9 @@ func Init(logger *log.Logger) (*ShellOperator, error) { // Init logging subsystem. app.SetupLogging(runtimeConfig, logger) - // Log version and jq filtering implementation. + // Log version and filter implementation logger.Info(app.AppStartMessage) - fl := jq.NewFilter() - logger.Debug(fl.FilterInfo()) + logger.Debug(jq.Info()) hooksDir, err := utils.RequireExistingDirectory(app.HooksDir) if err != nil {