diff --git a/docs-website/concepts/cache-warmer.mdx b/docs-website/concepts/cache-warmer.mdx index 985e5b40a3..c3f4f35ed4 100644 --- a/docs-website/concepts/cache-warmer.mdx +++ b/docs-website/concepts/cache-warmer.mdx @@ -84,19 +84,16 @@ Users can manually recompute slow queries from the Cosmo Studio. Currently, reco ## In-Memory Fallback Cache Warming -The in-memory fallback cache warming feature preserves the planner cache across hot config reloads and schema changes, allowing it to be rewarmed automatically and reducing latency spikes during restarts. +The in-memory fallback cache warming feature uses the **[slow plan cache](#slow-plan-cache)** to preserve query plans across hot config reloads and schema changes, reducing latency spikes during restarts. ### How It Works -After the router has started, the router can be reloaded for two reasons: either a config change or a schema change. Due to the structure of the router internals, we have two slight variations on how we handle the in-memory switchover cache warming: +The in-memory fallback relies on the slow plan cache — a secondary, bounded cache that tracks queries whose planning time exceeds a configurable threshold (`slow_plan_cache_threshold`, default 100ms). During normal operation, this cache is populated in two ways: -1. **Before Reload**: In case of config changes (from hot config reloading), the router extracts all queries from the current plan cache, preserving the queries that were in the planner cache before the cache is cleared for reloading. +1. **On first plan**: When a query is planned and its planning duration exceeds the threshold, the plan is stored in both the main cache and the slow plan cache. +2. **On eviction**: If the main TinyLFU cache evicts a plan that is in the slow plan cache, the query plan won't be recomputed and would simply be served from the slow plan cache. -2. **During Reload**: The router with the updated config receives the queries from the previous plan cache that existed before reloading, and uses them to warm up its current plan cache before serving traffic. - -3. **Result**: The updated router reloads with a fully warmed cache, eliminating latency spikes that would normally occur during cold starts. - -**Important Limitation:** When using the in-memory fallback, the first start will still experience a cold start, as there is no prior populated planner cache. *Only subsequent reloads* will benefit from the in-memory fallback. This is why it works best when combined with CDN cache warming (the default configuration). +When the router reloads, the slow plan cache contents are used to rewarm the cache. ### When to Use the In-Memory Fallback @@ -106,7 +103,7 @@ When the in-memory fallback is used with the Cosmo Cloud CDN cache warmer, the f * Getting the list of operations from the CDN fails * The request to the CDN succeeds but does not return a list of operations (either no operations are cached or the manifest has not been created yet) -In these cases, the router will use the fallback and load the list of operations from the in-memory fallback (if any operations exist). +In these cases, the router will use the fallback and load the list of operations from the slow plan cache (if any operations exist). The in-memory fallback cannot be used as a fallback for sources other than the Cosmo Cloud CDN cache warmer. @@ -115,13 +112,11 @@ In these cases, the router will use the fallback and load the list of operations ### Key Characteristics of In-Memory Fallback **Advantages:** -- **Comprehensive coverage**: After the initial start, all queries that have been executed are preserved and warmed on reload, including both slow and fast queries. This provides broader coverage than CDN cache warming. -- **Eliminates reload spikes**: You won't experience query planning spikes after configuration or schema reloads, as the cache persists across these changes. -- **Built-in feature**: No enterprise plan required; it's available to all users and enabled by default. +- **Coverage of expensive queries**: By default, queries with planning times above the threshold (100ms) are preserved and warmed on reload, protecting slow-to-plan queries from cold-start latency. Users can lower the threshold to any positive duration (e.g., `slow_plan_cache_threshold: 100ms`) to capture all queries. Users can also set the duration to 1 nanosecond (`slow_plan_cache_threshold: 1ns`), this would ensure that all queries are cached in the fallback, and thus would be available to rewarm the cache upon reloads. +- **Eliminates reload spikes for expensive queries**: You won't experience query planning spikes for queries above the threshold after configuration or schema reloads. Users can tune the threshold to cover more or fewer queries. **Tradeoffs:** - **Cold start on first start**: The first router start will experience normal cache warming latency, as there's no existing cache to preserve. -- **Cache can accumulate stale entries**: Without a full restart, the planner cache can eventually fill up with query plans for outdated or rarely-used queries. However, the cache uses a LFU (Least Frequently Used) eviction policy, ensuring that older, less-used items are removed when the cache reaches capacity. ### Configuration @@ -157,3 +152,57 @@ cache_warmup: cdn: enabled: false ``` + +## Slow Plan Cache + +When in-memory fallback is enabled, the cache the in memory fallback uses is the **Slow Plan Cache**. This is different from the main query plan cache which uses a TinyLFU (Least Frequently Used) eviction policy, which is optimized for frequently accessed items. However, this can cause problems for queries that are slow to plan but infrequently accessed — the LFU policy may evict them in favor of cheaper, more frequent queries. When an expensive query is evicted and re-requested, the router must re-plan it from scratch, causing a latency spike. + +The slow plan cache is a secondary cache that protects these slow-to-plan queries from eviction. It is automatically enabled when `in_memory_fallback` is set to `true`. + +### How It Works + +1. When a query is planned for the first time, its planning duration is measured. +2. If the planning duration exceeds the configured threshold (`slow_plan_cache_threshold`, default 100ms), the query plan is stored in both the main cache and the slow plan cache. +3. If the main cache later evicts this plan (due to LFU pressure from more frequent queries), the OnEvict hook pushes it to the slow plan cache (if it meets the threshold). +4. On subsequent requests, if the plan is not found in the main cache, the router checks the slow plan cache before re-planning. If found, the plan is served immediately and re-inserted into the main cache. +5. During config reloads, slow plan cache entries are used as the warmup source, ensuring slow queries survive cache rebuilds. + +### Cache Size and Eviction + +The slow plan cache has a configurable maximum size (`slow_plan_cache_size`, default 300). When the cache is full and a new expensive query needs to be added: + +- The new query's planning duration is compared to the shortest duration in the cache. +- If the new query is more expensive (took longer to plan), it replaces the least expensive entry. +- If the new query is cheaper or equal, it is not added. This ensures the cache always contains the most expensive queries. + + +Whenever an existing item in the cache is attempted to be added to the cache while full, we will not remove the entry and will only update it's plan time duration if it was higher than the previous duration it took to plan. This way we only consider the worst case planning duration. + + +### Configuration + +The slow plan cache is configured through the engine configuration: + +```yaml +engine: + slow_plan_cache_size: 300 # Maximum entries (default: 300) + slow_plan_cache_threshold: 100ms # Minimum planning time to qualify (default: 100ms) + +cache_warmup: + enabled: true + in_memory_fallback: true # Required to enable the slow plan cache +``` + +For the full list of engine configuration options, see [Router Engine Configuration](/router/configuration#router-engine-configuration). + +### Tuning + +You can tune the threshold and cache size to control warmup coverage: + +- **Lower threshold → more queries protected**: Setting `slow_plan_cache_threshold: 1ns` captures all queries regardless of planning time. This gives you full "carry forward everything" behaviour similar to preserving the entire plan cache. +- **Higher cache size → more entries held**: Increase `slow_plan_cache_size` to hold more entries. For full coverage, set it to match or exceed `execution_plan_cache_size`. +- **Tradeoff**: Lower thresholds and larger cache sizes increase memory usage but provide broader warmup coverage. + +### Observability + +Slow plan cache hits are counted as regular plan cache hits — the `wg.engine.plan_cache_hit` attribute is set to `true` for hits from either the main cache or the slow plan cache. There is no separate observability signal for slow plan cache hits. diff --git a/docs-website/router/configuration.mdx b/docs-website/router/configuration.mdx index 7cd14b7d78..b0df8644f7 100644 --- a/docs-website/router/configuration.mdx +++ b/docs-website/router/configuration.mdx @@ -1768,6 +1768,8 @@ Configure the GraphQL Execution Engine of the Router. | ENGINE_WEBSOCKET_CLIENT_PING_TIMEOUT | websocket_client_ping_timeout | | The Websocket client ping timeout to the subgraph. Defines how long the router will wait for a ping response from the subgraph. The timeout is specified as a string with a number and a unit, e.g. 10ms, 1s, 1m, 1h. The supported units are 'ms', 's', 'm', 'h'. | 30s | | ENGINE_WEBSOCKET_CLIENT_FRAME_TIMEOUT | websocket_client_frame_timeout | | The Websocket client frame timeout to the subgraph. Defines how long the router will wait for a frame response from the subgraph. The timeout is specified as a string with a number and a unit, e.g. 10ms, 1s, 1m, 1h. The supported units are 'ms', 's', 'm', 'h'. | 100ms | | ENGINE_EXECUTION_PLAN_CACHE_SIZE | execution_plan_cache_size | | Define how many GraphQL Operations should be stored in the execution plan cache. A low number will lead to more frequent cache misses, which will lead to increased latency. | 1024 | +| ENGINE_SLOW_PLAN_CACHE_SIZE | slow_plan_cache_size | | The maximum number of entries in the slow plan cache. This cache protects slow-to-plan queries from being evicted by the main plan cache's LFU policy. Only used when `in_memory_fallback` is enabled. See [Slow Plan Cache](/concepts/cache-warmer#slow-plan-cache). | 300 | +| ENGINE_SLOW_PLAN_CACHE_THRESHOLD | slow_plan_cache_threshold | | The minimum planning duration for a query to be promoted into the slow plan cache. Queries that take longer than this threshold to plan are considered expensive and protected from eviction. The period is specified as a string with a number and a unit, e.g. 10ms, 1s, 5s. The supported units are 'ms', 's', 'm', 'h'. | 100ms | | ENGINE_MINIFY_SUBGRAPH_OPERATIONS | minify_subgraph_operations | | Minify the subgraph operations. If the value is true, GraphQL Operations get minified after planning. This reduces the amount of GraphQL AST nodes the Subgraph has to parse, which ultimately saves CPU time and memory, resulting in faster response times. | false | | ENGINE_ENABLE_PERSISTED_OPERATIONS_CACHE | enable_persisted_operations_cache | | Enable the persisted operations cache. The persisted operations cache is used to cache normalized persisted operations to improve performance. | true | | ENGINE_ENABLE_NORMALIZATION_CACHE | enable_normalization_cache | | Enable the normalization cache. The normalization cache is used to cache normalized operations to improve performance. | true | @@ -1802,6 +1804,8 @@ engine: websocket_client_ping_timeout: "30s" websocket_client_frame_timeout: "100ms" execution_plan_cache_size: 10000 + slow_plan_cache_size: 300 + slow_plan_cache_threshold: 100ms minify_subgraph_operations: true enable_persisted_operations_cache: true enable_normalization_cache: true diff --git a/docs-website/router/metrics-and-monitoring.mdx b/docs-website/router/metrics-and-monitoring.mdx index a5da03ec5b..4f574d22ea 100644 --- a/docs-website/router/metrics-and-monitoring.mdx +++ b/docs-website/router/metrics-and-monitoring.mdx @@ -69,7 +69,7 @@ All the below mentioned metrics have the `wg.subgraph.name` dimensions. Do note #### GraphQL specific metrics -* `router.graphql.operation.planning_time`: Time taken to plan the operation. An additional attribute `wg.engine.plan_cache_hit` indicates if the plan was served from the cache. +* `router.graphql.operation.planning_time`: Time taken to plan the operation. An additional attribute `wg.engine.plan_cache_hit` indicates if the plan was served from the main execution plan cache or the plan fallback cache. #### Cost Control metrics diff --git a/router-tests/operations/cache_warmup_test.go b/router-tests/operations/cache_warmup_test.go index e07c27999d..2f9c662abd 100644 --- a/router-tests/operations/cache_warmup_test.go +++ b/router-tests/operations/cache_warmup_test.go @@ -979,6 +979,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { } testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.SlowPlanCacheSize = 100 + }, RouterOptions: []core.Option{ core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ Enabled: true, @@ -990,6 +993,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { }, }), core.WithConfigVersionHeader(true), + core.WithPlanningDurationOverride(func(_ string) time.Duration { + return 10 * time.Second + }), }, RouterConfig: &testenv.RouterConfig{ ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller { @@ -1125,6 +1131,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { writeTestConfig(t, "initial", configFile) testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.SlowPlanCacheSize = 100 + }, RouterOptions: []core.Option{ core.WithConfigVersionHeader(true), core.WithExecutionConfig(&core.ExecutionConfig{ @@ -1136,6 +1145,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { Enabled: true, InMemoryFallback: true, }), + core.WithPlanningDurationOverride(func(_ string) time.Duration { + return 10 * time.Second + }), }, }, func(t *testing.T, xEnv *testenv.Environment) { res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ @@ -1169,6 +1181,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { var impl *fakeSelfRegister = nil testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.SlowPlanCacheSize = 100 + }, CdnSever: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) })), @@ -1189,6 +1204,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { }, }, }), + core.WithPlanningDurationOverride(func(_ string) time.Duration { + return 10 * time.Second + }), }, }, func(t *testing.T, xEnv *testenv.Environment) { res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ @@ -1222,6 +1240,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { var impl *fakeSelfRegister = nil testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.SlowPlanCacheSize = 100 + }, CdnSever: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusUnauthorized) })), @@ -1242,6 +1263,9 @@ func TestInMemoryPlanCacheFallback(t *testing.T) { }, }, }), + core.WithPlanningDurationOverride(func(_ string) time.Duration { + return 10 * time.Second + }), }, }, func(t *testing.T, xEnv *testenv.Environment) { res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ @@ -1292,7 +1316,9 @@ cache_warmup: cdn: enabled: false -engine: +engine: + slow_plan_cache_threshold: "1ns" + slow_plan_cache_size: 100 debug: enable_cache_response_headers: true ` @@ -1352,23 +1378,27 @@ func writeTestConfig(t *testing.T, version string, path string) { RootNodes: []*nodev1.TypeField{ { TypeName: "Query", - FieldNames: []string{"hello"}, + FieldNames: []string{"hello", "world"}, }, }, CustomStatic: &nodev1.DataSourceCustom_Static{ Data: &nodev1.ConfigurationVariable{ - StaticVariableContent: `{"hello": "Hello!"}`, + StaticVariableContent: `{"hello": "Hello!", "world": "World!"}`, }, }, Id: "0", }, }, - GraphqlSchema: "schema {\n query: Query\n}\ntype Query {\n hello: String\n}", + GraphqlSchema: "schema {\n query: Query\n}\ntype Query {\n hello: String\n world: String\n}", FieldConfigurations: []*nodev1.FieldConfiguration{ { TypeName: "Query", FieldName: "hello", }, + { + TypeName: "Query", + FieldName: "world", + }, }, }, } diff --git a/router-tests/operations/plan_fallback_cache_test.go b/router-tests/operations/plan_fallback_cache_test.go new file mode 100644 index 0000000000..1b00ce0181 --- /dev/null +++ b/router-tests/operations/plan_fallback_cache_test.go @@ -0,0 +1,465 @@ +package integration + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" + nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" + "github.com/wundergraph/cosmo/router/pkg/config" + "github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller" +) + +func TestPlanFallbackCache(t *testing.T) { + t.Parallel() + + // slowQueries are queries whose planning duration is overridden to exceed the threshold. + slowQueries := []testenv.GraphQLRequest{ + {Query: `{ employees { id } }`}, + {Query: `query { employees { id details { forename } } }`}, + } + + // fastQueries are queries whose planning duration stays below the threshold. + fastQueries := []testenv.GraphQLRequest{ + {Query: `query { employees { id details { forename surname } } }`}, + {Query: `query m($id: Int!){ employee(id: $id) { id details { forename surname } } }`, Variables: []byte(`{"id": 1}`)}, + } + + allQueries := make([]testenv.GraphQLRequest, 0, len(slowQueries)+len(fastQueries)) + allQueries = append(allQueries, slowQueries...) + allQueries = append(allQueries, fastQueries...) + + fallbackThreshold := 1 * time.Second + + // The override function receives the normalized (minified) query content. + // Both slow queries lack "surname", while all fast queries contain it. + planningDurationOverride := core.WithPlanningDurationOverride(func(content string) time.Duration { + if !strings.Contains(content, "surname") { + return 10 * time.Second + } + return 0 + }) + + // waitForPlanCacheHits sends all queries, retrying until each one + // is served from the plan cache (which includes fallback cache promotions). + waitForPlanCacheHits := func(t *testing.T, xEnv *testenv.Environment, queries []testenv.GraphQLRequest, extraChecks ...func(*assert.CollectT, *testenv.TestResponse)) { + t.Helper() + + for _, q := range queries { + require.EventuallyWithT(t, func(ct *assert.CollectT) { + res := xEnv.MakeGraphQLRequestOK(q) + assert.Equal(ct, 200, res.Response.StatusCode) + assert.Equal(ct, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache"), + "expected plan to be served from cache") + for _, check := range extraChecks { + check(ct, res) + } + }, 2*time.Second, 100*time.Millisecond) + } + } + + t.Run("fallback cache serves evicted plans from small main cache", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 100 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + }), + planningDurationOverride, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Send all queries — each is a MISS and gets planned via singleflight. + for _, q := range allQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache")) + } + + // Slow queries should be served from cache (via fallback promotion) + waitForPlanCacheHits(t, xEnv, slowQueries) + }) + }) + + t.Run("fast queries do not enter fallback cache", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 100 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + }), + planningDurationOverride, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Send all queries + for _, q := range allQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + } + + // Wait for Ristretto eviction + time.Sleep(200 * time.Millisecond) + + // Fast queries should not be cached after eviction from the tiny main cache + for _, q := range fastQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"), + "fast query should not be in cache after eviction") + } + }) + }) + + t.Run("evicted plans survive config reload via fallback cache with small main cache", func(t *testing.T) { + t.Parallel() + + pm := ConfigPollerMock{ + ready: make(chan struct{}), + } + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 100 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + Source: config.CacheWarmupSource{ + CdnSource: config.CacheWarmupCDNSource{ + Enabled: true, + }, + }, + }), + core.WithConfigVersionHeader(true), + planningDurationOverride, + }, + RouterConfig: &testenv.RouterConfig{ + ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller { + pm.initConfig = config + return &pm + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Populate caches with slow queries + for _, q := range slowQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache")) + } + + // Trigger config reload — new Ristretto cache is created (size 1). + <-pm.ready + pm.initConfig.Version = "updated" + require.NoError(t, pm.updateConfig(pm.initConfig, "old-1")) + + // After reload, slow queries should still be available via fallback cache. + waitForPlanCacheHits(t, xEnv, slowQueries, func(ct *assert.CollectT, res *testenv.TestResponse) { + assert.Equal(ct, "updated", res.Response.Header.Get("X-Router-Config-Version")) + }) + }) + }) + + t.Run("only slow queries persist across config reload, fast queries do not", func(t *testing.T) { + t.Parallel() + + pm := ConfigPollerMock{ + ready: make(chan struct{}), + } + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + // Large enough to hold all queries — no evictions before reload + cfg.ExecutionPlanCacheSize = 1024 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 100 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + Source: config.CacheWarmupSource{ + CdnSource: config.CacheWarmupCDNSource{ + Enabled: true, + }, + }, + }), + core.WithConfigVersionHeader(true), + planningDurationOverride, + }, + RouterConfig: &testenv.RouterConfig{ + ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller { + pm.initConfig = config + return &pm + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Populate caches with both slow and fast queries + for _, q := range allQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache")) + } + + // Verify all queries are cached in the main plan cache before reload + for _, q := range allQueries { + require.EventuallyWithT(t, func(ct *assert.CollectT) { + res := xEnv.MakeGraphQLRequestOK(q) + assert.Equal(ct, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache")) + }, 2*time.Second, 100*time.Millisecond) + } + + // Trigger config reload — main plan cache is reset. + <-pm.ready + pm.initConfig.Version = "updated" + require.NoError(t, pm.updateConfig(pm.initConfig, "old-1")) + + // Wait for reload to complete by checking a slow query (which will be + // served from the fallback cache, confirming the new server is active). + require.EventuallyWithT(t, func(ct *assert.CollectT) { + res := xEnv.MakeGraphQLRequestOK(slowQueries[0]) + assert.Equal(ct, "updated", res.Response.Header.Get("X-Router-Config-Version")) + }, 2*time.Second, 100*time.Millisecond) + + // After reload, fast queries must not be persisted anywhere — the first + // request on the new server should be a MISS on both caches. + for _, q := range fastQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version")) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"), + "fast query should not be in plan cache after config reload") + } + }) + }) + + t.Run("plans survive multiple config reloads with small main cache", func(t *testing.T) { + t.Parallel() + + pm := ConfigPollerMock{ + ready: make(chan struct{}), + } + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 100 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + Source: config.CacheWarmupSource{ + CdnSource: config.CacheWarmupCDNSource{ + Enabled: true, + }, + }, + }), + core.WithConfigVersionHeader(true), + planningDurationOverride, + }, + RouterConfig: &testenv.RouterConfig{ + ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller { + pm.initConfig = config + return &pm + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Warm up with slow queries + for _, q := range slowQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache")) + } + + <-pm.ready + + // First reload + pm.initConfig.Version = "v2" + require.NoError(t, pm.updateConfig(pm.initConfig, "old-1")) + + waitForPlanCacheHits(t, xEnv, slowQueries, func(ct *assert.CollectT, res *testenv.TestResponse) { + assert.Equal(ct, "v2", res.Response.Header.Get("X-Router-Config-Version")) + }) + + // Second reload + pm.initConfig.Version = "v3" + require.NoError(t, pm.updateConfig(pm.initConfig, "v2")) + + waitForPlanCacheHits(t, xEnv, slowQueries, func(ct *assert.CollectT, res *testenv.TestResponse) { + assert.Equal(ct, "v3", res.Response.Header.Get("X-Router-Config-Version")) + }) + }) + }) + + t.Run("fallback cache works without config reload", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 10 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + }), + planningDurationOverride, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Send slow queries to overflow the tiny main cache + for _, q := range slowQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache")) + } + + waitForPlanCacheHits(t, xEnv, slowQueries) + }) + }) + + t.Run("router shuts down cleanly with fallback cache enabled", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 50 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + }), + planningDurationOverride, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Make some requests to populate both caches + for _, q := range allQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + } + // testenv.Run handles shutdown — test verifies no panic or hang + }) + }) + + t.Run("fallback cache entries survive static execution config reload", func(t *testing.T) { + t.Parallel() + + configFile := t.TempDir() + "/config.json" + writeTestConfig(t, "initial", configFile) + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1024 + cfg.SlowPlanCacheThreshold = fallbackThreshold + cfg.SlowPlanCacheSize = 100 + }, + RouterOptions: []core.Option{ + core.WithConfigVersionHeader(true), + core.WithExecutionConfig(&core.ExecutionConfig{ + Path: configFile, + Watch: true, + WatchInterval: 100 * time.Millisecond, + }), + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + }), + // "hello" is slow (enters fallback cache), "world" is fast (does not) + core.WithPlanningDurationOverride(func(content string) time.Duration { + if strings.Contains(content, "hello") { + return 10 * time.Second + } + return 0 + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + slowQ := testenv.GraphQLRequest{Query: `query { hello }`} + fastQ := testenv.GraphQLRequest{Query: `query { world }`} + + // Plan both queries + for _, q := range []testenv.GraphQLRequest{slowQ, fastQ} { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version")) + } + + // Trigger schema reload + writeTestConfig(t, "updated", configFile) + + // Wait for reload to complete — slow query should survive via fallback cache + require.EventuallyWithT(t, func(ct *assert.CollectT) { + res := xEnv.MakeGraphQLRequestOK(slowQ) + assert.Equal(ct, "updated", res.Response.Header.Get("X-Router-Config-Version")) + assert.Equal(ct, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache"), + "expected slow plan to survive schema reload") + }, 2*time.Second, 100*time.Millisecond) + + // Fast query must not be persisted anywhere after reload + res := xEnv.MakeGraphQLRequestOK(fastQ) + require.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version")) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"), + "fast query should not be in plan cache after schema reload") + }) + }) + + t.Run("high threshold prevents fast plans from entering fallback cache", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) { + cfg.ExecutionPlanCacheSize = 1 + cfg.SlowPlanCacheThreshold = 1 * time.Hour + cfg.SlowPlanCacheSize = 100 + }, + RouterOptions: []core.Option{ + core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{ + Enabled: true, + InMemoryFallback: true, + }), + // No planning duration override — all plans are fast + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + // Populate — all plans are fast (well under 1h threshold) + for _, q := range allQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache")) + } + + // Wait for Ristretto eviction + time.Sleep(200 * time.Millisecond) + + // Re-query — with main cache size 1, most are evicted from Ristretto. + // Since no plan met the 1h threshold, the fallback cache is empty. + // These should be re-planned (MISS). + for _, q := range allQueries { + res := xEnv.MakeGraphQLRequestOK(q) + require.Equal(t, 200, res.Response.StatusCode) + require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"), + "no plan should be cached with a 1h threshold") + } + }) + }) +} diff --git a/router-tests/protocol/config_hot_reload_test.go b/router-tests/protocol/config_hot_reload_test.go index 59feec3715..447fff851e 100644 --- a/router-tests/protocol/config_hot_reload_test.go +++ b/router-tests/protocol/config_hot_reload_test.go @@ -604,13 +604,32 @@ func writeTestConfig(t *testing.T, version string, path string) { }, Id: "0", }, + { + Kind: nodev1.DataSourceKind_STATIC, + RootNodes: []*nodev1.TypeField{ + { + TypeName: "Query", + FieldNames: []string{"world"}, + }, + }, + CustomStatic: &nodev1.DataSourceCustom_Static{ + Data: &nodev1.ConfigurationVariable{ + StaticVariableContent: `{"world": "World!"}`, + }, + }, + Id: "1", + }, }, - GraphqlSchema: "schema {\n query: Query\n}\ntype Query {\n hello: String\n}", + GraphqlSchema: "schema {\n query: Query\n}\ntype Query {\n hello: String\n world: String\n}", FieldConfigurations: []*nodev1.FieldConfiguration{ { TypeName: "Query", FieldName: "hello", }, + { + TypeName: "Query", + FieldName: "world", + }, }, }, } diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 40bd0ca716..d9e73be51f 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -54,6 +54,7 @@ import ( rmetric "github.com/wundergraph/cosmo/router/pkg/metric" "github.com/wundergraph/cosmo/router/pkg/otel" "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" + "github.com/wundergraph/cosmo/router/pkg/slowplancache" "github.com/wundergraph/cosmo/router/pkg/statistics" rtrace "github.com/wundergraph/cosmo/router/pkg/trace" @@ -546,6 +547,7 @@ type graphMux struct { mux *chi.Mux planCache *ristretto.Cache[uint64, *planWithMetaData] + planFallbackCache *slowplancache.Cache[*planWithMetaData] persistedOperationCache *ristretto.Cache[uint64, NormalizationCacheEntry] normalizationCache *ristretto.Cache[uint64, NormalizationCacheEntry] complexityCalculationCache *ristretto.Cache[uint64, ComplexityCacheEntry] @@ -583,6 +585,14 @@ func (s *graphMux) buildOperationCaches(srv *graphServer) (computeSha256 bool, e IgnoreInternalCost: true, BufferItems: 64, } + if srv.cacheWarmup != nil && srv.cacheWarmup.Enabled && srv.cacheWarmup.InMemoryFallback { + planCacheConfig.OnEvict = func(item *ristretto.Item[*planWithMetaData]) { + // This could be called before planFallbackCache is set, but it's not a problem + // because there is a nil guard inside, as well as items should not really be evicted + // on startup + s.planFallbackCache.Set(item.Key, item.Value, item.Value.planningDuration) + } + } s.planCache, err = ristretto.NewCache[uint64, *planWithMetaData](planCacheConfig) if err != nil { return computeSha256, fmt.Errorf("failed to create planner cache: %w", err) @@ -788,6 +798,7 @@ func (s *graphMux) configureCacheMetrics(srv *graphServer, baseOtelAttributes [] func (s *graphMux) Shutdown(ctx context.Context) error { s.planCache.Close() + s.planFallbackCache.Close() s.persistedOperationCache.Close() s.normalizationCache.Close() s.variablesNormalizationCache.Close() @@ -1341,7 +1352,18 @@ func (s *graphServer) buildGraphMux( CostControl: s.securityConfiguration.CostControl, }) - operationPlanner := NewOperationPlanner(executor, gm.planCache, opts.ReloadPersistentState.inMemoryPlanCacheFallback.IsEnabled()) + if opts.ReloadPersistentState.inMemoryPlanCacheFallback.IsEnabled() { + var err error + gm.planFallbackCache, err = slowplancache.New[*planWithMetaData]( + int(s.engineExecutionConfiguration.SlowPlanCacheSize), + s.engineExecutionConfiguration.SlowPlanCacheThreshold, + ) + if err != nil { + return nil, fmt.Errorf("failed to create plan fallback cache: %w", err) + } + } + + operationPlanner := NewOperationPlanner(executor, gm.planCache, gm.planFallbackCache, s.planningDurationOverride) // We support the MCP only on the base graph. Feature flags are not supported yet. if opts.IsBaseGraph() && s.mcpServer != nil { @@ -1397,10 +1419,10 @@ func (s *graphServer) buildGraphMux( // - Using static execution config (not Cosmo): s.selfRegister == nil // - OR CDN cache warmer is explictly disabled case s.cacheWarmup.InMemoryFallback && (s.selfRegister == nil || !s.cacheWarmup.Source.CdnSource.Enabled): - // We first utilize the existing plan cache (if it was already set, i.e., not on the first start) to create a list of queries + // We first utilize the existing cache (if it was already set, i.e., not on the first start) to create a list of queries // and then reset the plan cache to the new plan cache for this start afterwards. warmupConfig.Source = NewPlanSource(opts.ReloadPersistentState.inMemoryPlanCacheFallback.getPlanCacheForFF(opts.FeatureFlagName)) - opts.ReloadPersistentState.inMemoryPlanCacheFallback.setPlanCacheForFF(opts.FeatureFlagName, gm.planCache) + opts.ReloadPersistentState.inMemoryPlanCacheFallback.setPlanCacheForFF(opts.FeatureFlagName, gm.planFallbackCache) case s.cacheWarmup.Source.CdnSource.Enabled: if s.graphApiToken == "" { return nil, fmt.Errorf("graph token is required for cache warmup in order to communicate with the CDN") @@ -1410,7 +1432,7 @@ func (s *graphServer) buildGraphMux( // This is useful for when an issue occurs with the CDN when retrieving the required manifest if s.cacheWarmup.InMemoryFallback { warmupConfig.FallbackSource = NewPlanSource(opts.ReloadPersistentState.inMemoryPlanCacheFallback.getPlanCacheForFF(opts.FeatureFlagName)) - opts.ReloadPersistentState.inMemoryPlanCacheFallback.setPlanCacheForFF(opts.FeatureFlagName, gm.planCache) + opts.ReloadPersistentState.inMemoryPlanCacheFallback.setPlanCacheForFF(opts.FeatureFlagName, gm.planFallbackCache) } cdnSource, err := NewCDNSource(s.cdnConfig.URL, s.graphApiToken, s.logger) if err != nil { diff --git a/router/core/operation_planner.go b/router/core/operation_planner.go index 496f9131b5..f9da57396f 100644 --- a/router/core/operation_planner.go +++ b/router/core/operation_planner.go @@ -3,12 +3,13 @@ package core import ( "errors" "strconv" + "time" "golang.org/x/sync/singleflight" graphqlmetricsv1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" "github.com/wundergraph/cosmo/router/pkg/graphqlschemausage" - + "github.com/wundergraph/cosmo/router/pkg/slowplancache" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/astparser" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" @@ -22,14 +23,20 @@ type planWithMetaData struct { typeFieldUsageInfo []*graphqlschemausage.TypeFieldUsageInfo argumentUsageInfo []*graphqlmetricsv1.ArgumentUsageInfo content string + operationName string + planningDuration time.Duration } type OperationPlanner struct { - sf singleflight.Group - planCache ExecutionPlanCache[uint64, *planWithMetaData] - executor *Executor - trackUsageInfo bool - operationContent bool + sf singleflight.Group + planCache ExecutionPlanCache[uint64, *planWithMetaData] + slowPlanCache *slowplancache.Cache[*planWithMetaData] + executor *Executor + trackUsageInfo bool + + // planningDurationOverride, when set, replaces the measured planning duration. + // This is used in tests to simulate slow queries. + planningDurationOverride func(content string) time.Duration } type operationPlannerOpts struct { @@ -47,17 +54,24 @@ type ExecutionPlanCache[K any, V any] interface { Close() } -func NewOperationPlanner(executor *Executor, planCache ExecutionPlanCache[uint64, *planWithMetaData], storeContent bool) *OperationPlanner { +func NewOperationPlanner( + executor *Executor, + planCache ExecutionPlanCache[uint64, *planWithMetaData], + fallbackCache *slowplancache.Cache[*planWithMetaData], + planningDurationOverride func(content string) time.Duration, +) *OperationPlanner { return &OperationPlanner{ - planCache: planCache, - executor: executor, - trackUsageInfo: executor.TrackUsageInfo, - operationContent: storeContent, + planCache: planCache, + executor: executor, + trackUsageInfo: executor.TrackUsageInfo, + slowPlanCache: fallbackCache, + planningDurationOverride: planningDurationOverride, } } -func (p *OperationPlanner) preparePlan(ctx *operationContext, opts operationPlannerOpts) (*planWithMetaData, error) { - doc, report := astparser.ParseGraphqlDocumentString(ctx.content) +// planOperation performs the core planning work: parse, plan, and postprocess. +func (p *OperationPlanner) planOperation(content string, name string, includeQueryPlan bool) (*planWithMetaData, error) { + doc, report := astparser.ParseGraphqlDocumentString(content) if report.HasErrors() { return nil, &reportError{report: &report} } @@ -67,16 +81,11 @@ func (p *OperationPlanner) preparePlan(ctx *operationContext, opts operationPlan return nil, err } - var ( - preparedPlan plan.Plan - ) - - // create and postprocess the plan - // planning uses the router schema - if ctx.executionOptions.IncludeQueryPlanInResponse { - preparedPlan = planner.Plan(&doc, p.executor.RouterSchema, ctx.name, &report, plan.IncludeQueryPlanInResponse()) + var preparedPlan plan.Plan + if includeQueryPlan { + preparedPlan = planner.Plan(&doc, p.executor.RouterSchema, name, &report, plan.IncludeQueryPlanInResponse()) } else { - preparedPlan = planner.Plan(&doc, p.executor.RouterSchema, ctx.name, &report) + preparedPlan = planner.Plan(&doc, p.executor.RouterSchema, name, &report) } if report.HasErrors() { return nil, &reportError{report: &report} @@ -84,19 +93,28 @@ func (p *OperationPlanner) preparePlan(ctx *operationContext, opts operationPlan post := postprocess.NewProcessor(postprocess.CollectDataSourceInfo()) post.Process(preparedPlan) - out := &planWithMetaData{ + return &planWithMetaData{ preparedPlan: preparedPlan, operationDocument: &doc, schemaDocument: p.executor.RouterSchema, + }, nil +} + +func (p *OperationPlanner) preparePlan(ctx *operationContext, opts operationPlannerOpts) (*planWithMetaData, error) { + out, err := p.planOperation(ctx.content, ctx.name, ctx.executionOptions.IncludeQueryPlanInResponse) + if err != nil { + return nil, err } + out.operationName = ctx.name + if opts.operationContent { out.content = ctx.Content() } if p.trackUsageInfo { - out.typeFieldUsageInfo = graphqlschemausage.GetTypeFieldUsageInfo(preparedPlan) - out.argumentUsageInfo, err = graphqlschemausage.GetArgumentUsageInfo(&doc, p.executor.RouterSchema, ctx.variables, preparedPlan, ctx.remapVariables) + out.typeFieldUsageInfo = graphqlschemausage.GetTypeFieldUsageInfo(out.preparedPlan) + out.argumentUsageInfo, err = graphqlschemausage.GetArgumentUsageInfo(out.operationDocument, p.executor.RouterSchema, ctx.variables, out.preparedPlan, ctx.remapVariables) if err != nil { return nil, err } @@ -116,6 +134,7 @@ func (p *OperationPlanner) plan(opContext *operationContext, options PlanOptions // if we have tracing enabled or want to include a query plan in the response we always prepare a new plan // this is because in case of tracing, we're writing trace data to the plan // in case of including the query plan, we don't want to cache this additional overhead + skipCache := options.TraceOptions.Enable || options.ExecutionOptions.IncludeQueryPlanInResponse // Store plan config regardless of cache to enable costs calculation. @@ -142,19 +161,40 @@ func (p *OperationPlanner) plan(opContext *operationContext, options PlanOptions // try to get a prepared plan for this operation ID from the cache cachedPlan, ok := p.planCache.Get(operationID) if ok && cachedPlan != nil { - // re-use a prepared plan + // re-use a prepared plan from the main cache opContext.preparedPlan = cachedPlan opContext.planCacheHit = true - } else { + } else if p.slowPlanCache != nil { + if cachedPlan, ok = p.slowPlanCache.Get(operationID); ok { + // found in the plan fallback cache — re-use and re-insert into main cache + opContext.preparedPlan = cachedPlan + opContext.planCacheHit = true + p.planCache.Set(operationID, cachedPlan, 1) + } + } + + if opContext.preparedPlan == nil { // prepare a new plan using single flight // this ensures that we only prepare the plan once for this operation ID operationIDStr := strconv.FormatUint(operationID, 10) sharedPreparedPlan, err, _ := p.sf.Do(operationIDStr, func() (interface{}, error) { - prepared, err := p.preparePlan(opContext, operationPlannerOpts{operationContent: p.operationContent}) + start := time.Now() + prepared, err := p.preparePlan(opContext, operationPlannerOpts{operationContent: p.slowPlanCache != nil}) if err != nil { return nil, err } + prepared.planningDuration = time.Since(start) + + // This is only used for test cases + if p.planningDurationOverride != nil { + prepared.planningDuration = p.planningDurationOverride(prepared.content) + } + + // Set into the main cache after planningDuration is finalized, + // because the OnEvict callback reads planningDuration concurrently. p.planCache.Set(operationID, prepared, 1) + p.slowPlanCache.Set(operationID, prepared, prepared.planningDuration) + return prepared, nil }) if err != nil { diff --git a/router/core/reload_persistent_state.go b/router/core/reload_persistent_state.go index d842b239a5..c8a5af6ecc 100644 --- a/router/core/reload_persistent_state.go +++ b/router/core/reload_persistent_state.go @@ -3,13 +3,11 @@ package core import ( "sync" - "github.com/dgraph-io/ristretto/v2" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" + "github.com/wundergraph/cosmo/router/pkg/slowplancache" "go.uber.org/zap" ) -type planCache = *ristretto.Cache[uint64, *planWithMetaData] - // ReloadPersistentState This file describes any configuration which should persist or be shared across router restarts type ReloadPersistentState struct { inMemoryPlanCacheFallback *InMemoryPlanCacheFallback @@ -34,6 +32,7 @@ func (s *ReloadPersistentState) CleanupFeatureFlags(routerCfg *nodev1.RouterConf s.inMemoryPlanCacheFallback.cleanupUnusedFeatureFlags(routerCfg) } +// This should always be called before graphMux.Shutdown() as ordering matters func (s *ReloadPersistentState) OnRouterConfigReload() { // For cases of router config changes (not execution config), we shut down before creating the // graph mux, because we need to initialize everything from the start @@ -45,7 +44,8 @@ func (s *ReloadPersistentState) OnRouterConfigReload() { s.inMemoryPlanCacheFallback.extractQueriesAndOverridePlanCache() } -// InMemoryPlanCacheFallback is a store that stores either queries or references to the planner cache for use with the cache warmer +// InMemoryPlanCacheFallback is a store that stores either queries or references to the planner cache for use with the cache warmer. +// Only expensive queries (planning duration >= threshold) are persisted. type InMemoryPlanCacheFallback struct { mu sync.RWMutex queriesForFeatureFlag map[string]any @@ -82,7 +82,8 @@ func (c *InMemoryPlanCacheFallback) IsEnabled() bool { return c.queriesForFeatureFlag != nil } -// getPlanCacheForFF gets the plan cache in the []*nodev1.Operation format for a specific feature flag key +// getPlanCacheForFF gets the plan cache in the []*nodev1.Operation format for a specific feature flag key. +// It handles both live expensive cache references and already-extracted operation snapshots. func (c *InMemoryPlanCacheFallback) getPlanCacheForFF(featureFlagKey string) []*nodev1.Operation { c.mu.RLock() defer c.mu.RUnlock() @@ -92,7 +93,7 @@ func (c *InMemoryPlanCacheFallback) getPlanCacheForFF(featureFlagKey string) []* } switch cache := c.queriesForFeatureFlag[featureFlagKey].(type) { - case planCache: + case *slowplancache.Cache[*planWithMetaData]: return convertToNodeOperation(cache) case []*nodev1.Operation: return cache @@ -107,7 +108,7 @@ func (c *InMemoryPlanCacheFallback) getPlanCacheForFF(featureFlagKey string) []* } // setPlanCacheForFF sets the plan cache for a specific feature flag key -func (c *InMemoryPlanCacheFallback) setPlanCacheForFF(featureFlagKey string, cache planCache) { +func (c *InMemoryPlanCacheFallback) setPlanCacheForFF(featureFlagKey string, cache *slowplancache.Cache[*planWithMetaData]) { c.mu.Lock() defer c.mu.Unlock() @@ -128,7 +129,7 @@ func (c *InMemoryPlanCacheFallback) extractQueriesAndOverridePlanCache() { fallbackMap := make(map[string]any) for k, v := range c.queriesForFeatureFlag { - if cache, ok := v.(planCache); ok { + if cache, ok := v.(*slowplancache.Cache[*planWithMetaData]); ok { fallbackMap[k] = convertToNodeOperation(cache) } } @@ -158,14 +159,13 @@ func (c *InMemoryPlanCacheFallback) cleanupUnusedFeatureFlags(routerCfg *nodev1. } } -func convertToNodeOperation(data planCache) []*nodev1.Operation { +func convertToNodeOperation(data *slowplancache.Cache[*planWithMetaData]) []*nodev1.Operation { items := make([]*nodev1.Operation, 0) - data.IterValues(func(v *planWithMetaData) (stop bool) { + for v := range data.Values() { items = append(items, &nodev1.Operation{ Request: &nodev1.OperationRequest{Query: v.content}, }) - return false - }) + } return items } diff --git a/router/core/reload_persistent_state_test.go b/router/core/reload_persistent_state_test.go index 4ec5e0638b..247091dba0 100644 --- a/router/core/reload_persistent_state_test.go +++ b/router/core/reload_persistent_state_test.go @@ -3,10 +3,10 @@ package core import ( "testing" - "github.com/dgraph-io/ristretto/v2" "github.com/stretchr/testify/require" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/pkg/config" + "github.com/wundergraph/cosmo/router/pkg/slowplancache" "go.uber.org/zap" ) @@ -33,7 +33,6 @@ func TestInMemoryPlanCacheFallback_UpdateInMemoryFallbackCacheForConfigChanges(t cache := &InMemoryPlanCacheFallback{ queriesForFeatureFlag: make(map[string]any), } - cache.queriesForFeatureFlag["test"] = nil cfg := &Config{ cacheWarmup: &config.CacheWarmupConfiguration{ @@ -48,11 +47,11 @@ func TestInMemoryPlanCacheFallback_UpdateInMemoryFallbackCacheForConfigChanges(t t.Run("update when already enabled keeps existing data", func(t *testing.T) { t.Parallel() - existingMap := make(map[string]any) - existingMap["test"] = nil + existing := make(map[string]any) + existing["test"] = (*slowplancache.Cache[*planWithMetaData])(nil) cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: existingMap, + queriesForFeatureFlag: existing, } cfg := &Config{ @@ -120,43 +119,8 @@ func TestInMemoryPlanCacheFallback_UpdateInMemoryFallbackCacheForConfigChanges(t func TestInMemoryPlanCacheFallback_GetPlanCacheForFF(t *testing.T) { t.Parallel() - t.Run("returns operations for existing feature flag when enabled with ristretto cache", func(t *testing.T) { - t.Parallel() - mockCache, err := ristretto.NewCache(&ristretto.Config[uint64, *planWithMetaData]{ - MaxCost: 10000, - NumCounters: 10000000, - IgnoreInternalCost: true, - BufferItems: 64, - }) - require.NoError(t, err) - - query1 := "query { test1 }" - query2 := "query { test2 }" - - mockCache.Set(1, &planWithMetaData{content: query1}, 1) - mockCache.Set(2, &planWithMetaData{content: query2}, 1) - mockCache.Wait() - - cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: make(map[string]any), - } - cache.queriesForFeatureFlag["test-ff"] = mockCache - - result := cache.getPlanCacheForFF("test-ff") - - require.NotNil(t, result) - require.IsType(t, []*nodev1.Operation{}, result) - require.Len(t, result, 2) - // Verify the operations contain the expected queries (order may vary) - queries := make([]string, len(result)) - for i, op := range result { - queries[i] = op.Request.Query - } - require.ElementsMatch(t, []string{query1, query2}, queries) - }) - - t.Run("returns operations for existing feature flag when enabled with operation slice", func(t *testing.T) { + t.Run("returns operations for existing feature flag from extracted ops", func(t *testing.T) { t.Parallel() expectedOps := []*nodev1.Operation{ {Request: &nodev1.OperationRequest{Query: "query { test1 }"}}, @@ -164,9 +128,10 @@ func TestInMemoryPlanCacheFallback_GetPlanCacheForFF(t *testing.T) { } cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: make(map[string]any), + queriesForFeatureFlag: map[string]any{ + "test-ff": expectedOps, + }, } - cache.queriesForFeatureFlag["test-ff"] = expectedOps result := cache.getPlanCacheForFF("test-ff") @@ -174,78 +139,47 @@ func TestInMemoryPlanCacheFallback_GetPlanCacheForFF(t *testing.T) { require.Equal(t, expectedOps, result) }) - t.Run("returns empty slice for non-existent feature flag", func(t *testing.T) { + t.Run("returns operations from live fallback cache reference", func(t *testing.T) { t.Parallel() - cache := &InMemoryPlanCacheFallback{ - logger: zap.NewNop(), - queriesForFeatureFlag: make(map[string]any), - } - result := cache.getPlanCacheForFF("non-existent") - require.Nil(t, result) - }) + fallbackCache, err := slowplancache.New[*planWithMetaData](100, 0) + require.NoError(t, err) + fallbackCache.Set(1, &planWithMetaData{content: "query { fromFallback }"}, 5*1e9) + fallbackCache.Wait() - t.Run("returns nil when cache is disabled", func(t *testing.T) { - t.Parallel() cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: nil, + queriesForFeatureFlag: map[string]any{ + "test-ff": fallbackCache, + }, } result := cache.getPlanCacheForFF("test-ff") - require.Nil(t, result) + require.NotNil(t, result) + require.Len(t, result, 1) + require.Equal(t, "query { fromFallback }", result[0].Request.Query) }) -} -func TestInMemoryPlanCacheFallback_SetPlanCacheForFF(t *testing.T) { - t.Parallel() - t.Run("sets cache for feature flag when enabled", func(t *testing.T) { + t.Run("returns nil for non-existent feature flag", func(t *testing.T) { t.Parallel() - mockCache, err := ristretto.NewCache(&ristretto.Config[uint64, *planWithMetaData]{ - MaxCost: 100, - NumCounters: 10000, - BufferItems: 64, - }) - require.NoError(t, err) - cache := &InMemoryPlanCacheFallback{ + logger: zap.NewNop(), queriesForFeatureFlag: make(map[string]any), } - cache.setPlanCacheForFF("test-ff", mockCache) - - require.Contains(t, cache.queriesForFeatureFlag, "test-ff") - // Verify it's the same cache by comparing the underlying pointer - require.Equal(t, cache.queriesForFeatureFlag["test-ff"], mockCache) + result := cache.getPlanCacheForFF("non-existent") + require.Nil(t, result) }) - t.Run("does not set cache when disabled", func(t *testing.T) { + t.Run("returns nil when cache is disabled", func(t *testing.T) { t.Parallel() - mockCache, err := ristretto.NewCache(&ristretto.Config[uint64, *planWithMetaData]{ - MaxCost: 100, - NumCounters: 10000, - BufferItems: 64, - }) - require.NoError(t, err) - cache := &InMemoryPlanCacheFallback{ queriesForFeatureFlag: nil, } - cache.setPlanCacheForFF("test-ff", mockCache) - - require.Nil(t, cache.queriesForFeatureFlag) - }) - - t.Run("does not set nil cache", func(t *testing.T) { - t.Parallel() - cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: make(map[string]any), - } - - cache.setPlanCacheForFF("test-ff", nil) + result := cache.getPlanCacheForFF("test-ff") - require.NotContains(t, cache.queriesForFeatureFlag, "test-ff") + require.Nil(t, result) }) } @@ -254,11 +188,12 @@ func TestInMemoryPlanCacheFallback_CleanupUnusedFeatureFlags(t *testing.T) { t.Run("removes unused feature flags", func(t *testing.T) { t.Parallel() cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: make(map[string]any), + queriesForFeatureFlag: map[string]any{ + "ff1": (*slowplancache.Cache[*planWithMetaData])(nil), + "ff2": (*slowplancache.Cache[*planWithMetaData])(nil), + "ff3": (*slowplancache.Cache[*planWithMetaData])(nil), + }, } - cache.queriesForFeatureFlag["ff1"] = nil - cache.queriesForFeatureFlag["ff2"] = nil - cache.queriesForFeatureFlag["ff3"] = nil routerCfg := &nodev1.RouterConfig{ FeatureFlagConfigs: &nodev1.FeatureFlagRouterExecutionConfigs{ @@ -280,10 +215,11 @@ func TestInMemoryPlanCacheFallback_CleanupUnusedFeatureFlags(t *testing.T) { t.Run("keeps empty string feature flag", func(t *testing.T) { t.Parallel() cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: make(map[string]any), + queriesForFeatureFlag: map[string]any{ + "": (*slowplancache.Cache[*planWithMetaData])(nil), + "ff1": (*slowplancache.Cache[*planWithMetaData])(nil), + }, } - cache.queriesForFeatureFlag[""] = nil - cache.queriesForFeatureFlag["ff1"] = nil routerCfg := &nodev1.RouterConfig{ FeatureFlagConfigs: &nodev1.FeatureFlagRouterExecutionConfigs{ @@ -319,14 +255,15 @@ func TestInMemoryPlanCacheFallback_CleanupUnusedFeatureFlags(t *testing.T) { t.Run("removes feature flags when not in ConfigByFeatureFlagName", func(t *testing.T) { t.Parallel() cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: make(map[string]any), + queriesForFeatureFlag: map[string]any{ + "": (*slowplancache.Cache[*planWithMetaData])(nil), // base should be kept + "ff1": (*slowplancache.Cache[*planWithMetaData])(nil), + "ff2": (*slowplancache.Cache[*planWithMetaData])(nil), + "ff3": (*slowplancache.Cache[*planWithMetaData])(nil), + "ff4": (*slowplancache.Cache[*planWithMetaData])(nil), + "ff5": (*slowplancache.Cache[*planWithMetaData])(nil), + }, } - cache.queriesForFeatureFlag[""] = nil // base should be kept - cache.queriesForFeatureFlag["ff1"] = nil - cache.queriesForFeatureFlag["ff2"] = nil - cache.queriesForFeatureFlag["ff3"] = nil - cache.queriesForFeatureFlag["ff4"] = nil - cache.queriesForFeatureFlag["ff5"] = nil routerCfg := &nodev1.RouterConfig{ FeatureFlagConfigs: nil, @@ -344,50 +281,40 @@ func TestInMemoryPlanCacheFallback_CleanupUnusedFeatureFlags(t *testing.T) { func TestInMemoryPlanCacheFallback_ProcessOnConfigChangeRestart(t *testing.T) { t.Parallel() - t.Run("converts ristretto caches to operation slices", func(t *testing.T) { + t.Run("extracts fallback cache entries to operations", func(t *testing.T) { t.Parallel() - mockCache1, err := ristretto.NewCache(&ristretto.Config[uint64, *planWithMetaData]{ - MaxCost: 10000, - NumCounters: 10000000, - IgnoreInternalCost: true, - BufferItems: 64, - }) - require.NoError(t, err) - - mockCache2, err := ristretto.NewCache(&ristretto.Config[uint64, *planWithMetaData]{ - MaxCost: 10000, - NumCounters: 10000000, - IgnoreInternalCost: true, - BufferItems: 64, - }) - require.NoError(t, err) query1 := "query { test1 }" query2 := "query { test2 }" - mockCache1.Set(1, &planWithMetaData{content: query1}, 1) - mockCache1.Wait() - mockCache2.Set(2, &planWithMetaData{content: query2}, 1) - mockCache2.Wait() + fallbackCache1, err := slowplancache.New[*planWithMetaData](100, 0) + require.NoError(t, err) + fallbackCache2, err := slowplancache.New[*planWithMetaData](100, 0) + require.NoError(t, err) + + fallbackCache1.Set(1, &planWithMetaData{content: query1}, 5*1e9) + fallbackCache1.Wait() + fallbackCache2.Set(2, &planWithMetaData{content: query2}, 5*1e9) + fallbackCache2.Wait() cache := &InMemoryPlanCacheFallback{ - queriesForFeatureFlag: make(map[string]any), + queriesForFeatureFlag: map[string]any{ + "ff1": fallbackCache1, + "ff2": fallbackCache2, + }, } - cache.queriesForFeatureFlag["ff1"] = mockCache1 - cache.queriesForFeatureFlag["ff2"] = mockCache2 cache.extractQueriesAndOverridePlanCache() - // Verify both caches have been converted to operation slices - require.IsType(t, []*nodev1.Operation{}, cache.queriesForFeatureFlag["ff1"]) - require.IsType(t, []*nodev1.Operation{}, cache.queriesForFeatureFlag["ff2"]) - - ff1Ops := cache.queriesForFeatureFlag["ff1"].([]*nodev1.Operation) - ff2Ops := cache.queriesForFeatureFlag["ff2"].([]*nodev1.Operation) - + // Verify both caches have been extracted to operations + ff1Ops, ok := cache.queriesForFeatureFlag["ff1"].([]*nodev1.Operation) + require.True(t, ok) require.Len(t, ff1Ops, 1) - require.Len(t, ff2Ops, 1) require.Equal(t, query1, ff1Ops[0].Request.Query) + + ff2Ops, ok := cache.queriesForFeatureFlag["ff2"].([]*nodev1.Operation) + require.True(t, ok) + require.Len(t, ff2Ops, 1) require.Equal(t, query2, ff2Ops[0].Request.Query) }) @@ -436,5 +363,4 @@ func TestInMemoryPlanCacheFallback_IsEnabled(t *testing.T) { require.False(t, cache.IsEnabled()) }) - } diff --git a/router/core/router.go b/router/core/router.go index eef8b8734d..34e619e0da 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -2246,6 +2246,14 @@ func WithCacheWarmupConfig(cfg *config.CacheWarmupConfiguration) Option { } } +// WithPlanningDurationOverride sets a function that overrides the measured planning duration. +// Used in tests to simulate slow queries that exceed the expensive query threshold. +func WithPlanningDurationOverride(fn func(content string) time.Duration) Option { + return func(r *Router) { + r.planningDurationOverride = fn + } +} + func WithMCP(cfg config.MCPConfiguration) Option { return func(r *Router) { r.mcp = cfg diff --git a/router/core/router_config.go b/router/core/router_config.go index bdb126614f..6580e1e639 100644 --- a/router/core/router_config.go +++ b/router/core/router_config.go @@ -139,6 +139,7 @@ type Config struct { subgraphErrorPropagation config.SubgraphErrorPropagationConfiguration clientHeader config.ClientHeader cacheWarmup *config.CacheWarmupConfiguration + planningDurationOverride func(content string) time.Duration subscriptionHeartbeatInterval time.Duration hostName string mcp config.MCPConfiguration diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 5b31c1f932..f9715bfe97 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -439,6 +439,8 @@ type EngineExecutionConfiguration struct { WebSocketClientPingTimeout time.Duration `envDefault:"30s" env:"ENGINE_WEBSOCKET_CLIENT_PING_TIMEOUT" yaml:"websocket_client_ping_timeout,omitempty"` WebSocketClientFrameTimeout time.Duration `envDefault:"100ms" env:"ENGINE_WEBSOCKET_CLIENT_FRAME_TIMEOUT" yaml:"websocket_client_frame_timeout,omitempty"` ExecutionPlanCacheSize int64 `envDefault:"1024" env:"ENGINE_EXECUTION_PLAN_CACHE_SIZE" yaml:"execution_plan_cache_size,omitempty"` + SlowPlanCacheSize int64 `envDefault:"300" env:"ENGINE_SLOW_PLAN_CACHE_SIZE" yaml:"slow_plan_cache_size,omitempty"` + SlowPlanCacheThreshold time.Duration `envDefault:"100ms" env:"ENGINE_SLOW_PLAN_CACHE_THRESHOLD" yaml:"slow_plan_cache_threshold,omitempty"` MinifySubgraphOperations bool `envDefault:"true" env:"ENGINE_MINIFY_SUBGRAPH_OPERATIONS" yaml:"minify_subgraph_operations"` EnablePersistedOperationsCache bool `envDefault:"true" env:"ENGINE_ENABLE_PERSISTED_OPERATIONS_CACHE" yaml:"enable_persisted_operations_cache"` EnableNormalizationCache bool `envDefault:"true" env:"ENGINE_ENABLE_NORMALIZATION_CACHE" yaml:"enable_normalization_cache"` diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index d8e9bdacc3..c0b6b79350 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -3414,6 +3414,21 @@ "default": 1024, "description": "The size of the execution plan cache." }, + "slow_plan_cache_size": { + "type": "integer", + "minimum": 1, + "default": 300, + "description": "The maximum number of entries in the slow plan cache." + }, + "slow_plan_cache_threshold": { + "type": "string", + "format": "go-duration", + "description": "The minimum planning duration for a query plan to be stored in the slow plan cache.", + "default": "100ms", + "duration": { + "minimum": "0ns" + } + }, "operation_hash_cache_size": { "type": "integer", "default": 2048, diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index e5e86ea931..817b683454 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -434,6 +434,8 @@ "WebSocketClientPingTimeout": 30000000000, "WebSocketClientFrameTimeout": 100000000, "ExecutionPlanCacheSize": 1024, + "SlowPlanCacheSize": 300, + "SlowPlanCacheThreshold": 100000000, "MinifySubgraphOperations": true, "EnablePersistedOperationsCache": true, "EnableNormalizationCache": true, diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 1ffc9837d1..bd307ab0c3 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -844,6 +844,8 @@ "WebSocketClientPingTimeout": 30000000000, "WebSocketClientFrameTimeout": 100000000, "ExecutionPlanCacheSize": 1024, + "SlowPlanCacheSize": 300, + "SlowPlanCacheThreshold": 100000000, "MinifySubgraphOperations": true, "EnablePersistedOperationsCache": true, "EnableNormalizationCache": true, diff --git a/router/pkg/slowplancache/slow_plan_cache.go b/router/pkg/slowplancache/slow_plan_cache.go new file mode 100644 index 0000000000..17fba9aa5f --- /dev/null +++ b/router/pkg/slowplancache/slow_plan_cache.go @@ -0,0 +1,226 @@ +package slowplancache + +import ( + "fmt" + "iter" + "sync" + "sync/atomic" + "time" +) + +// Entry holds a cached value and the duration it took to produce. +type Entry[V any] struct { + value V + duration time.Duration +} + +type setRequest[V any] struct { + key uint64 + value V + dur time.Duration + waitCh chan struct{} // if non-nil, will be closed after previous requests in the buffer are processed +} + +// Cache is a bounded map that holds expensive-to-compute values +// that should not be subject to TinyLFU eviction in the main cache. +// Writes are buffered through a channel and applied asynchronously by a +// background goroutine, making Set non-blocking. Reads use sync.Map for lock-free access. +// It tracks the minimum-duration entry so that rejection of cheaper entries is O(1). +type Cache[V any] struct { + entries sync.Map // map[uint64]*Entry[V] + size int64 + maxSize int64 + threshold time.Duration + minKey uint64 + minDur time.Duration + + writeCh chan setRequest[V] + stop chan struct{} + done chan struct{} + closeOnce sync.Once + closed atomic.Bool +} + +// We use the same value as ristretto (this would be the buffer size if we used ristretto as the backing cache) +const defaultWriteBufferSize = 32 * 1024 + +func New[V any](maxSize int, threshold time.Duration) (*Cache[V], error) { + if maxSize < 1 { + return nil, fmt.Errorf("slow plan cache size must be at least 1, got %d", maxSize) + } + c := &Cache[V]{ + maxSize: int64(maxSize), + threshold: threshold, + writeCh: make(chan setRequest[V], defaultWriteBufferSize), + stop: make(chan struct{}), + done: make(chan struct{}), + } + go c.processWrites() + return c, nil +} + +// processWrites drains the write channel and applies sets. +// It exits when the stop channel is closed. +func (c *Cache[V]) processWrites() { + defer close(c.done) + + for { + select { + case req := <-c.writeCh: + if req.waitCh != nil { + close(req.waitCh) + continue + } + c.applySet(req.key, req.value, req.dur) + case <-c.stop: + return + } + } +} + +func (c *Cache[V]) Get(key uint64) (V, bool) { + if c == nil || c.closed.Load() { + var zero V + return zero, false + } + + val, ok := c.entries.Load(key) + if !ok { + var zero V + return zero, false + } + + return val.(*Entry[V]).value, true +} + +// Set enqueues a write to the cache. The write is applied asynchronously. +// If the write buffer is full or the cache is closed, the entry is silently dropped. +func (c *Cache[V]) Set(key uint64, value V, duration time.Duration) { + if c == nil || c.closed.Load() { + return + } + + if duration < c.threshold { + return + } + + select { + case c.writeCh <- setRequest[V]{key: key, value: value, dur: duration}: + default: + } +} + +// Wait blocks until all pending writes in the buffer have been processed. +// Returns immediately if the cache is closed. +func (c *Cache[V]) Wait() { + if c == nil || c.closed.Load() { + return + } + + ch := make(chan struct{}) + + select { + case c.writeCh <- setRequest[V]{waitCh: ch}: + <-ch + case <-c.done: + } +} + +// applySet performs the actual cache mutation. Must only be called from processWrites. +func (c *Cache[V]) applySet(key uint64, value V, duration time.Duration) { + entry := &Entry[V]{value: value, duration: duration} + + // If key already exists, update it + if existing, ok := c.entries.Load(key); ok { + currEntry := existing.(*Entry[V]) + // Consider worst case, if the previous run was faster then increase + if currEntry.duration < duration { + c.entries.Store(key, entry) + + // If the minKey duration was increased, there can be a new minKey + if c.minKey == key { + c.refreshMin() + } + } + return + } + + // If not at capacity, just add and update min tracking + if c.size < c.maxSize { + c.entries.Store(key, entry) + c.size++ + if c.size == 1 || duration < c.minDur { + c.minKey = key + c.minDur = duration + } + return + } + + // At capacity: reject if new entry is not more expensive than the current minimum + if duration <= c.minDur { + return + } + + // When at max capacity + // Evict the minimum and insert the new entry + c.entries.Delete(c.minKey) + c.entries.Store(key, entry) + // size stays the same: deleted one, added one + c.refreshMin() +} + +// refreshMin rescans the entries to find the new minimum. Must only be called from processWrites. +func (c *Cache[V]) refreshMin() { + var ( + minKey uint64 + minDur time.Duration + first = true + ) + + c.entries.Range(func(k, v any) bool { + e := v.(*Entry[V]) + if first || e.duration < minDur { + minKey = k.(uint64) + minDur = e.duration + first = false + } + return true + }) + + if !first { + c.minKey = minKey + c.minDur = minDur + } +} + +// Values returns an iterator over all cached values. +func (c *Cache[V]) Values() iter.Seq[V] { + return func(yield func(V) bool) { + if c == nil || c.closed.Load() { + return + } + + c.entries.Range(func(_, v any) bool { + return yield(v.(*Entry[V]).value) + }) + } +} + +// Close stops the background goroutine and releases resources. +// Pending writes in the buffer may be dropped. Safe to call multiple times. +func (c *Cache[V]) Close() { + if c == nil || c.closed.Load() { + return + } + + c.closeOnce.Do(func() { + c.closed.Store(true) + + close(c.stop) + <-c.done + + // This downside is also there in ristretto (if set is called concurrently) + // it is even documented in the ristretto code as a comment + close(c.writeCh) + }) +} diff --git a/router/pkg/slowplancache/slow_plan_cache_test.go b/router/pkg/slowplancache/slow_plan_cache_test.go new file mode 100644 index 0000000000..69734bf771 --- /dev/null +++ b/router/pkg/slowplancache/slow_plan_cache_test.go @@ -0,0 +1,512 @@ +package slowplancache + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type testPlan struct { + content string +} + +func TestCache_GetSet(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + defer c.Close() + + plan1 := &testPlan{content: "query { a }"} + plan2 := &testPlan{content: "query { b }"} + + // Miss + _, ok := c.Get(1) + require.False(t, ok) + + // Set and get + c.Set(1, plan1, 10*time.Millisecond) + c.Wait() + got, ok := c.Get(1) + require.True(t, ok) + require.Equal(t, plan1, got) + + // Different key + c.Set(2, plan2, 20*time.Millisecond) + c.Wait() + got, ok = c.Get(2) + require.True(t, ok) + require.Equal(t, plan2, got) + + // Original still there + got, ok = c.Get(1) + require.True(t, ok) + require.Equal(t, plan1, got) +} + +func TestCache_BoundedSize(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](3, 0) + require.NoError(t, err) + defer c.Close() + + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + c.Set(2, &testPlan{content: "q2"}, 20*time.Millisecond) + c.Set(3, &testPlan{content: "q3"}, 30*time.Millisecond) + + // Cache is full (3/3). Adding a 4th with higher duration should evict the shortest (key=1, 10ms) + c.Set(4, &testPlan{content: "q4"}, 25*time.Millisecond) + c.Wait() + + // Key 1 should be evicted (it had the shortest duration: 10ms) + _, ok := c.Get(1) + require.False(t, ok, "key 1 should have been evicted") + + // Keys 2, 3, 4 should remain + _, ok = c.Get(2) + require.True(t, ok) + _, ok = c.Get(3) + require.True(t, ok) + _, ok = c.Get(4) + require.True(t, ok) +} + +func TestCache_BoundedSize_SkipsCheaper(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](3, 0) + require.NoError(t, err) + defer c.Close() + + c.Set(1, &testPlan{content: "q1"}, 10*time.Second) + c.Set(2, &testPlan{content: "q2"}, 20*time.Second) + c.Set(3, &testPlan{content: "q3"}, 30*time.Second) + + // Try to add a cheaper entry (5s < 10s minimum) — should be rejected + c.Set(4, &testPlan{content: "q4"}, 5*time.Second) + c.Wait() + + _, ok := c.Get(4) + require.False(t, ok, "cheaper entry should not be added when cache is full") + + // All original entries should remain + _, ok = c.Get(1) + require.True(t, ok) + _, ok = c.Get(2) + require.True(t, ok) + _, ok = c.Get(3) + require.True(t, ok) +} + +func TestCache_UpdateExisting(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](2, 0) + require.NoError(t, err) + defer c.Close() + + plan1 := &testPlan{content: "q1"} + plan1Updated := &testPlan{content: "q1-updated"} + + c.Set(1, plan1, 10*time.Millisecond) + c.Set(1, plan1Updated, 50*time.Millisecond) + c.Wait() + + got, ok := c.Get(1) + require.True(t, ok) + require.Equal(t, "q1-updated", got.content) + + // Updating an existing key should not increase the count + c.Set(2, &testPlan{content: "q2"}, 20*time.Millisecond) + c.Wait() + _, ok = c.Get(1) + require.True(t, ok, "key 1 should still exist after adding key 2 (capacity is 2)") + _, ok = c.Get(2) + require.True(t, ok) +} + +func TestCache_Values(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + defer c.Close() + + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + c.Set(2, &testPlan{content: "q2"}, 20*time.Millisecond) + c.Set(3, &testPlan{content: "q3"}, 30*time.Millisecond) + c.Wait() + + var contents []string + for v := range c.Values() { + contents = append(contents, v.content) + } + require.Len(t, contents, 3) + require.ElementsMatch(t, []string{"q1", "q2", "q3"}, contents) +} + +func TestCache_Values_EarlyStop(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + defer c.Close() + + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + c.Set(2, &testPlan{content: "q2"}, 20*time.Millisecond) + c.Set(3, &testPlan{content: "q3"}, 30*time.Millisecond) + c.Wait() + + count := 0 + for range c.Values() { + count++ + break // stop after first + } + require.Equal(t, 1, count) +} + +func TestCache_Close(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + + c.Close() + + // After close, entries map should be nil + _, ok := c.Get(1) + require.False(t, ok) +} + +func TestCache_SetAfterClose(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + c.Close() + + // Set after Close should not panic — buffer drops silently + require.NotPanics(t, func() { + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + }) + + _, ok := c.Get(1) + require.False(t, ok) +} + +func TestCache_ValuesEmpty(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + defer c.Close() + + count := 0 + for range c.Values() { + count++ + } + require.Equal(t, 0, count) +} + +func TestCache_ValuesAfterClose(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + c.Close() + + count := 0 + for range c.Values() { + count++ + } + require.Equal(t, 0, count) +} + +func TestCache_EqualDurationNotEvicted(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](2, 0) + require.NoError(t, err) + defer c.Close() + + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + c.Set(2, &testPlan{content: "q2"}, 20*time.Millisecond) + + // Same duration as minimum (10ms) — should NOT evict (requires strictly greater) + c.Set(3, &testPlan{content: "q3"}, 10*time.Millisecond) + c.Wait() + + _, ok := c.Get(3) + require.False(t, ok, "entry with equal duration should not replace minimum") + _, ok = c.Get(1) + require.True(t, ok) + _, ok = c.Get(2) + require.True(t, ok) +} + +func TestCache_MaxSizeOne(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](1, 0) + require.NoError(t, err) + defer c.Close() + + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + c.Wait() + got, ok := c.Get(1) + require.True(t, ok) + require.Equal(t, "q1", got.content) + + // Adding a more expensive entry should evict the only entry + c.Set(2, &testPlan{content: "q2"}, 20*time.Millisecond) + c.Wait() + _, ok = c.Get(1) + require.False(t, ok) + got, ok = c.Get(2) + require.True(t, ok) + require.Equal(t, "q2", got.content) + + // Adding a cheaper entry should be rejected + c.Set(3, &testPlan{content: "q3"}, 5*time.Millisecond) + c.Wait() + _, ok = c.Get(3) + require.False(t, ok) + _, ok = c.Get(2) + require.True(t, ok) +} + +// runMixedOps exercises all cache operations deterministically based on the counter i. +// Operation distribution: ~29% writes, ~14% same-key writes, ~29% read hits, +// ~14% read misses, ~14% iteration (80% full, 20% early stop) + occasional Wait. +func runMixedOps(c *Cache[*testPlan], i int) { + plan := &testPlan{content: "q"} + op := i % 7 + key := uint64(i % 2000) + + switch { + case op < 2: + // ~29% writes with varying keys (triggers eviction when cache is full) + c.Set(key, plan, time.Duration(i%500+1)*time.Millisecond) + case op < 3: + // ~14% writes to same key (triggers update path and possible refreshMin) + c.Set(42, plan, time.Duration(i%500+1)*time.Millisecond) + case op < 5: + // ~29% reads that may hit + c.Get(uint64(i % 500)) + case op < 6: + // ~14% reads that will mostly miss (keys beyond cache capacity) + c.Get(key + 1000) + default: + // ~14% iteration + Wait + if i%5 == 0 { + for range c.Values() { + break + } + } else { + for range c.Values() { + } + } + if i%13 == 0 { + c.Wait() + } + } +} + +func TestCache_ConcurrentAccess(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](100, 0) + require.NoError(t, err) + defer c.Close() + var wg sync.WaitGroup + + const ( + numGoroutines = 2000 + opsPerRoutine = 5000 + ) + + for g := range numGoroutines { + wg.Go(func() { + for j := range opsPerRoutine { + runMixedOps(c, g*opsPerRoutine+j) + } + }) + } + + wg.Wait() +} + +func BenchmarkCache_ConcurrentMixed(b *testing.B) { + c, err := New[*testPlan](1000, 0) + require.NoError(b, err) + defer c.Close() + + // Pre-populate half the key space so we get a mix of hits and misses + for i := range 500 { + c.Set(uint64(i), &testPlan{content: "q"}, time.Duration(i+1)*time.Millisecond) + } + c.Wait() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + runMixedOps(c, i) + i++ + } + }) + c.Wait() +} + +func TestCache_InvalidSize(t *testing.T) { + t.Parallel() + _, err := New[*testPlan](0, 0) + require.Error(t, err) + + _, err = New[*testPlan](-1, 0) + require.Error(t, err) +} + +func TestCache_ThresholdRejectsBelow(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 100*time.Millisecond) + require.NoError(t, err) + defer c.Close() + + // Below threshold — should be rejected + c.Set(1, &testPlan{content: "q1"}, 50*time.Millisecond) + c.Wait() + _, ok := c.Get(1) + require.False(t, ok, "entry below threshold should be rejected") + + // At threshold — should be accepted + c.Set(2, &testPlan{content: "q2"}, 100*time.Millisecond) + c.Wait() + _, ok = c.Get(2) + require.True(t, ok, "entry at threshold should be accepted") + + // Above threshold — should be accepted + c.Set(3, &testPlan{content: "q3"}, 200*time.Millisecond) + c.Wait() + _, ok = c.Get(3) + require.True(t, ok, "entry above threshold should be accepted") +} + +func TestCache_WaitAfterClose(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + c.Close() + + // Wait after Close should not deadlock or panic + require.NotPanics(t, func() { + c.Wait() + }) +} + +func TestCache_DoubleClose(t *testing.T) { + t.Parallel() + c, err := New[*testPlan](10, 0) + require.NoError(t, err) + + c.Set(1, &testPlan{content: "q1"}, 10*time.Millisecond) + + // Double Close should not panic + require.NotPanics(t, func() { + c.Close() + c.Close() + }) +} + +func BenchmarkCache_Set(b *testing.B) { + c, err := New[*testPlan](1000, 0) + require.NoError(b, err) + defer c.Close() + + plan := &testPlan{content: "query { benchmarkField }"} + + i := 0 + for b.Loop() { + c.Set(uint64(i), plan, time.Duration(i)*time.Millisecond) + i++ + } + c.Wait() +} + +func BenchmarkCache_Set_Eviction(b *testing.B) { + c, err := New[*testPlan](100, 0) + require.NoError(b, err) + defer c.Close() + + plan := &testPlan{content: "query { benchmarkField }"} + + i := 0 + for b.Loop() { + c.Set(uint64(i), plan, time.Duration(i)*time.Millisecond) + i++ + } + c.Wait() +} + +func BenchmarkCache_Get_Hit(b *testing.B) { + c, err := New[*testPlan](1000, 0) + require.NoError(b, err) + defer c.Close() + + for i := range 1000 { + c.Set(uint64(i), &testPlan{content: "q"}, time.Duration(i+1)*time.Millisecond) + } + c.Wait() + + i := 0 + for b.Loop() { + c.Get(uint64(i % 1000)) + i++ + } +} + +func BenchmarkCache_Get_Miss(b *testing.B) { + c, err := New[*testPlan](1000, 0) + require.NoError(b, err) + defer c.Close() + + i := 0 + for b.Loop() { + c.Get(uint64(i)) + i++ + } +} + +func BenchmarkCache_Set_SameKey(b *testing.B) { + c, err := New[*testPlan](1000, 0) + require.NoError(b, err) + defer c.Close() + + plan := &testPlan{content: "query { benchmarkField }"} + + // Pre-populate so the key exists + c.Set(42, plan, 10*time.Millisecond) + c.Wait() + + i := 0 + for b.Loop() { + c.Set(42, plan, time.Duration(i)*time.Millisecond) + i++ + } + c.Wait() +} + +// 19.22 ns/op | 21.75 ns/op | 18.95 ns/op : SyncMap +// 43.91 ns/op | 41.16 ns/op | 39.43 ns/op : Mutexes +func BenchmarkCache_Mixed(b *testing.B) { + c, err := New[*testPlan](1000, 0) + require.NoError(b, err) + defer c.Close() + + plan := &testPlan{content: "query { benchmarkField }"} + + i := 0 + for b.Loop() { + key := uint64(i % 2000) + if i%3 == 0 { + c.Set(key, plan, time.Duration(i)*time.Millisecond) + } else { + c.Get(key) + } + i++ + } + c.Wait() +}