From 7eb19e0b88437aef2b520f5d6549e1c3bd9a9e9a Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Tue, 2 Dec 2025 11:15:10 +0800 Subject: [PATCH] Add log tracing support for contextual logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds comprehensive log tracing support across the cloudevents subsystem and base controller. Key changes include: - Add logging package with tracing annotation utilities - Transfer log tracing annotations between CloudEvents and ManifestWork objects - Enhance base controller with improved logging context - Add comprehensive test coverage for log tracing functionality - Update base controller default queue key to be more descriptive The log tracing feature enables better observability by propagating contextual logging metadata through the CloudEvents flow. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Signed-off-by: Jian Qiu --- pkg/basecontroller/factory/base_controller.go | 8 +- .../factory/base_controller_test.go | 363 +++++++++++ pkg/basecontroller/factory/factory.go | 2 +- .../work/agent/codec/manifestbundle.go | 11 + .../work/agent/codec/manifestbundle_test.go | 267 +++++++++ .../work/source/client/manifestwork.go | 10 + .../work/source/codec/manifestbundle.go | 13 +- .../work/source/codec/manifestbundle_test.go | 170 ++++++ pkg/cloudevents/generic/clients/baseclient.go | 14 +- pkg/logging/logging.go | 160 +++++ pkg/logging/logging_test.go | 566 ++++++++++++++++++ 11 files changed, 1572 insertions(+), 12 deletions(-) create mode 100644 pkg/basecontroller/factory/base_controller_test.go create mode 100644 pkg/logging/logging.go create mode 100644 pkg/logging/logging_test.go diff --git a/pkg/basecontroller/factory/base_controller.go b/pkg/basecontroller/factory/base_controller.go index a6a2ed77..2b805b24 100644 --- a/pkg/basecontroller/factory/base_controller.go +++ b/pkg/basecontroller/factory/base_controller.go @@ -80,7 +80,7 @@ func (c *baseController) Run(ctx context.Context, workers int) { queueContext, queueContextCancel := context.WithCancel(ctx) for i := 1; i <= workers; i++ { - logger.Info("Starting worker of controller ...", "numberOfWorkers", i) + logger.Info("Starting worker of controller ...", "worker-ID", i) workerWg.Add(1) go func() { defer func() { @@ -157,10 +157,10 @@ func (c *baseController) processNextWorkItem(queueCtx context.Context) { queueKey := key if err := c.sync(queueCtx, syncCtx, queueKey); err != nil { - if logger.V(4).Enabled() || key != "key" { - utilruntime.HandleErrorWithContext(queueCtx, err, "controller failed to sync", "key", key, "error", err) + if logger.V(4).Enabled() || key != DefaultQueueKey { + utilruntime.HandleErrorWithContext(queueCtx, err, "controller failed to sync", "key", key) } else { - utilruntime.HandleErrorWithContext(queueCtx, err, "reconciliation failed", "error", err) + utilruntime.HandleErrorWithContext(queueCtx, err, "reconciliation failed") } c.syncContext.Queue().AddRateLimited(key) return diff --git a/pkg/basecontroller/factory/base_controller_test.go b/pkg/basecontroller/factory/base_controller_test.go new file mode 100644 index 00000000..4ba2b94c --- /dev/null +++ b/pkg/basecontroller/factory/base_controller_test.go @@ -0,0 +1,363 @@ +package factory + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// mockInformer is a mock implementation of Informer for testing +type mockInformer struct { + handlers []cache.ResourceEventHandler + synced bool +} + +func (m *mockInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + m.handlers = append(m.handlers, handler) + return nil, nil +} + +func (m *mockInformer) HasSynced() bool { + return m.synced +} + +// TestBaseControllerRun tests the basic Run functionality of baseController +func TestBaseControllerRun(t *testing.T) { + tests := []struct { + name string + workers int + cacheSynced bool + expectSync bool + resyncInterval time.Duration + addToQueue bool + }{ + { + name: "controller runs with synced caches", + workers: 1, + cacheSynced: true, + expectSync: true, + addToQueue: true, + }, + { + name: "controller runs with multiple workers", + workers: 3, + cacheSynced: true, + expectSync: true, + addToQueue: true, + }, + { + name: "controller with periodic resync", + workers: 1, + cacheSynced: true, + expectSync: true, + resyncInterval: 100 * time.Millisecond, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := klog.NewContext(context.Background(), klog.Background()) + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + syncCalled := &sync.WaitGroup{} + if tt.expectSync { + syncCalled.Add(1) + } + + syncFunc := func(ctx context.Context, syncCtx SyncContext, key string) error { + syncCalled.Done() + return nil + } + + mockInf := &mockInformer{synced: tt.cacheSynced} + controller := &baseController{ + name: "test-controller", + sync: syncFunc, + syncContext: NewSyncContext("test"), + resyncEvery: tt.resyncInterval, + cachesToSync: []cache.InformerSynced{mockInf.HasSynced}, + cacheSyncTimeout: 1 * time.Second, + } + + // Add item to queue if needed + if tt.addToQueue { + controller.syncContext.Queue().Add("test-key") + } + + // Run controller in background + go controller.Run(ctx, tt.workers) + + // Wait for sync to be called or timeout + done := make(chan struct{}) + go func() { + syncCalled.Wait() + close(done) + }() + + select { + case <-done: + // Success - sync was called + case <-time.After(1500 * time.Millisecond): + if tt.expectSync { + t.Error("timeout waiting for sync to be called") + } + } + }) + } +} + +// TestBaseControllerProcessNextWorkItem tests the processNextWorkItem function +func TestBaseControllerProcessNextWorkItem(t *testing.T) { + tests := []struct { + name string + queueKey string + syncError error + expectRequeue bool + }{ + { + name: "successful sync with default queue key", + queueKey: DefaultQueueKey, + syncError: nil, + expectRequeue: false, + }, + { + name: "successful sync with custom queue key", + queueKey: "custom-key", + syncError: nil, + expectRequeue: false, + }, + { + name: "failed sync with default queue key", + queueKey: DefaultQueueKey, + syncError: errors.New("sync failed"), + expectRequeue: true, + }, + { + name: "failed sync with custom queue key", + queueKey: "namespace/name", + syncError: errors.New("sync failed"), + expectRequeue: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := klog.NewContext(context.Background(), klog.Background()) + + syncCalled := false + syncFunc := func(ctx context.Context, syncCtx SyncContext, key string) error { + syncCalled = true + if key != tt.queueKey { + t.Errorf("expected key %s, got %s", tt.queueKey, key) + } + return tt.syncError + } + + controller := &baseController{ + name: "test-controller", + sync: syncFunc, + syncContext: NewSyncContext("test"), + } + + // Add item to queue + controller.syncContext.Queue().Add(tt.queueKey) + + // Process the item + controller.processNextWorkItem(ctx) + + if !syncCalled { + t.Error("sync function was not called") + } + + // For rate limited items, we need to wait a bit + if tt.expectRequeue { + time.Sleep(10 * time.Millisecond) + } + + // Check if item was requeued on error + queueLen := controller.syncContext.Queue().Len() + if tt.expectRequeue && queueLen == 0 { + t.Error("expected item to be requeued but queue is empty") + } + if !tt.expectRequeue && queueLen > 0 { + t.Errorf("expected queue to be empty but has %d items", queueLen) + } + + // Cleanup + controller.syncContext.Queue().ShutDown() + }) + } +} + +// TestBaseControllerDefaultQueueKey tests the error logging behavior with DefaultQueueKey +func TestBaseControllerDefaultQueueKey(t *testing.T) { + ctx := klog.NewContext(context.Background(), klog.Background()) + + syncError := errors.New("test error") + syncFunc := func(ctx context.Context, syncCtx SyncContext, key string) error { + return syncError + } + + controller := &baseController{ + name: "test-controller", + sync: syncFunc, + syncContext: NewSyncContext("test"), + } + + // Test with DefaultQueueKey + controller.syncContext.Queue().Add(DefaultQueueKey) + controller.processNextWorkItem(ctx) + + // Wait for rate limited item + time.Sleep(10 * time.Millisecond) + + // Verify item was requeued + if controller.syncContext.Queue().Len() == 0 { + t.Error("expected item to be requeued") + } + + // Test with custom key + controller.syncContext.Queue().Add("custom/key") + controller.processNextWorkItem(ctx) + + // Wait for rate limited item + time.Sleep(10 * time.Millisecond) + + // Verify item was requeued + if controller.syncContext.Queue().Len() == 0 { + t.Error("expected item to be requeued") + } + + // Cleanup + controller.syncContext.Queue().ShutDown() +} + +// TestBaseControllerName tests the Name() method +func TestBaseControllerName(t *testing.T) { + controller := &baseController{ + name: "test-controller-name", + } + + if controller.Name() != "test-controller-name" { + t.Errorf("expected name 'test-controller-name', got '%s'", controller.Name()) + } +} + +// TestBaseControllerSyncContext tests the SyncContext() method +func TestBaseControllerSyncContext(t *testing.T) { + syncCtx := NewSyncContext("test") + controller := &baseController{ + syncContext: syncCtx, + } + + if controller.SyncContext() != syncCtx { + t.Error("SyncContext() returned different context than expected") + } +} + +// TestBaseControllerSync tests the Sync() method +func TestBaseControllerSync(t *testing.T) { + ctx := klog.NewContext(context.Background(), klog.Background()) + + syncCalled := false + expectedKey := "test-key" + syncFunc := func(ctx context.Context, syncCtx SyncContext, key string) error { + syncCalled = true + if key != expectedKey { + t.Errorf("expected key %s, got %s", expectedKey, key) + } + return nil + } + + syncCtx := NewSyncContext("test") + controller := &baseController{ + sync: syncFunc, + syncContext: syncCtx, + } + + err := controller.Sync(ctx, syncCtx, expectedKey) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !syncCalled { + t.Error("sync function was not called") + } +} + +// TestBaseControllerRunPeriodicalResync tests periodic resync functionality +func TestBaseControllerRunPeriodicalResync(t *testing.T) { + ctx := klog.NewContext(context.Background(), klog.Background()) + ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + + syncCtx := NewSyncContext("test") + controller := &baseController{ + syncContext: syncCtx, + resyncEvery: 50 * time.Millisecond, + } + + // Run periodic resync + go controller.runPeriodicalResync(ctx, controller.resyncEvery) + + // Wait for items to be added to queue + time.Sleep(400 * time.Millisecond) + + // Check that queue has items (should have at least 1-2 items added due to timing variations) + queueLen := controller.syncContext.Queue().Len() + if queueLen < 1 { + t.Errorf("expected at least 1 item in queue from periodic resync, got %d", queueLen) + } + + // Verify the items are DefaultQueueKey + for i := 0; i < queueLen; i++ { + item, _ := controller.syncContext.Queue().Get() + if item != DefaultQueueKey { + t.Errorf("expected DefaultQueueKey, got %v", item) + } + controller.syncContext.Queue().Done(item) + } + + // Cleanup + controller.syncContext.Queue().ShutDown() +} + +// TestDefaultQueueKeysFunc tests the DefaultQueueKeysFunc function +func TestDefaultQueueKeysFunc(t *testing.T) { + tests := []struct { + name string + obj runtime.Object + expect []string + }{ + { + name: "nil object", + obj: nil, + expect: []string{DefaultQueueKey}, + }, + { + name: "non-nil object", + obj: &runtime.Unknown{}, + expect: []string{DefaultQueueKey}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := DefaultQueueKeysFunc(tt.obj) + if len(result) != len(tt.expect) { + t.Errorf("expected %d keys, got %d", len(tt.expect), len(result)) + } + for i, key := range result { + if key != tt.expect[i] { + t.Errorf("expected key %s at index %d, got %s", tt.expect[i], i, key) + } + } + }) + } +} diff --git a/pkg/basecontroller/factory/factory.go b/pkg/basecontroller/factory/factory.go index 5a318476..7aad4717 100644 --- a/pkg/basecontroller/factory/factory.go +++ b/pkg/basecontroller/factory/factory.go @@ -10,7 +10,7 @@ import ( ) // DefaultQueueKey is the queue key used for string trigger based controllers. -const DefaultQueueKey = "key" +const DefaultQueueKey = "basecontroller-default-key" // DefaultQueueKeysFunc returns a slice with a single element - the DefaultQueueKey func DefaultQueueKeysFunc(_ runtime.Object) []string { diff --git a/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go b/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go index 62c2bdc2..d4df2fa3 100644 --- a/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go +++ b/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go @@ -6,6 +6,7 @@ import ( "github.com/bwmarrin/snowflake" cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" + "open-cluster-management.io/sdk-go/pkg/logging" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" @@ -68,6 +69,11 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT evt.SetExtension(types.ExtensionStatusHash, statusHash) + // Add log tracing extension + if err := logging.LogTracingFromObjectToEvent(work, &evt); err != nil { + return nil, err + } + // set the work's meta data to its cloud event metaJson, err := json.Marshal(work.ObjectMeta) if err != nil { @@ -148,6 +154,11 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo } metaObj.Labels[common.CloudEventsOriginalSourceLabelKey] = evt.Source() + // Add log tracing annotation + if err := logging.LogTracingFromEventToObject(evt, &metaObj); err != nil { + return nil, err + } + // Use the event's resource version as the current work's generation and resource version. // In the event case, the event's resource version should correspond to its spec change. // We can use the resource version to determine the spec of a work whether changed. diff --git a/pkg/cloudevents/clients/work/agent/codec/manifestbundle_test.go b/pkg/cloudevents/clients/work/agent/codec/manifestbundle_test.go index ecfbc7ae..ba865108 100644 --- a/pkg/cloudevents/clients/work/agent/codec/manifestbundle_test.go +++ b/pkg/cloudevents/clients/work/agent/codec/manifestbundle_test.go @@ -455,3 +455,270 @@ func toConfigMap(t *testing.T) []byte { return data } + +func TestManifestBundleAgentLogTracing(t *testing.T) { + codec := NewManifestBundleCodec() + + t.Run("encode preserves log tracing annotations to event", func(t *testing.T) { + work := &workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + UID: "test-uid", + ResourceVersion: "2", + Namespace: "cluster1", + Name: "test-work", + Generation: 2, + Labels: map[string]string{ + "cloudevents.open-cluster-management.io/originalsource": "source1", + }, + Annotations: map[string]string{ + "logging.open-cluster-management.io/agent-id": "agent-456", + "logging.open-cluster-management.io/cluster": "cluster1", + "non-tracing-annotation": "value", + }, + }, + Status: workv1.ManifestWorkStatus{ + Conditions: []metav1.Condition{ + { + Type: "Applied", + Status: metav1.ConditionTrue, + }, + }, + }, + } + + eventType := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceStatus, + Action: "test_update", + } + + evt, err := codec.Encode("test-agent", eventType, work) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that log tracing annotations are transferred to event extensions + extensions := evt.Extensions() + if logTracingExt, ok := extensions["logtracing"]; ok { + logTracingJSON, ok := logTracingExt.(string) + if !ok { + t.Errorf("logtracing extension is not a string") + } + + var tracingMap map[string]string + if err := json.Unmarshal([]byte(logTracingJSON), &tracingMap); err != nil { + t.Errorf("failed to unmarshal logtracing: %v", err) + } + + if tracingMap["logging.open-cluster-management.io/agent-id"] != "agent-456" { + t.Errorf("expected agent-id in logtracing, got %v", tracingMap) + } + if tracingMap["logging.open-cluster-management.io/cluster"] != "cluster1" { + t.Errorf("expected cluster in logtracing, got %v", tracingMap) + } + if _, ok := tracingMap["non-tracing-annotation"]; ok { + t.Errorf("non-tracing annotation should not be in logtracing") + } + } else { + t.Errorf("expected logtracing extension to be set") + } + }) + + t.Run("decode transfers log tracing from event to work annotations", func(t *testing.T) { + evt := cloudevents.NewEvent() + evt.SetSource("source1") + evt.SetType("io.open-cluster-management.works.v1alpha1.manifestbundles.spec.create_request") + evt.SetExtension("resourceid", "test-uid") + evt.SetExtension("resourceversion", "3") + evt.SetExtension("clustername", "cluster1") + + // Add log tracing extension + tracingMap := map[string]string{ + "logging.open-cluster-management.io/request-id": "req-789", + "logging.open-cluster-management.io/source-id": "src-123", + } + tracingJSON, _ := json.Marshal(tracingMap) + evt.SetExtension("logtracing", string(tracingJSON)) + + if err := evt.SetData(cloudevents.ApplicationJSON, &payload.ManifestBundle{ + Manifests: []workv1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Raw: toConfigMap(t), + }, + }, + }, + }); err != nil { + t.Fatalf("failed to set event data: %v", err) + } + + work, err := codec.Decode(&evt) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that log tracing was transferred to work annotations + if work.Annotations["logging.open-cluster-management.io/request-id"] != "req-789" { + t.Errorf("expected request-id annotation, got %v", work.Annotations) + } + if work.Annotations["logging.open-cluster-management.io/source-id"] != "src-123" { + t.Errorf("expected source-id annotation, got %v", work.Annotations) + } + }) + + t.Run("encode with no tracing annotations", func(t *testing.T) { + work := &workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + UID: "test-uid", + Namespace: "cluster1", + Generation: 1, + Labels: map[string]string{ + "cloudevents.open-cluster-management.io/originalsource": "source1", + }, + Annotations: map[string]string{ + "regular-annotation": "regular-value", + }, + }, + Status: workv1.ManifestWorkStatus{}, + } + + eventType := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceStatus, + Action: "test_update", + } + + evt, err := codec.Encode("test-agent", eventType, work) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that logtracing extension is either not set or is empty + if logTracingExt, ok := evt.Extensions()["logtracing"]; ok { + logTracingJSON, ok := logTracingExt.(string) + if !ok { + t.Errorf("logtracing extension is not a string") + } + var tracingMap map[string]string + if err := json.Unmarshal([]byte(logTracingJSON), &tracingMap); err != nil { + t.Errorf("failed to unmarshal logtracing: %v", err) + } + if len(tracingMap) != 0 { + t.Errorf("expected empty logtracing map, got %v", tracingMap) + } + } + }) + + t.Run("decode with no tracing extension", func(t *testing.T) { + evt := cloudevents.NewEvent() + evt.SetSource("source1") + evt.SetType("io.open-cluster-management.works.v1alpha1.manifestbundles.spec.create_request") + evt.SetExtension("resourceid", "test-uid") + evt.SetExtension("resourceversion", "1") + evt.SetExtension("clustername", "cluster1") + + if err := evt.SetData(cloudevents.ApplicationJSON, &payload.ManifestBundle{ + Manifests: []workv1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Raw: toConfigMap(t), + }, + }, + }, + }); err != nil { + t.Fatalf("failed to set event data: %v", err) + } + + work, err := codec.Decode(&evt) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that no tracing annotations are present (except system annotations) + foundTracingAnnotation := false + for key := range work.Annotations { + if key != "cloudevents.open-cluster-management.io/datatype" { + foundTracingAnnotation = true + break + } + } + if foundTracingAnnotation { + t.Errorf("unexpected tracing annotations found: %v", work.Annotations) + } + }) + + t.Run("round trip encode and decode preserves log tracing", func(t *testing.T) { + // First create a work via decode (simulating receiving from source) + evt1 := cloudevents.NewEvent() + evt1.SetSource("source1") + evt1.SetType("io.open-cluster-management.works.v1alpha1.manifestbundles.spec.create_request") + evt1.SetExtension("resourceid", "test-uid") + evt1.SetExtension("resourceversion", "5") + evt1.SetExtension("clustername", "cluster1") + + tracingMap1 := map[string]string{ + "logging.open-cluster-management.io/original-request-id": "orig-123", + } + tracingJSON1, _ := json.Marshal(tracingMap1) + evt1.SetExtension("logtracing", string(tracingJSON1)) + + if err := evt1.SetData(cloudevents.ApplicationJSON, &payload.ManifestBundle{ + Manifests: []workv1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Raw: toConfigMap(t), + }, + }, + }, + }); err != nil { + t.Fatalf("failed to set event data: %v", err) + } + + work, err := codec.Decode(&evt1) + if err != nil { + t.Fatalf("decode error: %v", err) + } + + // Verify the tracing was transferred + if work.Annotations["logging.open-cluster-management.io/original-request-id"] != "orig-123" { + t.Errorf("expected original-request-id annotation, got %v", work.Annotations) + } + + // Now encode the work back (simulating status update) + work.Labels = map[string]string{ + "cloudevents.open-cluster-management.io/originalsource": "source1", + } + work.Status = workv1.ManifestWorkStatus{ + Conditions: []metav1.Condition{ + { + Type: "Applied", + Status: metav1.ConditionTrue, + }, + }, + } + + eventType := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceStatus, + Action: "test_update", + } + + evt2, err := codec.Encode("test-agent", eventType, work) + if err != nil { + t.Fatalf("encode error: %v", err) + } + + // Verify the tracing was preserved in the encoded event + if logTracingExt, ok := evt2.Extensions()["logtracing"]; ok { + var tracingMap2 map[string]string + if err := json.Unmarshal([]byte(logTracingExt.(string)), &tracingMap2); err != nil { + t.Errorf("failed to unmarshal logtracing: %v", err) + } + if tracingMap2["logging.open-cluster-management.io/original-request-id"] != "orig-123" { + t.Errorf("expected original-request-id to be preserved, got %v", tracingMap2) + } + } else { + t.Errorf("expected logtracing extension in status update event") + } + }) +} diff --git a/pkg/cloudevents/clients/work/source/client/manifestwork.go b/pkg/cloudevents/clients/work/source/client/manifestwork.go index b3f7787e..2ef3b89c 100644 --- a/pkg/cloudevents/clients/work/source/client/manifestwork.go +++ b/pkg/cloudevents/clients/work/source/client/manifestwork.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "open-cluster-management.io/sdk-go/pkg/logging" "strconv" "k8s.io/apimachinery/pkg/api/errors" @@ -109,6 +110,9 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor return nil, returnErr } + // Add logging tracing annotation + logging.SetLogTracingFromContext(ctx, newWork) + if errs := utils.ValidateWork(newWork); len(errs) != 0 { returnErr = errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, errs) return nil, returnErr @@ -159,6 +163,9 @@ func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts now := metav1.Now() deletingWork.DeletionTimestamp = &now + // Add logging tracing annotation + logging.SetLogTracingFromContext(ctx, deletingWork) + if err := c.cloudEventsClient.Publish(ctx, eventType, deletingWork); err != nil { returnErr := cloudeventserrors.ToStatusError(common.ManifestWorkGR, name, err) metrics.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) @@ -296,6 +303,9 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku newWork.Generation = generation newWork.ResourceVersion = rv + // Add logging tracing annotation + logging.SetLogTracingFromContext(ctx, newWork) + if errs := utils.ValidateWork(newWork); len(errs) != 0 { returnErr = errors.NewInvalid(common.ManifestWorkGK, name, errs) return nil, returnErr diff --git a/pkg/cloudevents/clients/work/source/codec/manifestbundle.go b/pkg/cloudevents/clients/work/source/codec/manifestbundle.go index 04eaea58..1486d243 100644 --- a/pkg/cloudevents/clients/work/source/codec/manifestbundle.go +++ b/pkg/cloudevents/clients/work/source/codec/manifestbundle.go @@ -5,6 +5,7 @@ import ( "fmt" cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" + "open-cluster-management.io/sdk-go/pkg/logging" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" @@ -40,13 +41,18 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT WithResourceVersion(work.Generation). NewEvent() - // set the work's meta data to its cloud event + // set the work's metadata to its cloud event metaJson, err := json.Marshal(work.ObjectMeta) if err != nil { return nil, err } evt.SetExtension(types.ExtensionWorkMeta, string(metaJson)) + // Add log tracing extension + if err := logging.LogTracingFromObjectToEvent(work, &evt); err != nil { + return nil, err + } + if !work.DeletionTimestamp.IsZero() { evt.SetExtension(types.ExtensionDeletionTimestamp, work.DeletionTimestamp.Time) return &evt, nil @@ -119,6 +125,11 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo } metaObj.Annotations[common.CloudEventsSequenceIDAnnotationKey] = sequenceID + // Add log tracing annotation + if err := logging.LogTracingFromEventToObject(evt, &metaObj); err != nil { + return nil, err + } + work := &workv1.ManifestWork{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metaObj, diff --git a/pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go b/pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go index b90e0a20..309a3c72 100644 --- a/pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go +++ b/pkg/cloudevents/clients/work/source/codec/manifestbundle_test.go @@ -370,3 +370,173 @@ func TestManifestBundleDecode(t *testing.T) { }) } } + +func TestManifestBundleLogTracing(t *testing.T) { + codec := NewManifestBundleCodec() + + t.Run("encode preserves log tracing annotations to event", func(t *testing.T) { + work := &workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + UID: "test-uid", + ResourceVersion: "1", + Namespace: "cluster1", + Name: "test-work", + Generation: 1, + Annotations: map[string]string{ + "logging.open-cluster-management.io/trace-id": "trace-123", + "logging.open-cluster-management.io/user": "test-user", + "other-annotation": "other-value", + }, + }, + Spec: workv1.ManifestWorkSpec{}, + } + + eventType := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceSpec, + Action: "test_create", + } + + evt, err := codec.Encode("test-source", eventType, work) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that log tracing annotations are transferred to event extensions + extensions := evt.Extensions() + if logTracingExt, ok := extensions["logtracing"]; ok { + logTracingJSON, ok := logTracingExt.(string) + if !ok { + t.Errorf("logtracing extension is not a string") + } + + var tracingMap map[string]string + if err := json.Unmarshal([]byte(logTracingJSON), &tracingMap); err != nil { + t.Errorf("failed to unmarshal logtracing: %v", err) + } + + if tracingMap["logging.open-cluster-management.io/trace-id"] != "trace-123" { + t.Errorf("expected trace-id in logtracing, got %v", tracingMap) + } + if tracingMap["logging.open-cluster-management.io/user"] != "test-user" { + t.Errorf("expected user in logtracing, got %v", tracingMap) + } + if _, ok := tracingMap["other-annotation"]; ok { + t.Errorf("non-tracing annotation should not be in logtracing") + } + } else { + t.Errorf("expected logtracing extension to be set") + } + }) + + t.Run("decode transfers log tracing from event to work annotations", func(t *testing.T) { + evt := cloudevents.NewEvent() + evt.SetSource("source1") + evt.SetType("io.open-cluster-management.works.v1alpha1.manifestbundles.status.test") + evt.SetExtension("resourceid", "test-uid") + evt.SetExtension("resourceversion", "5") + evt.SetExtension("sequenceid", "1834773391719010304") + + // Add log tracing extension + tracingMap := map[string]string{ + "logging.open-cluster-management.io/request-id": "req-456", + "logging.open-cluster-management.io/session": "sess-789", + } + tracingJSON, _ := json.Marshal(tracingMap) + evt.SetExtension("logtracing", string(tracingJSON)) + + if err := evt.SetData(cloudevents.ApplicationJSON, &payload.ManifestBundleStatus{ + Conditions: []metav1.Condition{ + { + Type: "Applied", + Status: metav1.ConditionTrue, + }, + }, + }); err != nil { + t.Fatalf("failed to set event data: %v", err) + } + + work, err := codec.Decode(&evt) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that log tracing was transferred to work annotations + if work.Annotations["logging.open-cluster-management.io/request-id"] != "req-456" { + t.Errorf("expected request-id annotation, got %v", work.Annotations) + } + if work.Annotations["logging.open-cluster-management.io/session"] != "sess-789" { + t.Errorf("expected session annotation, got %v", work.Annotations) + } + }) + + t.Run("encode with no tracing annotations", func(t *testing.T) { + work := &workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + UID: "test-uid", + Namespace: "cluster1", + Generation: 1, + Annotations: map[string]string{ + "regular-annotation": "regular-value", + }, + }, + Spec: workv1.ManifestWorkSpec{}, + } + + eventType := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceSpec, + Action: "test_create", + } + + evt, err := codec.Encode("test-source", eventType, work) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that logtracing extension is either not set or is empty + if logTracingExt, ok := evt.Extensions()["logtracing"]; ok { + logTracingJSON, ok := logTracingExt.(string) + if !ok { + t.Errorf("logtracing extension is not a string") + } + var tracingMap map[string]string + if err := json.Unmarshal([]byte(logTracingJSON), &tracingMap); err != nil { + t.Errorf("failed to unmarshal logtracing: %v", err) + } + if len(tracingMap) != 0 { + t.Errorf("expected empty logtracing map, got %v", tracingMap) + } + } + }) + + t.Run("decode with no tracing extension", func(t *testing.T) { + evt := cloudevents.NewEvent() + evt.SetSource("source1") + evt.SetType("io.open-cluster-management.works.v1alpha1.manifestbundles.status.test") + evt.SetExtension("resourceid", "test-uid") + evt.SetExtension("resourceversion", "3") + evt.SetExtension("sequenceid", "1834773391719010304") + + if err := evt.SetData(cloudevents.ApplicationJSON, &payload.ManifestBundleStatus{}); err != nil { + t.Fatalf("failed to set event data: %v", err) + } + + work, err := codec.Decode(&evt) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Verify that only sequenceid annotation is present (no tracing annotations) + foundTracingAnnotation := false + for key := range work.Annotations { + if key != "cloudevents.open-cluster-management.io/sequenceid" { + foundTracingAnnotation = true + break + } + } + if foundTracingAnnotation { + t.Errorf("unexpected tracing annotations found: %v", work.Annotations) + } + }) +} diff --git a/pkg/cloudevents/generic/clients/baseclient.go b/pkg/cloudevents/generic/clients/baseclient.go index c8c166c3..618ec246 100644 --- a/pkg/cloudevents/generic/clients/baseclient.go +++ b/pkg/cloudevents/generic/clients/baseclient.go @@ -3,6 +3,7 @@ package clients import ( "context" "fmt" + "open-cluster-management.io/sdk-go/pkg/logging" "sync" "time" @@ -119,7 +120,7 @@ func (c *baseClient) connect(ctx context.Context) error { } func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { - logger := klog.FromContext(ctx) + logger := logging.SetLogTracingByCloudEvent(klog.FromContext(ctx), &evt) now := time.Now() if err := c.cloudEventsRateLimiter.Wait(ctx); err != nil { @@ -201,14 +202,15 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { if startReceiving { go func() { if err := c.transport.Receive(receiverCtx, func(ctx context.Context, evt cloudevents.Event) { - logger := klog.FromContext(ctx) - logger.V(2).Info("Received event", "event", evt.Context) - if logger.V(5).Enabled() { - logger.V(5).Info("Received event", "event", evt.String()) + receiveLogger := logging.SetLogTracingByCloudEvent(klog.FromContext(ctx), &evt) + ctx = klog.NewContext(ctx, receiveLogger) + receiveLogger.V(2).Info("Received event", "event", evt.Context) + if receiveLogger.V(5).Enabled() { + receiveLogger.V(5).Info("Received event", "event", evt.String()) } receive(ctx, evt) }); err != nil { - runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err)) + runtime.HandleErrorWithContext(ctx, err, "failed to receive cloudevents") } }() startReceiving = false diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go new file mode 100644 index 00000000..8eff6cea --- /dev/null +++ b/pkg/logging/logging.go @@ -0,0 +1,160 @@ +package logging + +import ( + "context" + "encoding/json" + "fmt" + cloudevents "github.com/cloudevents/sdk-go/v2" + cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "strings" +) + +type ContextTracingKey string + +const ( + LogTracingPrefix = "logging.open-cluster-management.io/" + ExtensionLogTracing = "logtracing" + ContextTracingOPIDKey ContextTracingKey = "op-id" +) + +// DefaultContextTracingKeys is a global variable of interested keys in context used for log tracing +var DefaultContextTracingKeys = []ContextTracingKey{ContextTracingOPIDKey} + +func SetLogTracingByObject(logger klog.Logger, object metav1.Object) klog.Logger { + if object == nil { + return logger + } + annotations := object.GetAnnotations() + for key, value := range annotations { + if !strings.HasPrefix(key, LogTracingPrefix) { + continue + } + tracingKey := strings.TrimPrefix(key, LogTracingPrefix) + logger = logger.WithValues(tracingKey, value) + } + return logger +} + +func getTracingMapFromExtension(evt *cloudevents.Event) (map[string]string, error) { + logTracingMap := make(map[string]string) + + logTracingValue, ok := evt.Extensions()[ExtensionLogTracing] + if !ok { + return logTracingMap, nil + } + + logTracingValueString, err := cloudeventstypes.ToString(logTracingValue) + if err != nil { + return logTracingMap, err + } + + err = json.Unmarshal([]byte(logTracingValueString), &logTracingMap) + if err != nil { + return logTracingMap, err + } + + return logTracingMap, nil +} + +func setTracingMapToExtension(evt *cloudevents.Event, tracingMap map[string]string) error { + tracingValueRaw, err := json.Marshal(tracingMap) + if err != nil { + return err + } + evt.SetExtension(ExtensionLogTracing, string(tracingValueRaw)) + return nil +} + +func SetLogTracingByCloudEvent(logger klog.Logger, evt *cloudevents.Event) klog.Logger { + if evt == nil { + return logger + } + + logTracingMap, err := getTracingMapFromExtension(evt) + if err != nil { + logger.Error(err, "Failed to get log tracing map") + return logger + } + + for key, value := range logTracingMap { + if !strings.HasPrefix(key, LogTracingPrefix) { + continue + } + tracingKey := strings.TrimPrefix(key, LogTracingPrefix) + logger = logger.WithValues(tracingKey, value) + } + return logger +} + +// SetLogTracingFromContext is used in cloudevent work source client to set up the +// log tracing annotation for the manifestwork. +func SetLogTracingFromContext(ctx context.Context, object metav1.Object) { + annotations := object.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + for _, key := range DefaultContextTracingKeys { + value := ctx.Value(key) + if value == nil { + continue + } + annotations[LogTracingPrefix+string(key)] = fmt.Sprintf("%v", value) + } + + if len(annotations) != 0 { + object.SetAnnotations(annotations) + } +} + +func LogTracingFromEventToObject(evt *cloudevents.Event, obj metav1.Object) error { + if obj == nil { + return nil + } + if evt == nil { + return nil + } + + logTracingMap, err := getTracingMapFromExtension(evt) + if err != nil { + return err + } + + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + for key, value := range logTracingMap { + if !strings.HasPrefix(key, LogTracingPrefix) { + continue + } + annotations[key] = value + } + + if len(annotations) != 0 { + obj.SetAnnotations(annotations) + } + + return nil +} + +func LogTracingFromObjectToEvent(obj metav1.Object, evt *cloudevents.Event) error { + if obj == nil { + return nil + } + if evt == nil { + return nil + } + + tracingMap := make(map[string]string) + for key, value := range obj.GetAnnotations() { + if !strings.HasPrefix(key, LogTracingPrefix) { + continue + } + tracingMap[key] = value + } + + return setTracingMapToExtension(evt, tracingMap) +} diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go new file mode 100644 index 00000000..f0e8142c --- /dev/null +++ b/pkg/logging/logging_test.go @@ -0,0 +1,566 @@ +package logging + +import ( + "context" + "encoding/json" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" +) + +func TestSetLogTracingByObject(t *testing.T) { + tests := []struct { + name string + object metav1.Object + }{ + { + name: "nil object returns logger unchanged", + object: nil, + }, + { + name: "object with no tracing annotations", + object: &metav1.ObjectMeta{ + Name: "test", + Annotations: map[string]string{ + "key1": "value1", + }, + }, + }, + { + name: "object with tracing annotations adds values to logger", + object: &metav1.ObjectMeta{ + Name: "test", + Annotations: map[string]string{ + "key1": "value1", + LogTracingPrefix + "trace-id": "12345", + LogTracingPrefix + "user": "testuser", + }, + }, + }, + { + name: "object with no annotations", + object: &metav1.ObjectMeta{ + Name: "test", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := klog.Background() + result := SetLogTracingByObject(logger, tt.object) + if result.GetSink() == nil { + t.Errorf("expected logger to be returned") + } + // Note: We can't easily verify the logger's internal values, + // but we verify that it returns a valid logger + }) + } +} + +func TestSetLogTracingByCloudEvent(t *testing.T) { + tests := []struct { + name string + event *cloudevents.Event + }{ + { + name: "nil event returns logger unchanged", + event: nil, + }, + { + name: "event with no tracing extensions", + event: func() *cloudevents.Event { + e := cloudevents.NewEvent() + e.SetID("test") + e.SetSource("test") + e.SetType("test") + return &e + }(), + }, + { + name: "event with tracing extension adds values to logger", + event: func() *cloudevents.Event { + e := cloudevents.NewEvent() + e.SetID("test") + e.SetSource("test") + e.SetType("test") + tracingMap := map[string]string{ + LogTracingPrefix + "trace-id": "12345", + LogTracingPrefix + "user": "testuser", + } + tracingJSON, _ := json.Marshal(tracingMap) + e.SetExtension(ExtensionLogTracing, string(tracingJSON)) + return &e + }(), + }, + { + name: "event with non-tracing extensions", + event: func() *cloudevents.Event { + e := cloudevents.NewEvent() + e.SetID("test") + e.SetSource("test") + e.SetType("test") + e.SetExtension("other-key", "other-value") + return &e + }(), + }, + { + name: "event with invalid JSON in tracing extension logs error but returns logger", + event: func() *cloudevents.Event { + e := cloudevents.NewEvent() + e.SetID("test") + e.SetSource("test") + e.SetType("test") + e.SetExtension(ExtensionLogTracing, "invalid-json") + return &e + }(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := klog.Background() + result := SetLogTracingByCloudEvent(logger, tt.event) + if result.GetSink() == nil { + t.Errorf("expected logger to be returned") + } + // Note: We can't easily verify the logger's internal values, + // but we verify that it always returns a valid logger. + // Errors are now logged internally rather than returned. + }) + } +} + +func TestSetLogTracingFromContext(t *testing.T) { + t.Run("adds annotations from context with default keys", func(t *testing.T) { + ctx := context.Background() + ctx = context.WithValue(ctx, ContextTracingOPIDKey, "operation-123") + + obj := &metav1.ObjectMeta{ + Name: "test-object", + } + + SetLogTracingFromContext(ctx, obj) + + annotations := obj.GetAnnotations() + if annotations[LogTracingPrefix+"op-id"] != "operation-123" { + t.Errorf("expected op-id annotation to be set, got %v", annotations) + } + }) + + t.Run("adds annotations with existing annotations", func(t *testing.T) { + ctx := context.Background() + ctx = context.WithValue(ctx, ContextTracingOPIDKey, "operation-456") + + obj := &metav1.ObjectMeta{ + Name: "test-object", + Annotations: map[string]string{ + "existing-key": "existing-value", + }, + } + + SetLogTracingFromContext(ctx, obj) + + annotations := obj.GetAnnotations() + if annotations[LogTracingPrefix+"op-id"] != "operation-456" { + t.Errorf("expected op-id annotation to be set, got %v", annotations) + } + if annotations["existing-key"] != "existing-value" { + t.Errorf("expected existing annotation to be preserved, got %v", annotations) + } + }) + + t.Run("handles missing context values", func(t *testing.T) { + ctx := context.Background() + + obj := &metav1.ObjectMeta{ + Name: "test-object", + } + + SetLogTracingFromContext(ctx, obj) + + annotations := obj.GetAnnotations() + if len(annotations) != 0 { + t.Errorf("expected no annotations when context has no values, got %v", annotations) + } + }) + + t.Run("handles multiple values in default keys", func(t *testing.T) { + // Temporarily modify DefaultContextTracingKeys for this test + originalKeys := DefaultContextTracingKeys + DefaultContextTracingKeys = []ContextTracingKey{"op-id", "request-id", "session-id"} + defer func() { DefaultContextTracingKeys = originalKeys }() + + ctx := context.Background() + ctx = context.WithValue(ctx, ContextTracingOPIDKey, "op-789") + ctx = context.WithValue(ctx, ContextTracingKey("request-id"), "req-101") + ctx = context.WithValue(ctx, ContextTracingKey("session-id"), "sess-202") + + obj := &metav1.ObjectMeta{ + Name: "test-object", + } + + SetLogTracingFromContext(ctx, obj) + + annotations := obj.GetAnnotations() + if annotations[LogTracingPrefix+"op-id"] != "op-789" { + t.Errorf("expected op-id annotation, got %v", annotations) + } + if annotations[LogTracingPrefix+"request-id"] != "req-101" { + t.Errorf("expected request-id annotation, got %v", annotations) + } + if annotations[LogTracingPrefix+"session-id"] != "sess-202" { + t.Errorf("expected session-id annotation, got %v", annotations) + } + }) + + t.Run("handles partial context values", func(t *testing.T) { + // Temporarily modify DefaultContextTracingKeys for this test + originalKeys := DefaultContextTracingKeys + DefaultContextTracingKeys = []ContextTracingKey{ContextTracingOPIDKey, "missing-key"} + defer func() { DefaultContextTracingKeys = originalKeys }() + + ctx := context.Background() + ctx = context.WithValue(ctx, ContextTracingOPIDKey, "op-999") + // "missing-key" is not set in context + + obj := &metav1.ObjectMeta{ + Name: "test-object", + } + + SetLogTracingFromContext(ctx, obj) + + annotations := obj.GetAnnotations() + if annotations[LogTracingPrefix+"op-id"] != "op-999" { + t.Errorf("expected op-id annotation, got %v", annotations) + } + if _, ok := annotations["missing-key"]; ok { + t.Errorf("expected missing-key to not be set, got %v", annotations) + } + }) +} + +func TestLogTracingFromEventToObject(t *testing.T) { + t.Run("nil event", func(t *testing.T) { + obj := &metav1.ObjectMeta{Name: "test"} + err := LogTracingFromEventToObject(nil, obj) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("nil object", func(t *testing.T) { + e := cloudevents.NewEvent() + err := LogTracingFromEventToObject(&e, nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("event with tracing extensions to object with existing annotations", func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetID("test-id") + e.SetSource("test-source") + e.SetType("test-type") + + tracingMap := map[string]string{ + LogTracingPrefix + "trace-id": "12345", + } + tracingJSON, _ := json.Marshal(tracingMap) + e.SetExtension(ExtensionLogTracing, string(tracingJSON)) + + obj := &metav1.ObjectMeta{ + Name: "test", + Annotations: map[string]string{ + "existing-key": "existing-value", + }, + } + + err := LogTracingFromEventToObject(&e, obj) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + annotations := obj.GetAnnotations() + if annotations[LogTracingPrefix+"trace-id"] != "12345" { + t.Errorf("expected tracing annotation to be set, got %v", annotations) + } + if annotations["existing-key"] != "existing-value" { + t.Errorf("expected existing annotation to be preserved, got %v", annotations) + } + }) + + t.Run("event with tracing extensions to object with no annotations", func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetID("test-id") + e.SetSource("test-source") + e.SetType("test-type") + + tracingMap := map[string]string{ + LogTracingPrefix + "trace-id": "67890", + LogTracingPrefix + "user": "testuser", + } + tracingJSON, _ := json.Marshal(tracingMap) + e.SetExtension(ExtensionLogTracing, string(tracingJSON)) + + obj := &metav1.ObjectMeta{ + Name: "test", + } + + err := LogTracingFromEventToObject(&e, obj) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + annotations := obj.GetAnnotations() + if len(annotations) == 0 { + t.Errorf("expected annotations to be set") + } + if annotations[LogTracingPrefix+"trace-id"] != "67890" { + t.Errorf("expected trace-id annotation, got %v", annotations) + } + if annotations[LogTracingPrefix+"user"] != "testuser" { + t.Errorf("expected user annotation, got %v", annotations) + } + }) + + t.Run("event with non-tracing extensions", func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetID("test-id") + e.SetSource("test-source") + e.SetType("test-type") + + tracingMap := map[string]string{ + "other-key": "other-value", + } + tracingJSON, _ := json.Marshal(tracingMap) + e.SetExtension(ExtensionLogTracing, string(tracingJSON)) + + obj := &metav1.ObjectMeta{ + Name: "test", + Annotations: map[string]string{ + "existing-key": "existing-value", + }, + } + + err := LogTracingFromEventToObject(&e, obj) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + annotations := obj.GetAnnotations() + if _, ok := annotations["other-key"]; ok { + t.Errorf("non-tracing key should not be copied to annotations") + } + }) + + t.Run("event with no tracing extension", func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetID("test-id") + e.SetSource("test-source") + e.SetType("test-type") + + obj := &metav1.ObjectMeta{ + Name: "test", + } + + err := LogTracingFromEventToObject(&e, obj) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + annotations := obj.GetAnnotations() + if len(annotations) != 0 { + t.Errorf("expected no annotations to be set when no tracing exists, got %v", annotations) + } + }) +} + +func TestLogTracingFromObjectToEvent(t *testing.T) { + t.Run("nil object", func(t *testing.T) { + e := cloudevents.NewEvent() + err := LogTracingFromObjectToEvent(nil, &e) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("nil event", func(t *testing.T) { + obj := &metav1.ObjectMeta{Name: "test"} + err := LogTracingFromObjectToEvent(obj, nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("object with tracing annotations", func(t *testing.T) { + obj := &metav1.ObjectMeta{ + Name: "test", + Annotations: map[string]string{ + LogTracingPrefix + "trace-id": "12345", + LogTracingPrefix + "user": "testuser", + }, + } + + e := cloudevents.NewEvent() + e.SetID("test-id") + e.SetSource("test-source") + e.SetType("test-type") + + err := LogTracingFromObjectToEvent(obj, &e) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + tracingMap, err := getTracingMapFromExtension(&e) + if err != nil { + t.Errorf("failed to get tracing map: %v", err) + } + + if tracingMap[LogTracingPrefix+"trace-id"] != "12345" { + t.Errorf("expected trace-id extension, got %v", tracingMap) + } + if tracingMap[LogTracingPrefix+"user"] != "testuser" { + t.Errorf("expected user extension, got %v", tracingMap) + } + }) + + t.Run("object with non-tracing annotations", func(t *testing.T) { + obj := &metav1.ObjectMeta{ + Name: "test", + Annotations: map[string]string{ + "other-key": "other-value", + }, + } + + e := cloudevents.NewEvent() + e.SetID("test-id") + e.SetSource("test-source") + e.SetType("test-type") + + err := LogTracingFromObjectToEvent(obj, &e) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + tracingMap, err := getTracingMapFromExtension(&e) + if err != nil { + t.Errorf("failed to get tracing map: %v", err) + } + + if len(tracingMap) != 0 { + t.Errorf("expected empty tracing map, got %v", tracingMap) + } + }) + + t.Run("object with no annotations", func(t *testing.T) { + obj := &metav1.ObjectMeta{ + Name: "test", + } + + e := cloudevents.NewEvent() + e.SetID("test-id") + e.SetSource("test-source") + e.SetType("test-type") + + err := LogTracingFromObjectToEvent(obj, &e) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + tracingMap, err := getTracingMapFromExtension(&e) + if err != nil { + t.Errorf("failed to get tracing map: %v", err) + } + + if len(tracingMap) != 0 { + t.Errorf("expected empty tracing map, got %v", tracingMap) + } + }) +} + +func TestGetTracingMapFromExtension(t *testing.T) { + t.Run("event with valid JSON tracing extension", func(t *testing.T) { + e := cloudevents.NewEvent() + tracingMap := map[string]string{ + LogTracingPrefix + "key1": "value1", + LogTracingPrefix + "key2": "value2", + } + tracingJSON, _ := json.Marshal(tracingMap) + e.SetExtension(ExtensionLogTracing, string(tracingJSON)) + + result, err := getTracingMapFromExtension(&e) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 2 { + t.Errorf("expected 2 items in tracing map, got %d", len(result)) + } + if result[LogTracingPrefix+"key1"] != "value1" { + t.Errorf("expected key1 value, got %v", result) + } + }) + + t.Run("event with no tracing extension", func(t *testing.T) { + e := cloudevents.NewEvent() + result, err := getTracingMapFromExtension(&e) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(result) != 0 { + t.Errorf("expected empty map, got %v", result) + } + }) + + t.Run("event with invalid JSON tracing extension", func(t *testing.T) { + e := cloudevents.NewEvent() + e.SetExtension(ExtensionLogTracing, "not-valid-json") + _, err := getTracingMapFromExtension(&e) + if err == nil { + t.Errorf("expected error for invalid JSON") + } + }) +} + +func TestSetTracingMapToExtension(t *testing.T) { + t.Run("sets valid tracing map", func(t *testing.T) { + e := cloudevents.NewEvent() + tracingMap := map[string]string{ + LogTracingPrefix + "key1": "value1", + LogTracingPrefix + "key2": "value2", + } + + err := setTracingMapToExtension(&e, tracingMap) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + result, err := getTracingMapFromExtension(&e) + if err != nil { + t.Errorf("failed to get tracing map: %v", err) + } + if len(result) != 2 { + t.Errorf("expected 2 items, got %d", len(result)) + } + }) + + t.Run("sets empty tracing map", func(t *testing.T) { + e := cloudevents.NewEvent() + tracingMap := map[string]string{} + + err := setTracingMapToExtension(&e, tracingMap) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + result, err := getTracingMapFromExtension(&e) + if err != nil { + t.Errorf("failed to get tracing map: %v", err) + } + if len(result) != 0 { + t.Errorf("expected empty map, got %v", result) + } + }) +}