Skip to content

Commit 5ec2c8d

Browse files
feat: Add retry logic for PubSub and Implement Deserialization generalized (#453)
* Add Deserializer Signed-off-by: Xavier Geerinck <[email protected]> * Convert Serializer to unified code base as well Signed-off-by: Xavier Geerinck <[email protected]> * Add Uint8Array deserialization Signed-off-by: Xavier Geerinck <[email protected]> * Add Uint8Array deserialization fix Signed-off-by: Xavier Geerinck <[email protected]> * Add implementation for PubSub retrial logic Signed-off-by: Xavier Geerinck <[email protected]> * Prettier Signed-off-by: Xavier Geerinck <[email protected]> * Cover linting issues Signed-off-by: Xavier Geerinck <[email protected]> * Changes Signed-off-by: Xavier Geerinck <[email protected]> * WIP Signed-off-by: Xavier Geerinck <[email protected]> * Add code with resilience Signed-off-by: Xavier Geerinck <[email protected]> * PRettier Signed-off-by: Xavier Geerinck <[email protected]> * Prettier, my best friend Signed-off-by: Xavier Geerinck <[email protected]> * Remove old comment block Signed-off-by: Xavier Geerinck <[email protected]> --------- Signed-off-by: Xavier Geerinck <[email protected]>
1 parent 75859d8 commit 5ec2c8d

File tree

12 files changed

+604
-176
lines changed

12 files changed

+604
-176
lines changed

daprdocs/content/en/js-sdk-docs/js-server/_index.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,77 @@ async function start() {
228228

229229
> For a full list of state operations visit [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}).
230230
231+
#### Subscribe with SUCCESS/RETRY/DROP status
232+
233+
Dapr supports [status codes for retry logic](https://docs.dapr.io/reference/api/pubsub_api/#expected-http-response) to specify what should happen after a message gets processed.
234+
235+
> ⚠️ The JS SDK allows multiple callbacks on the same topic, we handle priority of status on `RETRY` > `DROP` > `SUCCESS` and default to `SUCCESS`
236+
237+
> ⚠️ Make sure to [configure resiliency](https://docs.dapr.io/operations/resiliency/resiliency-overview/) in your application to handle `RETRY` messages
238+
239+
In the JS SDK we support these messages through the `DaprPubSubStatusEnum` enum. To ensure Dapr will retry we configure a Resiliency policy as well.
240+
241+
\*\*components/resiliency.yaml`
242+
243+
```yaml
244+
apiVersion: dapr.io/v1alpha1
245+
kind: Resiliency
246+
metadata:
247+
name: myresiliency
248+
spec:
249+
policies:
250+
retries:
251+
# Global Retry Policy for Inbound Component operations
252+
DefaultComponentInboundRetryPolicy:
253+
policy: constant
254+
duration: 500ms
255+
maxRetries: 10
256+
targets:
257+
components:
258+
messagebus:
259+
inbound:
260+
retry: DefaultComponentInboundRetryPolicy
261+
```
262+
263+
**src/index.ts**
264+
265+
```typescript
266+
import { DaprServer, DaprPubSubStatusEnum } from "@dapr/dapr";
267+
268+
const daprHost = "127.0.0.1"; // Dapr Sidecar Host
269+
const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
270+
const serverHost = "127.0.0.1"; // App Host of this Example Server
271+
const serverPort = "50051"; // App Port of this Example Server "
272+
273+
async function start() {
274+
const server = new DaprServer(serverHost, serverPort, daprHost, daprPort);
275+
276+
const pubSubName = "my-pubsub-name";
277+
const topic = "topic-a";
278+
279+
// Process a message successfully
280+
await server.pubsub.subscribe(pubSubName, topic, async (data: any, headers: object) => {
281+
return DaprPubSubStatusEnum.SUCCESS;
282+
});
283+
284+
// Retry a message
285+
// Note: this example will keep on retrying to deliver the message
286+
// Note 2: each component can have their own retry configuration
287+
// e.g., https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-redis-pubsub/
288+
await server.pubsub.subscribe(pubSubName, topic, async (data: any, headers: object) => {
289+
return DaprPubSubStatusEnum.RETRY;
290+
});
291+
292+
// Drop a message
293+
await server.pubsub.subscribe(pubSubName, topic, async (data: any, headers: object) => {
294+
return DaprPubSubStatusEnum.DROP;
295+
});
296+
297+
// Start the server
298+
await server.start();
299+
}
300+
```
301+
231302
#### Subscribe to messages rule based
232303

233304
Dapr [supports routing messages](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/) to different handlers (routes) based on rules.

src/enum/DaprPubSubStatus.enum.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
Copyright 2022 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
export enum DaprPubSubStatusEnum {
15+
SUCCESS = "SUCCESS", // Message is processed successfully
16+
RETRY = "RETRY", // Message to be retried by Dapr
17+
DROP = "DROP", // Warning is logged and message is dropped
18+
}
19+
20+
export default DaprPubSubStatusEnum;

src/implementation/Server/GRPCServer/GRPCServerImpl.ts

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ import { PubSubSubscriptionsType } from "../../../types/pubsub/PubSubSubscriptio
3939
import { DaprPubSubType } from "../../../types/pubsub/DaprPubSub.type";
4040
import { PubSubSubscriptionTopicRoutesType } from "../../../types/pubsub/PubSubSubscriptionTopicRoutes.type";
4141
import { DaprInvokerCallbackFunction } from "../../../types/DaprInvokerCallback.type";
42+
import { PubSubSubscriptionTopicRouteType } from "../../../types/pubsub/PubSubSubscriptionTopicRoute.type";
43+
import DaprPubSubStatusEnum from "../../../enum/DaprPubSubStatus.enum";
44+
import { deserializeGrpc } from "../../../utils/Deserializer.util";
4245

4346
// https://github.com/badsyntax/grpc-js-typescript/issues/1#issuecomment-705419742
4447
// @ts-ignore
@@ -417,13 +420,7 @@ export default class GRPCServerImpl implements IAppCallbackServer {
417420
return;
418421
}
419422

420-
const data = Buffer.from(req.getData()).toString();
421-
let dataParsed: any;
422-
try {
423-
dataParsed = JSON.parse(data);
424-
} catch (_) {
425-
dataParsed = data;
426-
}
423+
const data = deserializeGrpc(req.getDataContentType(), req.getData());
427424

428425
const res = new TopicEventResponse();
429426

@@ -436,19 +433,63 @@ export default class GRPCServerImpl implements IAppCallbackServer {
436433
}
437434
}
438435

439-
try {
440-
const eventHandlers = this.pubSubSubscriptions[pubsubName][topic].routes[route].eventHandlers;
441-
await Promise.all(eventHandlers.map((cb) => cb(dataParsed, headers)));
442-
res.setStatus(TopicEventResponse.TopicEventResponseStatus.SUCCESS);
443-
} catch (e) {
444-
// @todo: for now we drop, maybe we should allow retrying as well more easily?
445-
this.logger.error(`Error handling topic event: ${e}`);
446-
res.setStatus(TopicEventResponse.TopicEventResponseStatus.DROP);
436+
// Process the callbacks
437+
// we handle priority of status on `RETRY` > `DROP` > `SUCCESS` and default to `SUCCESS`
438+
const routeObj = this.pubSubSubscriptions[pubsubName][topic].routes[route];
439+
const status = await this.processPubSubCallbacks(routeObj, data, headers);
440+
441+
switch (status) {
442+
case DaprPubSubStatusEnum.RETRY:
443+
res.setStatus(TopicEventResponse.TopicEventResponseStatus.RETRY);
444+
break;
445+
case DaprPubSubStatusEnum.DROP:
446+
res.setStatus(TopicEventResponse.TopicEventResponseStatus.DROP);
447+
break;
448+
case DaprPubSubStatusEnum.SUCCESS:
449+
default:
450+
res.setStatus(TopicEventResponse.TopicEventResponseStatus.SUCCESS);
451+
break;
447452
}
448453

449454
return callback(null, res);
450455
}
451456

457+
async processPubSubCallbacks(
458+
routeObj: PubSubSubscriptionTopicRouteType,
459+
data: any,
460+
headers: { [key: string]: string },
461+
): Promise<DaprPubSubStatusEnum> {
462+
const eventHandlers = routeObj.eventHandlers;
463+
const statuses = [];
464+
465+
// Process the callbacks (default: SUCCESS)
466+
for (const cb of eventHandlers) {
467+
let status = DaprPubSubStatusEnum.SUCCESS;
468+
469+
try {
470+
status = await cb(data, headers);
471+
} catch (e) {
472+
// We catch and log an error, but we don't do anything with it as the statuses should define that
473+
this.logger.error(`[route-${routeObj.path}] Message processing failed, ${e}`);
474+
}
475+
476+
statuses.push(status ?? DaprPubSubStatusEnum.SUCCESS);
477+
}
478+
479+
// Look at the statuses and return the highest priority
480+
// we handle priority of status on `RETRY` > `DROP` > `SUCCESS`
481+
if (statuses.includes(DaprPubSubStatusEnum.RETRY)) {
482+
this.logger.debug(`[route-${routeObj.path}] Retrying message`);
483+
return DaprPubSubStatusEnum.RETRY;
484+
} else if (statuses.includes(DaprPubSubStatusEnum.DROP)) {
485+
this.logger.debug(`[route-${routeObj.path}] Dropping message`);
486+
return DaprPubSubStatusEnum.DROP;
487+
} else {
488+
this.logger.debug(`[route-${routeObj.path}] Acknowledging message`);
489+
return DaprPubSubStatusEnum.SUCCESS;
490+
}
491+
}
492+
452493
// Dapr will call this on startup to see which topics it is subscribed to
453494
async listTopicSubscriptions(
454495
call: grpc.ServerUnaryCall<Empty, ListTopicSubscriptionsResponse>,

src/implementation/Server/HTTPServer/HTTPServerImpl.ts

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14-
import SubscribedMessageHttpResponse from "../../../enum/SubscribedMessageHttpResponse.enum";
1514
import { Logger } from "../../../logger/Logger";
1615
import { LoggerOptions } from "../../../types/logger/LoggerOptions";
1716
import { DaprPubSubType } from "../../../types/pubsub/DaprPubSub.type";
@@ -20,6 +19,9 @@ import { PubSubSubscriptionOptionsType } from "../../../types/pubsub/PubSubSubsc
2019
import { PubSubSubscriptionTopicRoutesType } from "../../../types/pubsub/PubSubSubscriptionTopicRoutes.type";
2120
import { IServerType } from "./HTTPServer";
2221
import { TypeDaprPubSubCallback } from "../../../types/DaprPubSubCallback.type";
22+
import DaprPubSubStatusEnum from "../../../enum/DaprPubSubStatus.enum";
23+
import { IncomingHttpHeaders } from "http";
24+
import { PubSubSubscriptionTopicRouteType } from "../../../types/pubsub/PubSubSubscriptionTopicRoute.type";
2325

2426
export default class HTTPServerImpl {
2527
private readonly PUBSUB_DEFAULT_ROUTE_NAME = "default";
@@ -121,21 +123,12 @@ export default class HTTPServerImpl {
121123
// Add a server POST handler
122124
this.server.post(`/${routeObj.path}`, async (req, res) => {
123125
const headers = req.headers;
124-
125126
const data = this.extractDataFromSubscribeRequest(req);
126127

127-
// Process the callback
128-
try {
129-
const eventHandlers = routeObj.eventHandlers;
130-
await Promise.all(eventHandlers.map((cb) => cb(data, headers)));
131-
} catch (e) {
132-
this.logger.error(`[route-${routeObj.path}] Message processing failed, dropping: ${e}`);
133-
return res.send({ status: SubscribedMessageHttpResponse.DROP });
134-
}
135-
136-
// Let Dapr know that the message was processed correctly
137-
this.logger.debug(`[route-${routeObj.path}] Acknowledging message`);
138-
return res.send({ status: SubscribedMessageHttpResponse.SUCCESS });
128+
// Process the callbacks
129+
// we handle priority of status on `RETRY` > `DROP` > `SUCCESS` and default to `SUCCESS`
130+
const status = await this.processPubSubCallbacks(routeObj, data, headers);
131+
return res.send({ status });
139132
});
140133
}
141134

@@ -146,6 +139,42 @@ export default class HTTPServerImpl {
146139
);
147140
}
148141

142+
async processPubSubCallbacks(
143+
routeObj: PubSubSubscriptionTopicRouteType,
144+
data: any,
145+
headers: IncomingHttpHeaders,
146+
): Promise<DaprPubSubStatusEnum> {
147+
const eventHandlers = routeObj.eventHandlers;
148+
const statuses = [];
149+
150+
// Process the callbacks (default: SUCCESS)
151+
for (const cb of eventHandlers) {
152+
let status = DaprPubSubStatusEnum.SUCCESS;
153+
154+
try {
155+
status = await cb(data, headers);
156+
} catch (e) {
157+
// We catch and log an error, but we don't do anything with it as the statuses should define that
158+
this.logger.error(`[route-${routeObj.path}] Message processing failed, ${e}`);
159+
}
160+
161+
statuses.push(status ?? DaprPubSubStatusEnum.SUCCESS);
162+
}
163+
164+
// Look at the statuses and return the highest priority
165+
// we handle priority of status on `RETRY` > `DROP` > `SUCCESS`
166+
if (statuses.includes(DaprPubSubStatusEnum.RETRY)) {
167+
this.logger.debug(`[route-${routeObj.path}] Retrying message`);
168+
return DaprPubSubStatusEnum.RETRY;
169+
} else if (statuses.includes(DaprPubSubStatusEnum.DROP)) {
170+
this.logger.debug(`[route-${routeObj.path}] Dropping message`);
171+
return DaprPubSubStatusEnum.DROP;
172+
} else {
173+
this.logger.debug(`[route-${routeObj.path}] Acknowledging message`);
174+
return DaprPubSubStatusEnum.SUCCESS;
175+
}
176+
}
177+
149178
registerPubSubSubscriptionEventHandler(
150179
pubsubName: string,
151180
topic: string,

src/index.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,32 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14-
import HttpMethod from "./enum/HttpMethod.enum";
15-
import CommunicationProtocolEnum from "./enum/CommunicationProtocol.enum";
16-
import DaprClient from "./implementation/Client/DaprClient";
17-
import DaprServer from "./implementation/Server/DaprServer";
18-
import AbstractActor from "./actors/runtime/AbstractActor";
1914
import { Temporal } from "@js-temporal/polyfill";
20-
import ActorProxyBuilder from "./actors/client/ActorProxyBuilder";
15+
16+
import DaprServer from "./implementation/Server/DaprServer";
17+
import DaprClient from "./implementation/Client/DaprClient";
18+
import GRPCClient from "./implementation/Client/GRPCClient/GRPCClient";
19+
import HTTPClient from "./implementation/Client/HTTPClient/HTTPClient";
20+
2121
import ActorId from "./actors/ActorId";
22+
import ActorProxyBuilder from "./actors/client/ActorProxyBuilder";
23+
import AbstractActor from "./actors/runtime/AbstractActor";
24+
25+
import { ConsoleLoggerService } from "./logger/ConsoleLoggerService";
26+
2227
import { DaprClientOptions } from "./types/DaprClientOptions";
28+
import { InvokerOptions } from "./types/InvokerOptions.type";
29+
import { DaprInvokerCallbackContent, DaprInvokerCallbackFunction } from "./types/DaprInvokerCallback.type";
2330
import { LoggerOptions } from "./types/logger/LoggerOptions";
2431
import { LoggerService } from "./types/logger/LoggerService";
25-
import { ConsoleLoggerService } from "./logger/ConsoleLoggerService";
2632
import { LogLevel } from "./types/logger/LogLevel";
27-
import GRPCClient from "./implementation/Client/GRPCClient/GRPCClient";
28-
import HTTPClient from "./implementation/Client/HTTPClient/HTTPClient";
29-
import { InvokerOptions } from "./types/InvokerOptions.type";
30-
import { DaprInvokerCallbackContent, DaprInvokerCallbackFunction } from "./types/DaprInvokerCallback.type";
3133
import { PubSubBulkPublishResponse } from "./types/pubsub/PubSubBulkPublishResponse.type";
3234
import { PubSubBulkPublishMessage } from "./types/pubsub/PubSubBulkPublishMessage.type";
35+
36+
import HttpMethod from "./enum/HttpMethod.enum";
37+
import CommunicationProtocolEnum from "./enum/CommunicationProtocol.enum";
38+
import DaprPubSubStatusEnum from "./enum/DaprPubSubStatus.enum";
39+
3340
export {
3441
DaprClient,
3542
DaprServer,
@@ -39,7 +46,6 @@ export {
3946
AbstractActor,
4047
ActorId,
4148
ActorProxyBuilder,
42-
CommunicationProtocolEnum,
4349
Temporal,
4450
DaprClientOptions,
4551
LogLevel,
@@ -49,6 +55,8 @@ export {
4955
InvokerOptions,
5056
DaprInvokerCallbackFunction as TypeDaprInvokerCallback,
5157
DaprInvokerCallbackContent,
58+
CommunicationProtocolEnum,
59+
DaprPubSubStatusEnum,
5260
PubSubBulkPublishMessage,
5361
PubSubBulkPublishResponse,
5462
};

src/utils/Client.util.ts

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14+
import { randomUUID } from "crypto";
15+
import { Map } from "google-protobuf";
16+
17+
import { ConfigurationItem as ConfigurationItemProto } from "../proto/dapr/proto/common/v1/common_pb";
18+
import { isCloudEvent } from "./CloudEvent.util";
19+
1420
import { KeyValueType } from "../types/KeyValue.type";
1521
import { ConfigurationType } from "../types/configuration/Configuration.type";
1622
import { ConfigurationItem } from "../types/configuration/ConfigurationItem";
17-
import { ConfigurationItem as ConfigurationItemProto } from "../proto/dapr/proto/common/v1/common_pb";
18-
import { Map } from "google-protobuf";
1923
import { PubSubBulkPublishEntry } from "../types/pubsub/PubSubBulkPublishEntry.type";
20-
import { randomUUID } from "crypto";
2124
import { PubSubBulkPublishResponse } from "../types/pubsub/PubSubBulkPublishResponse.type";
2225
import { PubSubBulkPublishMessage } from "../types/pubsub/PubSubBulkPublishMessage.type";
2326
import { PubSubBulkPublishApiResponse } from "../types/pubsub/PubSubBulkPublishApiResponse.type";
@@ -82,24 +85,6 @@ export function createConfigurationType(configDict: Map<string, ConfigurationIte
8285
return configMap;
8386
}
8487

85-
/**
86-
* Checks if the input object is a valid Cloud Event.
87-
* A valid Cloud Event is a JSON object that contains id, source, type, and specversion.
88-
* See https://github.com/cloudevents/spec/blob/v1.0/spec.md#required-attributes
89-
* @param str input object
90-
* @returns true if the object is a valid Cloud Event
91-
*/
92-
function isCloudEvent(obj: object): boolean {
93-
const requiredAttributes = ["id", "source", "type", "specversion"];
94-
return (
95-
typeof obj === "object" &&
96-
obj !== null &&
97-
requiredAttributes.every((attr) => {
98-
return Object.prototype.hasOwnProperty.call(obj, attr);
99-
})
100-
);
101-
}
102-
10388
/**
10489
* Gets the Content-Type for the input data.
10590
*

0 commit comments

Comments
 (0)