Skip to content

Commit 573bbd0

Browse files
SoTrxuponleir
andauthored
Adding optional attributes in state management API (#476)
* Adding optional parameters concurrency, consistency and etag in state management Signed-off-by: SoTrx <[email protected]> * style(state): Changing one-liners ifs into multiline ifs and adding newlines Signed-off-by: SoTrx <[email protected]> * refactor(state): Renaming enums methods matchStateConsistency -> getStateConsistencyValue matchStateConcurrency -> getStateConcurrencyValue Signed-off-by: SoTrx <[email protected]> * fix(state): Querystring in get request was not used Signed-off-by: SoTrx <[email protected]> * refactor(state): Renaming StateConcurrency and StateConsistency enums to match other exported enums Signed-off-by: SoTrx <[email protected]> * chore(state): Adding StateConcurrencyEnum and StateConsistencyEnum as exported memeber for the whole lib Signed-off-by: SoTrx <[email protected]> * style : prettier Signed-off-by: SoTrx <[email protected]> * style : prettier Signed-off-by: SoTrx <[email protected]> * fix(state): Remapping enums values to their string representation when present in body Signed-off-by: SoTrx <[email protected]> * style(utils): Removing unused import Signed-off-by: SoTrx <[email protected]> --------- Signed-off-by: SoTrx <[email protected]> Co-authored-by: SoTrx <[email protected]>
1 parent d742c36 commit 573bbd0

File tree

13 files changed

+304
-40
lines changed

13 files changed

+304
-40
lines changed

src/enum/StateConcurrency.enum.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14-
export enum EStateConcurrency {
14+
export enum StateConcurrencyEnum {
1515
CONCURRENCY_UNSPECIFIED = 0,
1616
CONCURRENCY_FIRST_WRITE = 1,
1717
CONCURRENCY_LAST_WRITE = 2,
1818
}
19+
20+
export default StateConcurrencyEnum;

src/enum/StateConsistency.enum.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14-
export enum EStateConsistency {
14+
export enum StateConsistencyEnum {
1515
CONSISTENCY_UNSPECIFIED = 0,
1616
CONSISTENCY_EVENTUAL = 1,
1717
CONSISTENCY_STRONG = 2,
1818
}
19+
export default StateConsistencyEnum;

src/implementation/Client/GRPCClient/state.ts

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ import { Settings } from "../../../utils/Settings.util";
3838
import { addMetadataToMap } from "../../../utils/Client.util";
3939
import { StateSaveResponseType } from "../../../types/state/StateSaveResponseType";
4040
import { StateSaveOptions } from "../../../types/state/StateSaveOptions.type";
41+
import { StateDeleteOptions } from "../../../types/state/StateDeleteOptions.type";
42+
import { StateGetOptions } from "../../../types/state/StateGetOptions.type";
43+
import { IStateOptions } from "../../../types/state/StateOptions.type";
4144

4245
// https://docs.dapr.io/reference/api/state_api/
4346
export default class GRPCClientState implements IClientState {
@@ -63,6 +66,15 @@ export default class GRPCClientState implements IClientState {
6366
"utf-8",
6467
),
6568
);
69+
70+
if (stateObject?.etag) {
71+
const etag = new Etag();
72+
etag.setValue(stateObject.etag);
73+
si.setEtag(etag);
74+
}
75+
76+
si.setOptions(this._configureStateOptions(stateObject?.options));
77+
6678
// Merge metadata from stateObject and options.
6779
// Note, metadata from options will override metadata from stateObject.
6880
// See https://github.com/dapr/dapr/blob/029ec8cb7a1c88ec5d222bc2b0d1d53541217f19/pkg/http/api.go#L1525-L1532
@@ -89,13 +101,14 @@ export default class GRPCClientState implements IClientState {
89101
});
90102
}
91103

92-
async get(storeName: string, key: string): Promise<KeyValueType | string> {
104+
async get(storeName: string, key: string, options?: Partial<StateGetOptions>): Promise<KeyValueType | string> {
93105
const msgService = new GetStateRequest();
94106
msgService.setStoreName(storeName);
95107
msgService.setKey(key);
96108

97-
// @todo: https://docs.dapr.io/reference/api/state_api/#optional-behaviors
98-
// msgService.setConsistency()
109+
if (options?.consistency) {
110+
msgService.setConsistency(options.consistency as any);
111+
}
99112

100113
const client = await this.client.getClient();
101114

@@ -158,14 +171,18 @@ export default class GRPCClientState implements IClientState {
158171
});
159172
}
160173

161-
async delete(storeName: string, key: string): Promise<void> {
174+
async delete(storeName: string, key: string, options?: StateDeleteOptions): Promise<StateSaveResponseType> {
162175
const msgService = new DeleteStateRequest();
163176
msgService.setStoreName(storeName);
164177
msgService.setKey(key);
165178

166-
// @todo: implement below
167-
// msgService.setEtag();
168-
// msgService.setOptions();
179+
if (options?.etag) {
180+
const etag = new Etag();
181+
etag.setValue(options.etag);
182+
msgService.setEtag(etag);
183+
}
184+
185+
msgService.setOptions(this._configureStateOptions(options));
169186

170187
const client = await this.client.getClient();
171188

@@ -176,7 +193,7 @@ export default class GRPCClientState implements IClientState {
176193
}
177194

178195
// https://docs.dapr.io/reference/api/state_api/#http-response-3
179-
return resolve();
196+
return resolve({});
180197
});
181198
});
182199
}
@@ -200,13 +217,7 @@ export default class GRPCClientState implements IClientState {
200217
si.setEtag(etag);
201218
}
202219

