Skip to content

Commit c3f6347

Browse files
authored
feat: Add metadata option to PubSub publish and subscribe (#329)
* Add metadata option to publish Signed-off-by: Shubham Sharma <[email protected]> * WIP backup Signed-off-by: Shubham Sharma <[email protected]> * Fix HTTP test Signed-off-by: Shubham Sharma <[email protected]> * Add another HTTP test Signed-off-by: Shubham Sharma <[email protected]> * WIP gRPC test Signed-off-by: Shubham Sharma <[email protected]> * Add metadata utils Signed-off-by: Shubham Sharma <[email protected]> * Use the new utils Signed-off-by: Shubham Sharma <[email protected]> * Update tests Signed-off-by: Shubham Sharma <[email protected]> * Remove console.log Signed-off-by: Shubham Sharma <[email protected]> * Add tests with hash character Signed-off-by: Shubham Sharma <[email protected]> * Review comments address Signed-off-by: Shubham Sharma <[email protected]> * Review comments address Signed-off-by: Shubham Sharma <[email protected]> * Some more review comments address Signed-off-by: Shubham Sharma <[email protected]> Signed-off-by: Shubham Sharma <[email protected]>
1 parent fd35781 commit c3f6347

File tree

14 files changed

+311
-53
lines changed

14 files changed

+311
-53
lines changed

src/implementation/Client/GRPCClient/pubsub.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import { PublishEventRequest } from "../../../proto/dapr/proto/runtime/v1/dapr_p
1616
import IClientPubSub from "../../../interfaces/Client/IClientPubSub";
1717
import { Logger } from '../../../logger/Logger';
1818
import * as SerializerUtil from "../../../utils/Serializer.util";
19+
import { KeyValueType } from '../../../types/KeyValue.type';
20+
import { createGRPCMetadata } from '../../../utils/Client.util';
1921

2022
// https://docs.dapr.io/reference/api/pubsub_api/
2123
export default class GRPCClientPubSub implements IClientPubSub {
@@ -29,16 +31,17 @@ export default class GRPCClientPubSub implements IClientPubSub {
2931
}
3032

3133
// @todo: should return a specific typed Promise<TypePubSubPublishResponse> instead of Promise<any>
32-
async publish(pubSubName: string, topic: string, data: object = {}): Promise<boolean> {
34+
async publish(pubSubName: string, topic: string, data: object = {}, metadata?: KeyValueType): Promise<boolean> {
3335
const msgService = new PublishEventRequest();
3436
msgService.setPubsubName(pubSubName);
3537
msgService.setTopic(topic);
3638
msgService.setData(SerializerUtil.serializeGrpc(data).serializedData);
3739

3840
const client = await this.client.getClient();
41+
const grpcMetadata = createGRPCMetadata(metadata);
3942

4043
return new Promise((resolve, reject) => {
41-
client.publishEvent(msgService, (err, _res) => {
44+
client.publishEvent(msgService, grpcMetadata, (err, _res) => {
4245
if (err) {
4346
this.logger.error(`publish failed: ${err}`);
4447
return reject(false);

src/implementation/Client/HTTPClient/pubsub.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ limitations under the License.
1414
import HTTPClient from './HTTPClient';
1515
import IClientPubSub from '../../../interfaces/Client/IClientPubSub';
1616
import { Logger } from '../../../logger/Logger';
17+
import { KeyValueType } from '../../../types/KeyValue.type';
18+
import { createHTTPMetadataQueryParam } from '../../../utils/Client.util';
1719

1820
// https://docs.dapr.io/reference/api/pubsub_api/
1921
export default class HTTPClientPubSub implements IClientPubSub {
@@ -25,9 +27,11 @@ export default class HTTPClientPubSub implements IClientPubSub {
2527
this.logger = new Logger("HTTPClient", "PubSub", client.getOptions().logger);
2628
}
2729

28-
async publish(pubSubName: string, topic: string, data: object = {}): Promise<boolean> {
30+
async publish(pubSubName: string, topic: string, data: object = {}, metadata?: KeyValueType): Promise<boolean> {
31+
const queryParams = createHTTPMetadataQueryParam(metadata);
32+
2933
try {
30-
await this.client.execute(`/publish/${pubSubName}/${topic}`, {
34+
await this.client.execute(`/publish/${pubSubName}/${topic}?${queryParams}`, {
3135
method: 'POST',
3236
headers: {
3337
'Content-Type': 'application/json',

src/implementation/Server/GRPCServer/GRPCServerImpl.ts

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { TypeDaprBindingCallback } from "../../../types/DaprBindingCallback.type
2424
import { TypeDaprPubSubCallback } from "../../../types/DaprPubSubCallback.type";
2525
import { Logger } from "../../../logger/Logger";
2626
import { LoggerOptions } from "../../../types/logger/LoggerOptions";
27+
import { KeyValueType } from "../../../types/KeyValue.type";
2728

2829

2930
// https://github.com/badsyntax/grpc-js-typescript/issues/1#issuecomment-705419742
@@ -33,13 +34,13 @@ export default class GRPCServerImpl implements IAppCallbackServer {
3334

3435
handlersInvoke: { [key: string]: TypeDaprInvokerCallback };
3536
handlersBindings: { [key: string]: TypeDaprBindingCallback };
36-
handlersTopics: { [key: string]: TypeDaprPubSubCallback };
37+
registrationsTopics: { [key: string]: { cb: TypeDaprPubSubCallback, metadata: KeyValueType } };
3738

3839
constructor(loggerOptions?: LoggerOptions) {
3940
this.logger = new Logger("GRPCServer", "GRPCServerImpl", loggerOptions);
4041
this.handlersInvoke = {};
4142
this.handlersBindings = {};
42-
this.handlersTopics = {};
43+
this.registrationsTopics = {};
4344
}
4445

4546
createPubSubSubscriptionHandlerKey(pubSubName: string, topicName: string): string {
@@ -59,9 +60,9 @@ export default class GRPCServerImpl implements IAppCallbackServer {
5960
this.handlersInvoke[handlerKey] = cb;
6061
}
6162

62-
registerPubSubSubscriptionHandler(pubSubName: string, topicName: string, cb: TypeDaprInvokerCallback): void {
63+
registerPubSubSubscriptionHandler(pubSubName: string, topicName: string, cb: TypeDaprInvokerCallback, metadata: KeyValueType = {}): void {
6364
const handlerKey = this.createPubSubSubscriptionHandlerKey(pubSubName, topicName);
64-
this.handlersTopics[handlerKey] = cb;
65+
this.registrationsTopics[handlerKey] = { cb: cb, metadata };
6566
}
6667

6768
registerInputBindingHandler(bindingName: string, cb: TypeDaprInvokerCallback): void {
@@ -144,8 +145,8 @@ export default class GRPCServerImpl implements IAppCallbackServer {
144145
const req = call.request;
145146
const handlerKey = this.createPubSubSubscriptionHandlerKey(req.getPubsubName(), req.getTopic());
146147

147-
if (!this.handlersTopics[handlerKey]) {
148-
this.logger.warn(`Event from topic: "${handlerKey}" was not handled`);
148+
if (!this.registrationsTopics[handlerKey]) {
149+
this.logger.warn(`Topic "${handlerKey}" unregistered, event was not handled.`);
149150
return;
150151
}
151152

@@ -161,7 +162,7 @@ export default class GRPCServerImpl implements IAppCallbackServer {
161162
const res = new TopicEventResponse();
162163

163164
try {
164-
await this.handlersTopics[handlerKey](dataParsed);
165+
await (this.registrationsTopics[handlerKey]).cb(dataParsed);
165166
res.setStatus(TopicEventResponse.TopicEventResponseStatus.SUCCESS);
166167
} catch (e) {
167168
// @todo: for now we drop, maybe we should allow retrying as well more easily?
@@ -175,19 +176,22 @@ export default class GRPCServerImpl implements IAppCallbackServer {
175176
// @todo: WIP
176177
async listTopicSubscriptions(call: grpc.ServerUnaryCall<Empty, ListTopicSubscriptionsResponse>, callback: grpc.sendUnaryData<ListTopicSubscriptionsResponse>): Promise<void> {
177178
const res = new ListTopicSubscriptionsResponse();
179+
const topicSubscriptions: TopicSubscription[] = [];
178180

179-
const values = Object.keys(this.handlersTopics).map((i) => {
180-
const handlerTopic = i.split("|");
181-
181+
for (const [key, value] of Object.entries(this.registrationsTopics)) {
182+
const [pubSubName, topicName] = key.split("|");
182183
const topicSubscription = new TopicSubscription();
183-
topicSubscription.setPubsubName(handlerTopic[0]);
184-
topicSubscription.setTopic(handlerTopic[1]);
185184

186-
return topicSubscription;
187-
});
185+
topicSubscription.setPubsubName(pubSubName);
186+
topicSubscription.setTopic(topicName);
187+
for (const [mKey, mValue] of Object.entries(value.metadata)) {
188+
topicSubscription.getMetadataMap().set(mKey, mValue);
189+
}
188190

189-
res.setSubscriptionsList(values);
191+
topicSubscriptions.push(topicSubscription);
192+
}
190193

194+
res.setSubscriptionsList(topicSubscriptions);
191195
return callback(null, res);
192196
}
193197

@@ -197,4 +201,4 @@ export default class GRPCServerImpl implements IAppCallbackServer {
197201
res.setBindingsList(Object.keys(this.handlersBindings));
198202
return callback(null, res);
199203
}
200-
}
204+
}

src/implementation/Server/GRPCServer/pubsub.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import GRPCServer from "./GRPCServer";
1515
import { TypeDaprPubSubCallback } from "../../../types/DaprPubSubCallback.type";
1616
import IServerPubSub from "../../../interfaces/Server/IServerPubSub";
1717
import { Logger } from "../../../logger/Logger";
18+
import { KeyValueType } from "../../../types/KeyValue.type";
1819

1920
// https://docs.dapr.io/reference/api/pubsub_api/
2021
export default class DaprPubSub implements IServerPubSub {
@@ -26,8 +27,8 @@ export default class DaprPubSub implements IServerPubSub {
2627
this.logger = new Logger("GRPCServer", "PubSub", server.client.options.logger);
2728
}
2829

29-
async subscribe(pubSubName: string, topic: string, cb: TypeDaprPubSubCallback): Promise<void> {
30-
this.logger.info(`Registering onTopicEvent Handler: PubSub = ${pubSubName}; Topic = ${topic}`)
31-
this.server.getServerImpl().registerPubSubSubscriptionHandler(pubSubName, topic, cb);
30+
async subscribe(pubSubName: string, topic: string, cb: TypeDaprPubSubCallback, _route?: "", metadata?: KeyValueType): Promise<void> {
31+
this.logger.info(`Registering onTopicEvent Handler: PubSub = ${pubSubName}; Topic = ${topic}; Metadata: ${metadata}`)
32+
this.server.getServerImpl().registerPubSubSubscriptionHandler(pubSubName, topic, cb, metadata);
3233
}
3334
}

src/implementation/Server/HTTPServer/HTTPServer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ export default class HTTPServer implements IServer {
109109

110110
// Add PubSub Routes
111111
this.logger.info(`Registering ${this.serverImpl.pubSubSubscriptionRoutes.length} PubSub Subscriptions`);
112-
this.server.get('/dapr/subscribe', (req, res) => {
112+
this.server.get('/dapr/subscribe', (_req, res) => {
113113
res.send(this.serverImpl.pubSubSubscriptionRoutes);
114114
this.logger.info(`Registered ${this.serverImpl.pubSubSubscriptionRoutes.length} PubSub Subscriptions`);
115115
});

src/implementation/Server/HTTPServer/HTTPServerImpl.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14+
import { KeyValueType } from "../../../types/KeyValue.type";
15+
1416
interface PubSubSubscriptionRoute {
1517
pubsubname: string;
1618
topic: string;
1719
route: string;
20+
metadata: { [key: string]: string };
1821
}
1922

2023
export default class HTTPServerImpl {
@@ -24,9 +27,13 @@ export default class HTTPServerImpl {
2427
this.pubSubSubscriptionRoutes = [];
2528
}
2629

27-
registerPubSubSubscriptionRoute(pubSubName: string, topicName: string, route: string): void {
30+
registerPubSubSubscriptionRoute(pubSubName: string, topicName: string, route: string, metadata: KeyValueType= {}): void {
31+
const httpMetadata: { [key: string]: string } = {};
32+
for (const [key, value] of Object.entries(metadata)) {
33+
httpMetadata[key] = JSON.stringify(value);
34+
}
2835
this.pubSubSubscriptionRoutes.push({
29-
pubsubname: pubSubName, topic: topicName, route
36+
pubsubname: pubSubName, topic: topicName, route: route, metadata: httpMetadata
3037
});
3138
}
3239
}

src/implementation/Server/HTTPServer/pubsub.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import IServerPubSub from '../../../interfaces/Server/IServerPubSub';
1616
import HTTPServer from './HTTPServer';
1717
import { Logger } from '../../../logger/Logger';
1818
import SubscribedMessageHttpResponse from '../../../enum/SubscribedMessageHttpResponse.enum';
19+
import { KeyValueType } from '../../../types/KeyValue.type';
1920

2021
// https://docs.dapr.io/reference/api/pubsub_api/
2122
export default class HTTPServerPubSub implements IServerPubSub {
@@ -27,13 +28,13 @@ export default class HTTPServerPubSub implements IServerPubSub {
2728
this.logger = new Logger("HTTPServer", "PubSub", server.client.options.logger);
2829
}
2930

30-
async subscribe(pubsubName: string, topic: string, cb: TypeDaprPubSubCallback, route = "") {
31+
async subscribe(pubsubName: string, topic: string, cb: TypeDaprPubSubCallback, route = "", metadata?: KeyValueType) {
3132
if (!route) {
3233
route = `route-${pubsubName}-${topic}`;
3334
}
3435

3536
// Register the handler
36-
await this.server.getServerImpl().registerPubSubSubscriptionRoute(pubsubName, topic, route);
37+
await this.server.getServerImpl().registerPubSubSubscriptionRoute(pubsubName, topic, route, metadata);
3738

3839
this.server.getServer().post(`/${route}`, async (req, res) => {
3940
// @ts-ignore
@@ -51,7 +52,7 @@ export default class HTTPServerPubSub implements IServerPubSub {
5152
}
5253

5354
// Let Dapr know that the message was processed correctly
54-
this.logger.debug(`[route-${topic}] Ack'ing the message`);
55+
this.logger.debug(`[route-${topic}] Acknowledging the message`);
5556
return res.send({ status: SubscribedMessageHttpResponse.SUCCESS });
5657
});
5758
}

src/interfaces/Client/IClientPubSub.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14+
import { KeyValueType } from "../../types/KeyValue.type";
15+
1416
export default interface IClientPubSub {
15-
publish(pubSubName: string, topic: string, data?: object): Promise<boolean>
17+
/**
18+
* Publish a message to a topic.
19+
* @param pubSubName name of the pubsub
20+
* @param topic name of the topic
21+
* @param data data to publish
22+
* @param metadata metadata for the message
23+
*/
24+
publish(pubSubName: string, topic: string, data?: object, metadata?: KeyValueType): Promise<boolean>
1625
}

src/interfaces/Server/IServerPubSub.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,17 @@ limitations under the License.
1212
*/
1313

1414
import { TypeDaprPubSubCallback } from "../../types/DaprPubSubCallback.type";
15+
import { KeyValueType } from "../../types/KeyValue.type";
1516

1617
export default interface IServerPubSub {
17-
subscribe(pubSubName: string, topic: string, cb: TypeDaprPubSubCallback, route?: string): Promise<void>;
18+
/**
19+
* Subscribe to a topic.
20+
* @param pubSubName name of the pubsub
21+
* @param topic name of the topic
22+
* @param cb callback function to handle messages
23+
* @param route The HTTP route override to register for the event subscription.
24+
* Default value is `/route-${pubsubName}-${topic}`. Ignored if gRPC is used.
25+
* @param metadata metadata for the subscription
26+
*/
27+
subscribe(pubSubName: string, topic: string, cb: TypeDaprPubSubCallback, route?: string, metadata?: KeyValueType): Promise<void>;
1828
}

src/utils/Client.util.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2022 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
import * as grpc from "@grpc/grpc-js";
15+
import { KeyValueType } from "../types/KeyValue.type";
16+
17+
/**
18+
* Converts a KeyValueType to a grpc.Metadata object.
19+
* @param metadata key value pair of metadata
20+
* @returns grpc.Metadata object
21+
*/
22+
export function createGRPCMetadata(metadata: KeyValueType = {}): grpc.Metadata {
23+
const grpcMetadata = new grpc.Metadata();
24+
for (const [key, value] of Object.entries(metadata)) {
25+
grpcMetadata.set(key, value);
26+
}
27+
return grpcMetadata;
28+
}
29+
30+
/**
31+
* Converts a KeyValueType to a HTTP query parameters.
32+
* The query parameters are separated by "&", and the key value pair is separated by "=".
33+
* Each metadata key is prefixed with "metadata.".
34+
*
35+
* Example, if the metadata is { "key1": "value1", "key2": "value2" }, the query parameter will be:
36+
* "metadata.key1=value1&metadata.key2=value2"
37+
*
38+
* Note, the returned value does not contain the "?" prefix.
39+
*
40+
* @param metadata key value pair of metadata
41+
* @returns HTTP query parameter string
42+
*/
43+
export function createHTTPMetadataQueryParam(metadata: KeyValueType = {}): string {
44+
let queryParam = "";
45+
for (const [key, value] of Object.entries(metadata)) {
46+
queryParam += "&" + "metadata." + encodeURIComponent(key) + "=" + encodeURIComponent(value);
47+
}
48+
// strip the first "&" if it exists
49+
queryParam = queryParam.substring(1);
50+
return queryParam;
51+
}

0 commit comments

Comments
 (0)