Skip to content

Commit 310cfca

Browse files
Make sure gRPC works
1 parent 285f8be commit 310cfca

File tree

12 files changed

+160
-32
lines changed

12 files changed

+160
-32
lines changed

src/implementation/Client/GRPCClient/health.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import GRPCClient from './GRPCClient';
22
import IClientHealth from '../../../interfaces/Client/IClientHealth';
3+
import { GetMetadataResponse } from '../../../proto/dapr/proto/runtime/v1/dapr_pb';
4+
import { Empty } from "google-protobuf/google/protobuf/empty_pb";
35

46
// https://docs.dapr.io/reference/api/health_api/
57
export default class GRPCClientHealth implements IClientHealth {
@@ -9,7 +11,22 @@ export default class GRPCClientHealth implements IClientHealth {
911
this.client = client;
1012
}
1113

14+
// There is no gRPC implementation of /healthz, so we try to fetch the metadata
1215
async isHealthy(): Promise<boolean> {
13-
return true;
16+
return new Promise((resolve, reject) => {
17+
const client = this.client.getClient();
18+
19+
try {
20+
client.getMetadata(new Empty(), (err, res: GetMetadataResponse) => {
21+
if (err) {
22+
return resolve(false);
23+
}
24+
25+
return resolve(true);
26+
});
27+
} catch (e) {
28+
return resolve(false);
29+
}
30+
});
1431
}
1532
}

src/implementation/Client/GRPCClient/state.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import GRPCClient from './GRPCClient';
2-
import { DeleteStateRequest, ExecuteStateTransactionRequest, GetBulkStateRequest, GetBulkStateResponse, GetStateRequest, GetStateResponse, SaveStateRequest, TransactionalStateOperation } from '../../../proto/dapr/proto/runtime/v1/dapr_pb';
2+
import { DeleteStateRequest, ExecuteStateTransactionRequest, GetBulkStateRequest, GetBulkStateResponse, GetStateRequest, GetStateResponse, QueryStateRequest, QueryStateResponse, SaveStateRequest, TransactionalStateOperation } from '../../../proto/dapr/proto/runtime/v1/dapr_pb';
33
import { Etag, StateItem, StateOptions } from '../../../proto/dapr/proto/common/v1/common_pb';
44
import { KeyValuePairType } from '../../../types/KeyValuePair.type';
55
import { OperationType } from '../../../types/Operation.type';
66
import { IRequestMetadata } from '../../../types/RequestMetadata.type';
77
import IClientState from '../../../interfaces/Client/IClientState';
88
import { KeyValueType } from '../../../types/KeyValue.type';
99
import { merge } from '../../../utils/Map.util';
10+
import { StateQueryType } from '../../../types/StateQuery.type';
11+
import { StateQueryResponseType } from '../../../types/StateQueryResponse.type';
1012

1113
// https://docs.dapr.io/reference/api/state_api/
1214
export default class GRPCClientState implements IClientState {
@@ -25,7 +27,7 @@ export default class GRPCClientState implements IClientState {
2527
si.setValue(Buffer.from(typeof stateObject.value === "object" ? JSON.stringify(stateObject.value) : stateObject.value.toString(), "utf-8"));
2628
stateList.push(si);
2729
}
28-
30+
2931
const msgService = new SaveStateRequest();
3032
msgService.setStoreName(storeName);
3133
msgService.setStatesList(stateList);
@@ -48,7 +50,7 @@ export default class GRPCClientState implements IClientState {
4850
msgService.setStoreName(storeName);
4951
msgService.setKey(key)
5052

51-
53+
5254
// @todo: https://docs.dapr.io/reference/api/state_api/#optional-behaviors
5355
// msgService.setConsistency()
5456

@@ -96,7 +98,7 @@ export default class GRPCClientState implements IClientState {
9698
let data: string;
9799
try {
98100
data = JSON.parse(resDataStr);
99-
} catch(e) {
101+
} catch (e) {
100102
data = resDataStr;
101103
}
102104
return {
@@ -180,4 +182,33 @@ export default class GRPCClientState implements IClientState {
180182
});
181183
});
182184
}
185+
186+
async query(storeName: string, query: StateQueryType): Promise<StateQueryResponseType> {
187+
const msgService = new QueryStateRequest();
188+
msgService.setStoreName(storeName);
189+
msgService.setQuery(JSON.stringify(query))
190+
191+
return new Promise((resolve, reject) => {
192+
const client = this.client.getClient();
193+
client.queryStateAlpha1(msgService, (err, res: QueryStateResponse) => {
194+
if (err) {
195+
return reject(err);
196+
}
197+
198+
// https://docs.dapr.io/reference/api/state_api/#response-body
199+
// map the res from gRPC
200+
let resMapped: StateQueryResponseType = {
201+
results: res.getResultsList().map((i) => ({
202+
key: i.getKey(),
203+
data: i.getData(),
204+
etag: i.getEtag(),
205+
error: i.getError(),
206+
})),
207+
token: res.getToken()
208+
};
209+
210+
return resolve(resMapped);
211+
});
212+
});
213+
}
183214
}

