Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions router-tests/events/kafka_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,23 @@ func TestKafkaEvents(t *testing.T) {
})
})

t.Run("mutate returns correct typename", func(t *testing.T) {
t.Parallel()

topics := []string{"employeeUpdated"}

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {
events.KafkaEnsureTopicExists(t, xEnv, KafkaWaitTimeout, topics...)
resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { __typename success } }`,
})
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"__typename":"edfs__PublishResult","success":true}}}`, resOne.Body)
})
})

t.Run("kafka startup and shutdown with wrong broker should not stop router from starting indefinitely", func(t *testing.T) {
t.Parallel()

Expand Down
27 changes: 26 additions & 1 deletion router-tests/events/nats_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func TestNatsEvents(t *testing.T) {
})
})

t.Run("publish", func(t *testing.T) {
t.Run("mutate", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
Expand Down Expand Up @@ -1064,6 +1064,31 @@ func TestNatsEvents(t *testing.T) {
})
})

t.Run("mutate returns correct typename", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate,
EnableNats: true,
}, func(t *testing.T, xEnv *testenv.Environment) {
sub, err := xEnv.NatsConnectionDefault.SubscribeSync(xEnv.GetPubSubName("employeeUpdatedMyNats.3"))
require.NoError(t, err)
require.NoError(t, xEnv.NatsConnectionDefault.Flush())

t.Cleanup(func() { _ = sub.Unsubscribe() })

resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation UpdateEmployeeNats($update: UpdateEmployeeInput!) {
updateEmployeeMyNats(id: 3, update: $update) {__typename success}
}`,
Variables: json.RawMessage(`{"update":{"name":"Stefan Avramovic","email":"avramovic@wundergraph.com"}}`),
})

// Send a query to receive the response from the NATS message
require.Equal(t, `{"data":{"updateEmployeeMyNats":{"__typename":"edfs__PublishResult","success":true}}}`, resOne.Body)
})
})

t.Run("subscribe with stream and consumer", func(t *testing.T) {
t.Parallel()

Expand Down
16 changes: 16 additions & 0 deletions router-tests/events/redis_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,22 @@ func TestRedisEvents(t *testing.T) {
}
})
})

t.Run("mutate returns correct typename", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsRedisJSONTemplate,
EnableRedis: true,
NoRetryClient: true,
}, func(t *testing.T, xEnv *testenv.Environment) {
// send a mutation to trigger the first subscription
resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation { updateEmployeeMyRedis(id: 3, update: {name: "name test"}) { __typename success } }`,
})
require.JSONEq(t, `{"data":{"updateEmployeeMyRedis":{"__typename":"edfs__PublishResult","success":true}}}`, resOne.Body)
})
})
}

func TestRedisClusterEvents(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions router-tests/modules/stream_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func TestPublishHook(t *testing.T) {
}, func(t *testing.T, xEnv *testenv.Environment) {
events.KafkaEnsureTopicExists(t, xEnv, time.Second, "employeeUpdated")
resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`,
Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { __typename success } }`,
})
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"success":true}}}`, resOne.Body)
require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"__typename": "edfs__PublishResult", "success":true}}}`, resOne.Body)

