Skip to content

Commit b2e6ba7

Browse files
rarguelloFdarccio
andauthored
chore(llmobs): add telemetry to track llmobs usage (#4088)
Co-authored-by: darccio <[email protected]>
1 parent 5ef8e9b commit b2e6ba7

File tree

8 files changed

+316
-23
lines changed

8 files changed

+316
-23
lines changed

instrumentation/testutils/testutils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
1717
"github.com/DataDog/dd-trace-go/v2/internal/normalizer"
1818
"github.com/DataDog/dd-trace-go/v2/internal/statsdtest"
19+
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"
1920
)
2021

2122
func SetGlobalServiceName(t *testing.T, val string) {
@@ -116,3 +117,10 @@ func SetPropagatingTag(t testing.TB, ctx *tracer.SpanContext, k, v string) {
116117
cc := (*cookieCutter)(*(*unsafe.Pointer)(unsafe.Pointer(&ptr)))
117118
cc.trace.propagatingTags[k] = v
118119
}
120+
121+
// FlushTelemetry flushes any pending telemetry data.
122+
func FlushTelemetry() {
123+
if client := telemetry.GlobalClient(); client != nil {
124+
client.Flush()
125+
}
126+
}

internal/llmobs/llmobs.go

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ var (
3737
errAgentlessRequiresAPIKey = errors.New("LLMOBs agentless mode requires a valid API key - set the DD_API_KEY env variable to configure one")
3838
errMLAppRequired = errors.New("ML App is required for sending LLM Observability data")
3939
errAgentModeNotSupported = errors.New("DD_LLMOBS_AGENTLESS_ENABLED has been configured to false but the agent is not available or does not support LLMObs")
40+
errInvalidMetricLabel = errors.New("label is required for evaluation metrics")
41+
errFinishedSpan = errors.New("span is already finished")
42+
errEvalJoinBothPresent = errors.New("provide either span/trace IDs or tag key/value, not both")
43+
errEvalJoinNonePresent = errors.New("must provide either span/trace IDs or tag key/value for joining")
44+
errInvalidSpanJoin = errors.New("both span and trace IDs are required for span-based joining")
45+
errInvalidTagJoin = errors.New("both tag key and value are required for tag-based joining")
4046
)
4147

4248
const (
@@ -182,7 +188,11 @@ func newLLMObs(cfg *config.Config, tracer Tracer) (*LLMObs, error) {
182188

183189
// Start starts the global LLMObs instance with the given configuration and tracer.
184190
// Returns an error if LLMObs is already running or if configuration is invalid.
185-
func Start(cfg config.Config, tracer Tracer) error {
191+
func Start(cfg config.Config, tracer Tracer) (err error) {
192+
startTime := time.Now()
193+
defer func() {
194+
trackLLMObsStart(startTime, err, cfg)
195+
}()
186196
mu.Lock()
187197
defer mu.Unlock()
188198

@@ -224,6 +234,7 @@ func ActiveLLMObs() (*LLMObs, error) {
224234
func Flush() {
225235
if activeLLMObs != nil {
226236
activeLLMObs.Flush()
237+
trackUserFlush()
227238
}
228239
}
229240

@@ -383,6 +394,7 @@ func (l *LLMObs) batchSend(params batchSendParams) {
383394
}
384395
if err := l.Transport.PushSpanEvents(ctx, events); err != nil {
385396
log.Error("llmobs: failed to push span events: %v", err.Error())
397+
trackDroppedPayload(len(events), telemetryMetricDroppedSpanEvents, "transport_error")
386398
} else {
387399
log.Debug("llmobs: push span events success")
388400
}
@@ -403,6 +415,7 @@ func (l *LLMObs) batchSend(params batchSendParams) {
403415
}
404416
if err := l.Transport.PushEvalMetrics(ctx, metrics); err != nil {
405417
log.Error("llmobs: failed to push eval metrics: %v", err.Error())
418+
trackDroppedPayload(len(metrics), telemetryMetricDroppedEvalEvents, "transport_error")
406419
} else {
407420
log.Debug("llmobs: push eval metrics success")
408421
}
@@ -583,20 +596,34 @@ func (l *LLMObs) llmobsSpanEvent(span *Span) *transport.LLMObsSpanEvent {
583596
Scope: span.scope,
584597
}
585598
if b, err := json.Marshal(ev); err == nil {
586-
if len(b) > sizeLimitEVPEvent {
599+
rawSize := len(b)
600+
trackSpanEventRawSize(ev, rawSize)
601+
602+
truncated := false
603+
if rawSize > sizeLimitEVPEvent {
587604
log.Warn(
588605
"llmobs: dropping llmobs span event input/output because its size (%s) exceeds the event size limit (5MB)",
589-
readableBytes(len(b)),
606+
readableBytes(rawSize),
590607
)
591-
dropSpanEventIO(ev)
608+
truncated = dropSpanEventIO(ev)
609+
if !truncated {
610+
log.Debug("llmobs: attempted to drop span event IO but it was not present")
611+
}
612+
}
613+
actualSize := rawSize
614+
if truncated {
615+
if b, err := json.Marshal(ev); err == nil {
616+
actualSize = len(b)
617+
}
592618
}
619+
trackSpanEventSize(ev, actualSize, truncated)
593620
}
594621
return ev
595622
}
596623

597-
func dropSpanEventIO(ev *transport.LLMObsSpanEvent) {
624+
func dropSpanEventIO(ev *transport.LLMObsSpanEvent) bool {
598625
if ev == nil {
599-
return
626+
return false
600627
}
601628
droppedIO := false
602629
if _, ok := ev.Meta["input"]; ok {
@@ -612,11 +639,14 @@ func dropSpanEventIO(ev *transport.LLMObsSpanEvent) {
612639
} else {
613640
log.Debug("llmobs: attempted to drop span event IO but it was not present")
614641
}
642+
return droppedIO
615643
}
616644

617645
// StartSpan starts a new LLMObs span with the given kind, name, and configuration.
618646
// Returns the created span and a context containing the span.
619647
func (l *LLMObs) StartSpan(ctx context.Context, kind SpanKind, name string, cfg StartSpanConfig) (*Span, context.Context) {
648+
defer trackSpanStarted()
649+
620650
spanName := name
621651
if spanName == "" {
622652
spanName = string(kind)
@@ -692,20 +722,38 @@ func (l *LLMObs) StartExperimentSpan(ctx context.Context, name string, experimen
692722

693723
// SubmitEvaluation submits an evaluation metric for a span.
694724
// The span can be identified either by span/trace IDs or by tag key-value pairs.
695-
func (l *LLMObs) SubmitEvaluation(cfg EvaluationConfig) error {
696-
// Validate exactly one join method is provided
697-
hasSpanJoin := cfg.SpanID != "" && cfg.TraceID != ""
698-
hasTagJoin := cfg.TagKey != "" && cfg.TagValue != ""
725+
func (l *LLMObs) SubmitEvaluation(cfg EvaluationConfig) (err error) {
726+
var metric *transport.LLMObsMetric
727+
defer func() {
728+
trackSubmitEvaluationMetric(metric, err)
729+
}()
699730

731+
if cfg.Label == "" {
732+
return errInvalidMetricLabel
733+
}
734+
var (
735+
hasTagJoin bool
736+
hasSpanJoin bool
737+
)
738+
if cfg.SpanID != "" || cfg.TraceID != "" {
739+
if !(cfg.SpanID != "" && cfg.TraceID != "") {
740+
return errInvalidSpanJoin
741+
}
742+
hasSpanJoin = true
743+
}
744+
if cfg.TagKey != "" || cfg.TagValue != "" {
745+
if !(cfg.TagKey != "" && cfg.TagValue != "") {
746+
return errInvalidTagJoin
747+
}
748+
hasTagJoin = true
749+
}
700750
if hasSpanJoin && hasTagJoin {
701-
return errors.New("provide either span/trace IDs or tag key/value, not both")
751+
return errEvalJoinBothPresent
702752
}
703753
if !hasSpanJoin && !hasTagJoin {
704-
return errors.New("must provide either span/trace IDs or tag key/value for joining")
705-
}
706-
if cfg.Label == "" {
707-
return errors.New("label is required for evaluation metrics")
754+
return errEvalJoinNonePresent
708755
}
756+
709757
numValues := 0
710758
if cfg.CategoricalValue != nil {
711759
numValues++
@@ -751,7 +799,7 @@ func (l *LLMObs) SubmitEvaluation(cfg EvaluationConfig) error {
751799
}
752800
tags = append(tags, fmt.Sprintf("ddtrace.version:%s", version.Tag))
753801

754-
metric := &transport.LLMObsMetric{
802+
metric = &transport.LLMObsMetric{
755803
JoinOn: joinOn,
756804
Label: cfg.Label,
757805
MLApp: mlApp,

internal/llmobs/llmobs_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1704,7 +1704,7 @@ func TestLLMObsLifecycle(t *testing.T) {
17041704
})
17051705
}
17061706

1707-
func BenchmarkStartSpan(b *testing.B) {
1707+
func BenchmarkLLMObsStartSpan(b *testing.B) {
17081708
run := func(b *testing.B, ll *llmobs.LLMObs, tt *testtracer.TestTracer, done chan struct{}) {
17091709
b.Log("starting benchmark")
17101710

internal/llmobs/span.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,9 @@ func (s *Span) Finish(cfg FinishSpanConfig) {
283283
log.Debug("llmobs: attempted to finish an already finished span")
284284
return
285285
}
286+
defer func() {
287+
trackSpanFinished(s)
288+
}()
286289

287290
if cfg.FinishTime.IsZero() {
288291
cfg.FinishTime = time.Now()
@@ -303,17 +306,23 @@ func (s *Span) Finish(cfg FinishSpanConfig) {
303306
}
304307
l.submitLLMObsSpan(s)
305308
s.finished = true
306-
307-
//TODO: telemetry.record_span_created(span)
308309
}
309310

310311
// Annotate adds annotations to the span using the provided SpanAnnotations.
311312
func (s *Span) Annotate(a SpanAnnotations) {
312313
s.mu.Lock()
313314
defer s.mu.Unlock()
314315

316+
var err error
317+
defer func() {
318+
if err != nil {
319+
log.Warn("llmobs: failed to annotate span: %v", err.Error())
320+
}
321+
trackSpanAnnotations(s, err)
322+
}()
323+
315324
if s.finished {
316-
log.Warn("llmobs: cannot annotate a finished span")
325+
err = errFinishedSpan
317326
return
318327
}
319328

0 commit comments

Comments
 (0)