From 8d2b16c62fae5103fedcb4f59e305271c630fb50 Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 10:47:50 +0300 Subject: [PATCH 1/8] fix Lookup in index interface Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/kvblock/index.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/kvcache/kvblock/index.go b/pkg/kvcache/kvblock/index.go index 03c390a..7930667 100644 --- a/pkg/kvcache/kvblock/index.go +++ b/pkg/kvcache/kvblock/index.go @@ -107,10 +107,9 @@ type Index interface { // If the podIdentifierSet is empty, all pods are returned. // // It returns: - // 1. A slice of the hit keys. - // 2. A map where the keys are those in (1) and the values are pod-identifiers. - // 3. An error if any occurred during the operation. - Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string]) ([]Key, map[Key][]string, error) + // 1. A map where the keys are those in (1) and the values are pod-identifiers. + // 2. An error if any occurred during the operation. + Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string]) (map[Key][]string, error) // Add adds a set of keys and their associated pod entries to the index backend. Add(ctx context.Context, keys []Key, entries []PodEntry) error // Evict removes a key and its associated pod entries from the index backend. From a3f61a05ac0ad571b9efe08ee657600ad21a01b9 Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 10:51:20 +0300 Subject: [PATCH 2/8] fix GetPodScores Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/indexer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kvcache/indexer.go b/pkg/kvcache/indexer.go index ebe1e86..0eec0e8 100644 --- a/pkg/kvcache/indexer.go +++ b/pkg/kvcache/indexer.go @@ -133,7 +133,7 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string, traceLogger.Info("found tokens", "tokens", tokens, "block-keys", blockKeys) // 3. query kvblock indexer for pods - strBlockKeys, keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...)) + keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...)) if err != nil { return nil, fmt.Errorf("failed to query kvblock indexer: %w", err) } @@ -141,7 +141,7 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string, "pods", podsPerKeyPrintHelper(keyToPods)) // 4. score pods - podScores, err := k.kvBlockScorer.Score(strBlockKeys, keyToPods) + podScores, err := k.kvBlockScorer.Score(blockKeys, keyToPods) if err != nil { return nil, fmt.Errorf("failed to query kvblock scorer: %w", err) } From 4656cfe76b52ad56f5a0ee427bb3b7da5d80ed58 Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 10:54:34 +0300 Subject: [PATCH 3/8] fix InMemoryIndex Lookup Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/kvblock/in_memory.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/kvcache/kvblock/in_memory.go b/pkg/kvcache/kvblock/in_memory.go index b8508b9..4e0f7d5 100644 --- a/pkg/kvcache/kvblock/in_memory.go +++ b/pkg/kvcache/kvblock/in_memory.go @@ -89,14 +89,13 @@ type PodCache struct { // If the podIdentifierSet is empty, all pods are returned. // // It returns: -// 1. A slice of the hit keys. -// 2. A map where the keys are those in (1) and the values are pod-identifiers. -// 3. An error if any occurred during the operation. +// 1. A map where the keys are those in (1) and the values are pod-identifiers. +// 2. An error if any occurred during the operation. func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string], -) ([]Key, map[Key][]string, error) { +) (map[Key][]string, error) { if len(keys) == 0 { - return nil, nil, fmt.Errorf("no keys provided for lookup") + return nil, fmt.Errorf("no keys provided for lookup") } traceLogger := klog.FromContext(ctx).V(logging.TRACE).WithName("kvblock.InMemoryIndex.Lookup") @@ -108,7 +107,7 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key, if pods, found := m.data.Get(key); found { //nolint:nestif // TODO: can this be optimized? if pods == nil || pods.cache.Len() == 0 { traceLogger.Info("no pods found for key, cutting search", "key", key) - return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here + return podsPerKey, nil // early stop since prefix-chain breaks here } highestHitIdx = idx @@ -135,7 +134,7 @@ func (m *InMemoryIndex) Lookup(ctx context.Context, keys []Key, traceLogger.Info("lookup completed", "highest-hit-index", highestHitIdx, "pods-per-key", podsPerKeyPrintHelper(podsPerKey)) - return keys[:highestHitIdx+1], podsPerKey, nil + return podsPerKey, nil } // Add adds a set of keys and their associated pod entries to the index backend. From 8cf325e830998a97e6f36126733c38302e0e0590 Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 10:56:25 +0300 Subject: [PATCH 4/8] fix instrumentedIndex Lookup Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/kvblock/instrumented_index.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/kvcache/kvblock/instrumented_index.go b/pkg/kvcache/kvblock/instrumented_index.go index 064a9a3..37e09c3 100644 --- a/pkg/kvcache/kvblock/instrumented_index.go +++ b/pkg/kvcache/kvblock/instrumented_index.go @@ -34,14 +34,13 @@ func (m *instrumentedIndex) Lookup( ctx context.Context, keys []Key, podIdentifierSet sets.Set[string], -) ([]Key, map[Key][]string, error) { +) (map[Key][]string, error) { timer := prometheus.NewTimer(metrics.LookupLatency) defer timer.ObserveDuration() metrics.LookupRequests.Inc() - hitKeys, pods, err := m.next.Lookup(ctx, keys, podIdentifierSet) - metrics.LookupHits.Add(float64(len(hitKeys))) + pods, err := m.next.Lookup(ctx, keys, podIdentifierSet) - return hitKeys, pods, err + return pods, err } From dcfe10848de12e8249c5d6c9029dfed90fb06098 Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 11:18:39 +0300 Subject: [PATCH 5/8] update RedisIndex.Lookup Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/kvblock/redis.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/pkg/kvcache/kvblock/redis.go b/pkg/kvcache/kvblock/redis.go index 144957f..24258ab 100644 --- a/pkg/kvcache/kvblock/redis.go +++ b/pkg/kvcache/kvblock/redis.go @@ -80,14 +80,13 @@ var _ Index = &RedisIndex{} // If the podIdentifierSet is empty, all pods are returned. // // It returns: -// 1. A slice of the hit keys. -// 2. A map where the keys are those in (1) and the values are pod-identifiers. -// 3. An error if any occurred during the operation. +// 1. A map where the keys are those in (1) and the values are pod-identifiers. +// 2. An error if any occurred during the operation. func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string], -) ([]Key, map[Key][]string, error) { +) (map[Key][]string, error) { if len(keys) == 0 { - return nil, nil, nil + return nil, nil } logger := klog.FromContext(ctx).WithName("kvblock.RedisIndex.Lookup") @@ -105,11 +104,10 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, _, execErr := pipe.Exec(ctx) if execErr != nil { - return nil, nil, fmt.Errorf("redis pipeline execution failed: %w", execErr) + return nil, fmt.Errorf("redis pipeline execution failed: %w", execErr) } filterPods := len(podIdentifierSet) > 0 // predicate for filtering - highestHitIdx := 0 for idx, cmd := range results { key := keys[idx] @@ -121,7 +119,7 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, logger.Error(cmdErr, "failed to get pods for key", "key", key) } - return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here + return podsPerKey, nil // early stop since prefix-chain breaks here } var filteredPods []string @@ -134,14 +132,13 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, if len(filteredPods) == 0 { logger.Info("no pods found for key, cutting search", "key", key) - return keys[:idx], podsPerKey, nil // early stop since prefix-chain breaks here + return podsPerKey, nil // early stop since prefix-chain breaks here } - highestHitIdx = idx podsPerKey[key] = filteredPods } - return keys[:highestHitIdx+1], podsPerKey, nil + return podsPerKey, nil } // Add adds a set of keys and their associated pod entries to the index backend. From ad825bc75f5054d82975574b672d059868c656d3 Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 11:41:57 +0300 Subject: [PATCH 6/8] Migrate common test to new interface Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/kvblock/common_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/kvcache/kvblock/common_test.go b/pkg/kvcache/kvblock/common_test.go index b4aa110..28488ed 100644 --- a/pkg/kvcache/kvblock/common_test.go +++ b/pkg/kvcache/kvblock/common_test.go @@ -39,9 +39,9 @@ func testAddBasic(t *testing.T, index kvblock.Index) { assert.NoError(t, err) // Lookup after add - hitKeys, podsPerKey, err := index.Lookup(t.Context(), []kvblock.Key{key}, sets.Set[string]{}) + podsPerKey, err := index.Lookup(t.Context(), []kvblock.Key{key}, sets.Set[string]{}) assert.NoError(t, err) - assert.Len(t, hitKeys, 1) - assert.Equal(t, key, hitKeys[0]) + assert.Len(t, podsPerKey, 1) + assert.Contains(t, podsPerKey, key) assert.Equal(t, podsPerKey[key], []string{"10.0.0.1", "10.0.0.2"}) } From ee8d0c85f8b30e8240e53ae1370959f0c67bf05b Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 11:42:11 +0300 Subject: [PATCH 7/8] lint redis index Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/kvblock/redis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kvcache/kvblock/redis.go b/pkg/kvcache/kvblock/redis.go index 24258ab..41689c6 100644 --- a/pkg/kvcache/kvblock/redis.go +++ b/pkg/kvcache/kvblock/redis.go @@ -86,7 +86,7 @@ func (r *RedisIndex) Lookup(ctx context.Context, keys []Key, podIdentifierSet sets.Set[string], ) (map[Key][]string, error) { if len(keys) == 0 { - return nil, nil + return make(map[Key][]string), nil } logger := klog.FromContext(ctx).WithName("kvblock.RedisIndex.Lookup") From 16b61a93137c2fd7610ad5d050bff2a333d0a165 Mon Sep 17 00:00:00 2001 From: sagiahrac Date: Wed, 13 Aug 2025 12:16:25 +0300 Subject: [PATCH 8/8] Add unit tests for InMemoryIndex Signed-off-by: sagiahrac Signed-off-by: Sage Ahrac --- pkg/kvcache/kvblock/in_memory_test.go | 70 +++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/pkg/kvcache/kvblock/in_memory_test.go b/pkg/kvcache/kvblock/in_memory_test.go index 6f38db1..dcc3282 100644 --- a/pkg/kvcache/kvblock/in_memory_test.go +++ b/pkg/kvcache/kvblock/in_memory_test.go @@ -29,3 +29,73 @@ func TestInMemoryAddBasic(t *testing.T) { assert.NoError(t, err) testAddBasic(t, index) } + +func TestInMemoryIndexSize(t *testing.T) { + // Test with small size to verify eviction + cfg := &kvblock.InMemoryIndexConfig{ + Size: 2, // Only 2 keys max + PodCacheSize: 1, // Pod cache size doesn't matter for this test + } + + index, err := kvblock.NewInMemoryIndex(cfg) + assert.NoError(t, err) + + ctx := t.Context() + + // Add first key + key1 := kvblock.Key{ModelName: "test-model", ChunkHash: 111} + err = index.Add(ctx, []kvblock.Key{key1}, []kvblock.PodEntry{{PodIdentifier: "pod1", DeviceTier: "gpu"}}) + assert.NoError(t, err) + + // Add second key + key2 := kvblock.Key{ModelName: "test-model", ChunkHash: 222} + err = index.Add(ctx, []kvblock.Key{key2}, []kvblock.PodEntry{{PodIdentifier: "pod2", DeviceTier: "gpu"}}) + assert.NoError(t, err) + + // Add third key - should evict the first one due to LRU + key3 := kvblock.Key{ModelName: "test-model", ChunkHash: 333} + err = index.Add(ctx, []kvblock.Key{key3}, []kvblock.PodEntry{{PodIdentifier: "pod3", DeviceTier: "cpu"}}) + assert.NoError(t, err) + + // Lookup should only return the last two keys + podsPerKey, err := index.Lookup(ctx, []kvblock.Key{key1, key2, key3}, nil) + assert.NoError(t, err) + + assert.Len(t, podsPerKey, 2) // Only key2 and key3 should be present + assert.Len(t, podsPerKey[key2], 1) + assert.Len(t, podsPerKey[key3], 1) + assert.Contains(t, podsPerKey[key2], "pod2") + assert.Contains(t, podsPerKey[key3], "pod3") +} + +func TestInMemoryIndexPodCacheSize(t *testing.T) { + // Test with small limits to verify enforcement + cfg := &kvblock.InMemoryIndexConfig{ + Size: 1, // Only 1 key max + PodCacheSize: 2, // Only 2 pods per key + } + + index, err := kvblock.NewInMemoryIndex(cfg) + assert.NoError(t, err) + + // Test PodCacheSize limit: add more pods than the limit for one key + key := kvblock.Key{ModelName: "test-model", ChunkHash: 111} + pods := []kvblock.PodEntry{ + {PodIdentifier: "pod1", DeviceTier: "gpu"}, + {PodIdentifier: "pod2", DeviceTier: "gpu"}, + {PodIdentifier: "pod3", DeviceTier: "cpu"}, // This should evict pod1 due to LRU + } + + ctx := t.Context() + + err = index.Add(ctx, []kvblock.Key{key}, pods) + assert.NoError(t, err) + + // Lookup should only return 2 pods (pod2 and pod3), pod1 should be evicted + podsPerKey, err := index.Lookup(ctx, []kvblock.Key{key}, nil) + assert.NoError(t, err) + assert.Len(t, podsPerKey, 1) + assert.Len(t, podsPerKey[key], 2, "Should only have 2 pods due to PodCacheSize limit") + assert.Contains(t, podsPerKey[key], "pod2") + assert.Contains(t, podsPerKey[key], "pod3") +}