Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cmd/shell-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/flant/kube-client/klogtolog"
"github.com/flant/shell-operator/pkg/app"
"github.com/flant/shell-operator/pkg/debug"
"github.com/flant/shell-operator/pkg/filter/jq"
"github.com/flant/shell-operator/pkg/jq"
)

func main() {
Expand All @@ -30,9 +30,7 @@ func main() {

// print version
kpApp.Command("version", "Show version.").Action(func(_ *kingpin.ParseContext) error {
fmt.Printf("%s %s\n", app.AppName, app.Version)
fl := jq.NewFilter()
fmt.Println(fl.FilterInfo())
fmt.Printf("%s %s\n%s\n", app.AppName, app.Version, jq.Info())
return nil
})

Expand Down
96 changes: 93 additions & 3 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,96 @@
/*
Package filter provides functionality for transforming and filtering objects
from plain kubernetes unstructured objects to structured objects, that shell-operator can work with.
- Supports JQ Expressions
- Supports Custom Filter Functions
- Supports Plain Filter Functions

Enriches objects with metadata and calculates a checksum.
*/
package filter

type Filter interface {
ApplyFilter(filterStr string, data map[string]any) ([]byte, error)
FilterInfo() string
import (
"context"
"encoding/json"
"fmt"
"reflect"
"runtime"
"runtime/trace"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/flant/shell-operator/pkg/jq"
kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types"
utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum"
)

type FilterFn func(obj *unstructured.Unstructured) (result interface{}, err error)

// RunFn runs a filter function on an object and returns an ObjectAndFilterResult.
func RunFn(filterFn FilterFn, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) {
defer trace.StartRegion(context.Background(), "FilterRunFn").End()

filtered, err := filterFn(obj)
if err != nil {
return nil, fmt.Errorf("filter function (%s) execution error: %v", funcName(filterFn), err)
}

filteredBytes, err := json.Marshal(filtered)
if err != nil {
return nil, err
}

return &kemtypes.ObjectAndFilterResult{
Object: obj,
Metadata: kemtypes.ObjectAndFilterResultMetadata{
ResourceId: resourceID(obj),
Checksum: utils_checksum.CalculateChecksum(string(filteredBytes)),
},
FilterResult: filtered,
}, nil
}

// RunExpression runs a jq expression on an object and returns an ObjectAndFilterResult.
func RunExpression(expression *jq.Expression, obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) {
defer trace.StartRegion(context.Background(), "FilterRunExpression").End()

filtered, err := jq.ExecuteJQ(expression, obj)
if err != nil {
return nil, fmt.Errorf("jq expression execution error: %v", err)
}

return &kemtypes.ObjectAndFilterResult{
Object: obj,
Metadata: kemtypes.ObjectAndFilterResultMetadata{
ResourceId: resourceID(obj),
Checksum: utils_checksum.CalculateChecksum(string(filtered)),
JqFilter: expression.Query(),
},
FilterResult: filtered,
}, nil
}

// RunPlain runs NO filter function on an object and returns an ObjectAndFilterResult.
func RunPlain(obj *unstructured.Unstructured) (*kemtypes.ObjectAndFilterResult, error) {
// TODO: json operation could be avoided, we can caclculate checksum from the object itself
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}

return &kemtypes.ObjectAndFilterResult{
Object: obj,
Metadata: kemtypes.ObjectAndFilterResultMetadata{
ResourceId: resourceID(obj),
Checksum: utils_checksum.CalculateChecksum(string(data)),
},
}, nil
}

func resourceID(obj *unstructured.Unstructured) string {
return fmt.Sprintf("%s/%s/%s", obj.GetNamespace(), obj.GetKind(), obj.GetName())
}

func funcName(fn FilterFn) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}
225 changes: 225 additions & 0 deletions pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package filter

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/flant/shell-operator/pkg/jq"
)

// Helper function to create test objects
func newTestObject(data map[string]interface{}) *unstructured.Unstructured {
return &unstructured.Unstructured{Object: data}
}

// Test RunFn function - custom filter function
func TestRunFn(t *testing.T) {
t.Run("with custom filter function - success", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "test-pod",
"namespace": "default",
},
"spec": map[string]interface{}{
"containers": []interface{}{
map[string]interface{}{"name": "nginx"},
},
},
})

filterFn := func(o *unstructured.Unstructured) (interface{}, error) {
return map[string]interface{}{"name": o.GetName()}, nil
}

