Skip to content

Commit da3ccfb

Browse files
authored
event trace propagation (#802)
1 parent 0b57536 commit da3ccfb

File tree

7 files changed

+82
-19
lines changed

7 files changed

+82
-19
lines changed

libs/hwes/event.go

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,31 @@ import (
1616
)
1717

1818
type Event struct {
19-
EventID uuid.UUID
20-
EventType string
21-
AggregateID uuid.UUID
22-
AggregateType AggregateType
23-
Data []byte
24-
Timestamp time.Time
25-
Version uint64
19+
// identifier of this event
20+
EventID uuid.UUID
21+
// constant which can be used to route this event to its handler(s)
22+
EventType string
23+
// identifier of the aggregate that this event has an effect on
24+
AggregateID uuid.UUID
25+
// type of the aggregate that this event has an effect on
26+
AggregateType AggregateType
27+
// payload of the event, must be json
28+
Data []byte
29+
// time of event creation
30+
Timestamp time.Time
31+
// event's revision number (event's can, but should not be modified)
32+
Version uint64
33+
// user responsible for this event
2634
CommitterUserID *uuid.UUID
35+
// w3c trace context
36+
TraceParent string
2737
}
2838

2939
type metadata struct {
3040
// CommitterUserID represents an optional UUID that identifies the user that is directly responsible for this event
3141
CommitterUserID string `json:"committer_user_id"`
42+
// w3c trace context
43+
TraceParent string `json:"trace_parent"`
3244
// The Timestamp represents the time when the event was created. Using the built-in eventstoreDB timestamp is discouraged.
3345
Timestamp time.Time `json:"timestamp"`
3446
}
@@ -39,6 +51,7 @@ type EventOption func(*Event) error
3951
// WithContext applies SetCommitterFromCtx after construction
4052
func WithContext(ctx context.Context) EventOption {
4153
return func(event *Event) error {
54+
event.SetTracingContextFromCtx(ctx)
4255
return event.SetCommitterFromCtx(ctx)
4356
}
4457
}
@@ -129,6 +142,7 @@ func NewEventFromRecordedEvent(esdbEvent *esdb.RecordedEvent) (Event, error) {
129142
Timestamp: md.Timestamp,
130143
Version: esdbEvent.EventNumber,
131144
CommitterUserID: nil,
145+
TraceParent: md.TraceParent,
132146
}
133147

134148
eventCommitterUserID, err := uuid.Parse(md.CommitterUserID)
@@ -163,6 +177,7 @@ func (e *Event) GetVersion() uint64 {
163177

164178
func (e *Event) ToEventData() (esdb.EventData, error) {
165179
md := metadata{
180+
TraceParent: e.TraceParent,
166181
Timestamp: e.Timestamp,
167182
}
168183
if e.CommitterUserID != nil {
@@ -221,8 +236,9 @@ func (e *Event) SetCommitterFromCtx(ctx context.Context) error {
221236

222237
userID, err := common.GetUserID(ctx)
223238
if err != nil {
224-
return err
239+
return nil // don't set a user, if no user is available
225240
}
241+
226242
e.CommitterUserID = &userID
227243

228244
// Just to make sure we are actually dealing with a valid UUID
@@ -234,16 +250,25 @@ func (e *Event) SetCommitterFromCtx(ctx context.Context) error {
234250
return nil
235251
}
236252

253+
// SetTracingContextFromCtx propagates the currently active span in the context
254+
func (e *Event) SetTracingContextFromCtx(ctx context.Context) {
255+
ctx, span, _ := telemetry.StartSpan(ctx, "SetTracingContextFromCtx")
256+
e.TraceParent = telemetry.TraceParent(ctx)
257+
span.End()
258+
}
259+
237260
// GetZerologDict to enrich some logs
238261
//
239262
// Example:
240263
//
241264
// zerolog.Ctx(ctx).Debug().Dict("event", event.GetZerologDict()).Msg("process event")
242265
func (e *Event) GetZerologDict() *zerolog.Event {
266+
243267
dict := zerolog.Dict().
244268
Str("eventId", e.EventID.String()).
245269
Str("eventType", e.EventType).
246-
Uint64("eventVersion", e.Version)
270+
Uint64("eventVersion", e.Version).
271+
Str("traceParent", e.TraceParent)
247272

248273
if e.CommitterUserID != nil {
249274
dict.Str("committerUserID", e.CommitterUserID.String())

libs/hwes/eventstoredb/projections/custom/custom.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ func (p *CustomProjection) Subscribe(ctx context.Context) error {
165165
// and calls the according event handler based on the received event
166166
// This function blocks the thread until the passed context gets canceled
167167
func (p *CustomProjection) processReceivedEventFromStream(ctx context.Context, stream *esdb.PersistentSubscription, esdbEvent *esdb.PersistentSubscriptionEvent) error {
168-
// TODO: Connect with source trace?
169168
ctx, span, log := telemetry.StartSpan(ctx, "custom_projection.processReceivedEventFromStream")
170169
defer span.End()
171170

@@ -193,6 +192,27 @@ func (p *CustomProjection) processReceivedEventFromStream(ctx context.Context, s
193192
return nil
194193
}
195194

195+
//
196+
// change tracing spans
197+
//
198+
if event.TraceParent != "" {
199+
ctx = telemetry.FromTraceParent(ctx, event.TraceParent)
200+
201+
// end old span, start new one
202+
span.End()
203+
ctx, span, log = telemetry.StartSpan(ctx, "custom_projection.processReceivedEventFromStream")
204+
defer span.End()
205+
206+
// set attributes
207+
log = log.With().
208+
Str("subscription_group_name", p.subscriptionGroupName).
209+
Str("esdbEventID", event.EventID.String()).
210+
Logger()
211+
212+
telemetry.SetSpanStr(ctx, "subscription_group_name", p.subscriptionGroupName)
213+
telemetry.SetSpanStr(ctx, "esdbEventID", event.EventID.String())
214+
}
215+
196216
log.Debug().Dict("event", event.GetZerologDict()).Msg("process event")
197217

198218
err, nackAction := p.HandleEvent(ctx, event)

libs/telemetry/otel.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"go.opentelemetry.io/otel"
88
"go.opentelemetry.io/otel/attribute"
99
"go.opentelemetry.io/otel/codes"
10+
"go.opentelemetry.io/otel/propagation"
1011
"go.opentelemetry.io/otel/trace"
1112
"reflect"
1213
)
@@ -56,6 +57,21 @@ func SetSpanBool(ctx context.Context, key string, value bool) {
5657
SetSpanAttributes(ctx, attribute.Bool(key, value))
5758
}
5859

60+
// TraceParent returns the w3c Trace Context traceparent header for a SpanContext, and if the span context is valid
61+
func TraceParent(ctx context.Context) string {
62+
propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
63+
carrier := propagation.MapCarrier{}
64+
propagator.Inject(ctx, carrier)
65+
return carrier["traceparent"]
66+
}
67+
68+
// FromTraceParent yields a new context using a propagated w3c traceparent header
69+
func FromTraceParent(ctx context.Context, traceparent string) context.Context {
70+
propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
71+
carrier := propagation.MapCarrier{"traceparent": traceparent}
72+
return propagator.Extract(ctx, carrier)
73+
}
74+
5975
// zerologTraceHook calls addSpanIdToLogEvent and TODO for log events
6076
func zerologTraceHook() zerolog.HookFunc {
6177
return func(event *zerolog.Event, level zerolog.Level, message string) {

services/property-svc/internal/property-set/aggregate/actions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
func (a *PropertySetAggregate) CreatePropertySet(ctx context.Context, name string) error {
99
id := a.GetID()
1010

11-
event, err := propertyEventsV1.NewPropertySetCreatedEvent(a, id, name)
11+
event, err := propertyEventsV1.NewPropertySetCreatedEvent(ctx, a, id, name)
1212
if err != nil {
1313
return err
1414
}

services/property-svc/internal/property-set/events/v1/events.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package v1
22

33
import (
4+
"context"
45
"github.com/google/uuid"
56
"hwes"
67
)
@@ -14,10 +15,10 @@ type PropertySetCreatedEvent struct {
1415
Name string `json:"name"`
1516
}
1617

17-
func NewPropertySetCreatedEvent(a hwes.Aggregate, id uuid.UUID, name string) (hwes.Event, error) {
18+
func NewPropertySetCreatedEvent(ctx context.Context, a hwes.Aggregate, id uuid.UUID, name string) (hwes.Event, error) {
1819
payload := PropertySetCreatedEvent{
1920
ID: id.String(),
2021
Name: name,
2122
}
22-
return hwes.NewEvent(a, PropertySetCreated, hwes.WithData(payload))
23+
return hwes.NewEvent(a, PropertySetCreated, hwes.WithData(payload), hwes.WithContext(ctx))
2324
}

services/property-svc/internal/property-value/aggregate/actions.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ import (
99
func (a *PropertyValueAggregate) CreatePropertyValue(ctx context.Context, propertyID uuid.UUID, value interface{}, subjectID uuid.UUID) error {
1010
id := a.GetID()
1111

12-
event, err := propertyEventsV1.NewPropertyValueCreatedEvent(a, id, propertyID, value, subjectID)
12+
event, err := propertyEventsV1.NewPropertyValueCreatedEvent(ctx, a, id, propertyID, value, subjectID)
1313
if err != nil {
1414
return err
1515
}
1616
return a.Apply(event)
1717
}
1818

1919
func (a *PropertyValueAggregate) UpdatePropertyValue(ctx context.Context, value interface{}) error {
20-
event, err := propertyEventsV1.NewPropertyValueUpdatedEvent(a, value)
20+
event, err := propertyEventsV1.NewPropertyValueUpdatedEvent(ctx, a, value)
2121
if err != nil {
2222
return err
2323
}

services/property-svc/internal/property-value/events/v1/events.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package v1
22

33
import (
4+
"context"
45
"github.com/google/uuid"
56
"hwes"
67
)
@@ -17,23 +18,23 @@ type PropertyValueCreatedEvent struct {
1718
SubjectID string `json:"subject_id"`
1819
}
1920

20-
func NewPropertyValueCreatedEvent(a hwes.Aggregate, id uuid.UUID, propertyID uuid.UUID, value interface{}, subjectID uuid.UUID) (hwes.Event, error) {
21+
func NewPropertyValueCreatedEvent(ctx context.Context, a hwes.Aggregate, id uuid.UUID, propertyID uuid.UUID, value interface{}, subjectID uuid.UUID) (hwes.Event, error) {
2122
payload := PropertyValueCreatedEvent{
2223
ID: id.String(),
2324
PropertyID: propertyID.String(),
2425
Value: value,
2526
SubjectID: subjectID.String(),
2627
}
27-
return hwes.NewEvent(a, PropertyValueCreated, hwes.WithData(payload))
28+
return hwes.NewEvent(a, PropertyValueCreated, hwes.WithData(payload), hwes.WithContext(ctx))
2829
}
2930

3031
type PropertyValueUpdatedEvent struct {
3132
Value interface{} `json:"value"`
3233
}
3334

34-
func NewPropertyValueUpdatedEvent(a hwes.Aggregate, value interface{}) (hwes.Event, error) {
35+
func NewPropertyValueUpdatedEvent(ctx context.Context, a hwes.Aggregate, value interface{}) (hwes.Event, error) {
3536
payload := PropertyValueUpdatedEvent{
3637
Value: value,
3738
}
38-
return hwes.NewEvent(a, PropertyValueUpdated, hwes.WithData(payload))
39+
return hwes.NewEvent(a, PropertyValueUpdated, hwes.WithData(payload), hwes.WithContext(ctx))
3940
}

0 commit comments

Comments
 (0)