Skip to content

Commit 932f436

Browse files
authored
feat(cache): deduplicate persisted operations and avoid planning twice (#2701)
1 parent d5e14cc commit 932f436

File tree

17 files changed

+664
-53
lines changed

17 files changed

+664
-53
lines changed

demo-router.fly.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ CORS_ALLOW_CREDENTIALS="true"
1414
CLUSTER_NAME="fly-lax"
1515
CACHE_WARMUP_ENABLED="true"
1616
PLUGINS_ENABLED="true"
17+
PERSISTED_OPERATIONS_MANIFEST_ENABLED="true"
18+
# PERSISTED_OPERATIONS_LOG_UNKNOWN="true"
1719
# ENGINE_DEBUG_REPORT_WEBSOCKET_CONNECTIONS="true"
1820

1921
[[vm]]

router-tests/operations/pql_manifest_test.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package integration
22

33
import (
4+
"context"
45
"encoding/json"
56
"io"
67
"net/http"
@@ -12,8 +13,11 @@ import (
1213

1314
"github.com/stretchr/testify/require"
1415
"github.com/wundergraph/cosmo/router-tests/testenv"
16+
"github.com/wundergraph/cosmo/router-tests/testutils"
1517
"github.com/wundergraph/cosmo/router/core"
1618
"github.com/wundergraph/cosmo/router/pkg/config"
19+
"go.opentelemetry.io/otel/sdk/metric"
20+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1721
"go.uber.org/zap/zapcore"
1822
)
1923

@@ -546,6 +550,9 @@ func TestPQLManifest(t *testing.T) {
546550
RouterOptions: []core.Option{
547551
core.WithPersistedOperationsConfig(manifestConfigWithWarmup),
548552
},
553+
// No AssertCacheMetrics here: Workers=4 with no rate limit means the 2 employees
554+
// variants (same normalized form) race for validation/plan caches, making exact
555+
// counts non-deterministic. Cache correctness is verified via response headers below.
549556
}, func(t *testing.T, xEnv *testenv.Environment) {
550557
header := make(http.Header)
551558
header.Add("graphql-client-name", "my-client")
@@ -601,6 +608,23 @@ func TestPQLManifest(t *testing.T) {
601608
})
602609
})
603610

611+
t.Run("APQ GET request with operation query parameter and manifest-known operation hits cache", func(t *testing.T) {
612+
t.Parallel()
613+
testenv.Run(t, &testenv.Config{
614+
RouterOptions: []core.Option{
615+
core.WithPersistedOperationsConfig(manifestConfigWithWarmup),
616+
},
617+
}, func(t *testing.T, xEnv *testenv.Environment) {
618+
res, err := xEnv.MakeGraphQLRequestOverGET(testenv.GraphQLRequest{
619+
Query: "{__typename}",
620+
Extensions: []byte(`{"persistedQuery":{"version":1,"sha256Hash":"ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38"}}`),
621+
})
622+
require.NoError(t, err)
623+
require.Equal(t, `{"data":{"__typename":"Query"}}`, res.Body)
624+
require.Equal(t, "HIT", res.Response.Header.Get(core.PersistedOperationCacheHeader))
625+
})
626+
})
627+
604628
t.Run("disabled persisted operations suppresses manifest", func(t *testing.T) {
605629
t.Parallel()
606630
testenv.Run(t, &testenv.Config{
@@ -675,6 +699,16 @@ func TestPQLManifest(t *testing.T) {
675699
},
676700
}),
677701
},
702+
AssertCacheMetrics: &testenv.CacheMetricsAssertions{
703+
BaseGraphAssertions: testenv.CacheMetricsAssertion{
704+
// No warmup → all caches cold on first request.
705+
// 2 persisted normalization misses: loadPersistedOperationFromCache checks
706+
// once without operation name, once with (because OperationName is set).
707+
PersistedQueryNormalizationMisses: 2,
708+
ValidationMisses: 1,
709+
PlanMisses: 1,
710+
},
711+
},
678712
}, func(t *testing.T, xEnv *testenv.Environment) {
679713
header := make(http.Header)
680714
header.Add("graphql-client-name", "my-client")
@@ -712,6 +746,17 @@ func TestPQLManifest(t *testing.T) {
712746
},
713747
}),
714748
},
749+
AssertCacheMetrics: &testenv.CacheMetricsAssertions{
750+
BaseGraphAssertions: testenv.CacheMetricsAssertion{
751+
// Custom warmup config (Workers=2, ItemsPerSecond=100) still warms all caches.
752+
// 3 manifest ops → 2 unique plans during warmup, 1 hit from the request.
753+
PersistedQueryNormalizationHits: 1,
754+
ValidationMisses: 2,
755+
ValidationHits: 2,
756+
PlanMisses: 2,
757+
PlanHits: 2,
758+
},
759+
},
715760
}, func(t *testing.T, xEnv *testenv.Environment) {
716761
header := make(http.Header)
717762
header.Add("graphql-client-name", "my-client")
@@ -730,6 +775,174 @@ func TestPQLManifest(t *testing.T) {
730775
})
731776
})
732777

