Skip to content

Commit 8c01eb3

Browse files
Merge pull request #148 from XavierGeerinck/load-test
feat(test): Add load test for PubSub
2 parents ac25aeb + 362f850 commit 8c01eb3

File tree

14 files changed

+158
-150
lines changed

14 files changed

+158
-150
lines changed

package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
"description": "The official Dapr (https://dapr.io) SDK for Node.js",
55
"types": "http/index.d.ts",
66
"scripts": {
7+
"test": "npm run test:unit:all && npm run test:e2e:all",
8+
"test:load": "jest --runInBand --detectOpenHandles --forceExit",
9+
"test:load:http": "TEST_SECRET_1=secret_val_1 TEST_SECRET_2=secret_val_2 dapr run --app-id test-suite --app-protocol http --app-port 50001 --dapr-http-port 50000 --components-path ./test/components npm run test:load 'test/load'",
710
"test:e2e": "jest --runInBand --detectOpenHandles --forceExit",
811
"test:e2e:grpc:main": "TEST_SECRET_1=secret_val_1 TEST_SECRET_2=secret_val_2 dapr run --app-id test-suite --app-protocol grpc --app-port 50001 --dapr-grpc-port 50000 --components-path ./test/components npm run test:e2e 'test/e2e/main.grpc.test.ts'",
912
"test:e2e:http:main": "TEST_SECRET_1=secret_val_1 TEST_SECRET_2=secret_val_2 dapr run --app-id test-suite --app-protocol http --app-port 50001 --dapr-http-port 50000 --components-path ./test/components npm run test:e2e 'test/e2e/main.http.test.ts'",

src/implementation/Client/GRPCClient/pubsub.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export default class GRPCClientPubSub implements IClientPubSub {
1111
}
1212

1313
// @todo: should return a specific typed Promise<TypePubSubPublishResponse> instead of Promise<any>
14-
async publish(pubSubName: string, topic: string, data: object = {}): Promise<any> {
14+
async publish(pubSubName: string, topic: string, data: object = {}): Promise<boolean> {
1515
const msgService = new PublishEventRequest();
1616
msgService.setPubsubName(pubSubName);
1717
msgService.setTopic(topic);
@@ -21,11 +21,12 @@ export default class GRPCClientPubSub implements IClientPubSub {
2121
const client = this.client.getClient();
2222
client.publishEvent(msgService, (err, res) => {
2323
if (err) {
24-
return reject(err);
24+
console.error(err);
25+
return reject(false);
2526
}
2627

2728
// https://docs.dapr.io/reference/api/pubsub_api/#expected-http-response
28-
return resolve({});
29+
return resolve(true);
2930
});
3031
});
3132
}

src/implementation/Client/HTTPClient/HTTPClient.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import fetch from "node-fetch";
22
import { CommunicationProtocolEnum } from "../../..";
33
import IClient from "../../../interfaces/Client/IClient";
4+
import http from "node:http";
5+
import https from "node:https";
46

57
export default class HTTPClient implements IClient {
68
private readonly isInitialized: boolean;
@@ -9,6 +11,9 @@ export default class HTTPClient implements IClient {
911
private readonly clientPort: string;
1012
private readonly clientUrl: string;
1113

14+
private readonly httpAgent;
15+
private readonly httpsAgent;
16+
1217
constructor(host = "127.0.0.1", port = "50050") {
1318
this.isInitialized = true;
1419
this.clientHost = host;
@@ -21,6 +26,9 @@ export default class HTTPClient implements IClient {
2126
}
2227

2328
this.client = fetch;
29+
30+
this.httpAgent = new http.Agent({ keepAlive: true, keepAliveMsecs: 30 * 1000 });
31+
this.httpsAgent = new https.Agent({ keepAlive: true, keepAliveMsecs: 30 * 1000 });
2432
}
2533

2634
getClient(): typeof fetch {
@@ -64,7 +72,14 @@ export default class HTTPClient implements IClient {
6472
}
6573
}
6674

75+
6776
const urlFull = url.startsWith("http") ? url : `${this.clientUrl}${url}`;
77+
78+
// Decide which agent to use
79+
// we use an agent so we can reuse an open connection, limiting handshake requirements
80+
const agent = urlFull.startsWith("https") ? this.httpsAgent : this.httpAgent;
81+
params.agent = agent;
82+
6883
// console.log(`${params.method} - ${urlFull} (${params.body})`);
6984
const res = await fetch(urlFull, params);
7085

src/implementation/Client/HTTPClient/pubsub.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,20 @@ export default class HTTPClientPubSub implements IClientPubSub {
99
this.client = client;
1010
}
1111

12-
async publish(pubSubName: string, topic: string, data: object = {}): Promise<void> {
13-
await this.client.execute(`/publish/${pubSubName}/${topic}`, {
14-
method: 'POST',
15-
headers: {
16-
'Content-Type': 'application/json',
17-
},
18-
body: JSON.stringify(data),
19-
});
12+
async publish(pubSubName: string, topic: string, data: object = {}): Promise<boolean> {
13+
try {
14+
await this.client.execute(`/publish/${pubSubName}/${topic}`, {
15+
method: 'POST',
16+
headers: {
17+
'Content-Type': 'application/json',
18+
},
19+
body: JSON.stringify(data),
20+
});
21+
} catch (e) {
22+
console.error(e);
23+
return false;
24+
}
25+
26+
return true;
2027
}
2128
}

src/implementation/Server/DaprServer.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ export default class DaprServer {
9393
await this.daprServer.startServer(this.serverHost, this.serverPort.toString());
9494
}
9595

96+
async stopServer(): Promise<void> {
97+
await this.daprServer.stopServer();
98+
}
99+
96100
getDaprClient(): IServer {
97101
return this.daprServer;
98102
}

src/implementation/Server/GRPCServer/GRPCServer.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,20 @@ export default class GRPCServer implements IServer {
9999
this.isInitialized = true;
100100
}
101101

102+
async stopServer(): Promise<void> {
103+
return new Promise((resolve, reject) => {
104+
105+
this.server.tryShutdown((err) => {
106+
if (err) {
107+
return reject(err);
108+
}
109+
110+
this.isInitialized = false;
111+
return resolve();
112+
});
113+
});
114+
}
115+
102116
private async initializeBind(): Promise<void> {
103117
console.log(`[Dapr-JS][gRPC] Starting to listen on ${this.serverHost}:${this.serverPort}`);
104118
return new Promise((resolve, reject) => {

src/implementation/Server/HTTPServer/HTTPServer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,9 @@ export default class HTTPServer implements IServer {
118118
// We are initialized
119119
this.isInitialized = true;
120120
}
121+
122+
async stopServer(): Promise<void> {
123+
await this.server.close();
124+
this.isInitialized = false;
125+
}
121126
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
export default interface IClientPubSub {
2-
publish(pubSubName: string, topic: string, data?: object): Promise<void>
2+
publish(pubSubName: string, topic: string, data?: object): Promise<boolean>
33
}

src/interfaces/Server/IServer.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
export default interface IServer {
2-
getServerAddress(): string;
3-
getServer(): any; // this is dependent on the implementation
4-
getServerImpl(): any; // this is dependent on the implementation
5-
close(): Promise<void>;
6-
startServer(host: string, port: string): Promise<void>;
2+
getServerAddress(): string;
3+
getServer(): any; // this is dependent on the implementation
4+
getServerImpl(): any; // this is dependent on the implementation
5+
close(): Promise<void>;
6+
startServer(host: string, port: string): Promise<void>;
7+
stopServer(): Promise<void>;
78
}

test/components/pubsub-mqtt.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# https://docs.dapr.io/reference/components-reference/supported-bindings/rabbitmq/
2+
apiVersion: dapr.io/v1alpha1
3+
kind: Component
4+
metadata:
5+
name: pubsub-mqtt
6+
namespace: default
7+
spec:
8+
type: pubsub.mqtt
9+
version: v1
10+
metadata:
11+
- name: url
12+
value: "tcp://admin:public@localhost:1883"
13+
- name: qos
14+
value: 1
15+
- name: cleanSession
16+
value: "true"
17+
- name: backOffMaxRetries
18+
value: "0"

0 commit comments

Comments
 (0)