Skip to content

Commit ba8ac76

Browse files
authored
Add namespace labels to source metrics (#8892)
1 parent 223b43f commit ba8ac76

File tree

5 files changed

+48
-12
lines changed

5 files changed

+48
-12
lines changed

pkg/adapter/apiserver/adapter.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ type apiServerAdapter struct {
5252

5353
config Config
5454

55-
discover discovery.DiscoveryInterface
56-
k8s dynamic.Interface
57-
source string // TODO: who dis?
58-
name string // TODO: who dis?
55+
discover discovery.DiscoveryInterface
56+
k8s dynamic.Interface
57+
source string // TODO: who dis?
58+
name string // TODO: who dis?
59+
namespace string
5960
}
6061

6162
type resourceWatchMatch struct {
@@ -199,6 +200,7 @@ func (a *apiServerAdapter) setupDelegate() cache.Store {
199200
logger: a.logger,
200201
ref: a.config.EventMode == v1.ReferenceMode,
201202
apiServerSourceName: a.name,
203+
apiServerSourceNS: a.namespace,
202204
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
203205
}
204206
if a.config.ResourceOwner != nil {

pkg/adapter/apiserver/adapter_injection.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,13 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
6767
}
6868

6969
return &apiServerAdapter{
70-
discover: kubeclient.Get(ctx).Discovery(),
71-
k8s: dynamicclient.Get(ctx),
72-
ce: ceClient,
73-
source: Get(ctx),
74-
name: env.Name,
75-
config: config,
70+
discover: kubeclient.Get(ctx).Discovery(),
71+
k8s: dynamicclient.Get(ctx),
72+
ce: ceClient,
73+
source: Get(ctx),
74+
name: env.Name,
75+
namespace: env.GetNamespace(),
76+
config: config,
7677

7778
logger: logger,
7879
}

pkg/adapter/apiserver/delegate.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,21 @@ import (
2222
cloudevents "github.com/cloudevents/sdk-go/v2"
2323
"github.com/google/uuid"
2424
"go.uber.org/zap"
25+
"k8s.io/apimachinery/pkg/types"
2526
"k8s.io/client-go/tools/cache"
2627
"knative.dev/eventing/pkg/adapter/apiserver/events"
2728
"knative.dev/eventing/pkg/eventfilter"
29+
"knative.dev/eventing/pkg/observability"
2830
)
2931

3032
type resourceDelegate struct {
3133
ce cloudevents.Client
3234
source string
3335
ref bool
3436
apiServerSourceName string
37+
apiServerSourceNS string
3538
filter eventfilter.Filter
36-
37-
logger *zap.SugaredLogger
39+
logger *zap.SugaredLogger
3840
}
3941

4042
var _ cache.Store = (*resourceDelegate)(nil)
@@ -82,6 +84,14 @@ func (a *resourceDelegate) sendCloudEvent(ctx context.Context, event cloudevents
8284
subject := event.Context.GetSubject()
8385
a.logger.Debugf("sending cloudevent id: %s, source: %s, subject: %s", event.ID(), source, subject)
8486

87+
// Add labels to context so otelhttp picks them up for metrics
88+
ctx = observability.WithLabeler(ctx)
89+
ctx = observability.WithSourceLabels(ctx, types.NamespacedName{
90+
Name: a.apiServerSourceName,
91+
Namespace: a.apiServerSourceNS,
92+
})
93+
ctx = observability.WithMinimalEventLabels(ctx, &event)
94+
8595
if result := a.ce.Send(ctx, event); !cloudevents.IsACK(result) {
8696
a.logger.Errorw("failed to send cloudevent", zap.Error(result), zap.String("source", source),
8797
zap.String("subject", subject), zap.String("id", event.ID()))

pkg/adapter/mtping/runner.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/robfig/cron/v3"
2929
"go.opentelemetry.io/otel/trace"
3030
"go.uber.org/zap"
31+
"k8s.io/apimachinery/pkg/types"
3132
"k8s.io/client-go/kubernetes"
3233
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
3334
"k8s.io/client-go/tools/record"
@@ -152,6 +153,14 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, client kncloudevents.Clie
152153

153154
a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target)
154155

156+
// Add labels to context so otelhttp picks them up for metrics
157+
ctx = observability.WithLabeler(ctx)
158+
ctx = observability.WithSourceLabels(ctx, types.NamespacedName{
159+
Name: src.Name,
160+
Namespace: src.Namespace,
161+
})
162+
ctx = observability.WithMinimalEventLabels(ctx, &event)
163+
155164
if result := client.Send(ctx, event); !cloudevents.IsACK(result) {
156165
// Exhausted number of retries. Event is lost.
157166
a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result),

pkg/observability/newcontext.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,20 @@ func WithSinkLabels(ctx context.Context, sink types.NamespacedName, kind string)
168168
return ctx
169169
}
170170

171+
func WithSourceLabels(ctx context.Context, source types.NamespacedName) context.Context {
172+
labeler, ok := otelhttp.LabelerFromContext(ctx)
173+
if !ok {
174+
ctx = otelhttp.ContextWithLabeler(ctx, labeler)
175+
}
176+
177+
labeler.Add(
178+
SourceName.With(source.Name),
179+
SourceNamespace.With(source.Namespace),
180+
)
181+
182+
return ctx
183+
}
184+
171185
func WithHTTPStatusCodeLabel(ctx context.Context, statusCode int) context.Context {
172186
labeler, ok := otelhttp.LabelerFromContext(ctx)
173187
if !ok {

0 commit comments

Comments
 (0)