Skip to content

Commit 53976be

Browse files
committed
update OnProduceRecordBuffered
1 parent 8d675a1 commit 53976be

File tree

1 file changed

+2
-30
lines changed

1 file changed

+2
-30
lines changed

contrib/twmb/franz-go/kgo.go

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -81,36 +81,8 @@ func (c *Client) PollRecords(ctx context.Context, maxPollRecords int) kgo.Fetche
8181
}
8282

8383
func (c *Client) OnProduceRecordBuffered(r *kgo.Record) {
84-
opts := []tracer.StartSpanOption{
85-
tracer.ServiceName("producer-service"), // TODO: from config in finished contrib
86-
tracer.ResourceName("Produce Topic " + r.Topic),
87-
tracer.SpanType(ext.SpanTypeMessageProducer),
88-
// tracer.Tag(ext.Component, componentName), // TODO: from const in dd-trace-go
89-
tracer.Tag(ext.SpanKind, ext.SpanKindProducer),
90-
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
91-
tracer.Tag(ext.MessagingDestinationName, r.Topic),
92-
}
93-
94-
// if tr.kafkaCfg.BootstrapServers != "" {
95-
// opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.kafkaCfg.BootstrapServers))
96-
// } // TODO: from config in finished contrib
97-
// if !math.IsNaN(tr.analyticsRate) {
98-
// opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate))
99-
// } // TODO: from config in finished contrib
100-
101-
span, ctx := tracer.StartSpanFromContext(r.Context, "kafka.produce", opts...) // TODO: operation name from config
102-
103-
slog.Info("Injecting span context into carrier in writer")
104-
carrier := NewKafkaHeadersCarrier(r)
105-
if err := tracer.Inject(span.Context(), carrier); err != nil {
106-
slog.Error("Failed to inject span context into carrier in writer", "error", err)
107-
// instr.Logger().Debug("contrib/twmb/franz-go: Failed to inject span context into carrier in writer, %s", err.Error())
108-
}
109-
110-
// Store the span in the record's context so we can finish it later
111-
r.Context = ctx
112-
113-
slog.Info("OnProduceRecordBuffered done")
84+
span := c.tracer.StartProduceSpan(r.Context, wrapRecord(r))
85+
r.Context = tracer.ContextWithSpan(r.Context, span)
11486
}
11587

11688
// OnProduceRecordUnbuffered is called when a record has been sent and ack'd by the broker.

0 commit comments

Comments
 (0)