Skip to content

Commit 21794b6

Browse files
committed
add more spans
Signed-off-by: sallyom <[email protected]>
1 parent 447de3b commit 21794b6

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed

pkg/kvcache/indexer.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,19 @@ func NewKVCacheIndexer(ctx context.Context, config *Config) (*Indexer, error) {
101101

102102
// Run starts the indexer.
103103
func (k *Indexer) Run(ctx context.Context) {
104+
// Create a span for indexer startup - this will always be generated
105+
tracer := otel.GetTracerProvider().Tracer("llm-d-kv-cache-manager")
106+
ctx, span := tracer.Start(ctx, "kv_cache_manager.indexer_startup")
107+
defer span.End()
108+
109+
span.SetAttributes(
110+
attribute.String("component", "llm-d-kv-cache-manager"),
111+
attribute.String("operation", "indexer_run"),
112+
attribute.String("service.name", "llm-d-kv-cache-manager"),
113+
)
114+
104115
k.tokenizersPool.Run(ctx)
116+
span.SetAttributes(attribute.String("operation", "tokenizers_pool_started"))
105117
}
106118

107119
// KVBlockIndex returns the kvblock.Index used by the Indexer.
@@ -128,38 +140,76 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string,
128140
attribute.String("operation", "get_pod_scores"),
129141
attribute.String("gen_ai.request.model", modelName),
130142
attribute.Int("llm_d.kv_cache.pod_count", len(podIdentifiers)),
143+
attribute.Int("llm_d.kv_cache.prompt_length", len(prompt)),
131144
)
132145

133146
traceLogger := klog.FromContext(ctx).V(logging.TRACE).WithName("kvcache.GetPodScores")
134147
// 0. add to tokenizers pool
135148
k.tokenizersPool.AddTask(prompt, modelName)
136149

137150
// 1. get available tokens of longest prefix
151+
_, tokenSpan := tracer.Start(ctx, "kv_cache_manager.find_tokens")
152+
tokenSpan.SetAttributes(
153+
attribute.String("component", "llm-d-kv-cache-manager"),
154+
attribute.String("operation", "find_longest_contained_tokens"),
155+
)
138156
tokens := k.tokensIndexer.FindLongestContainedTokens(prompt, modelName)
139157
if len(tokens) == 0 {
158+
tokenSpan.SetAttributes(attribute.Int("tokens.found_count", 0))
159+
tokenSpan.End()
140160
//nolint:nilnil // no need to return an error
141161
return nil, nil
142162
}
163+
tokenSpan.SetAttributes(attribute.Int("tokens.found_count", len(tokens)))
164+
tokenSpan.End()
143165

144166
// 2. get block keys
167+
_, blockSpan := tracer.Start(ctx, "kv_cache_manager.tokens_to_block_keys")
168+
blockSpan.SetAttributes(
169+
attribute.String("component", "llm-d-kv-cache-manager"),
170+
attribute.String("operation", "tokens_to_kv_block_keys"),
171+
attribute.Int("tokens.input_count", len(tokens)),
172+
)
145173
blockKeys := k.tokensProcessor.TokensToKVBlockKeys(tokens, modelName)
174+
blockSpan.SetAttributes(attribute.Int("block_keys.generated_count", len(blockKeys)))
175+
blockSpan.End()
146176
traceLogger.Info("found tokens", "tokens", tokens, "block-keys", blockKeys)
147177

148178
// 3. query kvblock indexer for pods
179+
_, lookupSpan := tracer.Start(ctx, "kv_cache_manager.lookup_pods")
180+
lookupSpan.SetAttributes(
181+
attribute.String("component", "llm-d-kv-cache-manager"),
182+
attribute.String("operation", "kvblock_index_lookup"),
183+
attribute.Int("block_keys.count", len(blockKeys)),
184+
)
149185
keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...))
150186
if err != nil {
187+
lookupSpan.SetAttributes(attribute.String("error", "lookup_failed"))
188+
lookupSpan.End()
151189
span.RecordError(err)
152190
return nil, fmt.Errorf("failed to query kvblock indexer: %w", err)
153191
}
192+
lookupSpan.SetAttributes(attribute.Int("lookup.result_keys_count", len(keyToPods)))
193+
lookupSpan.End()
154194
traceLogger.Info("found block keys", "block-keys", blockKeys,
155195
"pods", podsPerKeyPrintHelper(keyToPods))
156196

157197
// 4. score pods
198+
_, scoreSpan := tracer.Start(ctx, "kv_cache_manager.score_pods")
199+
scoreSpan.SetAttributes(
200+
attribute.String("component", "llm-d-kv-cache-manager"),
201+
attribute.String("operation", "kvblock_scorer_score"),
202+
attribute.Int("block_keys.count", len(blockKeys)),
203+
)
158204
podScores, err := k.kvBlockScorer.Score(blockKeys, keyToPods)
159205
if err != nil {
206+
scoreSpan.SetAttributes(attribute.String("error", "scoring_failed"))
207+
scoreSpan.End()
160208
span.RecordError(err)
161209
return nil, fmt.Errorf("failed to query kvblock scorer: %w", err)
162210
}
211+
scoreSpan.SetAttributes(attribute.Int("scoring.pod_scores_count", len(podScores)))
212+
scoreSpan.End()
163213
traceLogger.Info("found pod scores", "pod-scores", podScores)
164214

165215
// Calculate hit ratio for observability
@@ -176,6 +226,8 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string,
176226

177227
span.SetAttributes(
178228
attribute.Float64("llm_d.kv_cache.hit_ratio", hitRatio),
229+
attribute.String("operation.outcome", "success"),
230+
attribute.Int("llm_d.kv_cache.scoring_results_count", len(podScores)),
179231
)
180232

181233
return podScores, nil

0 commit comments

Comments
 (0)