Skip to content

Commit 4fc3a2d

Browse files
Add if PubSub was successful or not, implement shutdown to close handle and work on load tests
Signed-off-by: Xavier Geerinck <[email protected]>
1 parent 2f58df2 commit 4fc3a2d

File tree

12 files changed

+135
-70
lines changed

12 files changed

+135
-70
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
"types": "http/index.d.ts",
66
"scripts": {
77
"test": "npm run test:unit:all && npm run test:e2e:all",
8+
"test:load": "jest --runInBand --detectOpenHandles --forceExit",
9+
"test:load:http": "TEST_SECRET_1=secret_val_1 TEST_SECRET_2=secret_val_2 dapr run --app-id test-suite --app-protocol http --app-port 50001 --dapr-http-port 50000 --components-path ./test/components npm run test:load 'test/load'",
810
"test:e2e": "jest --runInBand --detectOpenHandles --forceExit",
911
"test:e2e:all": "npm run test:e2e:grpc:main && npm run test:e2e:http:main && npm run test:e2e:http:actors",
1012
"test:e2e:grpc:main": "TEST_SECRET_1=secret_val_1 TEST_SECRET_2=secret_val_2 dapr run --app-id test-suite --app-protocol grpc --app-port 50001 --dapr-grpc-port 50000 --components-path ./test/components npm run test:e2e 'test/e2e/main.grpc.test.ts'",

src/implementation/Client/GRPCClient/pubsub.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export default class GRPCClientPubSub implements IClientPubSub {
1111
}
1212

1313
// @todo: should return a specific typed Promise<TypePubSubPublishResponse> instead of Promise<any>
14-
async publish(pubSubName: string, topic: string, data: object = {}): Promise<any> {
14+
async publish(pubSubName: string, topic: string, data: object = {}): Promise<Boolean> {
1515
const msgService = new PublishEventRequest();
1616
msgService.setPubsubName(pubSubName);
1717
msgService.setTopic(topic);
@@ -21,11 +21,12 @@ export default class GRPCClientPubSub implements IClientPubSub {
2121
const client = this.client.getClient();
2222
client.publishEvent(msgService, (err, res) => {
2323
if (err) {
24-
return reject(err);
24+
console.error(err);
25+
return reject(false);
2526
}
2627

2728
// https://docs.dapr.io/reference/api/pubsub_api/#expected-http-response
28-
return resolve({});
29+
return resolve(true);
2930
});
3031
});
3132
}

src/implementation/Client/HTTPClient/pubsub.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,20 @@ export default class HTTPClientPubSub implements IClientPubSub {
99
this.client = client;
1010
}
1111

12-
async publish(pubSubName: string, topic: string, data: object = {}): Promise<void> {
13-
await this.client.execute(`/publish/${pubSubName}/${topic}`, {
14-
method: 'POST',
15-
headers: {
16-
'Content-Type': 'application/json',
17-
},
18-
body: JSON.stringify(data),
19-
});
12+
async publish(pubSubName: string, topic: string, data: object = {}): Promise<Boolean> {
13+
try {
14+
await this.client.execute(`/publish/${pubSubName}/${topic}`, {
15+
method: 'POST',
16+
headers: {
17+
'Content-Type': 'application/json',
18+
},
19+
body: JSON.stringify(data),
20+
});
21+
} catch (e) {
22+
console.error(e);
23+
return false;
24+
}
25+
26+
return true;
2027
}
2128
}

src/implementation/Server/DaprServer.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ export default class DaprServer {
9393
await this.daprServer.startServer(this.serverHost, this.serverPort.toString());
9494
}
9595

96+
async stopServer(): Promise<void> {
97+
await this.daprServer.stopServer();
98+
}
99+
96100
getDaprClient(): IServer {
97101
return this.daprServer;
98102
}

src/implementation/Server/GRPCServer/GRPCServer.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,20 @@ export default class GRPCServer implements IServer {
9999
this.isInitialized = true;
100100
}
101101

102+
async stopServer(): Promise<void> {
103+
return new Promise((resolve, reject) => {
104+
105+
this.server.tryShutdown((err) => {
106+
if (err) {
107+
return reject(err);
108+
}
109+
110+
this.isInitialized = false;
111+
return resolve();
112+
});
113+
});
114+
}
115+
102116
private async initializeBind(): Promise<void> {
103117
console.log(`[Dapr-JS][gRPC] Starting to listen on ${this.serverHost}:${this.serverPort}`);
104118
return new Promise((resolve, reject) => {

src/implementation/Server/HTTPServer/HTTPServer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,9 @@ export default class HTTPServer implements IServer {
118118
// We are initialized
119119
this.isInitialized = true;
120120
}
121+
122+
async stopServer(): Promise<void> {
123+
await this.server.close();
124+
this.isInitialized = false;
125+
}
121126
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
export default interface IClientPubSub {
2-
publish(pubSubName: string, topic: string, data?: object): Promise<void>
2+
publish(pubSubName: string, topic: string, data?: object): Promise<Boolean>
33
}

src/interfaces/Server/IServer.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
export default interface IServer {
2-
getServerAddress(): string;
3-
getServer(): any; // this is dependent on the implementation
4-
getServerImpl(): any; // this is dependent on the implementation
5-
close(): Promise<void>;
6-
startServer(host: string, port: string): Promise<void>;
2+
getServerAddress(): string;
3+
getServer(): any; // this is dependent on the implementation
4+
getServerImpl(): any; // this is dependent on the implementation
5+
close(): Promise<void>;
6+
startServer(host: string, port: string): Promise<void>;
7+
stopServer(): Promise<void>;
78
}

test/components/pubsub-mqtt.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# https://docs.dapr.io/reference/components-reference/supported-bindings/rabbitmq/
2+
apiVersion: dapr.io/v1alpha1
3+
kind: Component
4+
metadata:
5+
name: pubsub-mqtt
6+
namespace: default
7+
spec:
8+
type: pubsub.mqtt
9+
version: v1
10+
metadata:
11+
- name: url
12+
value: "tcp://admin:public@localhost:1883"
13+
- name: qos
14+
value: 1
15+
- name: cleanSession
16+
value: "true"
17+
- name: backOffMaxRetries
18+
value: "0"

test/e2e/main.grpc.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ describe('grpc/main', () => {
5555
// @ts-ignore
5656
expect(mockPubSubSubscribe.mock.calls[0][0]['hello']).toEqual('world');
5757
});
58+
59+
it('should receive if it was successful or not', async () => {
60+
const res = await client.pubsub.publish('pubsub-redis', 'test-topic', { hello: 'world' });
61+
expect(res).toEqual(true);
62+
});
5863
});
5964

6065
describe('invoker', () => {

0 commit comments

Comments
 (0)