Skip to content

Commit fd273a9

Browse files
feat: Implement PubSub Deadletter & Single/Multi Routing (#342)
* Initial implementation for Pub/Sub with Single and Multi routing + DeadLetter support - WIP Signed-off-by: Xavier Geerinck <[email protected]> * Implement event handlers and routes Signed-off-by: Xavier Geerinck <[email protected]> * Re-enable all server test Signed-off-by: Xavier Geerinck <[email protected]> * Add gRPC tests (disabled for now) Signed-off-by: Xavier Geerinck <[email protected]> * Recode for clarity Signed-off-by: Xavier Geerinck <[email protected]> * Fix tests Signed-off-by: Xavier Geerinck <[email protected]> * Add gRPC Signed-off-by: Xavier Geerinck <[email protected]> * Add deadletter support Signed-off-by: Xavier Geerinck <[email protected]> * Stabilize pubsub more, add more examples and tests and docs Signed-off-by: Xavier Geerinck <[email protected]> * Fix linting and fix build Signed-off-by: Xavier Geerinck <[email protected]> * Resolve comments Signed-off-by: Xavier Geerinck <[email protected]> * Minor nit corrections Signed-off-by: Xavier Geerinck <[email protected]> Signed-off-by: Xavier Geerinck <[email protected]>
1 parent c3f6347 commit fd273a9

31 files changed

+1572
-760
lines changed

daprdocs/content/en/js-sdk-docs/js-client/_index.md

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,17 @@ start().catch((e) => {
277277
});
278278
```
279279

280-
##### Subscribe to messages
280+
#### Subscribe to messages
281+
282+
Subscribing to messages can be done in several ways to offer flexibility of receiving messages on your topics:
283+
284+
* Direct subscription through the `subscribe` method
285+
* Direct susbcription with options through the `subscribeWithOptions` method
286+
* Subscription afterwards through the `susbcribeOnEvent` method
287+
288+
> Dapr requires subscriptions to be set up on startup, but in the JS SDK we allow event handlers to be added afterwards as well, providing you the flexibility of programming.
289+
290+
An example is provided below
281291

282292
```javascript
283293
import { DaprServer } from "@dapr/dapr";
@@ -294,14 +304,117 @@ async function start() {
294304
const topic = "topic-a";
295305

296306
// Configure Subscriber for a Topic
297-
await server.pubsub.subscribe(pubSubName, topic, async (data: any) => console.log(`Got Data: ${JSON.stringify(data)}`));
307+
// Method 1: Direct subscription through the `subscribe` method
308+
await server.pubsub.subscribe(pubSubName, topic, async (data: any) => console.log(`Received Data: ${JSON.stringify(data)}`));
298309

310+
// Method 2: Direct susbcription with options through the `subscribeWithOptions` method
311+
await server.pubsub.subscribeWithOptions(pubSubName, topic, {
312+
callback: async (data: any) => console.log(`Received Data: ${JSON.stringify(data)}`)
313+
});
314+
315+
// Method 3: Subscription afterwards through the `susbcribeOnEvent` method
316+
// Note: we use default, since if no route was passed (empty options) we utilize "default" as the route name
317+
await server.pubsub.subscribeWithOptions('pubsub-redis', 'topic-options-1', {});
318+
server.pubsub.subscribeToRoute("pubsub-redis", "topic-options-1", "default", async (data) => { console.log(`Received Data: ${JSON.stringify(data)}`) });
319+
320+
// Start the server
299321
await server.start();
300322
}
301323
```
302324

303325
> For a full list of state operations visit [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}).
304326
327+
#### Subscribe to messages rule based
328+
329+
Dapr [supports routing messages](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/
330+
) to different handlers (routes) based on rules.
331+
332+
> E.g., you are writing an application that needs to handle messages depending on their "type" with Dapr, you can send them to different routes `handlerType1` and `handlerType2` with the default route being `handlerDefault`
333+
334+
```javascript
335+
import { DaprServer } from "@dapr/dapr";
336+
337+
const daprHost = "127.0.0.1"; // Dapr Sidecar Host
338+
const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
339+
const serverHost = "127.0.0.1"; // App Host of this Example Server
340+
const serverPort = "50051"; // App Port of this Example Server "
341+
342+
async function start() {
343+
const server = new DaprServer(serverHost, serverPort, daprHost, daprPort);
344+
345+
const pubSubName = "my-pubsub-name";
346+
const topic = "topic-a";
347+
348+
// Configure Subscriber for a Topic with rule set
349+
// Note: the default route and match patterns are optional
350+
await server.pubsub.subscribe('pubsub-redis', 'topic-1', {
351+
default: "/default",
352+
rules: [
353+
{
354+
match: `event.type == "my-type-1"`,
355+
path: "/type-1"
356+
},
357+
{
358+
match: `event.type == "my-type-2"`,
359+
path: "/type-2"
360+
}
361+
]
362+
});
363+
364+
// Add handlers for each route
365+
server.pubsub.subscribeToRoute("pubsub-redis", "topic-1", "default", async (data) => { console.log(`Handling Default`) });
366+
server.pubsub.subscribeToRoute("pubsub-redis", "topic-1", "type-1", async (data) => { console.log(`Handling Type 1`) });
367+
server.pubsub.subscribeToRoute("pubsub-redis", "topic-1", "type-2", async (data) => { console.log(`Handling Type 2`) });
368+
369+
// Start the server
370+
await server.start();
371+
}
372+
```
373+
374+
#### Dead Letter Topics
375+
376+
Dapr supports [dead letter topic](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-deadletter/). This means that when a message fails to be processed, it gets sent to a dead letter queue. E.g., when a message fails to be handled on `/my-queue` it will be sent to `/my-queue-failed`.
377+
E.g., when a message fails to be handled on `/my-queue` it will be sent to `/my-queue-failed`.
378+
379+
You can use the following options with `subscribeWithOptions` method:
380+
* `deadletterTopic`: Specify a deadletter topic name (note: if none is provided we create one named `deadletter`)
381+
* `deadletterCallback`: The method to trigger as handler for our deadletter
382+
383+
Implementing Deadletter support in the JS SDK can be done by either
384+
* Passing the `deadletterCallback` as an option
385+
* By subscribing to route manually with `subscribeToRoute`
386+
387+
An example is provided below
388+
389+
```javascript
390+
import { DaprServer } from "@dapr/dapr";
391+
392+
const daprHost = "127.0.0.1"; // Dapr Sidecar Host
393+
const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
394+
const serverHost = "127.0.0.1"; // App Host of this Example Server
395+
const serverPort = "50051"; // App Port of this Example Server "
396+
397+
async function start() {
398+
const server = new DaprServer(serverHost, serverPort, daprHost, daprPort);
399+
400+
const pubSubName = "my-pubsub-name";
401+
402+
// Method 1 (direct subscribing through subscribeWithOptions)
403+
await server.pubsub.subscribeWithOptions('pubsub-redis', 'topic-options-5', {
404+
callback: async (data: any) => { throw new Error("Triggering Deadletter") },
405+
deadLetterCallback: async (data: any) => { console.log("Handling Deadletter message") }
406+
});
407+
408+
// Method 2 (subscribe afterwards)
409+
await server.pubsub.subscribeWithOptions('pubsub-redis', 'topic-options-1', { deadletterTopic: "my-deadletter-topic" });
410+
server.pubsub.subscribeToRoute("pubsub-redis", "topic-options-1", "default", async () => { throw new Error("Triggering Deadletter") });
411+
server.pubsub.subscribeToRoute("pubsub-redis", "topic-options-1", "my-deadletter-topic", async () => { console.log("Handling Deadletter message") });
412+
413+
// Start server
414+
await server.start();
415+
}
416+
```
417+
305418
### Bindings API
306419

307420
#### Invoke Output Binding

daprdocs/content/en/js-sdk-docs/js-server/_index.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ async function start() {
141141
const pubSubName = "my-pubsub-name";
142142
const topic = "topic-a";
143143

144-
// Configure Subscriber for a Topic
145-
await server.pubsub.subscribe(pubSubName, topic, async (data: any) => console.log(`Got Data: ${JSON.stringify(data)}`));
144+
// Method 1: Direct subscription through the `subscribe` method
145+
await server.pubsub.subscribe(pubSubName, topic, async (data: any) => console.log(`Received Data: ${JSON.stringify(data)}`));
146146

147147
await server.start();
148148
}

examples/configuration/package-lock.json

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/grpc/hello-world-distributed/client/package-lock.json

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/grpc/hello-world-distributed/server/package-lock.json

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/grpc/proxying/client/package-lock.json

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/http/actor-parking-sensor/package-lock.json

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/http/actor/package-lock.json

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/invocation/package-lock.json

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/pubsub/package-lock.json

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)