778+
t.Run("cache warmup and manifest warmup both warm overlapping operations", func(t *testing.T) {
779+
t.Parallel()
780+
testenv.Run(t, &testenv.Config{
781+
RouterOptions: []core.Option{
782+
core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{
783+
Enabled: true,
784+
Source: config.CacheWarmupSource{
785+
Filesystem: &config.CacheWarmupFileSystemSource{
786+
// Contains hash dc675... which also exists in the manifest.
787+
Path: "testdata/cache_warmup/json_po_manifest_overlap",
788+
},
789+
},
790+
}),
791+
core.WithPersistedOperationsConfig(config.PersistedOperationsConfig{
792+
Manifest: config.PQLManifestConfig{
793+
Enabled: true,
794+
PollInterval: 10 * time.Second,
795+
PollJitter: 5 * time.Second,
796+
Warmup: config.PQLManifestWarmupConfig{
797+
Enabled: true,
798+
Workers: 2,
799+
ItemsPerSecond: 100,
800+
Timeout: 30 * time.Second,
801+
},
802+
},
803+
}),
804+
},
805+
AssertCacheMetrics: &testenv.CacheMetricsAssertions{
806+
BaseGraphAssertions: testenv.CacheMetricsAssertion{
807+
// Cache warmup plans dc675... (1 plan+validation miss).
808+
// waitForCaches() flushes ristretto so all entries are visible.
809+
// Manifest warmup: dc675... hits plan cache, 33651... hits (same
810+
// normalized form), ecf4e... misses (unique query).
811+
// Request for dc675... hits all caches.
812+
// Total: 2 misses (dc675 warmup + ecf4e manifest), 3 hits (dc675+33651 manifest + request).
813+
PersistedQueryNormalizationHits: 1,
814+
ValidationMisses: 2,
815+
ValidationHits: 3,
816+
PlanMisses: 2,
817+
PlanHits: 3,
818+
},
819+
},
820+
}, func(t *testing.T, xEnv *testenv.Environment) {
821+
header := make(http.Header)
822+
header.Add("graphql-client-name", "my-client")
823+
824+
res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
825+
OperationName: []byte(`"Employees"`),
826+
Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "dc67510fb4289672bea757e862d6b00e83db5d3cbbcfb15260601b6f29bb2b8f"}}`),
827+
Header: header,
828+
})
829+
require.NoError(t, err)
830+
require.Equal(t, expectedEmployeesBody, res.Body)
831+
require.Equal(t, "HIT", res.Response.Header.Get(core.PersistedOperationCacheHeader))
832+
})
833+
})
834+
835+
t.Run("in-memory APQ skips save for manifest-known operations", func(t *testing.T) {
836+
t.Parallel()
837+
testenv.Run(t, &testenv.Config{
838+
ApqConfig: config.AutomaticPersistedQueriesConfig{
839+
Enabled: true,
840+
Cache: config.AutomaticPersistedQueriesCacheConfig{
841+
Size: 1024 * 1024,
842+
TTL: 2,
843+
},
844+
},
845+
RouterOptions: []core.Option{
846+
core.WithPersistedOperationsConfig(manifestConfig),
847+
},
848+
}, func(t *testing.T, xEnv *testenv.Environment) {
849+
header := make(http.Header)
850+
header.Add("graphql-client-name", "my-client")
851+
852+
// Send query + hash for a manifest-known operation.
853+
// For in-memory APQ, this should NOT be saved to APQ — the manifest is authoritative.
854+
// sha256("{__typename}") = ecf4e... which is in the manifest.
855+
res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
856+
Query: `{__typename}`,
857+
Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38"}}`),
858+
Header: header,
859+
})
860+
require.NoError(t, err)
861+
require.Equal(t, `{"data":{"__typename":"Query"}}`, res.Body)
862+
863+
// Wait for APQ TTL to expire. If the operation was saved to APQ,
864+
// a hash-only request would fail after this.
865+
time.Sleep(3 * time.Second)
866+
867+
// Hash-only request must still succeed — served from manifest, not expired APQ.
868+
res, err = xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
869+
Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38"}}`),
870+
Header: header,
871+
})
872+
require.NoError(t, err)
873+
require.Equal(t, `{"data":{"__typename":"Query"}}`, res.Body)
874+
})
875+
})
876+
877+
t.Run("APQ works for non-manifest operations when both enabled", func(t *testing.T) {
878+
t.Parallel()
879+
testenv.Run(t, &testenv.Config{
880+
ApqConfig: config.AutomaticPersistedQueriesConfig{
881+
Enabled: true,
882+
Cache: config.AutomaticPersistedQueriesCacheConfig{
883+
Size: 1024 * 1024,
884+
},
885+
},
886+
RouterOptions: []core.Option{
887+
core.WithPersistedOperationsConfig(manifestConfig),
888+
},
889+
}, func(t *testing.T, xEnv *testenv.Environment) {
890+
header := make(http.Header)
891+
header.Add("graphql-client-name", "my-client")
892+
893+
// Use an operation NOT in the manifest. APQ should work normally.
894+
// sha256("query { employees { id details { forename } } }") = 6083e15e...
895+
nonManifestHash := "6083e15eded39dbd64279ae4cffbc6e3bee52b177f7003ebba9532a17e6231f2"
896+
res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
897+
Query: `query { employees { id details { forename } } }`,
898+
Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "` + nonManifestHash + `"}}`),
899+
Header: header,
900+
})
901+
require.NoError(t, err)
902+
require.Contains(t, res.Body, `"data"`)
903+
904+
// Subsequent hash-only request should succeed — APQ saved the operation.
905+
res, err = xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
906+
Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "` + nonManifestHash + `"}}`),
907+
Header: header,
908+
})
909+
require.NoError(t, err)
910+
require.Contains(t, res.Body, `"data"`)
911+
})
912+
})
913+
914+
t.Run("manifest warmup emits planning time metrics", func(t *testing.T) {
915+
t.Parallel()
916+
917+
metricReader := metric.NewManualReader()
918+
919+
testenv.Run(t, &testenv.Config{
920+
MetricReader: metricReader,
921+
RouterOptions: []core.Option{
922+
core.WithPersistedOperationsConfig(manifestConfigWithWarmup),
923+
},
924+
}, func(t *testing.T, xEnv *testenv.Environment) {
925+
// No requests — collect metrics emitted purely by manifest warmup at startup.
926+
rm := metricdata.ResourceMetrics{}
927+
err := metricReader.Collect(context.Background(), &rm)
928+
require.NoError(t, err)
929+
930+
metricScope := testutils.GetMetricScopeByName(rm.ScopeMetrics, "cosmo.router")
931+
require.NotNil(t, metricScope)
932+
933+
m := testutils.GetMetricByName(metricScope, "router.graphql.operation.planning_time")
934+
require.NotNil(t, m, "planning_time metric should be emitted during manifest warmup")
935+
936+
dataPoints := m.Data.(metricdata.Histogram[float64]).DataPoints
937+
require.NotEmpty(t, dataPoints)
938+
939+
// Find the warmup data point (cache miss during warmup planning)
940+
warmupDP := findDataPoint(t, dataPoints, false)
941+
require.Greater(t, warmupDP.Count, uint64(0), "manifest warmup should record planning time metrics")
942+
require.Greater(t, warmupDP.Sum, float64(0), "manifest warmup planning time should be non-zero")
943+
})
944+
})
945+
733946
t.Run("fails to start when initial CDN manifest fetch fails", func(t *testing.T) {
734947
t.Parallel()
735948

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"operations": [
3+
{
4+
"hint": "This persisted operation also exists in the PQL manifest. When both cache warmup and manifest warmup are enabled, it should be processed only once.",
5+
"request": {
6+
"query": "query Employees {\n employees {\n id\n }\n}",
7+
"extensions": {
8+
"persistedQuery": {
9+
"version": 1,
10+
"sha256Hash": "dc67510fb4289672bea757e862d6b00e83db5d3cbbcfb15260601b6f29bb2b8f"
11+
}
12+
}
13+
},
14+
"client": {
15+
"name": "my-client"
16+
}
17+
}
18+
]
19+
}

router-tests/testenv/testdata/cdn/organization/graph/operations/manifest.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"generatedAt": "2024-01-01T00:00:00Z",
55
"operations": {
66
"dc67510fb4289672bea757e862d6b00e83db5d3cbbcfb15260601b6f29bb2b8f": "query Employees {\n employees {\n id\n }\n}",
7-
"33651da3d80e420709520fb900c7ab8ec4151555da56062feeee428cf7f3a5dd": "query Employees {\n employees {\n id\n }\n}",
8-
"9015ddfadd802bb378a14e48cea51e9bf9a07c7f8a71d85c56d7b104fea84937": "query Employees {\n employees {\n id\n }\n}"
7+
"33651da3d80e420709520fb900c7ab8ec4151555da56062feeee428cf7f3a5dd": "query Employees {\n employees {\n id\n }\n}",
8+
"ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38": "{__typename}"
99
}
1010
}

router/core/cache_warmup.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (w *cacheWarmup) run(ctx context.Context) (int, error) {
175175
)
176176
}
177177

178-
if err == nil && w.afterOperation != nil {
178+
if err == nil && res != nil && w.afterOperation != nil {
179179
w.afterOperation(res)
180180
}
181181

@@ -252,6 +252,7 @@ type CacheWarmupOperationPlanResult struct {
252252
ClientName string
253253
ClientVersion string
254254
PlanningTime time.Duration
255+
PlanCacheHit bool
255256
}
256257

257258
type CacheWarmupPlanningProcessor struct {
@@ -382,5 +383,6 @@ func (c *CacheWarmupPlanningProcessor) ProcessOperation(ctx context.Context, ope
382383
ClientName: item.Client.Name,
383384
ClientVersion: item.Client.Version,
384385
PlanningTime: time.Since(planningStart),
386+
PlanCacheHit: opContext.planCacheHit,
385387
}, nil
386388
}

0 commit comments

Comments
 (0)