Skip to content

Add instrumentedIndex basic unit tests #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kvcache/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string,
traceLogger.Info("found tokens", "tokens", tokens, "block-keys", blockKeys)

// 3. query kvblock indexer for pods
strBlockKeys, keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...))
keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...))
if err != nil {
return nil, fmt.Errorf("failed to query kvblock indexer: %w", err)
}
traceLogger.Info("found block keys", "block-keys", blockKeys,
"pods", podsPerKeyPrintHelper(keyToPods))

// 4. score pods
podScores, err := k.kvBlockScorer.Score(strBlockKeys, keyToPods)
podScores, err := k.kvBlockScorer.Score(blockKeys, keyToPods)
if err != nil {
return nil, fmt.Errorf("failed to query kvblock scorer: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kvcache/kvblock/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func testAddBasic(t *testing.T, index kvblock.Index) {
assert.NoError(t, err)

// Lookup after add
hitKeys, podsPerKey, err := index.Lookup(t.Context(), []kvblock.Key{key}, sets.Set[string]{})
podsPerKey, err := index.Lookup(t.Context(), []kvblock.Key{key}, sets.Set[string]{})
assert.NoError(t, err)
assert.Len(t, hitKeys, 1)
assert.Equal(t, key, hitKeys[0])
assert.Len(t, podsPerKey, 1)
assert.Contains(t, podsPerKey, key)
assert.Equal(t, podsPerKey[key], []string{"10.0.0.1", "10.0.0.2"})
}
13 changes: 6 additions & 7 deletions pkg/kvcache/kvblock/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ type PodCache struct {
// If the podIdentifierSet is empty, all pods are returned.
//
// It returns:
// 1. A slice of the hit keys.
// 2. A map where the keys are those in (1) and the values are pod-identifiers.
// 3. An error if any occurred during the operation.
// 1. A map where the keys are those in (1) and the values are pod-identifiers.
// 2. An error if any occurred during the operation.
func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key,
podIdentifierSet sets.Set[string],
) ([]Key, map[Key][]string, error) {
) (map[Key][]string, error) {
if len(keys) == 0 {
return nil, nil, fmt.Errorf("no keys provided for lookup")
return nil, fmt.Errorf("no keys provided for lookup")
}

traceLogger := klog.FromContext(ctx).V(logging.TRACE).WithName("kvblock.InMemoryIndex.Lookup")
Expand All @@ -108,7 +107,7 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key,
if pods, found := m.data.Get(key); found { //nolint:nestif // TODO: can this be optimized?
if pods == nil || pods.cache.Len() == 0 {
traceLogger.Info("no pods found for key, cutting search", "key", key)
return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here
return podsPerKey, nil // early stop since prefix-chain breaks here
}

highestHitIdx = idx
Expand All @@ -135,7 +134,7 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key,
traceLogger.Info("lookup completed", "highest-hit-index", highestHitIdx,
"pods-per-key", podsPerKeyPrintHelper(podsPerKey))

return keys[:highestHitIdx+1], podsPerKey, nil
return podsPerKey, nil
}

// Add adds a set of keys and their associated pod entries to the index backend.
Expand Down
7 changes: 3 additions & 4 deletions pkg/kvcache/kvblock/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,9 @@ type Index interface {
// If the podIdentifierSet is empty, all pods are returned.
//
// It returns:
// 1. A slice of the hit keys.
// 2. A map where the keys are those in (1) and the values are pod-identifiers.
// 3. An error if any occurred during the operation.
Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string]) ([]Key, map[Key][]string, error)
// 1. A map where the keys are those in (1) and the values are pod-identifiers.
// 2. An error if any occurred during the operation.
Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string]) (map[Key][]string, error)
// Add adds a set of keys and their associated pod entries to the index backend.
Add(ctx context.Context, keys []Key, entries []PodEntry) error
// Evict removes a key and its associated pod entries from the index backend.
Expand Down
7 changes: 3 additions & 4 deletions pkg/kvcache/kvblock/instrumented_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ func (m *instrumentedIndex) Lookup(
ctx context.Context,
keys []Key,
podIdentifierSet sets.Set[string],
) ([]Key, map[Key][]string, error) {
) (map[Key][]string, error) {
timer := prometheus.NewTimer(metrics.LookupLatency)
defer timer.ObserveDuration()

metrics.LookupRequests.Inc()

hitKeys, pods, err := m.next.Lookup(ctx, keys, podIdentifierSet)
metrics.LookupHits.Add(float64(len(hitKeys)))
pods, err := m.next.Lookup(ctx, keys, podIdentifierSet)

return hitKeys, pods, err
return pods, err
}
47 changes: 47 additions & 0 deletions pkg/kvcache/kvblock/instrumented_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2025 The llm-d Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kvblock_test

import (
"testing"

"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache/kvblock"
"github.com/stretchr/testify/assert"
)

func TestNewInstrumentedIndex(t *testing.T) {
// Create base index
baseIndex, err := kvblock.NewInMemoryIndex(nil)
assert.NoError(t, err)

// Wrap with instrumentation
instrumented := kvblock.NewInstrumentedIndex(baseIndex)
assert.NotNil(t, instrumented)

// Verify it implements Index interface
assert.Implements(t, (*kvblock.Index)(nil), instrumented)
}

func TestInstrumentedIndexBasicFunctionality(t *testing.T) {
// Create instrumented index
baseIndex, err := kvblock.NewInMemoryIndex(nil)
assert.NoError(t, err)
instrumented := kvblock.NewInstrumentedIndex(baseIndex)

// Test that basic functionality still works through the wrapper
testAddBasic(t, instrumented)
}
19 changes: 8 additions & 11 deletions pkg/kvcache/kvblock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ var _ Index = &RedisIndex{}
// If the podIdentifierSet is empty, all pods are returned.
//
// It returns:
// 1. A slice of the hit keys.
// 2. A map where the keys are those in (1) and the values are pod-identifiers.
// 3. An error if any occurred during the operation.
// 1. A map where the keys are those in (1) and the values are pod-identifiers.
// 2. An error if any occurred during the operation.
func (r *RedisIndex) Lookup(ctx context.Context, keys []Key,
podIdentifierSet sets.Set[string],
) ([]Key, map[Key][]string, error) {
) (map[Key][]string, error) {
if len(keys) == 0 {
return nil, nil, nil
return make(map[Key][]string), nil
}

logger := klog.FromContext(ctx).WithName("kvblock.RedisIndex.Lookup")
Expand All @@ -105,11 +104,10 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key,

_, execErr := pipe.Exec(ctx)
if execErr != nil {
return nil, nil, fmt.Errorf("redis pipeline execution failed: %w", execErr)
return nil, fmt.Errorf("redis pipeline execution failed: %w", execErr)
}

filterPods := len(podIdentifierSet) > 0 // predicate for filtering
highestHitIdx := 0

for idx, cmd := range results {
key := keys[idx]
Expand All @@ -121,7 +119,7 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key,
logger.Error(cmdErr, "failed to get pods for key", "key", key)
}

return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here
return podsPerKey, nil // early stop since prefix-chain breaks here
}

var filteredPods []string
Expand All @@ -134,14 +132,13 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key,

if len(filteredPods) == 0 {
logger.Info("no pods found for key, cutting search", "key", key)
return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here
return podsPerKey, nil // early stop since prefix-chain breaks here
}

highestHitIdx = idx
podsPerKey[key] = filteredPods
}

return keys[:highestHitIdx+1], podsPerKey, nil
return podsPerKey, nil
}

// Add adds a set of keys and their associated pod entries to the index backend.
Expand Down