|  | 
|  | 1 | +/* | 
|  | 2 | +Copyright 2025 The Flux authors | 
|  | 3 | +
 | 
|  | 4 | +Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | 5 | +you may not use this file except in compliance with the License. | 
|  | 6 | +You may obtain a copy of the License at | 
|  | 7 | +
 | 
|  | 8 | +    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 9 | +
 | 
|  | 10 | +Unless required by applicable law or agreed to in writing, software | 
|  | 11 | +distributed under the License is distributed on an "AS IS" BASIS, | 
|  | 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | 13 | +See the License for the specific language governing permissions and | 
|  | 14 | +limitations under the License. | 
|  | 15 | +*/ | 
|  | 16 | + | 
|  | 17 | +package notifier | 
|  | 18 | + | 
|  | 19 | +import ( | 
|  | 20 | +	"context" | 
|  | 21 | +	"crypto/sha256" | 
|  | 22 | +	"crypto/tls" | 
|  | 23 | +	"fmt" | 
|  | 24 | +	"net/http" | 
|  | 25 | +	"net/url" | 
|  | 26 | +	"slices" | 
|  | 27 | + | 
|  | 28 | +	"go.opentelemetry.io/otel/attribute" | 
|  | 29 | +	"go.opentelemetry.io/otel/codes" | 
|  | 30 | +	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" | 
|  | 31 | +	sdktrace "go.opentelemetry.io/otel/sdk/trace" | 
|  | 32 | +	"go.opentelemetry.io/otel/trace" | 
|  | 33 | +	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 
|  | 34 | +	"sigs.k8s.io/controller-runtime/pkg/log" | 
|  | 35 | + | 
|  | 36 | +	eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" | 
|  | 37 | + | 
|  | 38 | +	apiv1beta3 "github.com/fluxcd/notification-controller/api/v1beta3" | 
|  | 39 | +) | 
|  | 40 | + | 
|  | 41 | +type AlertMetadataContextKey struct{} | 
|  | 42 | + | 
|  | 43 | +type OTLPTracer struct { | 
|  | 44 | +	tracerProvider *sdktrace.TracerProvider | 
|  | 45 | +	tracer         trace.Tracer | 
|  | 46 | +} | 
|  | 47 | + | 
|  | 48 | +func NewOTLPTracer(ctx context.Context, urlStr string, proxyURL string, headers map[string]string, tlsConfig *tls.Config) (*OTLPTracer, error) { | 
|  | 49 | +	// Set up OTLP exporter options | 
|  | 50 | +	httpOptions := []otlptracehttp.Option{ | 
|  | 51 | +		otlptracehttp.WithEndpointURL(urlStr), | 
|  | 52 | +	} | 
|  | 53 | + | 
|  | 54 | +	// Add headers if available | 
|  | 55 | +	if len(headers) > 0 { | 
|  | 56 | +		httpOptions = append(httpOptions, otlptracehttp.WithHeaders(headers)) | 
|  | 57 | +	} | 
|  | 58 | + | 
|  | 59 | +	// Add TLS config if available | 
|  | 60 | +	if tlsConfig != nil { | 
|  | 61 | +		httpOptions = append(httpOptions, otlptracehttp.WithTLSClientConfig(tlsConfig)) | 
|  | 62 | +	} | 
|  | 63 | + | 
|  | 64 | +	// Add proxy if available | 
|  | 65 | +	if proxyURL != "" { | 
|  | 66 | +		proxyURLparsed, err := url.Parse(proxyURL) | 
|  | 67 | +		if err != nil { | 
|  | 68 | +			return nil, fmt.Errorf("failed to proxy URL - %s: %w", proxyURL, err) | 
|  | 69 | +		} else { | 
|  | 70 | +			httpOptions = append(httpOptions, otlptracehttp.WithProxy(func(*http.Request) (*url.URL, error) { | 
|  | 71 | +				return proxyURLparsed, nil | 
|  | 72 | +			})) | 
|  | 73 | +		} | 
|  | 74 | +	} | 
|  | 75 | + | 
|  | 76 | +	exporter, err := otlptracehttp.New(ctx, httpOptions...) | 
|  | 77 | +	if err != nil { | 
|  | 78 | +		return nil, err | 
|  | 79 | +	} | 
|  | 80 | + | 
|  | 81 | +	// Create TracerProvider once | 
|  | 82 | +	tp := sdktrace.NewTracerProvider( | 
|  | 83 | +		sdktrace.WithBatcher(exporter), | 
|  | 84 | +	) | 
|  | 85 | + | 
|  | 86 | +	log.FromContext(ctx).Info("Successfully created OTEL tracer") | 
|  | 87 | +	return &OTLPTracer{ | 
|  | 88 | +		tracerProvider: tp, | 
|  | 89 | +		tracer:         tp.Tracer("flux:notification-controller"), | 
|  | 90 | +	}, nil | 
|  | 91 | +} | 
|  | 92 | + | 
|  | 93 | +// Post implements the notifier.Interface | 
|  | 94 | +func (t *OTLPTracer) Post(ctx context.Context, event eventv1.Event) error { | 
|  | 95 | +	logger := log.FromContext(ctx).WithValues( | 
|  | 96 | +		"event", event.Reason, | 
|  | 97 | +		"object", fmt.Sprintf("%s/%s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name), | 
|  | 98 | +		"severity", event.Severity, | 
|  | 99 | +	) | 
|  | 100 | +	logger.Info("OTEL Post function called", "event", event.Reason) | 
|  | 101 | + | 
|  | 102 | +	alert, ok := ctx.Value(AlertMetadataContextKey{}).(metav1.ObjectMeta) | 
|  | 103 | +	if !ok { | 
|  | 104 | +		return fmt.Errorf("alert metadata not found in context") | 
|  | 105 | +	} | 
|  | 106 | + | 
|  | 107 | +	// Extract revision from event metadata | 
|  | 108 | +	revision := extractMetadata(event.Metadata, "revision") | 
|  | 109 | + | 
|  | 110 | +	// TraceID: <AlertUID>:<revisionID> | 
|  | 111 | +	logger.V(1).Info("Generating trace IDs", "alertUID", string(alert.UID), "revision", revision) | 
|  | 112 | +	traceIDStr := generateID(string(alert.UID), revision) | 
|  | 113 | +	spanIDStr := generateID(string(event.InvolvedObject.UID), | 
|  | 114 | +		fmt.Sprintf("%s/%s/%s", event.InvolvedObject.Kind, | 
|  | 115 | +			event.InvolvedObject.Namespace, event.InvolvedObject.Name)) | 
|  | 116 | + | 
|  | 117 | +	var traceID trace.TraceID | 
|  | 118 | +	var spanID trace.SpanID | 
|  | 119 | +	copy(traceID[:], traceIDStr[:16]) | 
|  | 120 | +	copy(spanID[:], spanIDStr[:8]) | 
|  | 121 | + | 
|  | 122 | +	// Determine span relationship based on Flux object hierarchy | 
|  | 123 | +	var spanCtx context.Context = t.createSpanContext(ctx, event, traceID, spanID) | 
|  | 124 | + | 
|  | 125 | +	// Create single span with proper attributes | 
|  | 126 | +	logger.Info("Processing OTEL notification", "alert", alert.Name) | 
|  | 127 | +	spanName := fmt.Sprintf("%s: %s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name) | 
|  | 128 | +	_, span := t.tracer.Start(spanCtx, spanName, | 
|  | 129 | +		trace.WithAttributes( | 
|  | 130 | +			attribute.String("object.uid", string(event.InvolvedObject.UID)), | 
|  | 131 | +			attribute.String("object.kind", event.InvolvedObject.Kind), | 
|  | 132 | +			attribute.String("object.name", event.InvolvedObject.Name), | 
|  | 133 | +			attribute.String("object.namespace", event.InvolvedObject.Namespace), | 
|  | 134 | +		), | 
|  | 135 | +		trace.WithTimestamp(event.Timestamp.Time), | 
|  | 136 | +	) | 
|  | 137 | + | 
|  | 138 | +	// Add related events if they exist in metadata | 
|  | 139 | +	t.addEvents(span, event) | 
|  | 140 | + | 
|  | 141 | +	// Set status based on event severity | 
|  | 142 | +	if event.Severity == eventv1.EventSeverityError { | 
|  | 143 | +		span.SetStatus(codes.Error, event.Message) | 
|  | 144 | +	} else { | 
|  | 145 | +		span.SetStatus(codes.Ok, event.Message) | 
|  | 146 | +	} | 
|  | 147 | + | 
|  | 148 | +	defer span.End() | 
|  | 149 | + | 
|  | 150 | +	serviceName := fmt.Sprintf("%s: %s/%s", apiv1beta3.AlertKind, alert.Namespace, alert.Name) | 
|  | 151 | +	logger.Info("Successfully sent trace to OTLP endpoint", | 
|  | 152 | +		"alert", serviceName, | 
|  | 153 | +	) | 
|  | 154 | + | 
|  | 155 | +	return nil | 
|  | 156 | +} | 
|  | 157 | + | 
|  | 158 | +func (t *OTLPTracer) createSpanContext(ctx context.Context, event eventv1.Event, traceID trace.TraceID, spanID trace.SpanID) context.Context { | 
|  | 159 | +	kind := event.InvolvedObject.Kind | 
|  | 160 | + | 
|  | 161 | +	spanContext := trace.NewSpanContext(trace.SpanContextConfig{ | 
|  | 162 | +		TraceID:    traceID, | 
|  | 163 | +		SpanID:     spanID, | 
|  | 164 | +		TraceFlags: trace.FlagsSampled, | 
|  | 165 | +	}) | 
|  | 166 | + | 
|  | 167 | +	// Root spans: Sources that start the deployment flow | 
|  | 168 | +	if isSource(kind) { | 
|  | 169 | +		return trace.ContextWithSpanContext(context.Background(), | 
|  | 170 | +			spanContext.WithTraceFlags(spanContext.TraceFlags().WithSampled(true))) | 
|  | 171 | +	} | 
|  | 172 | + | 
|  | 173 | +	// Child spans: Everything else inherits from the same trace | 
|  | 174 | +	return trace.ContextWithSpanContext(ctx, | 
|  | 175 | +		spanContext.WithTraceFlags(spanContext.TraceFlags().WithSampled(true))) | 
|  | 176 | +} | 
|  | 177 | + | 
|  | 178 | +func (t *OTLPTracer) addEvents(span trace.Span, event eventv1.Event) { | 
|  | 179 | +	// Build event attributes including metadata | 
|  | 180 | +	eventAttrs := []attribute.KeyValue{ | 
|  | 181 | +		attribute.String("severity", event.Severity), | 
|  | 182 | +		attribute.String("message", event.Message), | 
|  | 183 | +	} | 
|  | 184 | + | 
|  | 185 | +	// Add metadata as event attributes | 
|  | 186 | +	for k, v := range event.Metadata { | 
|  | 187 | +		eventAttrs = append(eventAttrs, attribute.String(k, v)) | 
|  | 188 | +	} | 
|  | 189 | + | 
|  | 190 | +	span.AddEvent(event.Reason, trace.WithAttributes(eventAttrs...), trace.WithTimestamp(event.Timestamp.Time)) | 
|  | 191 | +} | 
|  | 192 | + | 
|  | 193 | +// Add cleanup method | 
|  | 194 | +func (t *OTLPTracer) Close(ctx context.Context) error { | 
|  | 195 | +	return t.tracerProvider.Shutdown(ctx) | 
|  | 196 | +} | 
|  | 197 | + | 
|  | 198 | +// Add this function to generate trace and span ID | 
|  | 199 | +func generateID(UID string, rest string) []byte { | 
|  | 200 | +	input := fmt.Sprintf("%s:%s", UID, rest) | 
|  | 201 | +	hash := sha256.Sum256([]byte(input)) | 
|  | 202 | +	return hash[:] | 
|  | 203 | +} | 
|  | 204 | + | 
|  | 205 | +func extractMetadata(metadata map[string]string, key string) string { | 
|  | 206 | +	if v, ok := metadata[key]; ok { | 
|  | 207 | +		return v | 
|  | 208 | +	} | 
|  | 209 | +	return "unknown" | 
|  | 210 | +} | 
|  | 211 | + | 
|  | 212 | +func isSource(kind string) bool { | 
|  | 213 | +	sourceKinds := []string{"GitRepository", "HelmRepository", "OCIRepository", "Bucket"} | 
|  | 214 | +	return slices.Contains(sourceKinds, kind) | 
|  | 215 | +} | 
0 commit comments