Skip to content

Commit 75859d8

Browse files
authored
Add bulk publish support (#430)
* WIP Add bulk publish support Signed-off-by: Shubham Sharma <[email protected]> * WIP add proto and partial grpc impl Signed-off-by: Shubham Sharma <[email protected]> * WIP more Signed-off-by: Shubham Sharma <[email protected]> * Change HTTP client behavior and update bulk pubsub client Signed-off-by: Shubham Sharma <[email protected]> * Prettify!!! Signed-off-by: Shubham Sharma <[email protected]> * Fix UX and add examples Signed-off-by: Shubham Sharma <[email protected]> * Fix example gRPC instructions Signed-off-by: Shubham Sharma <[email protected]> * Add docs Signed-off-by: Shubham Sharma <[email protected]> * Revert example to HTTP Signed-off-by: Shubham Sharma <[email protected]> * Refactor creating a response Signed-off-by: Shubham Sharma <[email protected]> * Refactor types Signed-off-by: Shubham Sharma <[email protected]> * Refactor types (2) Signed-off-by: Shubham Sharma <[email protected]> * Add util unit tests Signed-off-by: Shubham Sharma <[email protected]> * Fix build Signed-off-by: Shubham Sharma <[email protected]> * Add e2e test suite Signed-off-by: Shubham Sharma <[email protected]> * Remove duplicate code Signed-off-by: Shubham Sharma <[email protected]> * Fix package.json Signed-off-by: Shubham Sharma <[email protected]> * Remove log Signed-off-by: Shubham Sharma <[email protected]> * Update Dapr reference in workflow Signed-off-by: Shubham Sharma <[email protected]> * Update hash Signed-off-by: Shubham Sharma <[email protected]> * Nit changes based on SIG spec Signed-off-by: Shubham Sharma <[email protected]> * Address comments Signed-off-by: Shubham Sharma <[email protected]> * Fix e2e test Signed-off-by: Shubham Sharma <[email protected]> * Update proto and response Signed-off-by: Shubham Sharma <[email protected]> * Fix unit test Signed-off-by: Shubham Sharma <[email protected]> * update example Signed-off-by: Shubham Sharma <[email protected]> * Re-add gRPC test Signed-off-by: Shubham Sharma <[email protected]> * Fix gRPC error handling Signed-off-by: Shubham Sharma <[email protected]> * Add e2e test Signed-off-by: Shubham Sharma <[email protected]> * Update comment Signed-off-by: Shubham Sharma <[email protected]> * Fix build Signed-off-by: Shubham Sharma <[email protected]> * Fix e2e test merge issue Signed-off-by: Shubham Sharma <[email protected]> * Revert test e2e workflow ref Signed-off-by: Shubham Sharma <[email protected]> --------- Signed-off-by: Shubham Sharma <[email protected]>
1 parent 25771f2 commit 75859d8

File tree

17 files changed

+582
-42
lines changed

17 files changed

+582
-42
lines changed

daprdocs/content/en/js-sdk-docs/js-client/_index.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,36 @@ async function start() {
304304
id: "1234",
305305
};
306306
await client.pubsub.publish(pubSubName, topic, cloudEvent);
307+
308+
// Publish multiple messages to a topic as text/plain
309+
await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
310+
311+
// Publish multiple messages to a topic as application/json
312+
await client.pubsub.publishBulk(pubSubName, topic, [
313+
{ hello: "message 1" },
314+
{ hello: "message 2" },
315+
{ hello: "message 3" },
316+
]);
317+
318+
// Publish multiple messages with explicit bulk publish messages
319+
const bulkPublishMessages = [
320+
{
321+
entryID: "entry-1",
322+
contentType: "application/json",
323+
event: { hello: "foo message 1" },
324+
},
325+
{
326+
entryID: "entry-2",
327+
contentType: "application/cloudevents+json",
328+
event: { ...cloudEvent, data: "foo message 2", datacontenttype: "text/plain" },
329+
},
330+
{
331+
entryID: "entry-3",
332+
contentType: "text/plain",
333+
event: "foo message 3",
334+
},
335+
];
336+
await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
307337
}
308338

