Skip to content

Commit 18ddf39

Browse files
Merge pull request #269 from XavierGeerinck/issue_267
Add req.body to the subscribe response as well
2 parents c3eebe0 + 3c1bf01 commit 18ddf39

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

src/implementation/Server/HTTPServer/pubsub.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,19 @@ export default class HTTPServerPubSub implements IServerPubSub {
3232
await this.server.getServerImpl().registerPubSubSubscriptionRoute(pubsubName, topic, route);
3333

3434
this.server.getServer().post(`/${route}`, async (req, res) => {
35-
// Process our callback
3635
// @ts-ignore
37-
await cb(req?.body?.data);
36+
// Parse the data of the body, we prioritize fetching the data key in body if possible
37+
// i.e. Redis returns { data: {} } and other services return {}
38+
// @todo: This will be deprecated in an upcoming major version and only req.body will be returned
39+
const data = req?.body?.data || req?.body;
40+
41+
// Process our callback
42+
try {
43+
await cb(data);
44+
} catch (e) {
45+
console.error(e);
46+
return res.send({ success: false });
47+
}
3848

3949
// Let Dapr know that the message was processed correctly
4050
// console.log(`[Dapr API][PubSub][route-${topic}] Ack'ing the message`);

test/e2e/main.grpc.test.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ describe('grpc/main', () => {
2626
let client: DaprClient;
2727
const mockBindingReceive = jest.fn(async (_data: object) => console.log('mockBindingReceive'));
2828
const mockPubSubSubscribe = jest.fn(async (_data: object) => console.log('mockPubSubSubscribe'));
29+
const mockPubSubSubscribeError = jest.fn(async (_data: object) => { throw new Error("mockPubSubSubscribeError") });
2930

3031
// We need to start listening on some endpoints already
3132
// this because Dapr is not dynamic and registers endpoints on boot
@@ -38,6 +39,7 @@ describe('grpc/main', () => {
3839
// Test with:
3940
// dapr publish --publish-app-id test-suite --pubsub pubsub-redis --topic test-topic --data '{ "hello": "world" }'
4041
await server.pubsub.subscribe('pubsub-redis', 'test-topic', mockPubSubSubscribe);
42+
await server.pubsub.subscribe('pubsub-redis', 'test-topic-error', mockPubSubSubscribeError);
4143

4244
// Start server
4345
await server.start();
@@ -109,6 +111,19 @@ describe('grpc/main', () => {
109111
const res = await client.pubsub.publish('pubsub-redis', 'test-topic', { hello: 'world' });
110112
expect(res).toEqual(true);
111113
});
114+
115+
it('should not crash if the callback throws an error', async () => {
116+
await client.pubsub.publish('pubsub-redis', 'test-topic-error', { hello: 'world' });
117+
118+
// Delay a bit for event to arrive
119+
await new Promise((resolve, _reject) => setTimeout(resolve, 250));
120+
121+
expect(mockPubSubSubscribeError.mock.calls.length).toBe(1);
122+
123+
// Also test for receiving data
124+
// @ts-ignore
125+
expect(mockPubSubSubscribeError.mock.calls[0][0]['hello']).toEqual('world');
126+
});
112127
});
113128

114129
describe('invoker', () => {

test/e2e/main.http.test.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ describe('http/main', () => {
2424
let client: DaprClient;
2525
const mockBindingReceive = jest.fn(async (_data: object) => console.log('mockBindingReceive'));
2626
const mockPubSubSubscribe = jest.fn(async (_data: object) => console.log('mockPubSubSubscribe'));
27+
const mockPubSubSubscribeError = jest.fn(async (_data: object) => { throw new Error("mockPubSubSubscribeError") });
2728

2829
// We need to start listening on some endpoints already
2930
// this because Dapr is not dynamic and registers endpoints on boot
@@ -40,6 +41,7 @@ describe('http/main', () => {
4041
// Test with:
4142
// dapr publish --publish-app-id test-suite --pubsub pubsub-redis --topic test-topic --data '{ "hello": "world" }'
4243
await server.pubsub.subscribe('pubsub-redis', 'test-topic', mockPubSubSubscribe);
44+
await server.pubsub.subscribe('pubsub-redis', 'test-topic-error', mockPubSubSubscribeError);
4345

4446
// Start server
4547
await server.start();
@@ -108,6 +110,19 @@ describe('http/main', () => {
108110
const res = await client.pubsub.publish('pubsub-redis', 'test-topic', { hello: 'world' });
109111
expect(res).toEqual(true);
110112
});
113+
114+
it('should not crash if the callback throws an error', async () => {
115+
await client.pubsub.publish('pubsub-redis', 'test-topic-error', { hello: 'world' });
116+
117+
// Delay a bit for event to arrive
118+
await new Promise((resolve, _reject) => setTimeout(resolve, 250));
119+
120+
expect(mockPubSubSubscribeError.mock.calls.length).toBe(1);
121+
122+
// Also test for receiving data
123+
// @ts-ignore
124+
expect(mockPubSubSubscribeError.mock.calls[0][0]['hello']).toEqual('world');
125+
});
111126
});
112127

113128
describe('invoker', () => {

0 commit comments

Comments
 (0)