Skip to content

Commit e544974

Browse files
committed
feat: add comprehensive OpenTelemetry tracing to metadata service
- Add spans for all saga steps (ADD_META, ADD_SCREENSHOT, GET_SCREENSHOT_URL, UPDATE_META) - Add span for publishing MetadataExtracted event - Add spans for unmarshal errors and empty payload errors - Use OpenTelemetry messaging semantic conventions (messaging.system, messaging.destination, messaging.id, etc.) - Add error attributes (error.code, error.retryable) for better error filtering - Improve span naming and attributes according to semantic conventions - Pass tracer to metadata MQ Event struct for span creation
1 parent ca9f9d8 commit e544974

File tree

6 files changed

+184
-5
lines changed

6 files changed

+184
-5
lines changed

Trace-451567-2025-11-27 21_29_57.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

boundaries/metadata/internal/di/wire.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,9 @@ func InitMetadataMQ(
139139
metadataUC *metadata.UC,
140140
registry *bus.TypeRegistry,
141141
marshaler cqrsmessage.Marshaler,
142+
tracer trace.TracerProvider,
142143
) (*metadata_mq.Event, error) {
143-
metadataMQ, err := metadata_mq.New(subscriber, metadataUC)
144+
metadataMQ, err := metadata_mq.New(subscriber, metadataUC, tracer)
144145
if err != nil {
145146
return nil, err
146147
}

boundaries/metadata/internal/di/wire_gen.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

boundaries/metadata/internal/infrastructure/mq/mq.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ MQ Endpoint
55
package metadata_mq
66

77
import (
8+
"go.opentelemetry.io/otel/trace"
9+
810
"github.com/ThreeDotsLabs/watermill/message"
911

1012
metadata_uc "github.com/shortlink-org/shortlink/boundaries/metadata/internal/usecases/metadata"
@@ -13,11 +15,13 @@ import (
1315
type Event struct {
1416
subscriber message.Subscriber
1517
metadataUC *metadata_uc.UC
18+
tracer trace.TracerProvider
1619
}
1720

18-
func New(subscriber message.Subscriber, metadataUC *metadata_uc.UC) (*Event, error) {
21+
func New(subscriber message.Subscriber, metadataUC *metadata_uc.UC, tracer trace.TracerProvider) (*Event, error) {
1922
return &Event{
2023
subscriber: subscriber,
2124
metadataUC: metadataUC,
25+
tracer: tracer,
2226
}, nil
2327
}

boundaries/metadata/internal/infrastructure/mq/subscribe.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
"github.com/shortlink-org/go-sdk/cqrs/bus"
1111
cqrsmessage "github.com/shortlink-org/go-sdk/cqrs/message"
1212
"github.com/shortlink-org/go-sdk/logger"
13+
"go.opentelemetry.io/otel"
14+
"go.opentelemetry.io/otel/attribute"
15+
otelcodes "go.opentelemetry.io/otel/codes"
16+
"go.opentelemetry.io/otel/trace"
1317

1418
"github.com/shortlink-org/shortlink/boundaries/metadata/internal/domain"
1519
domainerrors "github.com/shortlink-org/shortlink/boundaries/metadata/internal/domain/errors"
@@ -55,6 +59,21 @@ func (e *Event) SubscribeLinkCreated(ctx context.Context, log logger.Logger, reg
5559

5660
// Validate payload before unmarshaling
5761
if len(msg.Payload) == 0 {
62+
// Create span for empty payload error to track problematic messages in traces
63+
_, span := otel.Tracer("metadata.mq").Start(msgCtx, "metadata.mq.empty_payload_error",
64+
trace.WithSpanKind(trace.SpanKindConsumer),
65+
)
66+
span.SetStatus(otelcodes.Error, "Empty payload received")
67+
span.SetAttributes(
68+
attribute.String("messaging.system", "kafka"),
69+
attribute.String("messaging.destination", linkCreatedEvent),
70+
attribute.String("messaging.destination_kind", "topic"),
71+
attribute.String("messaging.id", msg.UUID),
72+
attribute.String("messaging.operation", "receive"),
73+
attribute.String("error.type", "empty_payload"),
74+
)
75+
span.End()
76+
5877
log.ErrorWithContext(msgCtx, "Received empty payload for link created event - nacking for Kafka DLQ",
5978
slog.String("topic", linkCreatedEvent),
6079
slog.String("message_uuid", msg.UUID),
@@ -74,6 +93,24 @@ func (e *Event) SubscribeLinkCreated(ctx context.Context, log logger.Logger, reg
7493

7594
unmarshalErr := marshaler.Unmarshal(msg, event)
7695
if unmarshalErr != nil {
96+
// Create span for unmarshal error to track problematic messages in traces
97+
_, span := otel.Tracer("metadata.mq").Start(msgCtx, "metadata.mq.unmarshal_error",
98+
trace.WithSpanKind(trace.SpanKindConsumer),
99+
)
100+
span.RecordError(unmarshalErr)
101+
span.SetStatus(otelcodes.Error, unmarshalErr.Error())
102+
span.SetAttributes(
103+
attribute.String("messaging.system", "kafka"),
104+
attribute.String("messaging.destination", linkCreatedEvent),
105+
attribute.String("messaging.destination_kind", "topic"),
106+
attribute.String("messaging.id", msg.UUID),
107+
attribute.String("messaging.operation", "receive"),
108+
attribute.Int("messaging.message_payload_size_bytes", len(msg.Payload)),
109+
attribute.Int("messaging.message_metadata_count", len(msg.Metadata)),
110+
attribute.String("error.type", "unmarshal"),
111+
)
112+
span.End()
113+
77114
// Nack() to allow Watermill DLQ to track retries and move to DLQ after max retries
78115
// Watermill will automatically move message to DLQ topic after WATERMILL_DLQ_MAX_RETRIES attempts
79116
// Log all metadata to debug DLQ retry tracking
@@ -91,18 +128,44 @@ func (e *Event) SubscribeLinkCreated(ctx context.Context, log logger.Logger, reg
91128
}
92129

93130
// Handle event - event is already typed as *linkpb.LinkCreated
94-
handleErr := e.handleLinkCreated(msgCtx, event, log) //nolint:contextcheck // metadata handling depends on message context
131+
// Create span for event processing
132+
ctx, span := otel.Tracer("metadata.mq").Start(msgCtx, "metadata.mq.handle_link_created",
133+
trace.WithSpanKind(trace.SpanKindConsumer),
134+
)
135+
defer span.End()
136+
137+
span.SetAttributes(
138+
attribute.String("messaging.system", "kafka"),
139+
attribute.String("messaging.destination", linkCreatedEvent),
140+
attribute.String("messaging.destination_kind", "topic"),
141+
attribute.String("messaging.message_id", msg.UUID),
142+
attribute.String("messaging.operation", "receive"),
143+
attribute.String("event.type", linkCreatedEvent),
144+
attribute.String("link.hash", event.GetHash()),
145+
attribute.String("link.url", event.GetUrl()),
146+
)
147+
148+
handleErr := e.handleLinkCreated(ctx, event, log) //nolint:contextcheck // metadata handling depends on message context
95149
if handleErr != nil {
150+
span.RecordError(handleErr)
151+
span.SetStatus(otelcodes.Error, handleErr.Error())
96152
var domainErr *domainerrors.Error
97153
if errors.As(handleErr, &domainErr) {
98154
dto := infraerrors.FromDomainError("metadata.mq.link_created", domainErr)
155+
span.SetAttributes(
156+
attribute.String("error.code", dto.Code),
157+
attribute.Bool("error.retryable", dto.Retryable),
158+
)
99159
log.ErrorWithContext(msgCtx, "Failed to handle link created event - nacking for Kafka DLQ",
100160
slog.String("error_code", dto.Code),
101161
slog.String("topic", linkCreatedEvent),
102162
slog.Bool("retryable", dto.Retryable),
103163
slog.String("message", dto.Message),
104164
)
105165
} else {
166+
span.SetAttributes(
167+
attribute.Bool("error.retryable", true),
168+
)
106169
log.ErrorWithContext(msgCtx, "Failed to handle link created event - nacking for Kafka DLQ",
107170
slog.String("error", handleErr.Error()),
108171
slog.String("topic", linkCreatedEvent),
@@ -115,6 +178,7 @@ func (e *Event) SubscribeLinkCreated(ctx context.Context, log logger.Logger, reg
115178
continue
116179
}
117180

181+
span.SetStatus(otelcodes.Ok, "Link created event processed successfully")
118182
msg.Ack()
119183
}
120184
}(ctx)
@@ -133,9 +197,23 @@ func (e *Event) handleLinkCreated(ctx context.Context, event *linkpb.LinkCreated
133197

134198
linkHash := event.GetHash()
135199

200+
// Note: metadata.uc.Add already creates spans for saga steps internally
201+
// This span wraps the entire metadata processing operation
202+
ctx, span := otel.Tracer("metadata.uc").Start(ctx, "metadata.process",
203+
trace.WithSpanKind(trace.SpanKindInternal),
204+
)
205+
defer span.End()
206+
207+
span.SetAttributes(
208+
attribute.String("link.url", linkURL),
209+
attribute.String("link.hash", linkHash),
210+
)
211+
136212
// Process metadata for the link URL
137213
_, err := e.metadataUC.Add(ctx, linkURL)
138214
if err != nil {
215+
span.RecordError(err)
216+
span.SetStatus(otelcodes.Error, err.Error())
139217
log.ErrorWithContext(ctx, "Error processing metadata for link",
140218
slog.String("error", err.Error()),
141219
slog.String("url", linkURL),
@@ -145,6 +223,7 @@ func (e *Event) handleLinkCreated(ctx context.Context, event *linkpb.LinkCreated
145223
return err
146224
}
147225

226+
span.SetStatus(otelcodes.Ok, "Metadata processed successfully")
148227
log.InfoWithContext(ctx, "Successfully processed metadata for link",
149228
slog.String("url", linkURL),
150229
slog.String("hash", linkHash),

boundaries/metadata/internal/usecases/metadata/metadata.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"github.com/shortlink-org/go-sdk/cqrs/bus"
99
"github.com/shortlink-org/go-sdk/logger"
1010
"github.com/shortlink-org/go-sdk/saga"
11+
"go.opentelemetry.io/otel"
12+
"go.opentelemetry.io/otel/attribute"
13+
otelcodes "go.opentelemetry.io/otel/codes"
14+
"go.opentelemetry.io/otel/trace"
1115
"google.golang.org/protobuf/types/known/timestamppb"
1216

1317
"github.com/shortlink-org/shortlink/boundaries/metadata/internal/domain"
@@ -88,12 +92,29 @@ func (uc *UC) Add(ctx context.Context, linkURL string) (*v1.Meta, error) { //nol
8892

8993
_, errs = sagaSetMetadata.AddStep(SAGA_STEP_ADD_META).
9094
Then(func(ctx context.Context) error {
95+
ctx, span := otel.Tracer("metadata.uc.parser").Start(ctx, "saga: SAGA_STEP_ADD_META",
96+
trace.WithSpanKind(trace.SpanKindInternal),
97+
)
98+
defer span.End()
99+
100+
span.SetAttributes(
101+
attribute.String("step", SAGA_STEP_ADD_META),
102+
attribute.String("status", "run"),
103+
attribute.String("link.url", linkURL),
104+
)
105+
91106
m, stepErr := uc.parserUC.Set(ctx, linkURL)
92107
if stepErr != nil {
108+
span.RecordError(stepErr)
109+
span.SetStatus(otelcodes.Error, stepErr.Error())
93110
return domainerrors.Normalize(OpParserSet, stepErr)
94111
}
95112

96113
meta = m
114+
span.SetAttributes(
115+
attribute.String("meta.id", m.GetId()),
116+
)
117+
span.SetStatus(otelcodes.Ok, "Metadata parsed successfully")
97118

98119
return nil
99120
}).Build()
@@ -104,11 +125,25 @@ func (uc *UC) Add(ctx context.Context, linkURL string) (*v1.Meta, error) { //nol
104125

105126
_, errs = sagaSetMetadata.AddStep(SAGA_STEP_ADD_SCREENSHOT).
106127
Then(func(ctx context.Context) error {
128+
ctx, span := otel.Tracer("metadata.uc.screenshot").Start(ctx, "saga: SAGA_STEP_ADD_SCREENSHOT",
129+
trace.WithSpanKind(trace.SpanKindInternal),
130+
)
131+
defer span.End()
132+
133+
span.SetAttributes(
134+
attribute.String("step", SAGA_STEP_ADD_SCREENSHOT),
135+
attribute.String("status", "run"),
136+
attribute.String("link.url", linkURL),
137+
)
138+
107139
stepErr := uc.screenshotUC.Set(ctx, linkURL)
108140
if stepErr != nil {
141+
span.RecordError(stepErr)
142+
span.SetStatus(otelcodes.Error, stepErr.Error())
109143
return domainerrors.Normalize(OpScreenshotSet, stepErr)
110144
}
111145

146+
span.SetStatus(otelcodes.Ok, "Screenshot processing started successfully")
112147
return nil
113148
}).Build()
114149

@@ -119,19 +154,38 @@ func (uc *UC) Add(ctx context.Context, linkURL string) (*v1.Meta, error) { //nol
119154
_, errs = sagaSetMetadata.AddStep(SAGA_STEP_GET_SCREENSHOT_URL).
120155
Needs(SAGA_STEP_ADD_SCREENSHOT, SAGA_STEP_ADD_META).
121156
Then(func(ctx context.Context) error {
157+
ctx, span := otel.Tracer("metadata.uc.screenshot").Start(ctx, "saga: SAGA_STEP_GET_SCREENSHOT_URL",
158+
trace.WithSpanKind(trace.SpanKindInternal),
159+
)
160+
defer span.End()
161+
162+
span.SetAttributes(
163+
attribute.String("step", SAGA_STEP_GET_SCREENSHOT_URL),
164+
attribute.String("status", "run"),
165+
attribute.String("link.url", linkURL),
166+
)
167+
122168
// Try to get screenshot URL, but don't fail if screenshot is not available yet
123169
url, stepErr := uc.screenshotUC.Get(ctx, linkURL)
124170
if stepErr != nil {
125171
// Log warning but continue without screenshot URL
172+
span.AddEvent("Screenshot URL not available yet, continuing without it")
173+
span.SetAttributes(attribute.Bool("screenshot.available", false))
126174
uc.log.WarnWithContext(ctx, "Failed to get screenshot URL, continuing without it",
127175
slog.String("error", stepErr.Error()),
128176
slog.String("url", linkURL),
129177
)
130178

179+
span.SetStatus(otelcodes.Ok, "Continuing without screenshot URL")
131180
return nil // Continue saga execution even if screenshot URL is not available
132181
}
133182

134183
meta.ImageUrl = url.String()
184+
span.SetAttributes(
185+
attribute.String("screenshot.url", url.String()),
186+
attribute.Bool("screenshot.available", true),
187+
)
188+
span.SetStatus(otelcodes.Ok, "Screenshot URL retrieved successfully")
135189

136190
return nil
137191
}).Build()
@@ -143,10 +197,24 @@ func (uc *UC) Add(ctx context.Context, linkURL string) (*v1.Meta, error) { //nol
143197
_, errs = sagaSetMetadata.AddStep(SAGA_STEP_UPDATE_META).
144198
Needs(SAGA_STEP_GET_SCREENSHOT_URL).
145199
Then(func(ctx context.Context) error {
200+
ctx, span := otel.Tracer("metadata.uc.store").Start(ctx, "saga: SAGA_STEP_UPDATE_META",
201+
trace.WithSpanKind(trace.SpanKindInternal),
202+
)
203+
defer span.End()
204+
205+
span.SetAttributes(
206+
attribute.String("step", SAGA_STEP_UPDATE_META),
207+
attribute.String("status", "run"),
208+
attribute.String("link.url", linkURL),
209+
attribute.String("meta.id", meta.GetId()),
210+
)
211+
146212
// Update meta in store with ImageUrl after screenshot URL is retrieved (or without it if screenshot failed)
147213
// This ensures meta is always persisted with the latest state
148214
storeErr := uc.parserUC.MetaStore.Store.Add(ctx, meta)
149215
if storeErr != nil {
216+
span.RecordError(storeErr)
217+
span.SetStatus(otelcodes.Error, storeErr.Error())
150218
uc.log.ErrorWithContext(ctx, "Failed to update meta in store",
151219
slog.String("error", storeErr.Error()),
152220
slog.String("url", linkURL),
@@ -155,6 +223,10 @@ func (uc *UC) Add(ctx context.Context, linkURL string) (*v1.Meta, error) { //nol
155223
return domainerrors.Normalize(OpStoreUpdate, storeErr)
156224
}
157225

226+
span.SetAttributes(
227+
attribute.String("meta.image_url", meta.GetImageUrl()),
228+
)
229+
span.SetStatus(otelcodes.Ok, "Meta updated in store successfully")
158230
uc.log.InfoWithContext(ctx, "Meta updated in store",
159231
slog.String("url", linkURL),
160232
slog.String("image_url", meta.GetImageUrl()),
@@ -174,6 +246,21 @@ func (uc *UC) Add(ctx context.Context, linkURL string) (*v1.Meta, error) { //nol
174246

175247
// Publish MetadataExtracted event using EventBus (canonical name: metadata.metadata.extracted.v1)
176248
// Published after saga completion to ensure all enrichment (including screenshot) is complete
249+
ctx, span := otel.Tracer("metadata.uc.event").Start(ctx, "metadata.uc.publish_metadata_extracted",
250+
trace.WithSpanKind(trace.SpanKindProducer),
251+
)
252+
defer span.End()
253+
254+
span.SetAttributes(
255+
attribute.String("messaging.system", "kafka"),
256+
attribute.String("messaging.destination", domain.MetadataExtractedTopic),
257+
attribute.String("messaging.destination_kind", "topic"),
258+
attribute.String("messaging.operation", "publish"),
259+
attribute.String("event.type", domain.MetadataExtractedTopic),
260+
attribute.String("link.url", linkURL),
261+
attribute.String("meta.id", meta.GetId()),
262+
)
263+
177264
event := &v1.MetadataExtracted{
178265
Id: meta.GetId(),
179266
ImageUrl: meta.GetImageUrl(),
@@ -183,13 +270,19 @@ func (uc *UC) Add(ctx context.Context, linkURL string) (*v1.Meta, error) { //nol
183270
}
184271

185272
if err := uc.eventBus.Publish(ctx, event); err != nil {
273+
span.RecordError(err)
274+
span.SetStatus(otelcodes.Error, err.Error())
186275
uc.log.ErrorWithContext(ctx, "Failed to publish metadata extracted event",
187276
slog.String("error", err.Error()),
188277
slog.String("event_type", domain.MetadataExtractedTopic),
189278
slog.String("url", linkURL),
190279
)
191280
// Don't fail the operation if event publishing fails
192281
} else {
282+
span.SetAttributes(
283+
attribute.String("event_type", domain.MetadataExtractedTopic),
284+
)
285+
span.SetStatus(otelcodes.Ok, "Metadata extracted event published successfully")
193286
uc.log.InfoWithContext(ctx, "Metadata extracted event published successfully",
194287
slog.String("event_type", domain.MetadataExtractedTopic),
195288
slog.String("url", linkURL),

0 commit comments

Comments
 (0)