309339
start().catch((e) => {

examples/pubsub/README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,17 @@ npm run start:dapr-http
2929

3030
By default, the example uses HTTP. To use gRPC instead:
3131

32-
- Add `CommunicationProtocolEnum.GRPC` to the DaprClient object creation:
32+
- Add `CommunicationProtocolEnum.GRPC` to the DaprServer and DaprClient object creations and update the client port to `DAPR_GRPC_PORT`:
3333

3434
```typescript
35-
const client = new DaprClient(daprHost, process.env.DAPR_HTTP_PORT, CommunicationProtocolEnum.GRPC);
35+
const server = new DaprServer(
36+
serverHost,
37+
serverPort,
38+
daprHost,
39+
process.env.DAPR_GRPC_PORT,
40+
CommunicationProtocolEnum.GRPC,
41+
);
42+
const client = new DaprClient(daprHost, process.env.DAPR_GRPC_PORT, CommunicationProtocolEnum.GRPC);
3643
```
3744

3845
- To run:

examples/pubsub/src/index.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,37 @@ async function start() {
5454
console.log("[Dapr-JS][Example] Publishing a cloud event message");
5555
response = await client.pubsub.publish("my-pubsub-component", "my-topic", cloudEvent);
5656
console.log(`[Dapr-JS][Example] Publish response: ${JSON.stringify(response)}`);
57+
58+
console.log("[Dapr-JS][Example] Bulk publishing multiple plain messages");
59+
const messages = ["message 1", "message 2", "message 3"];
60+
response = await client.pubsub.publishBulk("my-pubsub-component", "my-topic", messages);
61+
console.log(`[Dapr-JS][Example] Bulk publish response: ${JSON.stringify(response)}`);
62+
63+
console.log("[Dapr-JS][Example] Bulk publishing multiple JSON messages");
64+
const jsonMessages = [{ hello: "message 1" }, { hello: "message 2" }, { hello: "message 3" }];
65+
response = await client.pubsub.publishBulk("my-pubsub-component", "my-topic", jsonMessages);
66+
console.log(`[Dapr-JS][Example] Bulk publish response: ${JSON.stringify(response)}`);
67+
68+
console.log("[Dapr-JS][Example] Bulk publishing with entryID and custom content type");
69+
const bulkPublishMessages = [
70+
{
71+
entryID: "entry-1",
72+
contentType: "application/json",
73+
event: { hello: "foo message 1" },
74+
},
75+
{
76+
entryID: "entry-2",
77+
contentType: "application/cloudevents+json",
78+
event: { ...cloudEvent, data: "foo message 2", datacontenttype: "text/plain" },
79+
},
80+
{
81+
entryID: "entry-3",
82+
contentType: "text/plain",
83+
event: "foo message 3",
84+
},
85+
];
86+
response = await client.pubsub.publishBulk("my-pubsub-component", "my-topic", bulkPublishMessages);
87+
console.log(`[Dapr-JS][Example] Bulk publish response: ${JSON.stringify(response)}`);
5788
}
5889

5990
start().catch((e) => {

package-lock.json

Lines changed: 14 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/implementation/Client/GRPCClient/pubsub.ts

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@ limitations under the License.
1212
*/
1313

1414
import GRPCClient from "./GRPCClient";
15-
import { PublishEventRequest } from "../../../proto/dapr/proto/runtime/v1/dapr_pb";
15+
import {
16+
BulkPublishRequest,
17+
BulkPublishRequestEntry,
18+
PublishEventRequest,
19+
} from "../../../proto/dapr/proto/runtime/v1/dapr_pb";
1620
import IClientPubSub from "../../../interfaces/Client/IClientPubSub";
1721
import { Logger } from "../../../logger/Logger";
1822
import * as SerializerUtil from "../../../utils/Serializer.util";
1923
import { KeyValueType } from "../../../types/KeyValue.type";
20-
import { addMetadataToMap } from "../../../utils/Client.util";
24+
import { addMetadataToMap, getBulkPublishEntries, getBulkPublishResponse } from "../../../utils/Client.util";
2125
import { PubSubPublishResponseType } from "../../../types/pubsub/PubSubPublishResponse.type";
26+
import { PubSubBulkPublishResponse } from "../../../types/pubsub/PubSubBulkPublishResponse.type";
27+
import { PubSubBulkPublishMessage } from "../../../types/pubsub/PubSubBulkPublishMessage.type";
2228

2329
// https://docs.dapr.io/reference/api/pubsub_api/
2430
export default class GRPCClientPubSub implements IClientPubSub {
@@ -61,4 +67,54 @@ export default class GRPCClientPubSub implements IClientPubSub {
6167
});
6268
});
6369
}
70+
71+
async publishBulk(
72+
pubSubName: string,
73+
topic: string,
74+
messages: PubSubBulkPublishMessage[],
75+
metadata?: KeyValueType | undefined,
76+
): Promise<PubSubBulkPublishResponse> {
77+
const bulkPublishRequest = new BulkPublishRequest();
78+
bulkPublishRequest.setPubsubName(pubSubName);
79+
bulkPublishRequest.setTopic(topic);
80+
81+
const entries = getBulkPublishEntries(messages);
82+
const serializedEntries = entries.map((entry) => {
83+
const serialized = SerializerUtil.serializeGrpc(entry.event);
84+
const bulkPublishEntry = new BulkPublishRequestEntry();
85+
bulkPublishEntry.setEvent(serialized.serializedData);
86+
bulkPublishEntry.setContentType(serialized.contentType);
87+
bulkPublishEntry.setEntryId(entry.entryID);
88+
return bulkPublishEntry;
89+
});
90+
91+
bulkPublishRequest.setEntriesList(serializedEntries);
92+
addMetadataToMap(bulkPublishRequest.getMetadataMap(), metadata);
93+
94+
const client = await this.client.getClient();
95+
return new Promise((resolve, _reject) => {
96+
client.bulkPublishEventAlpha1(bulkPublishRequest, (err, res) => {
97+
if (err) {
98+
return resolve(getBulkPublishResponse({ entries: entries, error: err }));
99+
}
100+
101+
const failedEntries = res.getFailedentriesList();
102+
if (failedEntries.length > 0) {
103+
return resolve(
104+
getBulkPublishResponse({
105+
entries: entries,
106+
response: {
107+
failedEntries: failedEntries.map((entry) => ({
108+
entryID: entry.getEntryId(),
109+
error: entry.getError(),
110+
})),
111+
},
112+
}),
113+
);
114+
}
115+
116+
return resolve({ failedMessages: [] });
117+
});
118+
});
119+
}
64120
}

