Skip to content

Commit af9ed88

Browse files
DeepanshuADeepanshu Agarwal
andauthored
feat: Add Bulk Subscribe support (#437)
* Add Bulk Subscribe changes Signed-off-by: Deepanshu Agarwal <[email protected]> * Update Bulk Subscribe Support Signed-off-by: Deepanshu Agarwal <[email protected]> * Add e2e test for bulkSub Signed-off-by: Deepanshu Agarwal <[email protected]> * Fix build issue Signed-off-by: Deepanshu Agarwal <[email protected]> * Fix time Signed-off-by: Deepanshu Agarwal <[email protected]> * Use latest dapr Signed-off-by: Deepanshu Agarwal <[email protected]> * Add docs Signed-off-by: Deepanshu Agarwal <[email protected]> * Fix docs Signed-off-by: Deepanshu Agarwal <[email protected]> * Fix pretty Signed-off-by: Deepanshu Agarwal <[email protected]> * incorporate review comments Signed-off-by: Deepanshu Agarwal <[email protected]> * Pretty Signed-off-by: Deepanshu Agarwal <[email protected]> * Modify language Signed-off-by: Deepanshu Agarwal <[email protected]> * Introduce BulkSubscribeOptions Signed-off-by: Deepanshu Agarwal <[email protected]> * Correct docs Signed-off-by: Deepanshu Agarwal <[email protected]> * Pretty Signed-off-by: Deepanshu Agarwal <[email protected]> * Fix destructure Signed-off-by: Deepanshu Agarwal <[email protected]> * Use retry logic Signed-off-by: Deepanshu Agarwal <[email protected]> * Fix assignment Signed-off-by: Deepanshu Agarwal <[email protected]> * Test names Signed-off-by: Deepanshu Agarwal <[email protected]> * remove todo Signed-off-by: Deepanshu Agarwal <[email protected]> * Incorporate review comments Signed-off-by: Deepanshu Agarwal <[email protected]> * pretty Signed-off-by: Deepanshu Agarwal <[email protected]> * nitties Signed-off-by: Deepanshu Agarwal <[email protected]> * Comment improve Signed-off-by: Deepanshu Agarwal <[email protected]> * Comment improve Signed-off-by: Deepanshu Agarwal <[email protected]> * Correct proto Signed-off-by: Deepanshu Agarwal <[email protected]> * Comment Space Signed-off-by: Deepanshu Agarwal <[email protected]> --------- Signed-off-by: Deepanshu Agarwal <[email protected]> Signed-off-by: Deepanshu Agarwal <[email protected]> Co-authored-by: Deepanshu Agarwal <[email protected]>
1 parent 7a494f2 commit af9ed88

File tree

16 files changed

+624
-2
lines changed

16 files changed

+624
-2
lines changed

daprdocs/content/en/js-sdk-docs/js-server/_index.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,51 @@ async function start() {
351351
}
352352
```
353353

354+
#### Bulk Subscribe to messages
355+
356+
Bulk Subscription is supported and is available through following API:
357+
358+
- Bulk subscription through the `subscribeBulk` method: `maxMessagesCount` and `maxAwaitDurationMs` are optional; and if not provided, default values for related components will be used.
359+
360+
While listening for messages, the application receives messages from Dapr in bulk. However, like regular subscribe, the callback function receives a single message at a time, and the user can choose to return a `DaprPubSubStatusEnum` value to acknowledge successfully, retry, or drop the message. The default behavior is to return a success response.
361+
362+
Please refer [this document](https://v1-10.docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-bulk/) for more details.
363+
364+
```typescript
365+
import { DaprServer } from "@dapr/dapr";
366+
367+
const pubSubName = "orderPubSub";
368+
const topic = "topicbulk";
369+
370+
const DAPR_HOST = process.env.DAPR_HOST || "127.0.0.1";
371+
const DAPR_HTTP_PORT = process.env.DAPR_HTTP_PORT || "3502";
372+
const SERVER_HOST = process.env.SERVER_HOST || "127.0.0.1";
373+
const SERVER_PORT = process.env.APP_PORT || 5001;
374+
375+
async function start() {
376+
const server = new DaprServer(SERVER_HOST, SERVER_PORT, DAPR_HOST, DAPR_HTTP_PORT);
377+
378+
// Publish multiple messages to a topic with default config.
379+
await client.pubsub.subscribeBulk(pubSubName, topic, (data) =>
380+
console.log("Subscriber received: " + JSON.stringify(data)),
381+
);
382+
383+
// Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
384+
await client.pubsub.subscribeBulk(
385+
pubSubName,
386+
topic,
387+
(data) => {
388+
console.log("Subscriber received: " + JSON.stringify(data));
389+
return DaprPubSubStatusEnum.SUCCESS; // If App doesn't return anything, the default is SUCCESS. App can also return RETRY or DROP based on the incoming message.
390+
},
391+
{
392+
maxMessagesCount: 100,
393+
maxAwaitDurationMs: 40,
394+
},
395+
);
396+
}
397+
```
398+
354399
#### Dead Letter Topics
355400

356401
Dapr supports [dead letter topic](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-deadletter/). This means that when a message fails to be processed, it gets sent to a dead letter queue. E.g., when a message fails to be handled on `/my-queue` it will be sent to `/my-queue-failed`.

examples/pubsub/src/index.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,27 @@ async function start() {
2929
// The library parses JSON when possible.
3030
console.log(`[Dapr-JS][Example] Received on subscription: ${JSON.stringify(data)}`);
3131
});
32+
33+
// Publish multiple messages to a topic with default config.
34+
await client.pubsub.subscribeBulk("my-pubsub-component", "my-topic", async (data: Record<string, any>) => {
35+
// The library parses JSON when possible.
36+
console.log(`[Dapr-JS][Example] Received on subscription: ${JSON.stringify(data)}`);
37+
});
38+
39+
// Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
40+
await client.pubsub.subscribeBulk(
41+
"my-pubsub-component",
42+
"my-topic",
43+
async (data: Record<string, any>) => {
44+
// The library parses JSON when possible.
45+
console.log(`[Dapr-JS][Example] Received on subscription: ${JSON.stringify(data)}`);
46+
},
47+
{
48+
maxMessagesCount: 100,
49+
maxAwaitDurationMs: 40,
50+
},
51+
);
52+
3253
await server.start();
3354

3455
// Wait for 1 second to allow the server to start.

scripts/fetch-proto.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ OS=$(echo `uname`|tr '[:upper:]' '[:lower:]')
33
ARCH=$(uname -m)
44
ORG_NAME="dapr"
55
REPO_NAME="dapr"
6-
BRANCH_NAME="v1.10.0"
6+
BRANCH_NAME="v1.10.2"
77

88
# Path to store output
99
PATH_ROOT=$(pwd)

src/implementation/Server/GRPCServer/GRPCServer.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ limitations under the License.
1313

1414
import * as grpc from "@grpc/grpc-js";
1515
import GRPCServerImpl from "./GRPCServerImpl";
16-
import { AppCallbackService } from "../../../proto/dapr/proto/runtime/v1/appcallback_grpc_pb";
16+
import { AppCallbackService, AppCallbackAlphaService } from "../../../proto/dapr/proto/runtime/v1/appcallback_grpc_pb";
1717
import IServer from "../../../interfaces/Server/IServer";
1818
import { DaprClient } from "../../..";
1919
import { Logger } from "../../../logger/Logger";
@@ -52,6 +52,10 @@ export default class GRPCServer implements IServer {
5252
this.logger.info("Adding Service Implementation - AppCallbackService");
5353
// @ts-ignore
5454
this.server.addService(AppCallbackService, this.serverImpl);
55+
// Add our implementation
56+
this.logger.info("Adding Service Implementation - AppCallbackAlphaService");
57+
// @ts-ignore
58+
this.server.addService(AppCallbackAlphaService, this.serverImpl);
5559
}
5660

5761
// See: https://cs.github.com/nestjs/nest/blob/f4e9ac6208f3e7ee7ad44c3de713c9086f657977/packages/microservices/external/grpc-options.interface.ts

src/implementation/Server/GRPCServer/GRPCServerImpl.ts

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@ import { HTTPExtension, InvokeRequest, InvokeResponse } from "../../../proto/dap
2020
import {
2121
BindingEventRequest,
2222
BindingEventResponse,
23+
BulkSubscribeConfig,
2324
ListInputBindingsResponse,
2425
ListTopicSubscriptionsResponse,
2526
TopicEventRequest,
2627
TopicEventResponse,
2728
TopicRoutes,
2829
TopicRule,
2930
TopicSubscription,
31+
TopicEventBulkRequest,
32+
TopicEventBulkResponse,
33+
TopicEventBulkResponseEntry,
34+
TopicEventCERequest,
3035
} from "../../../proto/dapr/proto/runtime/v1/appcallback_pb";
3136
import * as HttpVerbUtil from "../../../utils/HttpVerb.util";
3237
import { TypeDaprBindingCallback } from "../../../types/DaprBindingCallback.type";
@@ -249,6 +254,7 @@ export default class GRPCServerImpl implements IAppCallbackServer {
249254
metadata: metadata,
250255
route: this.generateDaprSubscriptionRoute(pubsubName, topic),
251256
deadLetterTopic: options.deadLetterTopic,
257+
bulkSubscribe: options.bulkSubscribe,
252258
};
253259
} else if (typeof options.route === "string") {
254260
return {
@@ -257,6 +263,7 @@ export default class GRPCServerImpl implements IAppCallbackServer {
257263
metadata: metadata,
258264
route: this.generateDaprSubscriptionRoute(pubsubName, topic, options.route),
259265
deadLetterTopic: options.deadLetterTopic,
266+
bulkSubscribe: options.bulkSubscribe,
260267
};
261268
} else {
262269
return {
@@ -271,6 +278,7 @@ export default class GRPCServerImpl implements IAppCallbackServer {
271278
})),
272279
},
273280
deadLetterTopic: options.deadLetterTopic,
281+
bulkSubscribe: options.bulkSubscribe,
274282
};
275283
}
276284
}
@@ -454,6 +462,82 @@ export default class GRPCServerImpl implements IAppCallbackServer {
454462
return callback(null, res);
455463
}
456464

465+
async onBulkTopicEventAlpha1(
466+
call: grpc.ServerUnaryCall<TopicEventBulkRequest, TopicEventBulkResponse>,
467+
callback: grpc.sendUnaryData<TopicEventBulkResponse>,
468+
): Promise<void> {
469+
const req = call.request;
470+
const pubsubName = req.getPubsubName();
471+
const topic = req.getTopic();
472+
473+
// Route is unique to pubsub and topic and has format pubsub--topic--route so we strip it since else we can't find the route
474+
const route = this.generatePubSubSubscriptionTopicRouteName(req.getPath().replace(`${pubsubName}--${topic}--`, ""));
475+
476+
if (
477+
!this.pubSubSubscriptions[pubsubName] ||
478+
!this.pubSubSubscriptions[pubsubName][topic] ||
479+
!this.pubSubSubscriptions[pubsubName][topic].routes[route]
480+
) {
481+
this.logger.warn(
482+
`The topic '${topic}' is not being subscribed to on PubSub '${pubsubName}' for route '${route}'.`,
483+
);
484+
return;
485+
}
486+
487+
const resArr: TopicEventBulkResponseEntry[] = [];
488+
const entries = req.getEntriesList();
489+
490+
for (const ind in entries) {
491+
const event = entries[ind];
492+
let data: any;
493+
if (event.hasBytes()) {
494+
data = deserializeGrpc(event.getContentType(), event.getBytes());
495+
} else if (event.hasCloudEvent()) {
496+
const cloudEvent = event.getCloudEvent();
497+
if (cloudEvent instanceof TopicEventCERequest) {
498+
data = deserializeGrpc(cloudEvent.getDataContentType(), cloudEvent.getData());
499+
}
500+
}
501+
502+
const res = new TopicEventBulkResponseEntry();
503+
504+
// Get the headers
505+
const headers: { [key: string]: string } = {};
506+
507+
for (const [key, value] of Object.entries(call.metadata.toHttp2Headers())) {
508+
if (value) {
509+
headers[key] = value.toString();
510+
}
511+
}
512+
513+
// Process the callbacks
514+
// we handle priority of status on `RETRY` > `DROP` > `SUCCESS` and default to `SUCCESS`
515+
const routeObj = this.pubSubSubscriptions[pubsubName][topic].routes[route];
516+
const status = await this.processPubSubCallbacks(routeObj, data, headers);
517+
518+
switch (status) {
519+
case DaprPubSubStatusEnum.RETRY:
520+
res.setStatus(TopicEventResponse.TopicEventResponseStatus.RETRY);
521+
break;
522+
case DaprPubSubStatusEnum.DROP:
523+
res.setStatus(TopicEventResponse.TopicEventResponseStatus.DROP);
524+
break;
525+
case DaprPubSubStatusEnum.SUCCESS:
526+
default:
527+
res.setStatus(TopicEventResponse.TopicEventResponseStatus.SUCCESS);
528+
break;
529+
}
530+
531+
res.setEntryId(event.getEntryId());
532+
resArr.push(res);
533+
}
534+
535+
const totalRes = new TopicEventBulkResponse();
536+
totalRes.setStatusesList(resArr);
537+
538+
return callback(null, totalRes);
539+
}
540+
457541
async processPubSubCallbacks(
458542
routeObj: PubSubSubscriptionTopicRouteType,
459543
data: any,
@@ -512,6 +596,21 @@ export default class GRPCServerImpl implements IAppCallbackServer {
512596
topicSubscription.setDeadLetterTopic(daprConfig.deadLetterTopic);
513597
}
514598

599+
if (daprConfig?.bulkSubscribe) {
600+
const bulkSubscribe = new BulkSubscribeConfig();
601+
bulkSubscribe.setEnabled(daprConfig.bulkSubscribe.enabled);
602+
603+
if (daprConfig?.bulkSubscribe?.maxMessagesCount) {
604+
bulkSubscribe.setMaxMessagesCount(daprConfig.bulkSubscribe.maxMessagesCount);
605+
}
606+
607+
if (daprConfig?.bulkSubscribe?.maxAwaitDurationMs) {
608+
bulkSubscribe.setMaxAwaitDurationMs(daprConfig.bulkSubscribe.maxAwaitDurationMs);
609+
}
610+
611+
topicSubscription.setBulkSubscribe(bulkSubscribe);
612+
}
613+
515614
if (daprConfig?.metadata) {
516615
for (const [mKey, mValue] of Object.entries(daprConfig.metadata)) {
517616
topicSubscription.getMetadataMap().set(mKey, mValue);

src/implementation/Server/GRPCServer/pubsub.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import { PubSubSubscriptionOptionsType } from "../../../types/pubsub/PubSubSubsc
1919
import { DaprPubSubRouteType } from "../../../types/pubsub/DaprPubSubRouteType.type";
2020
import { PubSubSubscriptionsType } from "../../../types/pubsub/PubSubSubscriptions.type";
2121
import { KeyValueType } from "../../../types/KeyValue.type";
22+
import { BulkSubscribeConfig } from "../../../types/pubsub/BulkSubscribeConfig.type";
23+
import { BulkSubscribeOptions } from "../../../types/pubsub/BulkSubscribeOptions.type";
2224

2325
// https://docs.dapr.io/reference/api/pubsub_api/
2426
export default class DaprPubSub implements IServerPubSub {
@@ -97,6 +99,35 @@ export default class DaprPubSub implements IServerPubSub {
9799
this.server.getServerImpl().registerPubSubSubscriptionEventHandler(pubsubName, topic, route, cb);
98100
}
99101

102+
async subscribeBulk(
103+
pubsubName: string,
104+
topic: string,
105+
cb: TypeDaprPubSubCallback,
106+
bulkSubscribeOptions: BulkSubscribeOptions,
107+
): Promise<void> {
108+
const bulkSubscribe: BulkSubscribeConfig = {
109+
enabled: true,
110+
};
111+
112+
const { route, metadata, maxMessagesCount, maxAwaitDurationMs } = bulkSubscribeOptions || {};
113+
114+
if (maxMessagesCount != undefined) {
115+
bulkSubscribe.maxMessagesCount = maxMessagesCount;
116+
}
117+
118+
if (maxAwaitDurationMs != undefined) {
119+
bulkSubscribe.maxAwaitDurationMs = maxAwaitDurationMs;
120+
}
121+
122+
this.server
123+
.getServerImpl()
124+
.registerPubsubSubscription(pubsubName, topic, { route, metadata, bulkSubscribe: bulkSubscribe });
125+
126+
// Add the callback to the event handlers manually
127+
// @todo: we will deprecate this way of working? and require subscribeToRoute?
128+
this.subscribeToRoute(pubsubName, topic, route, cb);
129+
}
130+
100131
getSubscriptions(): PubSubSubscriptionsType {
101132
return this.server.getServerImpl().pubSubSubscriptions;
102133
}

0 commit comments

Comments
 (0)