Skip to content
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ef7e645
feat: cache plans
SkArchon Jan 9, 2026
419cabf
fix: cleanup
SkArchon Jan 9, 2026
e65cdf2
fix: updates
SkArchon Jan 13, 2026
2f89643
fix: updates
SkArchon Jan 13, 2026
cf6e9df
fix: updates
SkArchon Jan 13, 2026
0d2e2db
fix: updates
SkArchon Jan 13, 2026
52aa474
fix: updates
SkArchon Jan 13, 2026
8cd21fe
fix: updates
SkArchon Jan 13, 2026
822a4a4
fix: config updates
SkArchon Jan 13, 2026
3159713
fix: config reloading
SkArchon Jan 14, 2026
d576e03
Merge branch 'main' into milinda/eng-8701-implement-cache-warmer-from…
SkArchon Jan 14, 2026
2bdf426
fix: update router-tests
SkArchon Jan 14, 2026
0e92ca1
fix: cleanup
SkArchon Jan 14, 2026
78fc67d
fix: use assert
SkArchon Jan 14, 2026
f4850f6
fix: review comments
SkArchon Jan 14, 2026
7ec23f1
fix: linting
SkArchon Jan 14, 2026
42bbcbd
fix: linting
SkArchon Jan 14, 2026
480967d
fix: go modules
SkArchon Jan 18, 2026
d542eec
fix: updates
SkArchon Jan 18, 2026
3564f0d
fix: updates
SkArchon Jan 19, 2026
eb69a26
fix: make in memory switcher default when cache warmer is not enabled
SkArchon Jan 19, 2026
98608c7
fix: review comments
SkArchon Jan 19, 2026
a7adf3d
fix: updates
SkArchon Jan 20, 2026
05026d3
Merge remote-tracking branch 'origin/main' into milinda/eng-8701-impl…
SkArchon Jan 20, 2026
6d23b7e
fix: go mod
SkArchon Jan 20, 2026
ceabb75
fix: update enabled
SkArchon Jan 20, 2026
ae920ef
fix: update enabled
SkArchon Jan 20, 2026
3cfca20
fix: review comments
SkArchon Jan 20, 2026
d4ad399
fix: review comments
SkArchon Jan 20, 2026
5d79a9e
fix: feature flags didnt pass in cosmo flag status
SkArchon Jan 20, 2026
7f099f5
fix: nil pointer
SkArchon Jan 20, 2026
f382e46
fix: review comments
SkArchon Jan 20, 2026
9a34515
fix: use the new ristretto version
SkArchon Jan 22, 2026
42c8c81
fix: formatting
SkArchon Jan 22, 2026
ef33a6c
fix: review comments
SkArchon Jan 22, 2026
e0734f5
fix: review comments
SkArchon Jan 22, 2026
11158f6
fix: review comments
SkArchon Jan 22, 2026
54f24c1
fix: tests
SkArchon Jan 22, 2026
40548ad
Merge branch 'main' into milinda/eng-8701-implement-cache-warmer-from…
SkArchon Jan 22, 2026
fd68357
fix: review comments
SkArchon Jan 22, 2026
4fa6007
fix: review comments
SkArchon Jan 26, 2026
d26cb4e
fix: review comments
SkArchon Jan 26, 2026
e371592
fix: review comments
SkArchon Jan 26, 2026
335c0f1
fix: review comments
SkArchon Jan 26, 2026
2fd4793
fix: review comments
SkArchon Jan 26, 2026
9308bdd
fix: review comments
SkArchon Jan 26, 2026
f96e1f1
fix: review comments
SkArchon Jan 26, 2026
ab75a5a
fix: review comments
SkArchon Jan 26, 2026
172df66
fix: review comments
SkArchon Jan 26, 2026
44875f2
Merge branch 'main' into milinda/eng-8701-implement-cache-warmer-from…
SkArchon Jan 26, 2026
c3d2336
fix: update comments
SkArchon Jan 26, 2026
0f98b1d
fix: review comments
SkArchon Jan 29, 2026
e511bef
fix: review comments
SkArchon Jan 29, 2026
91b9bfc
fix: revert cdn
SkArchon Jan 30, 2026
c10b5bf
fix: review comments
SkArchon Jan 30, 2026
4ef640c
fix: review comments
SkArchon Jan 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,439 changes: 725 additions & 714 deletions connect-go/gen/proto/wg/cosmo/node/v1/node.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions connect/src/wg/cosmo/node/v1/node_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ export class RegistrationInfo extends Message<RegistrationInfo> {
*/
graphPublicKey = "";

/**
* @generated from field: bool is_cache_warmer_enabled = 3;
*/
isCacheWarmerEnabled = false;

constructor(data?: PartialMessage<RegistrationInfo>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -553,6 +558,7 @@ export class RegistrationInfo extends Message<RegistrationInfo> {
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "account_limits", kind: "message", T: AccountLimits },
{ no: 2, name: "graph_public_key", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "is_cache_warmer_enabled", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RegistrationInfo {
Expand Down
1 change: 1 addition & 0 deletions controlplane/src/core/bufservices/NodeService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export default function (opts: RouterOptions): Partial<ServiceImpl<typeof NodeSe
traceSamplingRate: (features['trace-sampling-rate'] as number) ?? 0.1,
},
graphPublicKey: publicKey,
isCacheWarmerEnabled: !!features['cache-warmer'],
};

registrationInfoCache.set(authContext.federatedGraphId, registrationInfo);
Expand Down
1 change: 1 addition & 0 deletions proto/wg/cosmo/node/v1/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ message ResponseStatus {
message RegistrationInfo {
AccountLimits account_limits = 1;
string graph_public_key = 2;
bool is_cache_warmer_enabled = 3;
}

message AccountLimits {
Expand Down
261 changes: 249 additions & 12 deletions router-tests/cache_warmup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@ package integration
import (
"context"
"net/http"
"os"
"path/filepath"
"syscall"
"testing"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"

"go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/wundergraph/cosmo/router/pkg/otel"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"
"github.com/wundergraph/cosmo/router/pkg/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.uber.org/zap"
)

func TestCacheWarmup(t *testing.T) {
Expand Down Expand Up @@ -914,6 +914,243 @@ func TestCacheWarmup(t *testing.T) {
})
}

func TestInMemorySwitchoverCaching(t *testing.T) {
t.Parallel()

t.Run("Verify the plan is cached on config restart when in memory switchover is enabled", func(t *testing.T) {
t.Parallel()

pm := ConfigPollerMock{
ready: make(chan struct{}),
}

testenv.Run(t, &testenv.Config{
RouterOptions: []core.Option{
core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{
Enabled: true,
InMemorySwitchoverFallback: true,
}),
core.WithConfigVersionHeader(true),
},
RouterConfig: &testenv.RouterConfig{
ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller {
pm.initConfig = config
return &pm
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
})
require.Equal(t, 200, res.Response.StatusCode)
require.Equal(t, xEnv.RouterConfigVersionMain(), res.Response.Header.Get("X-Router-Config-Version"))
require.JSONEq(t, employeesIDData, res.Body)
require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"))

// Wait for the config poller to be ready
<-pm.ready

pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))

res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
})
require.Equal(t, 200, res.Response.StatusCode)
require.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version"))
require.JSONEq(t, employeesIDData, res.Body)
require.Equal(t, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache"))

})
})

