Skip to content

Commit 15c9fa8

Browse files
authored
Allow publish cloud events directly and publish response enhancements (#428)
* Initial changes for publish Signed-off-by: Shubham Sharma <[email protected]> * Update response, docs, and wip e2e Signed-off-by: Shubham Sharma <[email protected]> * Fix build Signed-off-by: Shubham Sharma <[email protected]> * Update package.json Signed-off-by: Shubham Sharma <[email protected]> * Update package.json Signed-off-by: Shubham Sharma <[email protected]> * Update test Signed-off-by: Shubham Sharma <[email protected]> * Update package.json Signed-off-by: Shubham Sharma <[email protected]> * Fix stupid issue Signed-off-by: Shubham Sharma <[email protected]> * Ok Signed-off-by: Shubham Sharma <[email protected]> * Fix e2e tests Signed-off-by: Shubham Sharma <[email protected]> * Remove unused vars Signed-off-by: Shubham Sharma <[email protected]> * x Signed-off-by: Shubham Sharma <[email protected]> * Remove failing test Signed-off-by: Shubham Sharma <[email protected]> * Remove unused vars Signed-off-by: Shubham Sharma <[email protected]> * Finally fix the tests Signed-off-by: Shubham Sharma <[email protected]> * Increase binding timeout Signed-off-by: Shubham Sharma <[email protected]> Signed-off-by: Shubham Sharma <[email protected]>
1 parent 8a0a204 commit 15c9fa8

File tree

17 files changed

+341
-55
lines changed

17 files changed

+341
-55
lines changed

daprdocs/content/en/js-sdk-docs/js-client/_index.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,24 @@ async function start() {
272272

273273
const pubSubName = "my-pubsub-name";
274274
const topic = "topic-a";
275-
const message = { hello: "world" };
276275

277-
// Publish Message to Topic
278-
const response = await client.pubsub.publish(pubSubName, topic, message);
276+
// Publish message to topic as text/plain
277+
const response = await client.pubsub.publish(pubSubName, topic, "hello, world!");
278+
// If publish fails, response contains the error
279+
console.log(response);
280+
281+
// Publish message to topic as application/json
282+
await client.pubsub.publish(pubSubName, topic, { hello: "world" });
283+
284+
// Publish message to topic as application/cloudevents+json
285+
// You can also use the cloudevent SDK to create cloud events https://github.com/cloudevents/sdk-javascript
286+
const cloudEvent = {
287+
specversion: "1.0",
288+
source: "/some/source",
289+
type: "example",
290+
id: "1234",
291+
};
292+
await client.pubsub.publish(pubSubName, topic, cloudEvent);
279293
}
280294

281295
start().catch((e) => {

examples/pubsub/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ dapr init
1111
# Install dependenies
1212
npm install
1313

14-
# Start a RabbitMQ Container (for the binding example part)
14+
# Start a RabbitMQ Container.
1515
# note: mgmt interface at http://localhost:15672
1616
docker run -d --rm --hostname my-rabbitmq --name my-rabbitmq \
1717
-e RABBITMQ_DEFAULT_USER=test-user -e RABBITMQ_DEFAULT_PASS=test-password \

examples/pubsub/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/pubsub/src/index.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14-
import { DaprServer, DaprClient } from "@dapr/dapr";
14+
import { DaprClient, DaprServer } from "@dapr/dapr";
1515

1616
// Common settings
1717
const serverHost = "127.0.0.1"; // App Host of this Example Server
@@ -31,11 +31,29 @@ async function start() {
3131
});
3232
await server.start();
3333

34-
// Publish a message
35-
console.log("[Dapr-JS][Example] Publishing message");
34+
// Wait for 1 second to allow the server to start.
35+
await new Promise((resolve) => setTimeout(resolve, 1000));
3636

37-
// Internally, the message will be serialized using JSON.stringify()
38-
await client.pubsub.publish("my-pubsub-component", "my-topic", { hello: "world" });
37+
let response;
38+
39+
console.log("[Dapr-JS][Example] Publishing a plain message");
40+
response = await client.pubsub.publish("my-pubsub-component", "my-topic", "hello, world!");
41+
console.log(`[Dapr-JS][Example] Publish response: ${JSON.stringify(response)}`);
42+
43+
console.log("[Dapr-JS][Example] Publishing a JSON message");
44+
response = await client.pubsub.publish("my-pubsub-component", "my-topic", { hello: "world" });
45+
console.log(`[Dapr-JS][Example] Publish response: ${JSON.stringify(response)}`);
46+
47+
const cloudEvent = {
48+
specversion: "1.0",
49+
source: "/some/source",
50+
type: "example",
51+
id: "1234",
52+
};
53+
54+
console.log("[Dapr-JS][Example] Publishing a cloud event message");
55+
response = await client.pubsub.publish("my-pubsub-component", "my-topic", cloudEvent);
56+
console.log(`[Dapr-JS][Example] Publish response: ${JSON.stringify(response)}`);
3957
}
4058

4159
start().catch((e) => {

src/implementation/Client/GRPCClient/pubsub.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { Logger } from "../../../logger/Logger";
1818
import * as SerializerUtil from "../../../utils/Serializer.util";
1919
import { KeyValueType } from "../../../types/KeyValue.type";
2020
import { createGRPCMetadata } from "../../../utils/Client.util";
21+
import { PubSubPublishResponseType } from "../../../types/pubsub/PubSubPublishResponse.type";
2122

2223
// https://docs.dapr.io/reference/api/pubsub_api/
2324
export default class GRPCClientPubSub implements IClientPubSub {
@@ -30,12 +31,21 @@ export default class GRPCClientPubSub implements IClientPubSub {
3031
this.logger = new Logger("GRPCClient", "PubSub", client.getOptions().logger);
3132
}
3233

33-
// @todo: should return a specific typed Promise<TypePubSubPublishResponse> instead of Promise<any>
34-
async publish(pubSubName: string, topic: string, data: object = {}, metadata?: KeyValueType): Promise<boolean> {
34+
async publish(
35+
pubSubName: string,
36+
topic: string,
37+
data: object | string,
38+
metadata?: KeyValueType,
39+
): Promise<PubSubPublishResponseType> {
3540
const msgService = new PublishEventRequest();
3641
msgService.setPubsubName(pubSubName);
3742
msgService.setTopic(topic);
38-
msgService.setData(SerializerUtil.serializeGrpc(data).serializedData);
43+
44+
if (data) {
45+
const serialized = SerializerUtil.serializeGrpc(data);
46+
msgService.setData(serialized.serializedData);
47+
msgService.setDataContentType(serialized.contentType);
48+
}
3949

4050
const client = await this.client.getClient();
4151
const grpcMetadata = createGRPCMetadata(metadata);
@@ -44,11 +54,10 @@ export default class GRPCClientPubSub implements IClientPubSub {
4454
client.publishEvent(msgService, grpcMetadata, (err, _res) => {
4555
if (err) {
4656
this.logger.error(`publish failed: ${err}`);
47-
return reject(false);
57+
return reject({ error: err });
4858
}
4959

50-
// https://docs.dapr.io/reference/api/pubsub_api/#expected-http-response
51-
return resolve(true);
60+
return resolve({});
5261
});
5362
});
5463
}

src/implementation/Client/HTTPClient/pubsub.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import HTTPClient from "./HTTPClient";
1515
import IClientPubSub from "../../../interfaces/Client/IClientPubSub";
1616
import { Logger } from "../../../logger/Logger";
1717
import { KeyValueType } from "../../../types/KeyValue.type";
18-
import { createHTTPMetadataQueryParam } from "../../../utils/Client.util";
18+
import { createHTTPMetadataQueryParam, getContentType } from "../../../utils/Client.util";
19+
import { PubSubPublishResponseType } from "../../../types/pubsub/PubSubPublishResponse.type";
20+
import { THTTPExecuteParams } from "../../../types/http/THTTPExecuteParams.type";
1921

2022
// https://docs.dapr.io/reference/api/pubsub_api/
2123
export default class HTTPClientPubSub implements IClientPubSub {
@@ -27,22 +29,31 @@ export default class HTTPClientPubSub implements IClientPubSub {
2729
this.logger = new Logger("HTTPClient", "PubSub", client.getOptions().logger);
2830
}
2931

30-
async publish(pubSubName: string, topic: string, data: object = {}, metadata?: KeyValueType): Promise<boolean> {
32+
async publish(
33+
pubSubName: string,
34+
topic: string,
35+
data: object | string,
36+
metadata?: KeyValueType,
37+
): Promise<PubSubPublishResponseType> {
3138
const queryParams = createHTTPMetadataQueryParam(metadata);
39+
const params: THTTPExecuteParams = {
40+
method: "POST",
41+
headers: {
42+
"Content-Type": getContentType(data),
43+
},
44+
};
45+
46+
if (data) {
47+
params.body = JSON.stringify(data);
48+
}
3249

3350
try {
34-
await this.client.execute(`/publish/${pubSubName}/${topic}?${queryParams}`, {
35-
method: "POST",
36-
headers: {
37-
"Content-Type": "application/json",
38-
},
39-
body: JSON.stringify(data),
40-
});
51+
await this.client.execute(`/publish/${pubSubName}/${topic}?${queryParams}`, params);
4152
} catch (e: any) {
4253
this.logger.error(`publish failed: ${e}`);
43-
return false;
54+
return { error: e };
4455
}
4556

46-
return true;
57+
return {};
4758
}
4859
}

src/implementation/Server/HTTPServer/HTTPServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ export default class HTTPServer implements IServer {
112112
this.serverAddress = `http://${host}:${port}`;
113113

114114
// Add PubSub Routes
115-
this.logger.info(`Registering ${this.serverImpl.pubSubSubscriptions.length} PubSub Subscriptions`);
115+
this.logger.info(`Registering ${Object.keys(this.serverImpl.pubSubSubscriptions).length} PubSub Subscriptions`);
116116
this.server.get("/dapr/subscribe", (req, res) => {
117117
res.send(this.serverImpl.generateDaprPubSubSubscriptionList());
118-
this.logger.info(`Registered ${this.serverImpl.pubSubSubscriptions.length} PubSub Subscriptions`);
118+
this.logger.info(`Registered ${Object.keys(this.serverImpl.pubSubSubscriptions).length} PubSub Subscriptions`);
119119
});
120120

121121
this.isInitialized = true;

src/interfaces/Client/IClientPubSub.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,25 @@ limitations under the License.
1212
*/
1313

1414
import { KeyValueType } from "../../types/KeyValue.type";
15+
import { PubSubPublishResponseType } from "../../types/pubsub/PubSubPublishResponse.type";
1516

1617
export default interface IClientPubSub {
1718
/**
18-
* Publish a message to a topic.
19-
* @param pubSubName name of the pubsub
19+
* Publish data to a topic.
20+
* If the data is a valid cloud event, it will be published with Content-Type: application/cloudevents+json.
21+
* Otherwise, if it's a JSON object, it will be published with Content-Type: application/json.
22+
* Otherwise, it will be published with Content-Type: text/plain.
23+
* @param pubSubName name of the pubsub component
2024
* @param topic name of the topic
2125
* @param data data to publish
2226
* @param metadata metadata for the message
27+
*
28+
* @returns response from the publish
2329
*/
24-
publish(pubSubName: string, topic: string, data?: object, metadata?: KeyValueType): Promise<boolean>;
30+
publish(
31+
pubSubName: string,
32+
topic: string,
33+
data?: object | string,
34+
metadata?: KeyValueType,
35+
): Promise<PubSubPublishResponseType>;
2536
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
Copyright 2022 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
/**
15+
* PubSubPublishResponseType defines the response from a publish.
16+
*/
17+
export type PubSubPublishResponseType = {
18+
// error contains the error if the publish failed.
19+
error?: Error;
20+
};

src/utils/Client.util.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,41 @@ export function createConfigurationType(configDict: Map<string, ConfigurationIte
7979
});
8080
return configMap;
8181
}
82+
83+
/**
84+
* Checks if the input object is a valid Cloud Event.
85+
* A valid Cloud Event is a JSON object that contains id, source, type, and specversion.
86+
* See https://github.com/cloudevents/spec/blob/v1.0/spec.md#required-attributes
87+
* @param str input object
88+
* @returns true if the object is a valid Cloud Event
89+
*/
90+
function isCloudEvent(obj: object): boolean {
91+
const requiredAttributes = ["id", "source", "type", "specversion"];
92+
return (
93+
typeof obj === "object" &&
94+
obj !== null &&
95+
requiredAttributes.every((attr) => {
96+
return Object.prototype.hasOwnProperty.call(obj, attr);
97+
})
98+
);
99+
}
100+
101+
/**
102+
* Gets the Content-Type for the input data.
103+
* If the data is a valid Cloud Event, the Content-Type is "application/cloudevents+json".
104+
* If the data is a JSON object, the Content-Type is "application/json".
105+
* Otherwise, the Content-Type is "text/plain".
106+
* @param data input data
107+
* @returns Content-Type header value
108+
*/
109+
export function getContentType(data: object | string): string {
110+
if (typeof data === "string") {
111+
return "text/plain";
112+
}
113+
114+
if (isCloudEvent(data)) {
115+
return "application/cloudevents+json";
116+
} else {
117+
return "application/json";
118+
}
119+
}

0 commit comments

Comments
 (0)