Skip to content

Commit aac0a45

Browse files
authored
feat(pubsub): Allow setting explicit content-type during publish operations (#466)
* Add publish options to DaprClient Signed-off-by: Shubham Sharma <[email protected]> * Add e2e test Signed-off-by: Shubham Sharma <[email protected]> * Add unit test Signed-off-by: Shubham Sharma <[email protected]> * Fix example Signed-off-by: Shubham Sharma <[email protected]> * Remove debug logs Signed-off-by: Shubham Sharma <[email protected]> * Use content type in serializer Signed-off-by: Shubham Sharma <[email protected]> * Add unit test Signed-off-by: Shubham Sharma <[email protected]> * Run prettier Signed-off-by: Shubham Sharma <[email protected]> * Fix test Signed-off-by: Shubham Sharma <[email protected]> --------- Signed-off-by: Shubham Sharma <[email protected]>
1 parent 5d767c6 commit aac0a45

File tree

12 files changed

+149
-37
lines changed

12 files changed

+149
-37
lines changed

daprdocs/content/en/js-sdk-docs/js-client/_index.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,13 +304,18 @@ async function start() {
304304
const topic = "topic-a";
305305

306306
// Publish message to topic as text/plain
307+
// Note, the content type is inferred from the message type unless specified explicitly
307308
const response = await client.pubsub.publish(pubSubName, topic, "hello, world!");
308309
// If publish fails, response contains the error
309310
console.log(response);
310311

311312
// Publish message to topic as application/json
312313
await client.pubsub.publish(pubSubName, topic, { hello: "world" });
313314

315+
// Publish a JSON message as plain text
316+
const options = { contentType: "text/plain" };
317+
await client.pubsub.publish(pubSubName, topic, { hello: "world" }, options);
318+
314319
// Publish message to topic as application/cloudevents+json
315320
// You can also use the cloudevent SDK to create cloud events https://github.com/cloudevents/sdk-javascript
316321
const cloudEvent = {
@@ -321,6 +326,10 @@ async function start() {
321326
};
322327
await client.pubsub.publish(pubSubName, topic, cloudEvent);
323328

329+
// Publish a cloudevent as raw payload
330+
const options = { metadata: { rawPayload: true } };
331+
await client.pubsub.publish(pubSubName, topic, "hello, world!", options);
332+
324333
// Publish multiple messages to a topic as text/plain
325334
await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
326335

examples/pubsub/package-lock.json

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/pubsub/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@ async function start() {
3838
});
3939

4040
// Publish multiple messages to a topic with default config.
41-
await client.pubsub.subscribeBulk("my-pubsub-component", "my-topic", async (data: Record<string, any>) => {
41+
await server.pubsub.subscribeBulk("my-pubsub-component", "my-topic", async (data: Record<string, any>) => {
4242
// The library parses JSON when possible.
4343
console.log(`[Dapr-JS][Example] Received on subscription: ${JSON.stringify(data)}`);
4444
});
4545

4646
// Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
47-
await client.pubsub.subscribeBulk(
47+
await server.pubsub.subscribeBulk(
4848
"my-pubsub-component",
4949
"my-topic",
5050
async (data: Record<string, any>) => {

src/implementation/Client/GRPCClient/pubsub.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { addMetadataToMap, getBulkPublishEntries, getBulkPublishResponse } from
2525
import { PubSubPublishResponseType } from "../../../types/pubsub/PubSubPublishResponse.type";
2626
import { PubSubBulkPublishResponse } from "../../../types/pubsub/PubSubBulkPublishResponse.type";
2727
import { PubSubBulkPublishMessage } from "../../../types/pubsub/PubSubBulkPublishMessage.type";
28+
import { PubSubPublishOptions } from "../../../types/pubsub/PubSubPublishOptions.type";
2829

2930
// https://docs.dapr.io/reference/api/pubsub_api/
3031
export default class GRPCClientPubSub implements IClientPubSub {
@@ -41,19 +42,19 @@ export default class GRPCClientPubSub implements IClientPubSub {
4142
pubSubName: string,
4243
topic: string,
4344
data: object | string,
44-
metadata?: KeyValueType,
45+
options: PubSubPublishOptions = {},
4546
): Promise<PubSubPublishResponseType> {
4647
const msgService = new PublishEventRequest();
4748
msgService.setPubsubName(pubSubName);
4849
msgService.setTopic(topic);
4950

5051
if (data) {
51-
const serialized = SerializerUtil.serializeGrpc(data);
52+
const serialized = SerializerUtil.serializeGrpc(data, options.contentType);
5253
msgService.setData(serialized.serializedData);
5354
msgService.setDataContentType(serialized.contentType);
5455
}
5556

56-
addMetadataToMap(msgService.getMetadataMap(), metadata);
57+
addMetadataToMap(msgService.getMetadataMap(), options.metadata);
5758

5859
const client = await this.client.getClient();
5960
return new Promise((resolve, reject) => {

src/implementation/Client/HTTPClient/HTTPClient.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,11 @@ export default class HTTPClient implements IClient {
132132

133133
// Set Body and Content-Type Header
134134
if (params?.body) {
135-
const { serializedData, contentType } = SerializerUtil.serializeHttp(params?.body);
136-
137-
// Don't overwrite it
138-
if (!params?.headers?.["Content-Type"]) {
139-
clientOptions.headers["Content-Type"] = contentType;
140-
}
135+
// If content-type is already present, use that to serialize the data.
136+
const headerContentType = params?.headers?.["Content-Type"] ?? undefined;
137+
const { serializedData, contentType } = SerializerUtil.serializeHttp(params?.body, headerContentType);
141138

139+
clientOptions.headers["Content-Type"] = contentType;
142140
clientOptions.body = serializedData;
143141
}
144142

src/implementation/Client/HTTPClient/pubsub.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { PubSubBulkPublishResponse } from "../../../types/pubsub/PubSubBulkPubli
2525
import { PubSubBulkPublishMessage } from "../../../types/pubsub/PubSubBulkPublishMessage.type";
2626
import { PubSubBulkPublishEntry } from "../../../types/pubsub/PubSubBulkPublishEntry.type";
2727
import { PubSubPublishResponseType } from "../../../types/pubsub/PubSubPublishResponse.type";
28+
import { PubSubPublishOptions } from "../../../types/pubsub/PubSubPublishOptions.type";
2829

2930
// https://docs.dapr.io/reference/api/pubsub_api/
3031
export default class HTTPClientPubSub implements IClientPubSub {
@@ -40,14 +41,22 @@ export default class HTTPClientPubSub implements IClientPubSub {
4041
pubSubName: string,
4142
topic: string,
4243
data: object | string,
43-
metadata?: KeyValueType,
44+
options: PubSubPublishOptions = {},
4445
): Promise<PubSubPublishResponseType> {
45-
const queryParams = createHTTPMetadataQueryParam(metadata);
46+
const queryParams = createHTTPMetadataQueryParam(options.metadata);
47+
48+
// Set content type if provided.
49+
// If not, HTTPClient will infer it from the data.
50+
const headers: KeyValueType = {};
51+
if (options.contentType) {
52+
headers["Content-Type"] = options.contentType;
53+
}
4654

4755
try {
4856
await this.client.execute(`/publish/${pubSubName}/${topic}?${queryParams}`, {
4957
method: "POST",
5058
body: data,
59+
headers,
5160
});
5261
} catch (e: any) {
5362
this.logger.error(`publish failed: ${e}`);
@@ -72,7 +81,7 @@ export default class HTTPClientPubSub implements IClientPubSub {
7281
};
7382

7483
const entries = getBulkPublishEntries(messages);
75-
params.body = JSON.stringify(entries);
84+
params.body = entries;
7685

7786
try {
7887
await this.client.executeWithApiVersion(

src/interfaces/Client/IClientPubSub.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
import { KeyValueType } from "../../types/KeyValue.type";
1515
import { PubSubBulkPublishMessage } from "../../types/pubsub/PubSubBulkPublishMessage.type";
1616
import { PubSubBulkPublishResponse } from "../../types/pubsub/PubSubBulkPublishResponse.type";
17+
import { PubSubPublishOptions } from "../../types/pubsub/PubSubPublishOptions.type";
1718
import { PubSubPublishResponseType } from "../../types/pubsub/PubSubPublishResponse.type";
1819

1920
export default interface IClientPubSub {
@@ -33,7 +34,7 @@ export default interface IClientPubSub {
3334
pubSubName: string,
3435
topic: string,
3536
data?: object | string,
36-
metadata?: KeyValueType,
37+
options?: PubSubPublishOptions,
3738
): Promise<PubSubPublishResponseType>;
3839

3940
/**
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
import { KeyValueType } from "../KeyValue.type";
15+
16+
export type PubSubPublishOptions = {
17+
/**
18+
* The content type of the message.
19+
* This is optional and will be inferred from the payload if not provided.
20+
*/
21+
contentType?: string;
22+
23+
/**
24+
* Metadata to be passed to the publish operation.
25+
*/
26+
metadata?: KeyValueType;
27+
};

src/utils/Serializer.util.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,17 @@ limitations under the License.
1414
import { getContentType } from "./Client.util";
1515
import { BodyInit } from "node-fetch";
1616

17-
export function serializeGrpc(data: any): { serializedData: Buffer; contentType: string } {
17+
/**
18+
* Serialize data for gRPC requests.
19+
* If no content type is provided, it will be inferred from the data.
20+
* @param data data to serialize
21+
* @param inContentType content type of the data
22+
* @returns serialized data and content type
23+
*/
24+
export function serializeGrpc(data: any, inContentType?: string): { serializedData: Buffer; contentType: string } {
1825
let serializedData: Buffer = data;
1926

20-
const contentType = getContentType(data);
27+
const contentType = inContentType ?? getContentType(data);
2128

2229
switch (contentType) {
2330
case "application/json":
@@ -36,14 +43,24 @@ export function serializeGrpc(data: any): { serializedData: Buffer; contentType:
3643
return { serializedData, contentType };
3744
}
3845

39-
export function serializeHttp(data: any): {
46+
/**
47+
* Serialize data for HTTP requests.
48+
* If no content type is provided, it will be inferred from the data.
49+
* @param data data to serialize
50+
* @param inContentType content type of the data
51+
* @returns serialized data and content type
52+
*/
53+
export function serializeHttp(
54+
data: any,
55+
inContentType?: string,
56+
): {
4057
// https://developer.mozilla.org/en-US/docs/Web/API/fetch#body
4158
serializedData: BodyInit;
4259
contentType: string;
4360
} {
4461
let serializedData: BodyInit;
4562

46-
const contentType = getContentType(data);
63+
const contentType = inContentType ?? getContentType(data);
4764

4865
switch (contentType) {
4966
case "application/json":

test/e2e/common/server.test.ts

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ describe("common/server", () => {
154154

155155
describe("pubsub", () => {
156156
runIt(
157-
"should by default mark messagess as processed successfully (SUCCESS) and the same message should not be received anymore",
157+
"should mark messages as processed successfully (SUCCESS) by-default, and the same message should not be received anymore",
158158
async (server: DaprServer, protocol: string) => {
159159
const res = await server.client.pubsub.publish(pubSubName, getTopic(topicWithStatusCb, protocol), "SUCCESS");
160160
expect(res.error).toBeUndefined();
@@ -170,7 +170,7 @@ describe("common/server", () => {
170170
);
171171

172172
runIt(
173-
"should mark messagess as retried (RETRY) and the same message should be received again until we send SUCCESS",
173+
"should mark messages as retried (RETRY), and the same message should be received again until we send SUCCESS",
174174
async (server: DaprServer, protocol: string) => {
175175
const res = await server.client.pubsub.publish(
176176
pubSubName,
@@ -189,7 +189,7 @@ describe("common/server", () => {
189189
);
190190

191191
runIt(
192-
"should mark messagess as dropped (DROP) and the message should be deadlettered",
192+
"should mark messages as dropped (DROP), and the message should be deadlettered",
193193
async (server: DaprServer, protocol: string) => {
194194
const res = await server.client.pubsub.publish(pubSubName, getTopic(topicWithStatusCb, protocol), "DROP");
195195
expect(res.error).toBeUndefined();
@@ -229,7 +229,7 @@ describe("common/server", () => {
229229
{
230230
message: "Message 1!",
231231
},
232-
{ rawPayload: "true" },
232+
{ metadata: { rawPayload: "true" } },
233233
);
234234

235235
const res2 = await server.client.pubsub.publish(
@@ -238,7 +238,7 @@ describe("common/server", () => {
238238
{
239239
message: "Message 2!",
240240
},
241-
{ rawPayload: "true" },
241+
{ metadata: { rawPayload: "true" } },
242242
);
243243

244244
expect(res1.error).toBeUndefined();
@@ -320,7 +320,7 @@ describe("common/server", () => {
320320
{
321321
message: "Message 1!",
322322
},
323-
{ rawPayload: "true" },
323+
{ metadata: { rawPayload: "true" } },
324324
);
325325

326326
const res2 = await server.client.pubsub.publish(
@@ -329,7 +329,7 @@ describe("common/server", () => {
329329
{
330330
message: "Message 2!",
331331
},
332-
{ rawPayload: "true" },
332+
{ metadata: { rawPayload: "true" } },
333333
);
334334

335335
expect(res1.error).toBeUndefined();
@@ -355,7 +355,7 @@ describe("common/server", () => {
355355
expect(mockSubscribeHandler.mock.calls[0][0]).toEqual({ message: "Hello, world!" });
356356
});
357357

358-
runIt("should be able to send and receive cloud events", async (server: DaprServer, protocol: string) => {
358+
runIt("should be able to send and receive cloudevents", async (server: DaprServer, protocol: string) => {
359359
const ce = {
360360
specversion: "1.0",
361361
type: "com.github.pull.create",
@@ -375,6 +375,28 @@ describe("common/server", () => {
375375
expect(mockSubscribeHandler.mock.calls[0][0]).toEqual("Hello, world!");
376376
});
377377

378+
runIt("should be able to send cloudevents as JSON and receive it", async (server: DaprServer, protocol: string) => {
379+
const ce = {
380+
specversion: "1.0",
381+
type: "com.github.pull.create",
382+
source: "https://github.com/cloudevents/spec/pull",
383+
id: "A234-1234-1234",
384+
data: "Hello, world!",
385+
datacontenttype: "text/plain",
386+
};
387+
388+
const options = { contentType: "application/json" };
389+
const res = await server.client.pubsub.publish(pubSubName, getTopic(topicDefault, protocol), ce, options);
390+
expect(res.error).toBeUndefined();
391+
392+
// Delay a bit for event to arrive
393+
await new Promise((resolve, _reject) => setTimeout(resolve, 500));
394+
expect(mockSubscribeHandler.mock.calls.length).toBe(1);
395+
// The cloudevent should contain an inner cloudevent since the content type was application/json
396+
const innerCe: any = mockSubscribeHandler.mock.calls[0][0];
397+
expect(innerCe["data"]).toEqual("Hello, world!");
398+
});
399+
378400
runIt(
379401
"should be able to send plain events and receive as raw payload",
380402
async (server: DaprServer, protocol: string) => {
@@ -418,7 +440,7 @@ describe("common/server", () => {
418440
pubSubName,
419441
getTopic(topicRawPayload, protocol),
420442
"Hello, world!",
421-
{ rawPayload: "true" },
443+
{ metadata: { rawPayload: "true" } },
422444
);
423445
expect(res.error).toBeUndefined();
424446

@@ -437,7 +459,7 @@ describe("common/server", () => {
437459
pubSubName,
438460
getTopic(topicRawPayload, protocol),
439461
{ message: "Hello, world!" },
440-
{ rawPayload: "true" },
462+
{ metadata: { rawPayload: "true" } },
441463
);
442464
expect(res.error).toBeUndefined();
443465

0 commit comments

Comments
 (0)