t.Run("Verify the plan is not cached on config restart when in cache warmer is disabled", func(t *testing.T) {
t.Parallel()

pm := ConfigPollerMock{
ready: make(chan struct{}),
}

testenv.Run(t, &testenv.Config{
RouterOptions: []core.Option{
core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{
Enabled: false,
}),
core.WithConfigVersionHeader(true),
},
RouterConfig: &testenv.RouterConfig{
ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller {
pm.initConfig = config
return &pm
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
})
require.Equal(t, 200, res.Response.StatusCode)
require.Equal(t, xEnv.RouterConfigVersionMain(), res.Response.Header.Get("X-Router-Config-Version"))
require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"))

// Wait for the config poller to be ready
<-pm.ready

pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))

res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id } }`,
})
require.Equal(t, 200, res.Response.StatusCode)
require.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version"))
require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"))
})
})

t.Run("Verify the plan is not cached on config restart when using default cache warmer", func(t *testing.T) {
t.Parallel()

pm := ConfigPollerMock{
ready: make(chan struct{}),
}

testenv.Run(t, &testenv.Config{
RouterOptions: []core.Option{
core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{
Enabled: true,
InMemorySwitchoverFallback: false,
}),
core.WithConfigVersionHeader(true),
},
RouterConfig: &testenv.RouterConfig{
ConfigPollerFactory: func(config *nodev1.RouterConfig) configpoller.ConfigPoller {
pm.initConfig = config
return &pm
},
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id customDetails: details { forename } } }`,
})
require.Equal(t, 200, res.Response.StatusCode)
require.Equal(t, xEnv.RouterConfigVersionMain(), res.Response.Header.Get("X-Router-Config-Version"))
require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"))

// Wait for the config poller to be ready
<-pm.ready

pm.initConfig.Version = "updated"
require.NoError(t, pm.updateConfig(pm.initConfig, "old-1"))

res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `{ employees { id customDetails: details { forename } } }`,
})
require.Equal(t, 200, res.Response.StatusCode)
require.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version"))
require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"))
})
})

