Skip to content

Commit 79d7e79

Browse files
alepane21StarpTechdkorittki
authored
feat: allows hook in the subscriptions (#1309)
This PR introduces hooks inside the subscription lifecycle. We also decided to remove old pubsub implementation that is already deprecated and the router is not using anymore. This change is primarily needed for Cosmo Streams support in the router. It allows to run a hook during the initialization of a subscription client. The changes are meant to be backwards compatible to the router, so the router can use an engine version with these changes without adjustments. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added startup hooks for GraphQL subscriptions to run custom initialization when a subscription starts. * Improved subscription lifecycle with per-subscription update and close signaling, plus startup error propagation to clients. * **Revert** * Removed the Pub/Sub datasource implementation — all NATS and Kafka integrations, configurations, managers, and related tests were deleted. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: StarpTech <[email protected]> Co-authored-by: Dominik Korittki <[email protected]>
1 parent 0009898 commit 79d7e79

File tree

12 files changed

+771
-1533
lines changed

12 files changed

+771
-1533
lines changed

v2/pkg/engine/datasource/graphql_datasource/configuration.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
1010
"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
1111
grpcdatasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/grpc_datasource"
12+
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
1213
"github.com/wundergraph/graphql-go-tools/v2/pkg/federation"
1314
"github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport"
1415
)
@@ -103,6 +104,11 @@ type SingleTypeField struct {
103104
FieldName string
104105
}
105106

107+
// SubscriptionOnStartFn defines a hook function that is called when a subscription starts.
108+
// It receives the resolve context and the input of the subscription.
109+
// The function can return an error.
110+
type SubscriptionOnStartFn func(ctx resolve.StartupHookContext, input []byte) (err error)
111+
106112
type SubscriptionConfiguration struct {
107113
URL string
108114
Header http.Header
@@ -119,6 +125,8 @@ type SubscriptionConfiguration struct {
119125
// these headers by itself.
120126
ForwardedClientHeaderRegularExpressions []RegularExpression
121127
WsSubProtocol string
128+
// StartupHooks contains the method called when a subscription is started
129+
StartupHooks []SubscriptionOnStartFn
122130
}
123131

124132
type FetchConfiguration struct {

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,8 @@ func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration {
449449
return plan.SubscriptionConfiguration{
450450
Input: string(input),
451451
DataSource: &SubscriptionSource{
452-
client: p.subscriptionClient,
452+
client: p.subscriptionClient,
453+
subscriptionOnStartFns: p.config.subscription.StartupHooks,
453454
},
454455
Variables: p.variables,
455456
PostProcessing: DefaultPostProcessingConfiguration,
@@ -1953,7 +1954,8 @@ type RegularExpression struct {
19531954
}
19541955

19551956
type SubscriptionSource struct {
1956-
client GraphQLSubscriptionClient
1957+
client GraphQLSubscriptionClient
1958+
subscriptionOnStartFns []SubscriptionOnStartFn
19571959
}
19581960

19591961
func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input []byte, updater resolve.SubscriptionUpdater) error {
@@ -2003,3 +2005,16 @@ func (s *SubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte,
20032005
}
20042006
return s.client.UniqueRequestID(ctx, options, xxh)
20052007
}
2008+
2009+
// SubscriptionOnStart is called when a subscription is started.
2010+
// Hooks are invoked sequentially, short-circuiting on the first error.
2011+
func (s *SubscriptionSource) SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) error {
2012+
for _, fn := range s.subscriptionOnStartFns {
2013+
err := fn(ctx, input)
2014+
if err != nil {
2015+
return err
2016+
}
2017+
}
2018+
2019+
return nil
2020+
}

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4006,7 +4006,7 @@ func TestGraphQLDataSource(t *testing.T) {
40064006
Trigger: resolve.GraphQLSubscriptionTrigger{
40074007
Input: []byte(`{"url":"wss://swapi.com/graphql","body":{"query":"subscription{remainingJedis}"}}`),
40084008
Source: &SubscriptionSource{
4009-
NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
4009+
client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
40104010
},
40114011
PostProcessing: DefaultPostProcessingConfiguration,
40124012
},
@@ -8239,7 +8239,6 @@ func (f *FailingSubscriptionClient) SubscribeAsync(ctx *resolve.Context, id uint
82398239
}
82408240

82418241
func (f *FailingSubscriptionClient) Unsubscribe(id uint64) {
8242-
82438242
}
82448243

82458244
func (f *FailingSubscriptionClient) Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
@@ -8272,6 +8271,19 @@ func (t *testSubscriptionUpdaterChan) Update(data []byte) {
82728271
t.updates <- string(data)
82738272
}
82748273

8274+
// empty method to satisfy the interface, not used in this tests
8275+
func (t *testSubscriptionUpdaterChan) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) {
8276+
}
8277+
8278+
// empty method to satisfy the interface, not used in this tests
8279+
func (t *testSubscriptionUpdaterChan) CloseSubscription(kind resolve.SubscriptionCloseKind, id resolve.SubscriptionIdentifier) {
8280+
}
8281+
8282+
// empty method to satisfy the interface, not used in this tests
8283+
func (t *testSubscriptionUpdaterChan) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
8284+
return make(map[context.Context]resolve.SubscriptionIdentifier)
8285+
}
8286+
82758287
func (t *testSubscriptionUpdaterChan) Complete() {
82768288
close(t.complete)
82778289
}
@@ -8397,6 +8409,19 @@ func (t *testSubscriptionUpdater) Close(kind resolve.SubscriptionCloseKind) {
83978409
t.closed = true
83988410
}
83998411

8412+
// empty method to satisfy the interface, not used in this tests
8413+
func (t *testSubscriptionUpdater) CloseSubscription(kind resolve.SubscriptionCloseKind, id resolve.SubscriptionIdentifier) {
8414+
}
8415+
8416+
// empty method to satisfy the interface, not used in this tests
8417+
func (t *testSubscriptionUpdater) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
8418+
return make(map[context.Context]resolve.SubscriptionIdentifier)
8419+
}
8420+
8421+
// empty method to satisfy the interface, not used in this tests
8422+
func (t *testSubscriptionUpdater) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) {
8423+
}
8424+
84008425
func TestSubscriptionSource_Start(t *testing.T) {
84018426
chatServer := httptest.NewServer(subscriptiontesting.ChatGraphQLEndpointHandler())
84028427
defer chatServer.Close()
@@ -8970,6 +8995,60 @@ func TestSanitizeKey(t *testing.T) {
89708995
}
89718996
}
89728997