203-
if (o.request.options) {
204-
const so = new StateOptions();
205-
so.setConsistency(o.request.options.consistency as any);
206-
so.setConcurrency(o.request.options.concurrency as any);
207-
208-
si.setOptions(so);
209-
}
220+
si.setOptions(this._configureStateOptions(o.request?.options));
210221

211222
const transactionItem = new TransactionalStateOperation();
212223
transactionItem.setOperationtype(o.operation);
@@ -272,4 +283,21 @@ export default class GRPCClientState implements IClientState {
272283
});
273284
});
274285
}
286+
287+
_configureStateOptions(opt?: Partial<IStateOptions>): StateOptions | undefined {
288+
if (opt === undefined) {
289+
return undefined;
290+
}
291+
292+
const stateOptions = new StateOptions();
293+
if (opt?.consistency) {
294+
stateOptions.setConsistency(opt.consistency as any);
295+
}
296+
297+
if (opt?.concurrency) {
298+
stateOptions.setConcurrency(opt.concurrency as any);
299+
}
300+
301+
return stateOptions;
302+
}
275303
}

src/implementation/Client/HTTPClient/state.ts

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import { KeyValueType } from "../../../types/KeyValue.type";
2020
import { StateQueryType } from "../../../types/state/StateQuery.type";
2121
import { StateQueryResponseType } from "../../../types/state/StateQueryResponse.type";
2222
import { StateGetBulkOptions } from "../../../types/state/StateGetBulkOptions.type";
23-
import { createHTTPMetadataQueryParam } from "../../../utils/Client.util";
23+
import { createHTTPQueryParam, getStateConcurrencyValue, getStateConsistencyValue } from "../../../utils/Client.util";
2424
import { Settings } from "../../../utils/Settings.util";
2525
import { Logger } from "../../../logger/Logger";
2626
import { StateSaveResponseType } from "../../../types/state/StateSaveResponseType";
2727
import { StateSaveOptions } from "../../../types/state/StateSaveOptions.type";
28+
import { StateDeleteOptions } from "../../../types/state/StateDeleteOptions.type";
29+
import { THTTPExecuteParams } from "../../../types/http/THTTPExecuteParams.type";
30+
import { StateGetOptions } from "../../../types/state/StateGetOptions.type";
2831

