diff --git a/internal/k8sinventory/go.mod b/internal/k8sinventory/go.mod index 62f87bcd85610..04d82c6c4a8f9 100644 --- a/internal/k8sinventory/go.mod +++ b/internal/k8sinventory/go.mod @@ -1,6 +1,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory -go 1.24.4 +go 1.24.0 require ( github.com/stretchr/testify v1.11.1 diff --git a/receiver/k8sobjectsreceiver/config.go b/receiver/k8sobjectsreceiver/config.go index 808558fd48396..3ba7ff18d0691 100644 --- a/receiver/k8sobjectsreceiver/config.go +++ b/receiver/k8sobjectsreceiver/config.go @@ -17,22 +17,18 @@ import ( "k8s.io/client-go/dynamic" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory" ) -type mode string - const ( - PullMode mode = "pull" - WatchMode mode = "watch" - - defaultPullInterval time.Duration = time.Hour - defaultMode mode = PullMode - defaultResourceVersion = "1" + defaultPullInterval time.Duration = time.Hour + defaultMode k8sinventory.Mode = k8sinventory.PullMode + defaultResourceVersion = "1" ) -var modeMap = map[mode]bool{ - PullMode: true, - WatchMode: true, +var modeMap = map[k8sinventory.Mode]bool{ + k8sinventory.PullMode: true, + k8sinventory.WatchMode: true, } type ErrorMode string @@ -47,7 +43,7 @@ type K8sObjectsConfig struct { Name string `mapstructure:"name"` Group string `mapstructure:"group"` Namespaces []string `mapstructure:"namespaces"` - Mode mode `mapstructure:"mode"` + Mode k8sinventory.Mode `mapstructure:"mode"` LabelSelector string `mapstructure:"label_selector"` FieldSelector string `mapstructure:"field_selector"` Interval time.Duration `mapstructure:"interval"` @@ -85,15 +81,15 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid mode: %v", object.Mode) } - if object.Mode == PullMode && object.Interval == 0 { + if object.Mode == k8sinventory.PullMode && object.Interval == 0 { object.Interval = defaultPullInterval } - if object.Mode == PullMode && len(object.ExcludeWatchType) != 0 { + if object.Mode == k8sinventory.PullMode && len(object.ExcludeWatchType) != 0 { return errors.New("the Exclude config can only be used with watch mode") } - if object.Mode == PullMode && c.IncludeInitialState { + if object.Mode == k8sinventory.PullMode && c.IncludeInitialState { return errors.New("include_initial_state can only be used with watch mode") } } diff --git a/receiver/k8sobjectsreceiver/config_test.go b/receiver/k8sobjectsreceiver/config_test.go index d72e4c1efcfe7..3cddabe3d1d28 100644 --- a/receiver/k8sobjectsreceiver/config_test.go +++ b/receiver/k8sobjectsreceiver/config_test.go @@ -16,6 +16,7 @@ import ( apiWatch "k8s.io/apimachinery/pkg/watch" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata" ) @@ -35,14 +36,14 @@ func TestLoadConfig(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, Interval: time.Hour, FieldSelector: "status.phase=Running", LabelSelector: "environment in (production),tier in (frontend)", }, { Name: "events", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}, Group: "events.k8s.io", ExcludeWatchType: []apiWatch.EventType{ @@ -61,13 +62,13 @@ func TestLoadConfig(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, ResourceVersion: "1", Interval: time.Hour, }, { Name: "events", - Mode: PullMode, + Mode: k8sinventory.PullMode, Interval: time.Hour, }, }, @@ -82,14 +83,14 @@ func TestLoadConfig(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "events", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}, Group: "events.k8s.io", ResourceVersion: "", }, { Name: "events", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}, Group: "events.k8s.io", ResourceVersion: "2", @@ -150,7 +151,7 @@ func TestValidate(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, ExcludeWatchType: []apiWatch.EventType{ apiWatch.Deleted, }, @@ -177,7 +178,7 @@ func TestValidate(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, }, }, }, @@ -209,7 +210,7 @@ func TestDeepCopy(t *testing.T) { Name: "pods", Group: "group", Namespaces: []string{"default"}, - Mode: PullMode, + Mode: k8sinventory.PullMode, FieldSelector: "status.phase=Running", LabelSelector: "environment in (production),tier in (frontend)", Interval: time.Hour, @@ -234,7 +235,7 @@ func TestDeepCopy(t *testing.T) { actual.Name = "changed" actual.Group = "changed" actual.Namespaces[0] = "changed" - actual.Mode = WatchMode + actual.Mode = k8sinventory.WatchMode actual.FieldSelector = "changed" actual.LabelSelector = "changed" actual.Interval = time.Minute @@ -273,7 +274,7 @@ func TestConfigValidationIncludeInitialState(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, }, }, }, @@ -286,7 +287,7 @@ func TestConfigValidationIncludeInitialState(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, }, }, }, @@ -299,7 +300,7 @@ func TestConfigValidationIncludeInitialState(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, }, }, }, @@ -312,7 +313,7 @@ func TestConfigValidationIncludeInitialState(t *testing.T) { Objects: []*K8sObjectsConfig{ { Name: "pods", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, }, }, }, diff --git a/receiver/k8sobjectsreceiver/go.mod b/receiver/k8sobjectsreceiver/go.mod index a0fddfeeb238d..6daff51e45b7e 100644 --- a/receiver/k8sobjectsreceiver/go.mod +++ b/receiver/k8sobjectsreceiver/go.mod @@ -6,6 +6,7 @@ require ( github.com/google/uuid v1.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.140.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.140.1 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory v0.0.0-20251127074440-9df7f9e0b2c4 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.140.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.140.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xk8stest v0.140.1 @@ -152,6 +153,8 @@ retract ( v0.65.0 ) +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory => ../../internal/k8sinventory + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xk8stest => ../../pkg/xk8stest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index 5aa337776d1b1..bd09a33d92d2d 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "net/http" "sync" "time" @@ -16,15 +15,14 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" apiWatch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/watch" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory" + pullobserver "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory/pull" + watchobserver "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory/watch" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata" ) @@ -38,6 +36,7 @@ type k8sobjectsreceiver struct { obsrecv *receiverhelper.ObsReport mu sync.Mutex cancel context.CancelFunc + observerFunc func(ctx context.Context, object *K8sObjectsConfig) (k8sinventory.Observer, error) wg sync.WaitGroup } @@ -61,19 +60,81 @@ func newReceiver(params receiver.Settings, config *Config, consumer consumer.Log objects[i].exclude[item] = true } // Set default interval if in PullMode and interval is 0 - if objects[i].Mode == PullMode && objects[i].Interval == 0 { + if objects[i].Mode == k8sinventory.PullMode && objects[i].Interval == 0 { objects[i].Interval = defaultPullInterval } } - return &k8sobjectsreceiver{ + kr := &k8sobjectsreceiver{ setting: params, config: config, objects: objects, consumer: consumer, obsrecv: obsrecv, mu: sync.Mutex{}, - }, nil + } + + kr.observerFunc = getObserverFunc(kr) + + return kr, nil +} + +func getObserverFunc(kr *k8sobjectsreceiver) func(ctx context.Context, object *K8sObjectsConfig) (k8sinventory.Observer, error) { + return func(ctx context.Context, object *K8sObjectsConfig) (k8sinventory.Observer, error) { + obsConf := k8sinventory.Config{ + Gvr: *object.gvr, + Namespaces: object.Namespaces, + LabelSelector: object.LabelSelector, + FieldSelector: object.FieldSelector, + ResourceVersion: object.ResourceVersion, + } + + switch object.Mode { + case k8sinventory.PullMode: + return pullobserver.New( + kr.client, + pullobserver.Config{ + Config: obsConf, + Interval: object.Interval, + }, + kr.setting.Logger, + func(objects *unstructured.UnstructuredList) { + logs := pullObjectsToLogData(objects, time.Now(), object, kr.setting.BuildInfo.Version) + obsCtx := kr.obsrecv.StartLogsOp(ctx) + logRecordCount := logs.LogRecordCount() + err := kr.consumer.ConsumeLogs(obsCtx, logs) + kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err) + }, + ) + case k8sinventory.WatchMode: + return watchobserver.New( + kr.client, + watchobserver.Config{ + Config: k8sinventory.Config{ + Gvr: *object.gvr, + Namespaces: object.Namespaces, + LabelSelector: object.LabelSelector, + FieldSelector: object.FieldSelector, + ResourceVersion: object.ResourceVersion, + }, + IncludeInitialState: kr.config.IncludeInitialState, + Exclude: object.exclude, + }, + kr.setting.Logger, + func(data *apiWatch.Event) { + logs, err := watchObjectsToLogData(data, time.Now(), object, kr.setting.BuildInfo.Version) + if err != nil { + kr.setting.Logger.Error("error converting objects to log data", zap.Error(err)) + } else { + obsCtx := kr.obsrecv.StartLogsOp(ctx) + err := kr.consumer.ConsumeLogs(obsCtx, logs) + kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), 1, err) + } + }, + ) + } + return nil, fmt.Errorf("invalid observer mode: %s", object.Mode) + } } func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) error { @@ -138,7 +199,9 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er cctx, cancel := context.WithCancel(ctx) kr.cancel = cancel for _, object := range validConfigs { - kr.start(cctx, object) + if err := kr.start(cctx, object); err != nil { + kr.setting.Logger.Error("Could not start receiver for object type", zap.String("object", object.Name)) + } } kr.setting.Logger.Info("Object Receiver started as leader") }, @@ -152,7 +215,9 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er cctx, cancel := context.WithCancel(ctx) kr.cancel = cancel for _, object := range validConfigs { - kr.start(cctx, object) + if err := kr.start(cctx, object); err != nil { + return err + } } } @@ -193,280 +258,16 @@ func (kr *k8sobjectsreceiver) stopWatches() { kr.wg.Wait() } -func (kr *k8sobjectsreceiver) start(ctx context.Context, object *K8sObjectsConfig) { - resource := kr.client.Resource(*object.gvr) - kr.setting.Logger.Info("Started collecting", - zap.Any("gvr", object.gvr), - zap.Any("mode", object.Mode), - zap.Any("namespaces", object.Namespaces)) - - switch object.Mode { - case PullMode: - if len(object.Namespaces) == 0 { - go kr.startPull(ctx, object, resource) - } else { - for _, ns := range object.Namespaces { - go kr.startPull(ctx, object, resource.Namespace(ns)) - } - } - - case WatchMode: - if len(object.Namespaces) == 0 { - go kr.startWatch(ctx, object, resource) - } else { - for _, ns := range object.Namespaces { - go kr.startWatch(ctx, object, resource.Namespace(ns)) - } - } - } -} - -func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) { - stopperChan := make(chan struct{}) - kr.mu.Lock() - kr.stopperChanList = append(kr.stopperChanList, stopperChan) - kr.wg.Add(1) - kr.mu.Unlock() - defer kr.wg.Done() - ticker := newTicker(ctx, config.Interval) - listOption := metav1.ListOptions{ - FieldSelector: config.FieldSelector, - LabelSelector: config.LabelSelector, - } - - if config.ResourceVersion != "" { - listOption.ResourceVersion = config.ResourceVersion - listOption.ResourceVersionMatch = metav1.ResourceVersionMatchExact - } - - defer ticker.Stop() - for { - select { - case <-ticker.C: - objects, err := resource.List(ctx, listOption) - if err != nil { - kr.setting.Logger.Error("error in pulling object", - zap.String("resource", config.gvr.String()), - zap.Error(err)) - continue - } - if len(objects.Items) == 0 { - continue - } - logs := pullObjectsToLogData(objects, time.Now(), config, kr.setting.BuildInfo.Version) - obsCtx := kr.obsrecv.StartLogsOp(ctx) - logRecordCount := logs.LogRecordCount() - err = kr.consumer.ConsumeLogs(obsCtx, logs) - kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err) - - case <-stopperChan: - return - case <-ctx.Done(): - return - } - } -} - -func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) { - stopperChan := make(chan struct{}) - kr.mu.Lock() - kr.stopperChanList = append(kr.stopperChanList, stopperChan) - kr.wg.Add(1) - kr.mu.Unlock() - defer kr.wg.Done() - - if kr.config.IncludeInitialState { - kr.sendInitialState(ctx, config, resource) - } - - watchFunc := cache.WatchFuncWithContext(func(ctx context.Context, options metav1.ListOptions) (apiWatch.Interface, error) { - options.FieldSelector = config.FieldSelector - options.LabelSelector = config.LabelSelector - return resource.Watch(ctx, options) - }) - - cancelCtx, cancel := context.WithCancel(ctx) - defer cancel() - cfgCopy := *config - wait.UntilWithContext(cancelCtx, func(newCtx context.Context) { - resourceVersion, err := getResourceVersion(newCtx, &cfgCopy, resource) - if err != nil { - kr.setting.Logger.Error("could not retrieve a resourceVersion", - zap.String("resource", cfgCopy.gvr.String()), - zap.Error(err)) - cancel() - return - } - - done := kr.doWatch(newCtx, &cfgCopy, resourceVersion, watchFunc, stopperChan) - if done { - cancel() - return - } - - // need to restart with a fresh resource version - cfgCopy.ResourceVersion = "" - }, 0) -} - -// sendInitialState sends the current state of objects as synthetic Added events -func (kr *k8sobjectsreceiver) sendInitialState(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) { - kr.setting.Logger.Info("sending initial state", - zap.String("resource", config.gvr.String()), - zap.Strings("namespaces", config.Namespaces)) - - listOption := metav1.ListOptions{ - FieldSelector: config.FieldSelector, - LabelSelector: config.LabelSelector, - } - - objects, err := resource.List(ctx, listOption) +func (kr *k8sobjectsreceiver) start(ctx context.Context, object *K8sObjectsConfig) error { + obs, err := kr.observerFunc(ctx, object) if err != nil { - kr.setting.Logger.Error("error in listing objects for initial state", - zap.String("resource", config.gvr.String()), - zap.Error(err)) - return - } - - if len(objects.Items) == 0 { - kr.setting.Logger.Debug("no objects found for initial state", - zap.String("resource", config.gvr.String())) - return - } - - // Convert each object to a synthetic Added event for consistency with watch mode - for _, obj := range objects.Items { - event := &apiWatch.Event{ - Type: apiWatch.Added, - Object: &obj, - } - - logs, err := watchObjectsToLogData(event, time.Now(), config, kr.setting.BuildInfo.Version) - if err != nil { - kr.setting.Logger.Error("error converting initial state object to log data", - zap.String("resource", config.gvr.String()), - zap.Error(err)) - continue - } - - obsCtx := kr.obsrecv.StartLogsOp(ctx) - logRecordCount := logs.LogRecordCount() - err = kr.consumer.ConsumeLogs(obsCtx, logs) - kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err) - } - - kr.setting.Logger.Info("initial state sent", - zap.String("resource", config.gvr.String()), - zap.Int("object_count", len(objects.Items))) -} - -// doWatch returns true when watching is done, false when watching should be restarted. -func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsConfig, resourceVersion string, watchFunc cache.WatchFuncWithContext, stopperChan chan struct{}) bool { - watcher, err := watch.NewRetryWatcherWithContext(ctx, resourceVersion, &cache.ListWatch{WatchFuncWithContext: watchFunc}) - if err != nil { - kr.setting.Logger.Error("error in watching object", - zap.String("resource", config.gvr.String()), - zap.Error(err)) - return true - } - - defer watcher.Stop() - res := watcher.ResultChan() - for { - select { - case <-ctx.Done(): - kr.setting.Logger.Info("context canceled, stopping watch", - zap.String("resource", config.gvr.String())) - return true - case data, ok := <-res: - if data.Type == apiWatch.Error { - errObject := apierrors.FromObject(data.Object) - //nolint:errorlint - if errObject.(*apierrors.StatusError).ErrStatus.Code == http.StatusGone { - kr.setting.Logger.Info("received a 410, grabbing new resource version", - zap.Any("data", data)) - // we received a 410 so we need to restart - return false - } - } - - if !ok { - kr.setting.Logger.Warn("Watch channel closed unexpectedly", - zap.String("resource", config.gvr.String())) - return true - } - - if config.exclude[data.Type] { - kr.setting.Logger.Debug("dropping excluded data", - zap.String("type", string(data.Type))) - continue - } - - logs, err := watchObjectsToLogData(&data, time.Now(), config, kr.setting.BuildInfo.Version) - if err != nil { - kr.setting.Logger.Error("error converting objects to log data", zap.Error(err)) - } else { - obsCtx := kr.obsrecv.StartLogsOp(ctx) - cnt := logs.LogRecordCount() - err := kr.consumer.ConsumeLogs(obsCtx, logs) - kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), cnt, err) - } - case <-stopperChan: - watcher.Stop() - return true - } - } -} - -func getResourceVersion(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) { - resourceVersion := config.ResourceVersion - if resourceVersion == "" || resourceVersion == "0" { - // Proper use of the Kubernetes API Watch capability when no resourceVersion is supplied is to do a list first - // to get the initial state and a useable resourceVersion. - // See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes for details. - objects, err := resource.List(ctx, metav1.ListOptions{ - FieldSelector: config.FieldSelector, - LabelSelector: config.LabelSelector, - }) - if err != nil { - return "", fmt.Errorf("could not perform initial list for watch on %v, %w", config.gvr.String(), err) - } - if objects == nil { - return "", errors.New("nil objects returned, this is an error in the k8sobjectsreceiver") - } - - resourceVersion = objects.GetResourceVersion() - - // If we still don't have a resourceVersion we can try 1 as a last ditch effort. - // This also helps our unit tests since the fake client can't handle returning resource versions - // as part of a list of objects. - if resourceVersion == "" || resourceVersion == "0" { - resourceVersion = defaultResourceVersion - } + return err } - return resourceVersion, nil -} -// Start ticking immediately. -// Ref: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately -func newTicker(ctx context.Context, repeat time.Duration) *time.Ticker { - ticker := time.NewTicker(repeat) - oc := ticker.C - nc := make(chan time.Time, 1) - go func() { - nc <- time.Now() - for { - select { - case tm := <-oc: - nc <- tm - case <-ctx.Done(): - return - } - } - }() + stopChan := obs.Start(ctx, &kr.wg) + kr.stopperChanList = append(kr.stopperChanList, stopChan) - ticker.C = nc - return ticker + return nil } // handleError handles errors according to the configured error mode diff --git a/receiver/k8sobjectsreceiver/receiver_test.go b/receiver/k8sobjectsreceiver/receiver_test.go index 04b4aaa011262..d39283e43277e 100644 --- a/receiver/k8sobjectsreceiver/receiver_test.go +++ b/receiver/k8sobjectsreceiver/receiver_test.go @@ -17,6 +17,7 @@ import ( apiWatch "k8s.io/apimachinery/pkg/watch" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata" ) @@ -76,7 +77,7 @@ func TestErrorModes(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: tt.objectName, - Mode: PullMode, + Mode: k8sinventory.PullMode, }, } @@ -117,7 +118,7 @@ func TestNewReceiver(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, }, } @@ -156,7 +157,7 @@ func TestPullObject(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, Interval: time.Second * 30, LabelSelector: "environment=production", }, @@ -194,7 +195,7 @@ func TestWatchObject(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}, }, } @@ -295,7 +296,7 @@ func TestIncludeInitialState(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}, }, } @@ -372,7 +373,7 @@ func TestIncludeInitialStateWithPullMode(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, }, } @@ -405,7 +406,7 @@ func TestExcludeDeletedTrue(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: WatchMode, + Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}, ExcludeWatchType: []apiWatch.EventType{ apiWatch.Deleted, @@ -456,7 +457,7 @@ func TestReceiverWithLeaderElection(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, }, } rCfg.K8sLeaderElector = &leaderElectorID @@ -535,7 +536,7 @@ func TestWatchWithLeaderElectionStandby(t *testing.T) { rCfg.ErrorMode = PropagateError rCfg.IncludeInitialState = false rCfg.Objects = []*K8sObjectsConfig{ - {Name: "pods", Mode: WatchMode, Namespaces: []string{"default"}}, + {Name: "pods", Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}}, } rCfg.K8sLeaderElector = &leaderElectorID @@ -605,7 +606,7 @@ func TestPullWithLeaderElectionStandby(t *testing.T) { rCfg.Objects = []*K8sObjectsConfig{ { Name: "pods", - Mode: PullMode, + Mode: k8sinventory.PullMode, Interval: 10 * time.Millisecond, // fast pull to make the test snappy }, } @@ -663,7 +664,7 @@ func TestWatchLeaderFlapDuringStartup_NoPanic(t *testing.T) { cfg.ErrorMode = PropagateError cfg.IncludeInitialState = false cfg.Objects = []*K8sObjectsConfig{ - {Name: "pods", Mode: WatchMode, Namespaces: []string{"default"}}, + {Name: "pods", Mode: k8sinventory.WatchMode, Namespaces: []string{"default"}}, } cfg.K8sLeaderElector = &leaderElectorID @@ -721,7 +722,7 @@ func TestPullLeaderFlapDuringStartup_NoPanic(t *testing.T) { cfg.makeDiscoveryClient = getMockDiscoveryClient cfg.ErrorMode = PropagateError cfg.Objects = []*K8sObjectsConfig{ - {Name: "pods", Mode: PullMode, Interval: 5 * time.Millisecond}, + {Name: "pods", Mode: k8sinventory.PullMode, Interval: 5 * time.Millisecond}, } cfg.K8sLeaderElector = &leaderElectorID