Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions router-tests/modules/start_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,76 @@ func TestStartSubscriptionHook(t *testing.T) {
assert.Equal(t, int32(1), customModule.HookCallCount.Load())
})
})

t.Run("Test StartSubscription hook can access field arguments", func(t *testing.T) {
t.Parallel()

// This test verifies that the subscription start hook can access GraphQL field arguments
// via ctx.Operation().Arguments().

var capturedEmployeeID int

customModule := &start_subscription.StartSubscriptionModule{
HookCallCount: &atomic.Int32{},
Callback: func(ctx core.SubscriptionOnStartHandlerContext) error {
args := ctx.Operation().Arguments()
if args != nil {
employeeIDArg := args.Get("employeeUpdatedMyKafka", "employeeID")
if employeeIDArg != nil {
capturedEmployeeID = employeeIDArg.GetInt()
}
}
return nil
},
}

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"startSubscriptionModule": customModule,
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&start_subscription.StartSubscriptionModule{}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
} `graphql:"employeeUpdatedMyKafka(employeeID: $employeeID)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()
client := graphql.NewSubscriptionClient(surl)

vars := map[string]interface{}{
"employeeID": 7,
}
subscriptionOneID, err := client.Subscribe(&subscriptionOne, vars, func(dataValue []byte, errValue error) error {
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionOneID)

clientRunCh := make(chan error)
go func() {
clientRunCh <- client.Run()
}()

xEnv.WaitForSubscriptionCount(1, time.Second*10)

require.NoError(t, client.Close())
testenv.AwaitChannelWithT(t, time.Second*10, clientRunCh, func(t *testing.T, err error) {
require.NoError(t, err)
}, "unable to close client before timeout")

assert.Equal(t, int32(1), customModule.HookCallCount.Load())
assert.Equal(t, 7, capturedEmployeeID, "expected to capture employeeID argument value")
})
})
}
48 changes: 48 additions & 0 deletions router-tests/modules/stream_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,52 @@ func TestPublishHook(t *testing.T) {
require.Equal(t, []byte("3"), header.Value)
})
})

t.Run("Test Publish hook can access field arguments", func(t *testing.T) {
t.Parallel()

// This test verifies that the publish hook can access GraphQL field arguments
// via ctx.Operation().Arguments().

var capturedEmployeeID int

customModule := stream_publish.PublishModule{
HookCallCount: &atomic.Int32{},
Callback: func(ctx core.StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
args := ctx.Operation().Arguments()
if args != nil {
employeeIDArg := args.Get("updateEmployeeMyKafka", "employeeID")
if employeeIDArg != nil {
capturedEmployeeID = employeeIDArg.GetInt()
}
}
return events, nil
},
}

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"publishModule": customModule,
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_publish.PublishModule{}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
events.KafkaEnsureTopicExists(t, xEnv, time.Second, "employeeUpdated")
resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation { updateEmployeeMyKafka(employeeID: 5, update: {name: "test"}) { success } }`,
})
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"success":true}}}`, resOne.Body)

assert.Equal(t, int32(1), customModule.HookCallCount.Load())
assert.Equal(t, 5, capturedEmployeeID, "expected to capture employeeID argument value")
})
})
}
87 changes: 87 additions & 0 deletions router-tests/modules/stream_receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,4 +963,91 @@ func TestReceiveHook(t *testing.T) {
assert.Equal(t, int32(3), customModule.HookCallCount.Load())
})
})

t.Run("Test Receive hook can access field arguments", func(t *testing.T) {
t.Parallel()

// This test verifies that the receive hook can access GraphQL field arguments
// via ctx.Operation().Arguments().

var capturedEmployeeID int

customModule := stream_receive.StreamReceiveModule{
HookCallCount: &atomic.Int32{},
Callback: func(ctx core.StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) {
args := ctx.Operation().Arguments()
if args != nil {
employeeIDArg := args.Get("employeeUpdatedMyKafka", "employeeID")
if employeeIDArg != nil {
capturedEmployeeID = employeeIDArg.GetInt()
}
}
return events, nil
},
}

cfg := config.Config{
Graph: config.Graph{},
Modules: map[string]interface{}{
"streamReceiveModule": customModule,
},
}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
RouterOptions: []core.Option{
core.WithModulesConfig(cfg.Modules),
core.WithCustomModules(&stream_receive.StreamReceiveModule{}),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
topics := []string{"employeeUpdated"}
events.KafkaEnsureTopicExists(t, xEnv, time.Second, topics...)

var subscriptionOne struct {
employeeUpdatedMyKafka struct {
ID float64 `graphql:"id"`
} `graphql:"employeeUpdatedMyKafka(employeeID: 3)"`
}

surl := xEnv.GraphQLWebSocketSubscriptionURL()
client := graphql.NewSubscriptionClient(surl)

type kafkaSubscriptionArgs struct {
dataValue []byte
errValue error
}
subscriptionArgsCh := make(chan kafkaSubscriptionArgs)
subscriptionOneID, err := client.Subscribe(&subscriptionOne, nil, func(dataValue []byte, errValue error) error {
subscriptionArgsCh <- kafkaSubscriptionArgs{
dataValue: dataValue,
errValue: errValue,
}
return nil
})
require.NoError(t, err)
require.NotEmpty(t, subscriptionOneID)

clientRunCh := make(chan error)
go func() {
clientRunCh <- client.Run()
}()

xEnv.WaitForSubscriptionCount(1, Timeout)

events.ProduceKafkaMessage(t, xEnv, Timeout, topics[0], `{"__typename":"Employee","id": 1,"update":{"name":"foo"}}`)

testenv.AwaitChannelWithT(t, Timeout, subscriptionArgsCh, func(t *testing.T, args kafkaSubscriptionArgs) {
require.NoError(t, args.errValue)
})

require.NoError(t, client.Close())
testenv.AwaitChannelWithT(t, Timeout, clientRunCh, func(t *testing.T, err error) {
require.NoError(t, err)
}, "unable to close client before timeout")

assert.Equal(t, int32(1), customModule.HookCallCount.Load())
assert.Equal(t, 3, capturedEmployeeID, "expected to capture employeeID argument value")
})
})
}
58 changes: 58 additions & 0 deletions router/core/arguments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package core

import "github.com/wundergraph/astjson"

// fieldArgs is a collection of field arguments with their names
// as keys and their corresponding values.
type fieldArgs map[string]*astjson.Value

// Arguments allow access to GraphQL field arguments used by clients.
type Arguments struct {
// data holds a map which contains all field arguments
// for any given field of an operation.
data map[string]fieldArgs
}

// Get will return the value of argument a from field f.
//
// To access an argument of a root level field, you need to pass the
// response key of the field as the first argument to Get and the name of the argument
// as the second argument, e.g. Get("rootfield_name", "argument_name") .
//
// The response key is the alias if present, otherwise the field name.
// For aliased fields like "myAlias: user(id: 1)", use the alias "myAlias" in the path.
//
// The field path uses dot notation for nested fields.
// For example you can access arg1 on field2 on the operation
//
// subscription {
// mySub(arg1: "val1", arg2: "val2") {
// field1
// field2(arg1: "val3", arg2: "val4")
// }
// }
//
// You need to call Get("mySub.field2", "arg1") .
//
// For aliased fields:
//
// query {
// a: user(id: "1") { name }
// b: user(id: "2") { name }
// }
//
// You need to call Get("a", "id") or Get("b", "id") respectively.
//
// If fa is nil, or f or a cannot be found, nil is returned.
func (fa *Arguments) Get(f string, a string) *astjson.Value {
if fa == nil || fa.data == nil {
return nil
}

args, found := fa.data[f]
if !found {
return nil
}

return args[a]
}
Comment on lines +47 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some unit tests for this type. Check if you can provoke panics here.

16 changes: 12 additions & 4 deletions router/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,16 +482,16 @@ type OperationContext interface {
Hash() uint64
// Content is the content of the operation
Content() string
// Variables is the variables of the operation
// Arguments allow access to GraphQL operation field arguments.
Arguments() *Arguments
// Variables allow access to GraphQL operation variables.
Variables() *astjson.Value
// ClientInfo returns information about the client that initiated this operation
ClientInfo() ClientInfo

// Sha256Hash returns the SHA256 hash of the original operation
// It is important to note that this hash is not calculated just because this method has been called
// and is only calculated based on other existing logic (such as if sha256Hash is used in expressions)
Sha256Hash() string

// QueryPlanStats returns some statistics about the query plan for the operation
// if called too early in request chain, it may be inaccurate for modules, using
// in Middleware is recommended
Expand Down Expand Up @@ -524,7 +524,11 @@ type operationContext struct {
// RawContent is the raw content of the operation
rawContent string
// Content is the normalized content of the operation
content string
content string
// fieldArguments are the arguments of the operation.
// These are not mapped by default, only when certain custom modules require them.
fieldArguments Arguments
// variables are the variables of the operation
variables *astjson.Value
files []*httpclient.FileUpload
clientInfo *ClientInfo
Expand Down Expand Up @@ -560,6 +564,10 @@ func (o *operationContext) Variables() *astjson.Value {
return o.variables
}

func (o *operationContext) Arguments() *Arguments {
return &o.fieldArguments
}
Comment on lines +567 to +569
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is a good idea. An operation can be huge and even if you only register EnterArgument the visitor will still visit every node in the document. It might be better to have some kind of lazy walking here to only request the arguments when you really need them and then store them in the field arguments instead of walking on every request.

Would that make sense?

Copy link
Contributor Author

@dkorittki dkorittki Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I have with lazy walking the document (i.e. first time a hook accesses field arguments) is that I have to remember the AST, make the walk thread safe (prevent two hooks from walking simultaneously) and remember the result globally for all hooks of that subscription query. It introduces more complex mechanisms compared to the current implementation.

The upside of that is that the ast walking is only performed when really needed, and it's not done during query normalization. But that's only an advantage if walking field arguments is actually expensive. I'll get some big queries and see how it affects performance, so we can be sure lazy walking is a good tradeoff.

Copy link
Contributor Author

@dkorittki dkorittki Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I crunched some numbers. There is a benchmark in router/internal/planningbenchmark/benchmark_test.go, which contains one test BenchmarkPlanning which benchmarks the planning. I utilized this test as a basis and created another identical test but with field argument mapping included. This allowed me to measure the impact of this change compared to the planning step. As schema and operation input I used big production like operations (between 17KB and 37KB per operation). I then ran Go benchmarks for planning with and without field argument mapping.

sec/op was between 0.4 and 1.3ms slower with field argument mappings (1.2 to 2% slower).
B/s throughput was between 9.7 and 14.6KB lower (1.7 to 1.9% slower).
B/op allocated was between 0.3 and 0.6Mi higher (0.3 to 0.8% higher).
allocs/op was between 400 and 1400 more (0.4 to 1.4% higher).

The operations I tested did not contain many field arguments but lots of fields. This made the walker walk a lot of fields but it did not often enter my visitor to create the map for field arguments. I suppose on heavy operations with lots of field arguments we could see an increase in allocations and processing time. But in order to feel that it would involve lots of field arguments and the question is how realistic that is. I assume field arguments are only used for certain fields here and there.

So all in all the performance penalty is not too heavy I think. Given the fact that the field arguments walker only runs when Cosmo Streams Custom Modules are registered I think it's okay to let stay as is? To top it off I will also introduce an option to disable field argument mapping explicitely via config, even if these Custom Modules are registered. What do you think @Noroth ?


func (o *operationContext) Files() []*httpclient.FileUpload {
return o.files
}
Expand Down
Loading
Loading