result, err := RunFn(filterFn, obj)
require.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, obj, result.Object)
assert.NotEmpty(t, result.Metadata.Checksum)
assert.Equal(t, map[string]interface{}{"name": "test-pod"}, result.FilterResult)
})

t.Run("with custom filter function - error", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{"foo": "bar"})
expectedErr := errors.New("filter failed")

filterFn := func(_ *unstructured.Unstructured) (interface{}, error) {
return nil, expectedErr
}

result, err := RunFn(filterFn, obj)
assert.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "filter function")
assert.Contains(t, err.Error(), "filter failed")
})

t.Run("with custom filter function - invalid JSON result", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{"foo": "bar"})

// Return something that can't be marshaled to JSON
filterFn := func(_ *unstructured.Unstructured) (interface{}, error) {
return make(chan int), nil // channels can't be marshaled
}

result, err := RunFn(filterFn, obj)
assert.Error(t, err)
assert.Nil(t, result)
})
}

// Test RunExpression function - jq expression filter
func TestRunExpression(t *testing.T) {
t.Run("with jq expression - simple filter", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "test-pod",
"namespace": "default",
},
"spec": map[string]interface{}{
"nodeName": "node-1",
},
})

expr, err := jq.CompileExpression(".spec.nodeName")
require.NoError(t, err)

result, err := RunExpression(expr, obj)
require.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, `"node-1"`, string(result.FilterResult.([]byte)))
assert.NotEmpty(t, result.Metadata.Checksum)
assert.Equal(t, ".spec.nodeName", result.Metadata.JqFilter)
})

t.Run("with jq expression - complex filter", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"spec": map[string]interface{}{
"containers": []interface{}{
map[string]interface{}{"name": "nginx", "image": "nginx:1.19"},
map[string]interface{}{"name": "sidecar", "image": "sidecar:latest"},
},
},
})

expr, err := jq.CompileExpression(".spec.containers[].name")
require.NoError(t, err)

result, err := RunExpression(expr, obj)
require.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, `["nginx","sidecar"]`, string(result.FilterResult.([]byte)))
})

t.Run("with jq expression - filter returns null", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"spec": map[string]interface{}{},
})

expr, err := jq.CompileExpression(".spec.nonexistent")
require.NoError(t, err)

result, err := RunExpression(expr, obj)
require.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, "null", string(result.FilterResult.([]byte)))
})

t.Run("with jq expression - error", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"spec": "not-an-object",
})

expr, err := jq.CompileExpression(".spec.field")
require.NoError(t, err)

result, err := RunExpression(expr, obj)
assert.Error(t, err)
assert.Nil(t, result)
})
}

// Test RunPlain function - no filter
func TestRunPlain(t *testing.T) {
t.Run("no filter - full object checksum", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": map[string]interface{}{
"name": "test-cm",
"namespace": "kube-system",
},
"data": map[string]interface{}{
"key": "value",
},
})

result, err := RunPlain(obj)
require.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, obj, result.Object)
assert.NotEmpty(t, result.Metadata.Checksum)
assert.Nil(t, result.FilterResult)
assert.Equal(t, "kube-system/ConfigMap/test-cm", result.Metadata.ResourceId)
})

t.Run("no filter - empty object", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{})

result, err := RunPlain(obj)
require.NoError(t, err)
assert.NotNil(t, result)
assert.NotEmpty(t, result.Metadata.Checksum)
})
}

// Test edge cases and resource ID generation
func TestResourceIdGeneration(t *testing.T) {
t.Run("standard kubernetes object", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]interface{}{
"name": "my-pod",
"namespace": "default",
},
})

result, err := RunPlain(obj)
require.NoError(t, err)
assert.Equal(t, "default/Pod/my-pod", result.Metadata.ResourceId)
})

t.Run("cluster-scoped resource", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"apiVersion": "v1",
"kind": "Node",
"metadata": map[string]interface{}{
"name": "node-1",
},
})

result, err := RunPlain(obj)
require.NoError(t, err)
assert.Equal(t, "/Node/node-1", result.Metadata.ResourceId)
})

t.Run("missing metadata", func(t *testing.T) {
obj := newTestObject(map[string]interface{}{
"apiVersion": "v1",
"kind": "Unknown",
})

result, err := RunPlain(obj)
require.NoError(t, err)
assert.Equal(t, "/Unknown/", result.Metadata.ResourceId)
})
}
Loading
Loading