Skip to content

Commit 9dbad97

Browse files
Make sure server starts before doing anything, instead of hardcoded 250ms we now check the Healthz endpoint
Signed-off-by: Xavier Geerinck <[email protected]>
1 parent 136b7cc commit 9dbad97

File tree

11 files changed

+75
-75
lines changed

11 files changed

+75
-75
lines changed

src/implementation/Server/DaprServer.ts

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ import HTTPServerPubSub from './HTTPServer/pubsub';
1616
import HTTPServerBinding from './HTTPServer/binding';
1717
import HTTPServerInvoker from './HTTPServer/invoker';
1818
import HTTPServerActor from './HTTPServer/actor';
19-
import GRPCClient from '../Client/GRPCClient/GRPCClient';
20-
import HTTPClient from '../Client/HTTPClient/HTTPClient';
2119
import { DaprClientOptions } from '../../types/DaprClientOptions';
20+
import { DaprClient } from '../..';
2221

2322
export default class DaprServer {
2423
// App details
@@ -27,14 +26,13 @@ export default class DaprServer {
2726
// Dapr Sidecar
2827
private readonly daprHost: string;
2928
private readonly daprPort: string;
30-
private readonly communicationProtocol: CommunicationProtocolEnum;
3129

3230
readonly daprServer: IServer;
3331
readonly pubsub: IServerPubSub;
3432
readonly binding: IServerBinding;
3533
readonly invoker: IServerInvoker;
3634
readonly actor: IServerActor;
37-
readonly clientOptions: DaprClientOptions;
35+
readonly client: DaprClient;
3836

3937
constructor(
4038
serverHost = "127.0.0.1"
@@ -50,9 +48,9 @@ export default class DaprServer {
5048
this.serverPort = serverPort;
5149
this.daprHost = daprHost;
5250
this.daprPort = daprPort;
53-
this.clientOptions = clientOptions;
5451

55-
this.communicationProtocol = communicationProtocol;
52+
// Create a client to interface with the sidecar from the server side
53+
this.client = new DaprClient(daprHost, daprPort, communicationProtocol, clientOptions);
5654

5755
// If DAPR_SERVER_PORT was not set, we set it
5856
process.env.DAPR_SERVER_PORT = this.serverPort;
@@ -70,26 +68,24 @@ export default class DaprServer {
7068
// Builder
7169
switch (communicationProtocol) {
7270
case CommunicationProtocolEnum.GRPC: {
73-
const client = new GRPCClient(daprHost, daprPort, this.clientOptions);
74-
const server = new GRPCServer();
75-
71+
const server = new GRPCServer(this.client);
7672
this.daprServer = server;
77-
this.pubsub = new GRPCServerPubSub(server, client);
78-
this.binding = new GRPCServerBinding(server, client);
79-
this.invoker = new GRPCServerInvoker(server, client);
80-
this.actor = new GRPCServerActor(server, client);
73+
74+
this.pubsub = new GRPCServerPubSub(server);
75+
this.binding = new GRPCServerBinding(server);
76+
this.invoker = new GRPCServerInvoker(server);
77+
this.actor = new GRPCServerActor(server);
8178
break;
8279
}
8380
case CommunicationProtocolEnum.HTTP:
8481
default: {
85-
const client = new HTTPClient(daprHost, daprPort, this.clientOptions);
86-
const server = new HTTPServer();
87-
82+
const server = new HTTPServer(this.client);
8883
this.daprServer = server;
89-
this.pubsub = new HTTPServerPubSub(server, client);
90-
this.binding = new HTTPServerBinding(server, client);
91-
this.invoker = new HTTPServerInvoker(server, client);
92-
this.actor = new HTTPServerActor(server, client);
84+
85+
this.pubsub = new HTTPServerPubSub(server);
86+
this.binding = new HTTPServerBinding(server);
87+
this.invoker = new HTTPServerInvoker(server);
88+
this.actor = new HTTPServerActor(server, this.client);
9389
break;
9490
}
9591
}

src/implementation/Server/GRPCServer/GRPCServer.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import * as grpc from "@grpc/grpc-js";
22
import GRPCServerImpl from "./GRPCServerImpl";
33
import { AppCallbackService } from "../../../proto/dapr/proto/runtime/v1/appcallback_grpc_pb";
44
import IServer from "../../../interfaces/Server/IServer";
5+
import GRPCClient from "../../Client/GRPCClient/GRPCClient";
6+
import { DaprClient } from "../../..";
57

68
// eslint-disable-next-line
79
export interface IServerType extends grpc.Server { }
@@ -15,12 +17,14 @@ export default class GRPCServer implements IServer {
1517
server: IServerType;
1618
serverImpl: IServerImplType;
1719
serverCredentials: grpc.ServerCredentials;
20+
client: DaprClient;
1821

19-
constructor() {
22+
constructor(client: DaprClient) {
2023
this.isInitialized = false;
2124

2225
this.serverHost = "";
2326
this.serverPort = "";
27+
this.client = client;
2428

2529
// Create Server
2630
this.server = new grpc.Server();

src/implementation/Server/GRPCServer/actor.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@ import GRPCClient from '../../Client/GRPCClient/GRPCClient';
77
// https://docs.dapr.io/reference/api/bindings_api/
88
export default class GRPCServerActor implements IServerActor {
99
server: GRPCServer;
10-
client: GRPCClient;
1110

12-
constructor(server: GRPCServer, client: GRPCClient) {
13-
this.server = server;
14-
this.client = client;
11+
constructor(server: GRPCServer) {
12+
this.server = server;
1513
}
16-
14+
1715
deactivateActor(actorType: string, actorId: string): Promise<void> {
1816
throw new Error('Method not implemented.');
1917
}
@@ -25,7 +23,7 @@ export default class GRPCServerActor implements IServerActor {
2523
getRegisteredActors(): Promise<string[]> {
2624
throw new Error('Method not implemented.');
2725
}
28-
26+
2927
registerActor<T extends AbstractActor>(cls: Class<T>): Promise<void> {
3028
throw new Error('Method not implemented.');
3129
}

src/implementation/Server/GRPCServer/binding.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ import GRPCClient from '../../Client/GRPCClient/GRPCClient';
66
// https://docs.dapr.io/reference/api/bindings_api/
77
export default class DaprBinding implements IServerBinding {
88
server: GRPCServer;
9-
client: GRPCClient;
109

11-
constructor(server: GRPCServer, client: GRPCClient) {
12-
this.server = server;
13-
this.client = client;
10+
constructor(server: GRPCServer) {
11+
this.server = server;
1412
}
15-
13+
1614
// Receive an input from an external system
1715
async receive(bindingName: string, cb: TypeDaprBindingCallback): Promise<any> {
1816
console.log(`Registering GRPC onBindingInput Handler: Binding = ${bindingName}`);

src/implementation/Server/GRPCServer/invoker.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@ import GRPCClient from '../../Client/GRPCClient/GRPCClient';
88
// https://docs.dapr.io/reference/api/service_invocation_api/
99
export default class DaprInvoker implements IServerInvoker {
1010
server: GRPCServer;
11-
client: GRPCClient;
1211

13-
constructor(server: GRPCServer, client: GRPCClient) {
14-
this.server = server;
15-
this.client = client;
12+
constructor(server: GRPCServer) {
13+
this.server = server;
1614
}
17-
15+
1816
async listen(methodName: string, cb: TypeDaprInvokerCallback, options: InvokerListenOptionsType = {}): Promise<any> {
1917
const httpMethod: HttpMethod = options?.method?.toLowerCase() as HttpMethod || HttpMethod.GET;
2018
console.log(`Registering onInvoke Handler ${httpMethod} /${methodName}`);

src/implementation/Server/GRPCServer/pubsub.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
import GRPCServer from "./GRPCServer";
22
import { TypeDaprPubSubCallback } from "../../../types/DaprPubSubCallback.type";
33
import IServerPubSub from "../../../interfaces/Server/IServerPubSub";
4-
import GRPCClient from "../../Client/GRPCClient/GRPCClient";
54

65
// https://docs.dapr.io/reference/api/pubsub_api/
76
export default class DaprPubSub implements IServerPubSub {
87
server: GRPCServer;
9-
client: GRPCClient;
108

11-
constructor(server: GRPCServer, client: GRPCClient) {
12-
this.server = server;
13-
this.client = client;
9+
constructor(server: GRPCServer) {
10+
this.server = server;
1411
}
15-
12+
1613
async subscribe(pubSubName: string, topic: string, cb: TypeDaprPubSubCallback): Promise<void> {
1714
console.log(`Registering onTopicEvent Handler: PubSub = ${pubSubName}; Topic = ${topic}`);
1815
this.server.getServerImpl().registerPubSubSubscriptionHandler(pubSubName, topic, cb);

src/implementation/Server/HTTPServer/HTTPServer.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import bodyParser from "body-parser";
33
import HTTPServerImpl from "./HTTPServerImpl";
44
import IServer from "../../../interfaces/Server/IServer";
55
import * as NodeJSUtils from "../../../utils/NodeJS.util";
6+
import { DaprClient } from "../../..";
67

78
// eslint-disable-next-line
89
export interface IServerImplType extends HTTPServerImpl { }
@@ -16,11 +17,13 @@ export default class HTTPServer implements IServer {
1617
server: IServerType;
1718
serverAddress: string;
1819
serverImpl: IServerImplType;
19-
serverStartupDelay = 250;
20+
serverStartupDelay = 1000; // @todo: use health api https://docs.dapr.io/reference/api/health_api/
21+
client: DaprClient;
2022

21-
constructor() {
23+
constructor(client: DaprClient) {
2224
this.serverHost = "";
2325
this.serverPort = "";
26+
this.client = client;
2427

2528
this.isInitialized = false;
2629

@@ -94,15 +97,29 @@ export default class HTTPServer implements IServer {
9497
this.server.get('/dapr/subscribe', (req, res) => {
9598
res.send(this.serverImpl.pubSubSubscriptionRoutes);
9699
console.log(`[Dapr API][PubSub] Registered ${this.serverImpl.pubSubSubscriptionRoutes.length} PubSub Subscriptions`);
97-
})
100+
});
98101

99102
// We need to call the Singleton to start listening on the port, else Dapr will not pick it up correctly
100103
// Dapr will probe every 50ms to see if we are listening on our port: https://github.com/dapr/dapr/blob/a43712c97ead550ca2f733e9f7e7769ecb195d8b/pkg/runtime/runtime.go#L1694
101104
// if we are using actors we will change this to 4s to let the placement tables update
102-
console.log(`Letting Dapr pick-up the server (${this.serverStartupDelay}ms)`);
103-
await NodeJSUtils.sleep(this.serverStartupDelay);
105+
let isHealthy = false;
106+
let isHealthyRetryCount = 0;
107+
let isHealthyMaxRetryCount = 60; // 1s startup delay and we try max for 60s
108+
109+
console.log(`[Dapr-JS] Letting Dapr pick-up the server (Maximum 60s wait time)`);
110+
while (!isHealthy) {
111+
console.log(`[Dapr-JS] - Waiting till Dapr Started (#${isHealthyRetryCount})`);
112+
await NodeJSUtils.sleep(this.serverStartupDelay);
113+
isHealthy = await this.client.health.isHealthy();
114+
isHealthyRetryCount++;
115+
116+
if (isHealthyRetryCount > isHealthyMaxRetryCount) {
117+
throw new Error("DAPR_SIDECAR_COULD_NOT_BE_STARTED");
118+
}
119+
}
104120

105121
// We are initialized
122+
console.log(`[Dapr-JS] Server Started`);
106123
this.isInitialized = true;
107124
}
108125

src/implementation/Server/HTTPServer/actor.ts

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,31 @@ import IRequest from '../../../types/http/IRequest';
77
import IResponse from '../../../types/http/IResponse';
88
import { GetRegisteredActorsType } from '../../../types/response/GetRegisteredActors.type';
99
import BufferSerializer from '../../../actors/runtime/BufferSerializer';
10-
import HTTPClient from '../../Client/HTTPClient/HTTPClient';
10+
import { DaprClient } from '../../..';
1111

1212
// https://docs.dapr.io/reference/api/bindings_api/
1313
export default class HTTPServerActor implements IServerActor {
1414
private readonly server: HTTPServer;
15-
private readonly client: HTTPClient;
15+
private readonly client: DaprClient;
1616
private readonly serializer: BufferSerializer;
1717

18-
constructor(server: HTTPServer, client: HTTPClient) {
18+
constructor(server: HTTPServer, client: DaprClient) {
1919
this.client = client;
2020
this.server = server;
2121
this.serializer = new BufferSerializer();
2222
}
2323

24-
async deactivateActor(actorType: string, actorId: string): Promise<void> {
25-
await this.client.execute(`http://localhost:${this.server.serverPort}/actors/${actorType}/${actorId}`, { method: "DELETE" });
26-
}
24+
// async deactivateActor(actorType: string, actorId: string): Promise<void> {
25+
// await this.client.execute(`http://localhost:${this.server.serverPort}/actors/${actorType}/${actorId}`, { method: "DELETE" });
26+
// await this.client
27+
// }
2728

2829
async registerActor<T extends AbstractActor>(cls: Class<T>): Promise<void> {
29-
ActorRuntime.getInstance(this.client).registerActor(cls);
30+
ActorRuntime.getInstance(this.client.getDaprClient()).registerActor(cls);
3031
}
3132

3233
async getRegisteredActors(): Promise<string[]> {
33-
return await ActorRuntime.getInstance(this.client).getRegisteredActorTypes();
34+
return await ActorRuntime.getInstance(this.client.getDaprClient()).getRegisteredActorTypes();
3435
}
3536

3637
/**
@@ -61,7 +62,7 @@ export default class HTTPServerActor implements IServerActor {
6162
}
6263

6364
private async handlerConfig(req: IRequest, res: IResponse): Promise<void> {
64-
const actorRuntime = ActorRuntime.getInstance(this.client);
65+
const actorRuntime = ActorRuntime.getInstance(this.client.getDaprClient());
6566

6667
const result: GetRegisteredActorsType = {
6768
entities: actorRuntime.getRegisteredActorTypes(),
@@ -73,7 +74,7 @@ export default class HTTPServerActor implements IServerActor {
7374

7475
private async handlerDeactivate(req: IRequest, res: IResponse): Promise<void> {
7576
const { actorTypeName, actorId } = req.params;
76-
const result = await ActorRuntime.getInstance(this.client).deactivate(actorTypeName, actorId);
77+
const result = await ActorRuntime.getInstance(this.client.getDaprClient()).deactivate(actorTypeName, actorId);
7778
return this.handleResult(res, result);
7879
}
7980

@@ -84,7 +85,7 @@ export default class HTTPServerActor implements IServerActor {
8485
// @todo: reentrancy id? (https://github.com/dapr/python-sdk/blob/master/ext/flask_dapr/flask_dapr/actor.py#L91)
8586

8687
const dataSerialized = this.serializer.serialize(body);
87-
const result = await ActorRuntime.getInstance(this.client).invoke(actorTypeName, actorId, methodName, dataSerialized);
88+
const result = await ActorRuntime.getInstance(this.client.getDaprClient()).invoke(actorTypeName, actorId, methodName, dataSerialized);
8889
return this.handleResult(res, result);
8990
}
9091

@@ -93,7 +94,7 @@ export default class HTTPServerActor implements IServerActor {
9394
const body = req.body;
9495

9596
const dataSerialized = this.serializer.serialize(body);
96-
const result = await ActorRuntime.getInstance(this.client).fireTimer(actorTypeName, actorId, timerName, dataSerialized);
97+
const result = await ActorRuntime.getInstance(this.client.getDaprClient()).fireTimer(actorTypeName, actorId, timerName, dataSerialized);
9798
return res.send(result, 200);
9899
}
99100

@@ -102,7 +103,7 @@ export default class HTTPServerActor implements IServerActor {
102103
const body = req.body;
103104

104105
const dataSerialized = this.serializer.serialize(body);
105-
const result = await ActorRuntime.getInstance(this.client).fireReminder(actorTypeName, actorId, reminderName, dataSerialized);
106+
const result = await ActorRuntime.getInstance(this.client.getDaprClient()).fireReminder(actorTypeName, actorId, reminderName, dataSerialized);
106107
return res.send(result, 200);
107108
}
108109

src/implementation/Server/HTTPServer/binding.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
import HTTPServer from './HTTPServer';
22
import HttpStatusCode from '../../../enum/HttpStatusCode.enum';
33
import IServerBinding from '../../../interfaces/Server/IServerBinding';
4-
import HTTPClient from '../../Client/HTTPClient/HTTPClient';
54

65
// https://docs.dapr.io/reference/api/bindings_api/
76
type FunctionDaprInputCallback = (data: any) => Promise<any>;
87

98
export default class HTTPServerBinding implements IServerBinding {
109
private readonly server: HTTPServer;
11-
private readonly client: HTTPClient;
1210

13-
constructor(server: HTTPServer, client: HTTPClient) {
14-
this.client = client;
11+
constructor(server: HTTPServer) {
1512
this.server = server;
1613
}
1714

src/implementation/Server/HTTPServer/invoker.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@ import { InvokerListenOptionsType } from '../../../types/InvokerListenOptions.ty
33
import { HttpMethod } from '../../../enum/HttpMethod.enum';
44
import HTTPServer from './HTTPServer';
55
import IServerInvoker from '../../../interfaces/Server/IServerInvoker';
6-
import HTTPClient from '../../Client/HTTPClient/HTTPClient';
76

87
// https://docs.dapr.io/reference/api/service_invocation_api/
98
export default class HTTPServerInvoker implements IServerInvoker {
109
private readonly server: HTTPServer;
11-
private readonly client: HTTPClient;
1210

13-
constructor(server: HTTPServer, client: HTTPClient) {
14-
this.client = client;
11+
constructor(server: HTTPServer) {
1512
this.server = server;
1613
}
1714

@@ -28,10 +25,10 @@ export default class HTTPServerInvoker implements IServerInvoker {
2825
contentType: req.headers['content-type']
2926
}
3027
});
31-
28+
3229
// Make sure we close the request after the callback
3330
// @TODO this should send header and http status code to client (same as grpc)
34-
31+
3532
if (!res.writableEnded) {
3633
if (invokeResponse) {
3734
return res.end(JSON.stringify(invokeResponse));

0 commit comments

Comments
 (0)