src/implementation/Client/HTTPClient/HTTPClient.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,10 @@ export default class HTTPClient implements IClient {
197197
// 2XX -> OK; 3XX -> Redirects and Found
198198
if (res.status >= 200 && res.status <= 399) {
199199
return txtParsed;
200-
}
201-
202-
// All the others
203-
else {
204-
this.logger.debug(`Execute response with status: ${res.status} and text: ${txtParsed}`);
200+
} else {
205201
throw new Error(
206202
JSON.stringify({
207-
error: "UNKNOWN",
203+
error: res.statusText,
208204
error_msg: txt,
209205
status: res.status,
210206
}),

src/implementation/Client/HTTPClient/pubsub.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,15 @@ import HTTPClient from "./HTTPClient";
1515
import IClientPubSub from "../../../interfaces/Client/IClientPubSub";
1616
import { Logger } from "../../../logger/Logger";
1717
import { KeyValueType } from "../../../types/KeyValue.type";
18-
import { createHTTPMetadataQueryParam } from "../../../utils/Client.util";
18+
import {
19+
createHTTPMetadataQueryParam,
20+
getBulkPublishEntries,
21+
getBulkPublishResponse,
22+
} from "../../../utils/Client.util";
23+
import { THTTPExecuteParams } from "../../../types/http/THTTPExecuteParams.type";
24+
import { PubSubBulkPublishResponse } from "../../../types/pubsub/PubSubBulkPublishResponse.type";
25+
import { PubSubBulkPublishMessage } from "../../../types/pubsub/PubSubBulkPublishMessage.type";
26+
import { PubSubBulkPublishEntry } from "../../../types/pubsub/PubSubBulkPublishEntry.type";
1927
import { PubSubPublishResponseType } from "../../../types/pubsub/PubSubPublishResponse.type";
2028

2129
// https://docs.dapr.io/reference/api/pubsub_api/
@@ -48,4 +56,55 @@ export default class HTTPClientPubSub implements IClientPubSub {
4856

4957
return {};
5058
}
59+
60+
async publishBulk(
61+
pubSubName: string,
62+
topic: string,
63+
messages: PubSubBulkPublishMessage[],
64+
metadata?: KeyValueType | undefined,
65+
): Promise<PubSubBulkPublishResponse> {
66+
const queryParams = createHTTPMetadataQueryParam(metadata);
67+
const params: THTTPExecuteParams = {
68+
method: "POST",
69+
headers: {
70+
"Content-Type": "application/json",
71+
},
72+
};
73+
74+
const entries = getBulkPublishEntries(messages);
75+
params.body = JSON.stringify(entries);
76+
77+
try {
78+
await this.client.executeWithApiVersion(
79+
"v1.0-alpha1",
80+
`/publish/bulk/${pubSubName}/${topic}?${queryParams}`,
81+
params,
82+
);
83+
} catch (error: any) {
84+
this.logger.error(`Failure publishing bulk messages: ${error}`);
85+
return this.handleBulkPublishError(entries, error);
86+
}
87+
88+
// If no error is thrown, all messages were published successfully
89+
return { failedMessages: [] };
90+
}
91+
92+
private async handleBulkPublishError(
93+
entries: PubSubBulkPublishEntry[],
94+
error: any,
95+
): Promise<PubSubBulkPublishResponse> {
96+
try {
97+
// If the error is returned by the bulk publish API,
98+
// parse the error message and return the response
99+
const err = JSON.parse(error.message);
100+
if (err.error_msg) {
101+
const bulkPublishResponse = JSON.parse(err.error_msg);
102+
return getBulkPublishResponse({ entries: entries, response: bulkPublishResponse });
103+
}
104+
} catch (_innerError: any) {
105+
// This can indicate a general error with the request (e.g., network error, invalid pubsub name, etc.).
106+
}
107+
108+
return getBulkPublishResponse({ entries: entries, error: error });
109+
}
51110
}

src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import GRPCClient from "./implementation/Client/GRPCClient/GRPCClient";
2828
import HTTPClient from "./implementation/Client/HTTPClient/HTTPClient";
2929
import { InvokerOptions } from "./types/InvokerOptions.type";
3030
import { DaprInvokerCallbackContent, DaprInvokerCallbackFunction } from "./types/DaprInvokerCallback.type";
31+
import { PubSubBulkPublishResponse } from "./types/pubsub/PubSubBulkPublishResponse.type";
32+
import { PubSubBulkPublishMessage } from "./types/pubsub/PubSubBulkPublishMessage.type";
3133
export {
3234
DaprClient,
3335
DaprServer,
@@ -47,4 +49,6 @@ export {
4749
InvokerOptions,
4850
DaprInvokerCallbackFunction as TypeDaprInvokerCallback,
4951
DaprInvokerCallbackContent,
52+
PubSubBulkPublishMessage,
53+
PubSubBulkPublishResponse,
5054
};

0 commit comments

Comments
 (0)