8998+
func TestSubscriptionSource_SubscriptionOnStart(t *testing.T) {
8999+
9000+
t.Run("SubscriptionOnStart calls subscriptionOnStartFns", func(t *testing.T) {
9001+
ctx := resolve.StartupHookContext{
9002+
Context: context.Background(),
9003+
Updater: func(data []byte) {},
9004+
}
9005+
9006+
type fnData struct {
9007+
ctx resolve.StartupHookContext
9008+
input []byte
9009+
}
9010+
9011+
startFnCalled := make(chan fnData, 1)
9012+
subscriptionSource := SubscriptionSource{
9013+
subscriptionOnStartFns: []SubscriptionOnStartFn{
9014+
func(ctx resolve.StartupHookContext, input []byte) error {
9015+
startFnCalled <- fnData{ctx, input}
9016+
return nil
9017+
},
9018+
},
9019+
}
9020+
9021+
err := subscriptionSource.SubscriptionOnStart(ctx, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`))
9022+
require.NoError(t, err)
9023+
var called fnData
9024+
select {
9025+
case called = <-startFnCalled:
9026+
case <-time.After(1 * time.Second):
9027+
t.Fatal("SubscriptionOnStartFn was not called")
9028+
}
9029+
assert.Equal(t, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`), called.input)
9030+
})
9031+
9032+
t.Run("SubscriptionOnStart calls subscriptionOnStartFns and returns error if one of the functions returns an error", func(t *testing.T) {
9033+
ctx := resolve.StartupHookContext{
9034+
Context: context.Background(),
9035+
Updater: func(data []byte) {},
9036+
}
9037+
9038+
subscriptionSource := SubscriptionSource{
9039+
subscriptionOnStartFns: []SubscriptionOnStartFn{
9040+
func(ctx resolve.StartupHookContext, input []byte) error {
9041+
return errors.New("test error")
9042+
},
9043+
},
9044+
}
9045+
9046+
err := subscriptionSource.SubscriptionOnStart(ctx, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`))
9047+
require.Error(t, err)
9048+
assert.ErrorContains(t, err, "test error")
9049+
})
9050+
}
9051+
89739052
const interfaceSelectionSchema = `
89749053
89759054
scalar String

v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)