assert.Equal(t, int32(1), customModule.HookCallCount.Load())
})
Expand Down
4 changes: 2 additions & 2 deletions router/pkg/pubsub/kafka/engine_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ func (s *PublishDataSource) Load(ctx context.Context, input []byte, out *bytes.B

if err := s.pubSub.Publish(ctx, publishData.PublishEventConfiguration(), []datasource.StreamEvent{&Event{&publishData.Event}}); err != nil {
// err will not be returned but only logged inside PubSubProvider.Publish to avoid a "unable to fetch from subgraph" error
_, errWrite := io.WriteString(out, `{"success": false}`)
_, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": false}`)
return errWrite
}
_, errWrite := io.WriteString(out, `{"success": true}`)
_, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": true}`)
if errWrite != nil {
return errWrite
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestEngineDataSourceFactoryWithMockAdapter(t *testing.T) {
out := &bytes.Buffer{}
err = ds.Load(context.Background(), []byte(input), out)
require.NoError(t, err)
require.Equal(t, `{"success": true}`, out.String())
require.Equal(t, `{"__typename": "edfs__PublishResult", "success": true}`, out.String())
}

// TestEngineDataSourceFactory_GetResolveDataSource_WrongType tests the EngineDataSourceFactory with a mocked adapter
Expand Down
4 changes: 2 additions & 2 deletions router/pkg/pubsub/kafka/engine_datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestKafkaPublishDataSource_Load(t *testing.T) {
})).Return(nil)
},
expectError: false,
expectedOutput: `{"success": true}`,
expectedOutput: `{"__typename": "edfs__PublishResult", "success": true}`,
expectPublished: true,
},
{
Expand All @@ -118,7 +118,7 @@ func TestKafkaPublishDataSource_Load(t *testing.T) {
m.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("publish error"))
},
expectError: false, // The Load method doesn't return the publish error directly
expectedOutput: `{"success": false}`,
expectedOutput: `{"__typename": "edfs__PublishResult", "success": false}`,
expectPublished: true,
},
{
Expand Down
4 changes: 2 additions & 2 deletions router/pkg/pubsub/nats/engine_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ func (s *NatsPublishDataSource) Load(ctx context.Context, input []byte, out *byt

if err := s.pubSub.Publish(ctx, publishData.PublishEventConfiguration(), []datasource.StreamEvent{&Event{evt: &publishData.Event}}); err != nil {
// err will not be returned but only logged inside PubSubProvider.Publish to avoid a "unable to fetch from subgraph" error
_, errWrite := io.WriteString(out, `{"success": false}`)
_, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": false}`)
return errWrite
}
_, err := io.WriteString(out, `{"success": true}`)
_, err := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": true}`)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion router/pkg/pubsub/nats/engine_datasource_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestEngineDataSourceFactoryWithMockAdapter(t *testing.T) {
out := &bytes.Buffer{}
err = ds.Load(context.Background(), []byte(input), out)
require.NoError(t, err)
require.Equal(t, `{"success": true}`, out.String())
require.Equal(t, `{"__typename": "edfs__PublishResult", "success": true}`, out.String())
}

func TestEngineDataSourceFactory_GetResolveDataSource_WrongType(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions router/pkg/pubsub/nats/engine_datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestNatsPublishDataSource_Load(t *testing.T) {
})).Return(nil)
},
expectError: false,
expectedOutput: `{"success": true}`,
expectedOutput: `{"__typename": "edfs__PublishResult", "success": true}`,
expectPublished: true,
},
{
Expand All @@ -100,7 +100,7 @@ func TestNatsPublishDataSource_Load(t *testing.T) {
m.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("publish error"))
},
expectError: false, // The Load method doesn't return the publish error directly
expectedOutput: `{"success": false}`,
expectedOutput: `{"__typename": "edfs__PublishResult", "success": false}`,
expectPublished: true,
},
{
Expand Down
4 changes: 2 additions & 2 deletions router/pkg/pubsub/redis/engine_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ func (s *PublishDataSource) Load(ctx context.Context, input []byte, out *bytes.B

if err := s.pubSub.Publish(ctx, publishData.PublishEventConfiguration(), []datasource.StreamEvent{&Event{evt: &publishData.Event}}); err != nil {
// err will not be returned but only logged inside PubSubProvider.Publish to avoid a "unable to fetch from subgraph" error
_, errWrite := io.WriteString(out, `{"success": false}`)
_, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": false}`)
return errWrite
}
_, err := io.WriteString(out, `{"success": true}`)
_, err := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": true}`)
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestEngineDataSourceFactoryWithMockAdapter(t *testing.T) {
out := &bytes.Buffer{}
err = ds.Load(context.Background(), []byte(input), out)
require.NoError(t, err)
require.Equal(t, `{"success": true}`, out.String())
require.Equal(t, `{"__typename": "edfs__PublishResult", "success": true}`, out.String())
}

// TestEngineDataSourceFactory_GetResolveDataSource_WrongType tests the EngineDataSourceFactory with a mocked adapter
Expand Down
4 changes: 2 additions & 2 deletions router/pkg/pubsub/redis/engine_datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestRedisPublishDataSource_Load(t *testing.T) {
})).Return(nil)
},
expectError: false,
expectedOutput: `{"success": true}`,
expectedOutput: `{"__typename": "edfs__PublishResult", "success": true}`,
expectPublished: true,
},
{
Expand All @@ -98,7 +98,7 @@ func TestRedisPublishDataSource_Load(t *testing.T) {
m.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("publish error"))
},
expectError: false, // The Load method doesn't return the publish error directly
expectedOutput: `{"success": false}`,
expectedOutput: `{"__typename": "edfs__PublishResult", "success": false}`,
expectPublished: true,
},
{
Expand Down
Loading