src/implementation/Client/HTTPClient/HTTPClient.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ export default class HTTPClient implements IClient {
7373
this.httpsAgent.destroy();
7474
}
7575

76+
async executeWithApiVersion(apiVersion: string = "v1.0", url: string, params: any = {}): Promise<object | string> {
77+
const newClientUrl = this.clientUrl.replace("v1.0", apiVersion);
78+
return await this.execute(`${newClientUrl}${url}`, params);
79+
}
80+
7681
async execute(url: string, params: any = {}): Promise<object | string> {
7782
if (!params?.headers) {
7883
params.headers = {};

src/implementation/Client/HTTPClient/state.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { OperationType } from '../../../types/Operation.type';
44
import { IRequestMetadata } from '../../../types/RequestMetadata.type';
55
import IClientState from '../../../interfaces/Client/IClientState';
66
import { KeyValueType } from '../../../types/KeyValue.type';
7+
import { StateQueryType } from '../../../types/StateQuery.type';
8+
import { StateQueryResponseType } from '../../../types/StateQueryResponse.type';
79

810
// https://docs.dapr.io/reference/api/state_api/
911
export default class HTTPClientState implements IClientState {
@@ -61,4 +63,18 @@ export default class HTTPClientState implements IClientState {
6163
})
6264
});
6365
}
66+
67+
async query(storeName: string, query: StateQueryType): Promise<StateQueryResponseType> {
68+
const result = await this.client.executeWithApiVersion("v1.0-alpha1", `/state/${storeName}/query`, {
69+
method: 'POST',
70+
headers: {
71+
"Content-Type": "application/json"
72+
},
73+
body: JSON.stringify({
74+
query
75+
})
76+
});
77+
78+
return result as StateQueryResponseType;
79+
}
6480
}

src/implementation/Server/GRPCServer/GRPCServer.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import * as grpc from "@grpc/grpc-js";
22
import GRPCServerImpl from "./GRPCServerImpl";
33
import { AppCallbackService } from "../../../proto/dapr/proto/runtime/v1/appcallback_grpc_pb";
44
import IServer from "../../../interfaces/Server/IServer";
5-
import GRPCClient from "../../Client/GRPCClient/GRPCClient";
65
import { DaprClient } from "../../..";
6+
import * as NodeJSUtils from "../../../utils/NodeJS.util";
77

