Skip to content

Commit 7cf5c32

Browse files
authored
fix(implementation): Check if data is Buffer already (#293)
* fix(implementation): Check if data is Buffer already Sometimes we need to send bytes to downstream, For example sending data with avro format to Kafka, we should change its format anymore. Fixes #292 Signed-off-by: Alex Ji <[email protected]> * fix(implementation): Add unit test and fix invoker as well Signed-off-by: Alex Ji <[email protected]> * data should be const Signed-off-by: Alex Ji <[email protected]> * Change serializeGrpc method so that it can return both the data and the content-type based on if it's a Buffer or not. Signed-off-by: Alex Ji <[email protected]> * Refactor Serializer.util.ts Signed-off-by: Alex Ji <[email protected]>
1 parent 5780786 commit 7cf5c32

File tree

5 files changed

+58
-4
lines changed

5 files changed

+58
-4
lines changed

src/implementation/Client/GRPCClient/binding.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
import GRPCClient from './GRPCClient';
1515
import { InvokeBindingRequest, InvokeBindingResponse } from '../../../proto/dapr/proto/runtime/v1/dapr_pb';
1616
import IClientBinding from '../../../interfaces/Client/IClientBinding';
17+
import * as SerializerUtil from "../../../utils/Serializer.util";
1718

1819
// https://docs.dapr.io/reference/api/bindings_api/
1920
export default class GRPCClientBinding implements IClientBinding {
@@ -30,7 +31,7 @@ export default class GRPCClientBinding implements IClientBinding {
3031
const msgService = new InvokeBindingRequest();
3132
msgService.setName(bindingName);
3233
msgService.setOperation(operation);
33-
msgService.setData(Buffer.from(JSON.stringify(data), "utf-8"));
34+
msgService.setData(SerializerUtil.serializeGrpc(data).serializedData);
3435

3536
return new Promise((resolve, reject) => {
3637
const client = this.client.getClient();

src/implementation/Client/GRPCClient/invoker.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { HTTPExtension, InvokeRequest, InvokeResponse } from '../../../proto/dap
1919
import { InvokeServiceRequest } from '../../../proto/dapr/proto/runtime/v1/dapr_pb';
2020
import * as HttpVerbUtil from "../../../utils/HttpVerb.util";
2121
import IClientInvoker from '../../../interfaces/Client/IClientInvoker';
22+
import * as SerializerUtil from "../../../utils/Serializer.util"
2223

2324
// https://docs.dapr.io/reference/api/service_invocation_api/
2425
export default class GRPCClientInvoker implements IClientInvoker {
@@ -54,13 +55,14 @@ export default class GRPCClientInvoker implements IClientInvoker {
5455
httpExtension.setVerb(HttpVerbUtil.convertHttpVerbStringToNumber(method));
5556

5657
const msgSerialized = new Any();
57-
msgSerialized.setValue(Buffer.from(JSON.stringify(data), "utf-8"));
58+
const {serializedData, contentType} = SerializerUtil.serializeGrpc(data);
59+
msgSerialized.setValue(serializedData);
5860

5961
const msgInvoke = new InvokeRequest();
6062
msgInvoke.setMethod(methodName);
6163
msgInvoke.setHttpExtension(httpExtension);
6264
msgInvoke.setData(msgSerialized);
63-
msgInvoke.setContentType("application/json");
65+
msgInvoke.setContentType(contentType);
6466

6567
msgInvokeService.setMessage(msgInvoke);
6668

src/implementation/Client/GRPCClient/pubsub.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import GRPCClient from './GRPCClient';
1515
import { PublishEventRequest } from "../../../proto/dapr/proto/runtime/v1/dapr_pb";
1616
import IClientPubSub from "../../../interfaces/Client/IClientPubSub";
1717
import { Logger } from '../../../logger/Logger';
18+
import * as SerializerUtil from "../../../utils/Serializer.util";
1819

1920
// https://docs.dapr.io/reference/api/pubsub_api/
2021
export default class GRPCClientPubSub implements IClientPubSub {
@@ -32,7 +33,7 @@ export default class GRPCClientPubSub implements IClientPubSub {
3233
const msgService = new PublishEventRequest();
3334
msgService.setPubsubName(pubSubName);
3435
msgService.setTopic(topic);
35-
msgService.setData(Buffer.from(JSON.stringify(data), "utf-8"));
36+
msgService.setData(SerializerUtil.serializeGrpc(data).serializedData);
3637

3738
return new Promise((resolve, reject) => {
3839
const client = this.client.getClient();

src/utils/Serializer.util.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright 2022 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
export function serializeGrpc(data: any): {serializedData: Buffer, contentType: string} {
15+
let serializedData = data;
16+
let contentType = "application/octet-stream";
17+
if (!(data instanceof Buffer)) {
18+
serializedData = Buffer.from(JSON.stringify(data), "utf-8");
19+
contentType = "application/json";
20+
}
21+
22+
return {serializedData, contentType};
23+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2022 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
import * as SerializerUtil from "../../../src/utils/Serializer.util"
15+
16+
describe('serializer', () => {
17+
it('Object should be serialized to Buffer', () => {
18+
const data = SerializerUtil.serializeGrpc({ Hello: 'World' });
19+
expect(Buffer.compare(data.serializedData, Buffer.from(JSON.stringify({ Hello: 'World' })))).toEqual(0);
20+
expect(data.contentType).toEqual("application/json");
21+
});
22+
it('Buffer object should not be serialized again', () => {
23+
const data = SerializerUtil.serializeGrpc(Buffer.from('Hello World'));
24+
expect(Buffer.compare(data.serializedData, Buffer.from('Hello World'))).toEqual(0);
25+
expect(data.contentType).toEqual("application/octet-stream");
26+
});
27+
});

0 commit comments

Comments
 (0)