t.Run("Verify plan is cached when static execution config is reloaded", func(t *testing.T) {
t.Parallel()

// Create a temporary file for the router config
configFile := t.TempDir() + "/config.json"

// Initial config with just the employees subgraph
writeTestConfig(t, "initial", configFile)

testenv.Run(t, &testenv.Config{
RouterOptions: []core.Option{
core.WithConfigVersionHeader(true),
core.WithExecutionConfig(&core.ExecutionConfig{
Path: configFile,
Watch: true,
WatchInterval: 100 * time.Millisecond,
}),
core.WithCacheWarmupConfig(&config.CacheWarmupConfiguration{
Enabled: true,
InMemorySwitchoverFallback: true,
}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query { hello }`,
})
require.Equal(t, 200, res.Response.StatusCode)
require.Equal(t, "initial", res.Response.Header.Get("X-Router-Config-Version"))
require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"))

writeTestConfig(t, "updated", configFile)

require.EventuallyWithT(t, func(t *assert.CollectT) {
res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `query { hello }`,
})
assert.Equal(t, "updated", res.Response.Header.Get("X-Router-Config-Version"))
assert.Equal(t, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache"))
}, 2*time.Second, 100*time.Millisecond)
})
})

t.Run("Successfully persists cache across config change restarts", func(t *testing.T) {
t.Parallel()

updateConfig := func(t *testing.T, xEnv *testenv.Environment, ctx context.Context, listenString string, config string) {
f, err := os.Create(filepath.Join(xEnv.GetRouterProcessCwd(), "config.yaml"))
require.NoError(t, err)

_, err = f.WriteString(config)
require.NoError(t, err)
require.NoError(t, f.Close())

err = xEnv.SignalRouterProcess(syscall.SIGHUP)
require.NoError(t, err)
require.NoError(t, xEnv.WaitForServer(ctx, xEnv.RouterURL+"/"+listenString, 600, 60), "healthcheck post-reload failed")
}

getConfigString := func(listenString string) string {
return `
version: "1"

readiness_check_path: "/` + listenString + `"

cache_warmup:
enabled: true
in_memory_switchover_fallback: true

engine:
debug:
enable_cache_response_headers: true
`
}

err := testenv.RunRouterBinary(t, &testenv.Config{
DemoMode: true,
}, testenv.RunRouterBinConfigOptions{}, func(t *testing.T, xEnv *testenv.Environment) {
// Verify initial start
t.Logf("running router binary, cwd: %s", xEnv.GetRouterProcessCwd())
ctx := context.Background()
require.NoError(t, xEnv.WaitForServer(ctx, xEnv.RouterURL+"/health/ready", 600, 60), "healthcheck pre-reload failed")

// Enable cache response headers first
listenString1 := "after1"
updateConfig(t, xEnv, ctx, listenString1, getConfigString(listenString1))
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `query { hello }`})
require.Equal(t, "MISS", res.Response.Header.Get("x-wg-execution-plan-cache"))

// Verify cache persisted on restart
listenString2 := "after2"
updateConfig(t, xEnv, ctx, listenString2, getConfigString(listenString2))
res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{Query: `query { hello }`})
require.Equal(t, "HIT", res.Response.Header.Get("x-wg-execution-plan-cache"))
})

require.NoError(t, err)
})

}

// findDataPoint finds a data point in a slice of histogram data points by matching
// the value of WgEnginePlanCacheHit attribute
func findDataPoint(t *testing.T, dataPoints []metricdata.HistogramDataPoint[float64], cacheHit bool) metricdata.HistogramDataPoint[float64] {
Expand Down
2 changes: 1 addition & 1 deletion router-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto/v2 v2.1.0 // indirect
github.com/dgraph-io/ristretto/v2 v2.4.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v28.2.2+incompatible // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
Expand Down
8 changes: 4 additions & 4 deletions router-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto/v2 v2.1.0 h1:59LjpOJLNDULHh8MC4UaegN52lC4JnO2dITsie/Pa8I=
github.com/dgraph-io/ristretto/v2 v2.1.0/go.mod h1:uejeqfYXpUomfse0+lO+13ATz4TypQYLJZzBSAemuB4=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgraph-io/ristretto/v2 v2.4.0 h1:I/w09yLjhdcVD2QV192UJcq8dPBaAJb9pOuMyNy0XlU=
github.com/dgraph-io/ristretto/v2 v2.4.0/go.mod h1:0KsrXtXvnv0EqnzyowllbVJB8yBonswa2lTCK2gGo9E=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/trifles v0.0.0-20230903005119-f50d829f2e54 h1:SG7nF6SRlWhcT7cNTs5R6Hk4V2lcmLz2NsG2VnInyNo=
Expand Down
3 changes: 2 additions & 1 deletion router/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func Main() {
}

rs, err := core.NewRouterSupervisor(&core.RouterSupervisorOpts{
BaseLogger: baseLogger,
BaseLogger: baseLogger,
SwitchoverConfig: core.NewSwitchoverConfig(baseLogger),
ConfigFactory: func() (*config.Config, error) {
result, err := config.LoadConfig(*configPathFlag)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions router/core/cache_warmup_cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (
"context"
"errors"
"fmt"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"google.golang.org/protobuf/encoding/protojson"
"io"
"net/http"
"net/url"

nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/internal/httpclient"
"github.com/wundergraph/cosmo/router/internal/jwt"
"go.opentelemetry.io/otel/codes"
semconv12 "go.opentelemetry.io/otel/semconv/v1.12.0"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
)

var _ CacheWarmupSource = (*CDNSource)(nil)
Expand Down
Loading
Loading