2932
// https://docs.dapr.io/reference/api/state_api/
3033
export default class HTTPClientState implements IClientState {
@@ -41,7 +44,15 @@ export default class HTTPClientState implements IClientState {
4144
stateObjects: KeyValuePairType[],
4245
options: StateSaveOptions = {},
4346
): Promise<StateSaveResponseType> {
44-
const queryParams = createHTTPMetadataQueryParam(options.metadata);
47+
const queryParams = createHTTPQueryParam({ data: options?.metadata, type: "metadata" });
48+
49+
for (const so of stateObjects) {
50+
const behavior = {
51+
consistency: getStateConsistencyValue(so?.options?.consistency),
52+
concurrency: getStateConcurrencyValue(so?.options?.concurrency),
53+
};
54+
so.options = Object.assign({}, so.options, behavior);
55+
}
4556

4657
try {
4758
await this.client.execute(`/state/${storeName}?${queryParams}`, {
@@ -56,36 +67,72 @@ export default class HTTPClientState implements IClientState {
5667
return {};
5768
}
5869

59-
async get(storeName: string, key: string): Promise<KeyValueType | string> {
60-
const result = await this.client.execute(`/state/${storeName}/${key}`);
70+
async get(storeName: string, key: string, options?: Partial<StateGetOptions>): Promise<KeyValueType | string> {
71+
const behavior = {
72+
consistency: getStateConsistencyValue(options?.consistency),
73+
};
74+
75+
const queryParams = createHTTPQueryParam({ data: options?.metadata, type: "metadata" }, { data: behavior });
76+
77+
const result = await this.client.execute(`/state/${storeName}/${key}?${queryParams}`);
78+
6179
return result as KeyValueType;
6280
}
6381

64-
async getBulk(storeName: string, keys: string[], options: StateGetBulkOptions = {}): Promise<KeyValueType[]> {
65-
const queryParams = createHTTPMetadataQueryParam(options.metadata);
82+
async getBulk(storeName: string, keys: string[], options?: StateGetBulkOptions): Promise<KeyValueType[]> {
83+
const queryParams = createHTTPQueryParam({ data: options?.metadata, type: "metadata" });
6684

6785
const result = await this.client.execute(`/state/${storeName}/bulk?${queryParams}`, {
6886
method: "POST",
6987
body: {
7088
keys,
71-
parallelism: options.parallelism ?? Settings.getDefaultStateGetBulkParallelism,
89+
parallelism: options?.parallelism ?? Settings.getDefaultStateGetBulkParallelism,
7290
},
7391
});
7492

7593
return result as KeyValueType[];
7694
}
7795

78-
async delete(storeName: string, key: string): Promise<void> {
79-
await this.client.execute(`/state/${storeName}/${key}`, {
80-
method: "DELETE",
81-
});
96+
async delete(storeName: string, key: string, options?: Partial<StateDeleteOptions>): Promise<StateSaveResponseType> {
97+
const behavior = {
98+
concurrency: getStateConcurrencyValue(options?.concurrency),
99+
consistency: getStateConsistencyValue(options?.consistency),
100+
};
101+
102+
const queryParams = createHTTPQueryParam({ data: options?.metadata, type: "metadata" }, { data: behavior });
103+
104+
// Managed headers
105+
const headers: THTTPExecuteParams["headers"] = {};
106+
if (options?.etag) {
107+
headers["If-Match"] = options.etag;
108+
}
109+
110+
try {
111+
await this.client.execute(`/state/${storeName}/${key}?${queryParams}`, {
112+
method: "DELETE",
113+
headers,
114+
});
115+
} catch (e: any) {
116+
this.logger.error(`Error deleting state from store ${storeName}, error: ${e}`);
117+
return { error: e };
118+
}
119+
120+
return {};
82121
}
83122

84123
async transaction(
85124
storeName: string,
86125
operations: OperationType[] = [],
87126
metadata: IRequestMetadata | null = null,
88127
): Promise<void> {
128+
for (const op of operations) {
129+
const behavior = {
130+
consistency: getStateConsistencyValue(op?.request?.options?.consistency),
131+
concurrency: getStateConcurrencyValue(op?.request.options?.concurrency),
132+
};
133+
op.request.options = Object.assign({}, op.request.options, behavior);
134+
}
135+
89136
await this.client.execute(`/state/${storeName}/transaction`, {
90137
method: "POST",
91138
body: {

src/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import { PubSubBulkPublishMessage } from "./types/pubsub/PubSubBulkPublishMessag
3636
import HttpMethod from "./enum/HttpMethod.enum";
3737
import CommunicationProtocolEnum from "./enum/CommunicationProtocol.enum";
3838
import DaprPubSubStatusEnum from "./enum/DaprPubSubStatus.enum";
39+
import StateConcurrencyEnum from "./enum/StateConcurrency.enum";
40+
import StateConsistencyEnum from "./enum/StateConsistency.enum";
3941
import { StateGetBulkOptions } from "./types/state/StateGetBulkOptions.type";
4042

4143
export {
@@ -59,6 +61,8 @@ export {
5961
CommunicationProtocolEnum,
6062
DaprPubSubStatusEnum,
6163
PubSubBulkPublishMessage,
64+
StateConcurrencyEnum,
65+
StateConsistencyEnum,
6266
PubSubBulkPublishResponse,
6367
StateGetBulkOptions,
6468
};

src/interfaces/Client/IClientState.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import { StateQueryResponseType } from "../../types/state/StateQueryResponse.typ
2020
import { StateGetBulkOptions } from "../../types/state/StateGetBulkOptions.type";
2121
import { StateSaveResponseType } from "../../types/state/StateSaveResponseType";
2222
import { StateSaveOptions } from "../../types/state/StateSaveOptions.type";
23+
import { StateDeleteOptions } from "../../types/state/StateDeleteOptions.type";
24+
import { StateGetOptions } from "../../types/state/StateGetOptions.type";
2325

2426
export default interface IClientState {
2527
save(storeName: string, stateObjects: KeyValuePairType[], options?: StateSaveOptions): Promise<StateSaveResponseType>;
26-
get(storeName: string, key: string): Promise<KeyValueType | string>;
28+
get(storeName: string, key: string, options?: Partial<StateGetOptions>): Promise<KeyValueType | string>;
2729
getBulk(storeName: string, keys: string[], options?: StateGetBulkOptions): Promise<KeyValueType[]>;
28-
delete(storeName: string, key: string): Promise<void>;
30+
delete(storeName: string, key: string, options?: Partial<StateDeleteOptions>): Promise<StateSaveResponseType>;
2931
transaction(storeName: string, operations?: OperationType[], metadata?: IRequestMetadata | null): Promise<void>;
3032
query(storeName: string, query: StateQueryType): Promise<StateQueryResponseType>;
3133
}

src/types/KeyValuePair.type.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ limitations under the License.
1212
*/
1313

1414
import { KeyValueType } from "./KeyValue.type";
15+
import { IStateOptions } from "./state/StateOptions.type";
1516

1617
export type KeyValuePairType = {
1718
key: string;
1819
value: any;
1920
etag?: string;
2021
metadata?: KeyValueType;
21-
options?: object;
22+
options?: IStateOptions;
2223
};
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
import { IRequestMetadata } from "../RequestMetadata.type";
15+
import { IStateOptions } from "./StateOptions.type";
16+
import { KeyValuePairType } from "../KeyValuePair.type";
17+
18+
export type StateDeleteOptions = IStateOptions & {
19+
/**
20+
* Metadata to be passed to the operation.
21+
*/
22+
metadata: IRequestMetadata;
23+
24+
/**
25+
* Optional Etag for Optimistic Concurrency Control
26+
*/
27+
etag: KeyValuePairType["etag"];
28+
};
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
import { IRequestMetadata } from "../RequestMetadata.type";
15+
import { IStateOptions } from "./StateOptions.type";
16+
17+
export type StateGetOptions = Pick<IStateOptions, "consistency"> & {
18+
/**
19+
* Metadata to be passed to the operation.
20+
*/
21+
metadata: IRequestMetadata;
22+
};

src/types/state/StateOptions.type.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14-
import { EStateConsistency } from "../../enum/StateConsistency.enum";
15-
import { EStateConcurrency } from "../../enum/StateConcurrency.enum";
14+
import { StateConsistencyEnum } from "../../enum/StateConsistency.enum";
15+
import { StateConcurrencyEnum } from "../../enum/StateConcurrency.enum";
1616

1717
export type IStateOptions = {
18-
concurrency: EStateConcurrency;
19-
consistency: EStateConsistency;
18+
concurrency: StateConcurrencyEnum;
19+
consistency: StateConsistencyEnum;
2020
};

0 commit comments

Comments
 (0)