88
// eslint-disable-next-line
99
export interface IServerType extends grpc.Server { }
@@ -17,6 +17,7 @@ export default class GRPCServer implements IServer {
1717
server: IServerType;
1818
serverImpl: IServerImplType;
1919
serverCredentials: grpc.ServerCredentials;
20+
serverStartupDelay = 1000; // @todo: use health api https://docs.dapr.io/reference/api/health_api/
2021
client: DaprClient;
2122

2223
constructor(client: DaprClient) {
@@ -73,9 +74,22 @@ export default class GRPCServer implements IServer {
7374

7475
// We need to call the Singleton to start listening on the port, else Dapr will not pick it up correctly
7576
// Dapr will probe every 50ms to see if we are listening on our port: https://github.com/dapr/dapr/blob/a43712c97ead550ca2f733e9f7e7769ecb195d8b/pkg/runtime/runtime.go#L1694
76-
console.log("[Dapr-JS][gRPC] Letting Dapr pick-up the server");
77-
const delayMs = 250;
78-
await (new Promise((resolve) => setTimeout(resolve, delayMs)));
77+
// if we are using actors we will change this to 4s to let the placement tables update
78+
let isHealthy = false;
79+
let isHealthyRetryCount = 0;
80+
let isHealthyMaxRetryCount = 60; // 1s startup delay and we try max for 60s
81+
82+
console.log(`[Dapr-JS] Letting Dapr pick-up the server (Maximum 60s wait time)`);
83+
while (!isHealthy) {
84+
console.log(`[Dapr-JS] - Waiting till Dapr Started (#${isHealthyRetryCount})`);
85+
await NodeJSUtils.sleep(this.serverStartupDelay);
86+
isHealthy = await this.client.health.isHealthy();
87+
isHealthyRetryCount++;
88+
89+
if (isHealthyRetryCount > isHealthyMaxRetryCount) {
90+
throw new Error("DAPR_SIDECAR_COULD_NOT_BE_STARTED");
91+
}
92+
}
7993

8094
// We are initialized
8195
this.isInitialized = true;

src/interfaces/Client/IClientState.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ import { OperationType } from "../../types/Operation.type";
22
import { IRequestMetadata } from "../../types/RequestMetadata.type";
33
import { KeyValuePairType } from "../../types/KeyValuePair.type";
44
import { KeyValueType } from "../../types/KeyValue.type";
5+
import { StateQueryType } from "../../types/StateQuery.type";
6+
import { StateQueryResponseType } from "../../types/StateQueryResponse.type";
57

68
export default interface IClientState {
7-
save(storeName: string, stateObjects: KeyValuePairType[]): Promise<void>;
8-
get(storeName: string, key: string): Promise<KeyValueType | string>;
9-
getBulk(storeName: string, keys: string[], parallelism?: number, metadata?: string): Promise<KeyValueType[]>;
10-
delete(storeName: string, key: string): Promise<void>;
11-
transaction(storeName: string, operations?: OperationType[], metadata?: IRequestMetadata | null): Promise<void>;
9+
save(storeName: string, stateObjects: KeyValuePairType[]): Promise<void>;
10+
get(storeName: string, key: string): Promise<KeyValueType | string>;
11+
getBulk(storeName: string, keys: string[], parallelism?: number, metadata?: string): Promise<KeyValueType[]>;
12+
delete(storeName: string, key: string): Promise<void>;
13+
transaction(storeName: string, operations?: OperationType[], metadata?: IRequestMetadata | null): Promise<void>;
14+
query(storeName: string, query: StateQueryType): Promise<StateQueryResponseType>;
1215
}
Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,10 @@
11
import AbstractActor from "../../actors/runtime/AbstractActor";
22
import Class from "../../types/Class";
3-
import IResponse from "../../types/http/IResponse";
4-
import IRequest from "../../types/http/IRequest";
53

64
export default interface IServerActor {
7-
registerActor<T extends AbstractActor>(cls: Class<T>): Promise<void>;
8-
getRegisteredActors(): Promise<string[]>;
9-
init(): Promise<void>;
10-
deactivateActor(actorType: string, actorId: string): Promise<void>;
5+
registerActor<T extends AbstractActor>(cls: Class<T>): Promise<void>;
6+
getRegisteredActors(): Promise<string[]>;
7+
init(): Promise<void>;
118

12-
// @todo: do we want those in the interface? init should take care of it
13-
// handlerDeactivate(req: IRequest, res: IResponse): Promise<void>;
14-
// handlerMethod(req: IRequest, res: IResponse): Promise<void>;
15-
// handlerTimer(req: IRequest, res: IResponse): Promise<void>;
16-
// handlerReminder(req: IRequest, res: IResponse): Promise<void>;
9+
// deactivateActor(actorType: string, actorId: string): Promise<void>;
1710
}

src/types/StateQuery.type.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Enumerable } from "./Enumerable.type"
2+
3+
export type StateQueryType = {
4+
filter: StateQueryFilter;
5+
sort: StateQuerySort[];
6+
pagination: StateQueryPagination;
7+
}
8+
9+
type StateQuerySort = {
10+
key: string;
11+
order?: "ASC" | "DESC";
12+
}
13+
14+
type StateQueryPagination = {
15+
limit: number;
16+
token?: string;
17+
}
18+
19+
type StateQueryFilter = {
20+
// Operations
21+
AND?: Enumerable<StateQueryFilterInput>;
22+
OR?: Enumerable<StateQueryFilterInput>;
23+
EQ?: Enumerable<StateQueryFilterInput>;
24+
IN?: Enumerable<StateQueryFilterInput>;
25+
}
26+
27+
type StateQueryFilterInput = {
28+
// Operations can be nested, so repeat here
29+
AND?: Enumerable<StateQueryFilterInput>;
30+
OR?: Enumerable<StateQueryFilterInput>;
31+
EQ?: Enumerable<StateQueryFilterInput>;
32+
IN?: Enumerable<StateQueryFilterInput>;
33+
34+
// Next to that we can have the { key: value }
35+
[key: string]: any;
36+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
export type StateQueryResponseType = {
2+
results: StateQueryResponseResult[]
3+
token?: string;
4+
}
5+
6+
type StateQueryResponseResult = {
7+
key: string;
8+
data: any; // byte array
9+
etag?: string;
10+
error?: string;
11+
}

test/e2e/actors.http.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ describe('http/actors', () => {
4545

4646
// Start server
4747
await server.start(); // Start the general server
48-
});
48+
}, 10 * 1000);
4949

50-
afterAll(async () => {
51-
await server.stop();
52-
await client.stop();
53-
});
50+
// afterAll(async () => {
51+
// await server.stop();
52+
// await client.stop();
53+
// });
5454

5555
describe('actorProxy', () => {
5656
it('should be able to create an actor object through the proxy', async () => {

0 commit comments

Comments
 (0)