@@ -28,11 +28,12 @@ import (
2828
2929// EmbeddingsProcessorFactory returns a factory method to instantiate the embeddings processor.
3030func EmbeddingsProcessorFactory (em metrics.EmbeddingsMetrics ) ProcessorFactory {
31- return func (config * processorConfig , requestHeaders map [string ]string , logger * slog.Logger , _ tracing.Tracing , isUpstreamFilter bool ) (Processor , error ) {
31+ return func (config * processorConfig , requestHeaders map [string ]string , logger * slog.Logger , tracing tracing.Tracing , isUpstreamFilter bool ) (Processor , error ) {
3232 logger = logger .With ("processor" , "embeddings" , "isUpstreamFilter" , fmt .Sprintf ("%v" , isUpstreamFilter ))
3333 if ! isUpstreamFilter {
3434 return & embeddingsProcessorRouterFilter {
3535 config : config ,
36+ tracer : tracing .EmbeddingsTracer (),
3637 requestHeaders : requestHeaders ,
3738 logger : logger ,
3839 }, nil
@@ -67,6 +68,10 @@ type embeddingsProcessorRouterFilter struct {
6768 // when the request is retried.
6869 originalRequestBody * openai.EmbeddingRequest
6970 originalRequestBodyRaw []byte
71+ // tracer is the tracer used for requests.
72+ tracer tracing.EmbeddingsTracer
73+ // span is the tracing span for this request, created in ProcessRequestBody.
74+ span tracing.EmbeddingsSpan
7075 // upstreamFilterCount is the number of upstream filters that have been processed.
7176 // This is used to determine if the request is a retry request.
7277 upstreamFilterCount int
@@ -93,7 +98,7 @@ func (e *embeddingsProcessorRouterFilter) ProcessResponseBody(ctx context.Contex
9398}
9499
95100// ProcessRequestBody implements [Processor.ProcessRequestBody].
96- func (e * embeddingsProcessorRouterFilter ) ProcessRequestBody (_ context.Context , rawBody * extprocv3.HttpBody ) (* extprocv3.ProcessingResponse , error ) {
101+ func (e * embeddingsProcessorRouterFilter ) ProcessRequestBody (ctx context.Context , rawBody * extprocv3.HttpBody ) (* extprocv3.ProcessingResponse , error ) {
97102 originalModel , body , err := parseOpenAIEmbeddingBody (rawBody )
98103 if err != nil {
99104 return nil , fmt .Errorf ("failed to parse request body: %w" , err )
@@ -110,13 +115,24 @@ func (e *embeddingsProcessorRouterFilter) ProcessRequestBody(_ context.Context,
110115 })
111116 e .originalRequestBody = body
112117 e .originalRequestBodyRaw = rawBody .Body
118+
119+ // Tracing may need to inject headers, so create a header mutation here.
120+ headerMutation := & extprocv3.HeaderMutation {
121+ SetHeaders : additionalHeaders ,
122+ }
123+ e .span = e .tracer .StartSpanAndInjectHeaders (
124+ ctx ,
125+ e .requestHeaders ,
126+ headerMutation ,
127+ body ,
128+ rawBody .Body ,
129+ )
130+
113131 return & extprocv3.ProcessingResponse {
114132 Response : & extprocv3.ProcessingResponse_RequestBody {
115133 RequestBody : & extprocv3.BodyResponse {
116134 Response : & extprocv3.CommonResponse {
117- HeaderMutation : & extprocv3.HeaderMutation {
118- SetHeaders : additionalHeaders ,
119- },
135+ HeaderMutation : headerMutation ,
120136 ClearRouteCache : true ,
121137 },
122138 },
@@ -146,13 +162,15 @@ type embeddingsProcessorUpstreamFilter struct {
146162 costs translator.LLMTokenUsage
147163 // metrics tracking.
148164 metrics metrics.EmbeddingsMetrics
165+ // span is the tracing span for this request, inherited from the router filter.
166+ span tracing.EmbeddingsSpan
149167}
150168
151169// selectTranslator selects the translator based on the output schema.
152170func (e * embeddingsProcessorUpstreamFilter ) selectTranslator (out filterapi.VersionedAPISchema ) error {
153171 switch out .Name {
154172 case filterapi .APISchemaOpenAI :
155- e .translator = translator .NewEmbeddingOpenAIToOpenAITranslator (out .Version , e .modelNameOverride )
173+ e .translator = translator .NewEmbeddingOpenAIToOpenAITranslator (out .Version , e .modelNameOverride , e . span )
156174 default :
157175 return fmt .Errorf ("unsupported API schema: backend=%s" , out )
158176 }
@@ -278,6 +296,13 @@ func (e *embeddingsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Cont
278296 if err != nil {
279297 return nil , fmt .Errorf ("failed to transform response error: %w" , err )
280298 }
299+ if e .span != nil {
300+ b := bodyMutation .GetBody ()
301+ if b == nil {
302+ b = body .Body
303+ }
304+ e .span .EndSpanOnError (code , b )
305+ }
281306 // Mark so the deferred handler records failure.
282307 recordRequestCompletionErr = true
283308 return & extprocv3.ProcessingResponse {
@@ -327,6 +352,9 @@ func (e *embeddingsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Cont
327352 }
328353 }
329354
355+ if body .EndOfStream && e .span != nil {
356+ e .span .EndSpan ()
357+ }
330358 return resp , nil
331359}
332360
@@ -345,6 +373,10 @@ func (e *embeddingsProcessorUpstreamFilter) SetBackend(ctx context.Context, b *f
345373 e .metrics .SetBackend (b )
346374 e .modelNameOverride = b .ModelNameOverride
347375 e .backendName = b .Name
376+ e .originalRequestBody = rp .originalRequestBody
377+ e .originalRequestBodyRaw = rp .originalRequestBodyRaw
378+ e .onRetry = rp .upstreamFilterCount > 1
379+ e .span = rp .span
348380 if err = e .selectTranslator (b .Schema ); err != nil {
349381 return fmt .Errorf ("failed to select translator: %w" , err )
350382 }
@@ -356,9 +388,6 @@ func (e *embeddingsProcessorUpstreamFilter) SetBackend(ctx context.Context, b *f
356388 // Update metrics with the overridden model
357389 e .metrics .SetRequestModel (e .modelNameOverride )
358390 }
359- e .originalRequestBody = rp .originalRequestBody
360- e .originalRequestBodyRaw = rp .originalRequestBodyRaw
361- e .onRetry = rp .upstreamFilterCount > 1
362391 rp .upstreamFilter = e
363392 return
364393}
0 commit comments