diff --git a/pkg/kvcache/indexer.go b/pkg/kvcache/indexer.go index ebe1e86..0eec0e8 100644 --- a/pkg/kvcache/indexer.go +++ b/pkg/kvcache/indexer.go @@ -133,7 +133,7 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string, traceLogger.Info("found tokens", "tokens", tokens, "block-keys", blockKeys) // 3. query kvblock indexer for pods - strBlockKeys, keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...)) + keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...)) if err != nil { return nil, fmt.Errorf("failed to query kvblock indexer: %w", err) } @@ -141,7 +141,7 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string, "pods", podsPerKeyPrintHelper(keyToPods)) // 4. score pods - podScores, err := k.kvBlockScorer.Score(strBlockKeys, keyToPods) + podScores, err := k.kvBlockScorer.Score(blockKeys, keyToPods) if err != nil { return nil, fmt.Errorf("failed to query kvblock scorer: %w", err) } diff --git a/pkg/kvcache/kvblock/common_test.go b/pkg/kvcache/kvblock/common_test.go index b4aa110..28488ed 100644 --- a/pkg/kvcache/kvblock/common_test.go +++ b/pkg/kvcache/kvblock/common_test.go @@ -39,9 +39,9 @@ func testAddBasic(t *testing.T, index kvblock.Index) { assert.NoError(t, err) // Lookup after add - hitKeys, podsPerKey, err := index.Lookup(t.Context(), []kvblock.Key{key}, sets.Set[string]{}) + podsPerKey, err := index.Lookup(t.Context(), []kvblock.Key{key}, sets.Set[string]{}) assert.NoError(t, err) - assert.Len(t, hitKeys, 1) - assert.Equal(t, key, hitKeys[0]) + assert.Len(t, podsPerKey, 1) + assert.Contains(t, podsPerKey, key) assert.Equal(t, podsPerKey[key], []string{"10.0.0.1", "10.0.0.2"}) } diff --git a/pkg/kvcache/kvblock/in_memory.go b/pkg/kvcache/kvblock/in_memory.go index b8508b9..4e0f7d5 100644 --- a/pkg/kvcache/kvblock/in_memory.go +++ b/pkg/kvcache/kvblock/in_memory.go @@ -89,14 +89,13 @@ type PodCache struct { // If the podIdentifierSet is empty, all pods are returned. // // It returns: -// 1. A slice of the hit keys. -// 2. A map where the keys are those in (1) and the values are pod-identifiers. -// 3. An error if any occurred during the operation. +// 1. A map where the keys are those in (1) and the values are pod-identifiers. +// 2. An error if any occurred during the operation. func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string], -) ([]Key, map[Key][]string, error) { +) (map[Key][]string, error) { if len(keys) == 0 { - return nil, nil, fmt.Errorf("no keys provided for lookup") + return nil, fmt.Errorf("no keys provided for lookup") } traceLogger := klog.FromContext(ctx).V(logging.TRACE).WithName("kvblock.InMemoryIndex.Lookup") @@ -108,7 +107,7 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key, if pods, found := m.data.Get(key); found { //nolint:nestif // TODO: can this be optimized? if pods == nil || pods.cache.Len() == 0 { traceLogger.Info("no pods found for key, cutting search", "key", key) - return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here + return podsPerKey, nil // early stop since prefix-chain breaks here } highestHitIdx = idx @@ -135,7 +134,7 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key, traceLogger.Info("lookup completed", "highest-hit-index", highestHitIdx, "pods-per-key", podsPerKeyPrintHelper(podsPerKey)) - return keys[:highestHitIdx+1], podsPerKey, nil + return podsPerKey, nil } // Add adds a set of keys and their associated pod entries to the index backend. diff --git a/pkg/kvcache/kvblock/index.go b/pkg/kvcache/kvblock/index.go index 03c390a..7930667 100644 --- a/pkg/kvcache/kvblock/index.go +++ b/pkg/kvcache/kvblock/index.go @@ -107,10 +107,9 @@ type Index interface { // If the podIdentifierSet is empty, all pods are returned. // // It returns: - // 1. A slice of the hit keys. - // 2. A map where the keys are those in (1) and the values are pod-identifiers. - // 3. An error if any occurred during the operation. - Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string]) ([]Key, map[Key][]string, error) + // 1. A map where the keys are those in (1) and the values are pod-identifiers. + // 2. An error if any occurred during the operation. + Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string]) (map[Key][]string, error) // Add adds a set of keys and their associated pod entries to the index backend. Add(ctx context.Context, keys []Key, entries []PodEntry) error // Evict removes a key and its associated pod entries from the index backend. diff --git a/pkg/kvcache/kvblock/instrumented_index.go b/pkg/kvcache/kvblock/instrumented_index.go index 064a9a3..37e09c3 100644 --- a/pkg/kvcache/kvblock/instrumented_index.go +++ b/pkg/kvcache/kvblock/instrumented_index.go @@ -34,14 +34,13 @@ func (m *instrumentedIndex) Lookup( ctx context.Context, keys []Key, podIdentifierSet sets.Set[string], -) ([]Key, map[Key][]string, error) { +) (map[Key][]string, error) { timer := prometheus.NewTimer(metrics.LookupLatency) defer timer.ObserveDuration() metrics.LookupRequests.Inc() - hitKeys, pods, err := m.next.Lookup(ctx, keys, podIdentifierSet) - metrics.LookupHits.Add(float64(len(hitKeys))) + pods, err := m.next.Lookup(ctx, keys, podIdentifierSet) - return hitKeys, pods, err + return pods, err } diff --git a/pkg/kvcache/kvblock/instrumented_index_test.go b/pkg/kvcache/kvblock/instrumented_index_test.go new file mode 100644 index 0000000..01baa2a --- /dev/null +++ b/pkg/kvcache/kvblock/instrumented_index_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2025 The llm-d Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvblock_test + +import ( + "testing" + + "github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache/kvblock" + "github.com/stretchr/testify/assert" +) + +func TestNewInstrumentedIndex(t *testing.T) { + // Create base index + baseIndex, err := kvblock.NewInMemoryIndex(nil) + assert.NoError(t, err) + + // Wrap with instrumentation + instrumented := kvblock.NewInstrumentedIndex(baseIndex) + assert.NotNil(t, instrumented) + + // Verify it implements Index interface + assert.Implements(t, (*kvblock.Index)(nil), instrumented) +} + +func TestInstrumentedIndexBasicFunctionality(t *testing.T) { + // Create instrumented index + baseIndex, err := kvblock.NewInMemoryIndex(nil) + assert.NoError(t, err) + instrumented := kvblock.NewInstrumentedIndex(baseIndex) + + // Test that basic functionality still works through the wrapper + testAddBasic(t, instrumented) +} diff --git a/pkg/kvcache/kvblock/redis.go b/pkg/kvcache/kvblock/redis.go index 144957f..41689c6 100644 --- a/pkg/kvcache/kvblock/redis.go +++ b/pkg/kvcache/kvblock/redis.go @@ -80,14 +80,13 @@ var _ Index = &RedisIndex{} // If the podIdentifierSet is empty, all pods are returned. // // It returns: -// 1. A slice of the hit keys. -// 2. A map where the keys are those in (1) and the values are pod-identifiers. -// 3. An error if any occurred during the operation. +// 1. A map where the keys are those in (1) and the values are pod-identifiers. +// 2. An error if any occurred during the operation. func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string], -) ([]Key, map[Key][]string, error) { +) (map[Key][]string, error) { if len(keys) == 0 { - return nil, nil, nil + return make(map[Key][]string), nil } logger := klog.FromContext(ctx).WithName("kvblock.RedisIndex.Lookup") @@ -105,11 +104,10 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, _, execErr := pipe.Exec(ctx) if execErr != nil { - return nil, nil, fmt.Errorf("redis pipeline execution failed: %w", execErr) + return nil, fmt.Errorf("redis pipeline execution failed: %w", execErr) } filterPods := len(podIdentifierSet) > 0 // predicate for filtering - highestHitIdx := 0 for idx, cmd := range results { key := keys[idx] @@ -121,7 +119,7 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, logger.Error(cmdErr, "failed to get pods for key", "key", key) } - return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here + return podsPerKey, nil // early stop since prefix-chain breaks here } var filteredPods []string @@ -134,14 +132,13 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, if len(filteredPods) == 0 { logger.Info("no pods found for key, cutting search", "key", key) - return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here + return podsPerKey, nil // early stop since prefix-chain breaks here } - highestHitIdx = idx podsPerKey[key] = filteredPods } - return keys[:highestHitIdx+1], podsPerKey, nil + return podsPerKey, nil } // Add adds a set of keys and their associated pod entries to the index backend.