Skip to content

Commit a06edbc

Browse files
Add config API unsubscribe
Signed-off-by: Xavier Geerinck <[email protected]>
1 parent e2046e8 commit a06edbc

File tree

5 files changed

+86
-16
lines changed

5 files changed

+86
-16
lines changed

src/implementation/Client/GRPCClient/configuration.ts

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@ limitations under the License.
1313

1414
import GRPCClient from './GRPCClient';
1515
import * as grpc from "@grpc/grpc-js";
16-
import { GetConfigurationRequest, GetConfigurationResponse, SubscribeConfigurationRequest, SubscribeConfigurationResponse } from '../../../proto/dapr/proto/runtime/v1/dapr_pb';
16+
import { GetConfigurationRequest, GetConfigurationResponse, SubscribeConfigurationRequest, SubscribeConfigurationResponse, UnsubscribeConfigurationRequest, UnsubscribeConfigurationResponse } from '../../../proto/dapr/proto/runtime/v1/dapr_pb';
1717
import IClientConfiguration from '../../../interfaces/Client/IClientConfiguration';
1818
import { KeyValueType } from '../../../types/KeyValue.type';
1919
import { GetConfigurationResponse as GetConfigurationResponseResult } from '../../../types/configuration/GetConfigurationResponse';
2020
import { SubscribeConfigurationResponse as SubscribeConfigurationResponseResult } from '../../../types/configuration/SubscribeConfigurationResponse';
2121
import { SubscribeConfigurationCallback } from '../../../types/configuration/SubscribeConfigurationCallback';
22+
import { SubscribeConfigurationStream } from '../../../types/configuration/SubscribeConfigurationStream';
2223

2324
export default class GRPCClientConfiguration implements IClientConfiguration {
2425
client: GRPCClient;
@@ -68,19 +69,19 @@ export default class GRPCClientConfiguration implements IClientConfiguration {
6869
});
6970
}
7071

71-
async subscribe(storeName: string, cb: SubscribeConfigurationCallback): Promise<void> {
72+
async subscribe(storeName: string, cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream> {
7273
return this._subscribe(storeName, cb)
7374
}
7475

75-
async subscribeWithKeys(storeName: string, keys: string[], cb: SubscribeConfigurationCallback): Promise<void> {
76+
async subscribeWithKeys(storeName: string, keys: string[], cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream> {
7677
return this._subscribe(storeName, cb, keys)
7778
}
7879

79-
async subscribeWithMetadata(storeName: string, keys: string[], metadata: KeyValueType, cb: SubscribeConfigurationCallback): Promise<void> {
80+
async subscribeWithMetadata(storeName: string, keys: string[], metadata: KeyValueType, cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream> {
8081
return this._subscribe(storeName, cb, keys, metadata)
8182
}
8283

83-
async _subscribe(storeName: string, cb: SubscribeConfigurationCallback, keys?: string[], metadataObj?: KeyValueType): Promise<void> {
84+
async _subscribe(storeName: string, cb: SubscribeConfigurationCallback, keys?: string[], metadataObj?: KeyValueType): Promise<SubscribeConfigurationStream> {
8485
const metadata = new grpc.Metadata();
8586

8687
const msg = new SubscribeConfigurationRequest();
@@ -105,8 +106,11 @@ export default class GRPCClientConfiguration implements IClientConfiguration {
105106
// we will thus create a set with our listeners so we don't
106107
// break on multi listeners
107108
const stream = client.subscribeConfigurationAlpha1(msg, metadata);
109+
let streamId: string;
108110

109111
stream.on("data", async (data: SubscribeConfigurationResponse) => {
112+
streamId = data.getId();
113+
110114
const wrapped: SubscribeConfigurationResponseResult = {
111115
items: data.getItemsList().map((item) => ({
112116
key: item.getKey(),
@@ -122,6 +126,27 @@ export default class GRPCClientConfiguration implements IClientConfiguration {
122126

123127
await cb(wrapped);
124128
});
125-
}
126129

130+
return {
131+
stop: async () => {
132+
return new Promise((resolve, reject) => {
133+
const req = new UnsubscribeConfigurationRequest();
134+
req.setStoreName(storeName);
135+
req.setId(streamId);
136+
137+
client.unsubscribeConfigurationAlpha1(req, (err, res: UnsubscribeConfigurationResponse) => {
138+
if (err || !res.getOk()) {
139+
return reject(res.getMessage());
140+
}
141+
142+
// Clean up the node.js event emitter
143+
stream.removeAllListeners();
144+
stream.destroy();
145+
146+
return resolve();
147+
});
148+
})
149+
}
150+
};
151+
}
127152
}

src/implementation/Client/HTTPClient/configuration.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { KeyValueType } from '../../../types/KeyValue.type';
1616
import { GetConfigurationResponse as GetConfigurationResponseResult } from '../../../types/configuration/GetConfigurationResponse';
1717
import HTTPClient from './HTTPClient';
1818
import { SubscribeConfigurationCallback } from '../../../types/configuration/SubscribeConfigurationCallback';
19+
import { SubscribeConfigurationStream } from '../../../types/configuration/SubscribeConfigurationStream';
1920

2021
export default class HTTPClientConfiguration implements IClientConfiguration {
2122
client: HTTPClient;
@@ -24,15 +25,15 @@ export default class HTTPClientConfiguration implements IClientConfiguration {
2425
this.client = client;
2526
}
2627

27-
async subscribe(_storeName: string, _cb: SubscribeConfigurationCallback): Promise<void> {
28+
async subscribe(_storeName: string, _cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream> {
2829
throw new Error('HTTP is currently not supported.');
2930
}
3031

31-
async subscribeWithKeys(_storeName: string, _keys: string[], _cb: SubscribeConfigurationCallback): Promise<void> {
32+
async subscribeWithKeys(_storeName: string, _keys: string[], _cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream> {
3233
throw new Error('HTTP is currently not supported.');
3334
}
3435

35-
async subscribeWithMetadata(_storeName: string, _keys: string[], _metadata: KeyValueType, _cb: SubscribeConfigurationCallback): Promise<void> {
36+
async subscribeWithMetadata(_storeName: string, _keys: string[], _metadata: KeyValueType, _cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream> {
3637
throw new Error('HTTP is currently not supported.');
3738
}
3839

src/interfaces/Client/IClientConfiguration.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@ limitations under the License.
1313

1414
import { GetConfigurationResponse } from "../../types/configuration/GetConfigurationResponse";
1515
import { SubscribeConfigurationCallback } from "../../types/configuration/SubscribeConfigurationCallback";
16+
import { SubscribeConfigurationStream } from "../../types/configuration/SubscribeConfigurationStream";
1617
import { KeyValueType } from "../../types/KeyValue.type";
1718

1819
export default interface IClientConfiguration {
1920
// https://github.com/dapr/dapr/blob/master/dapr/proto/runtime/v1/dapr.proto#L90
2021
get(storeName: string, keys?: string[], metadata?: KeyValueType): Promise<GetConfigurationResponse>;
21-
subscribe(storeName: string, cb: SubscribeConfigurationCallback): Promise<void>;
22-
subscribeWithKeys(storeName: string, keys: string[], cb: SubscribeConfigurationCallback): Promise<void>;
23-
subscribeWithMetadata(storeName: string, keys: string[], metadata: KeyValueType, cb: SubscribeConfigurationCallback): Promise<void>;
22+
subscribe(storeName: string, cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream>;
23+
subscribeWithKeys(storeName: string, keys: string[], cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream>;
24+
subscribeWithMetadata(storeName: string, keys: string[], metadata: KeyValueType, cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream>;
2425
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
Copyright 2022 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
export type SubscribeConfigurationStream = {
15+
stop: () => void;
16+
}

test/e2e/main.grpc.test.ts

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,31 +317,55 @@ describe('grpc/main', () => {
317317
it('should be able to subscribe to configuration item changes on specific keys', async () => {
318318
const m = jest.fn(async (_res: SubscribeConfigurationResponse) => { return; });
319319

320-
await client.configuration.subscribeWithKeys("config-redis", ["myconfigkey1", "myconfigkey2"], m);
320+
const stream1 = await client.configuration.subscribeWithKeys("config-redis", ["myconfigkey1", "myconfigkey2"], m);
321321
await DockerUtils.executeDockerCommand("dapr_redis redis-cli MSET myconfigkey1 key1_mynewvalue||1");
322322

323323
expect(m.mock.calls.length).toEqual(1);
324324
expect(m.mock.calls[0][0].items[0].key).toEqual("myconfigkey1");
325325
expect(m.mock.calls[0][0].items[0].value).toEqual("key1_mynewvalue");
326+
327+
await stream1.stop();
326328
});
327329

328330
it('should be able to subscribe with metadata', async () => {
329331
const m = jest.fn(async (_res: SubscribeConfigurationResponse) => { return; });
330332

331-
await client.configuration.subscribeWithMetadata("config-redis", ["myconfigkey1", "myconfigkey2"], { "hello": "world" }, m);
333+
const stream1 = await client.configuration.subscribeWithMetadata("config-redis", ["myconfigkey1", "myconfigkey2"], { "hello": "world" }, m);
334+
await DockerUtils.executeDockerCommand("dapr_redis redis-cli MSET myconfigkey1 key1_mynewvalue||1");
335+
336+
expect(m.mock.calls.length).toEqual(1);
337+
expect(m.mock.calls[0][0].items[0].key).toEqual("myconfigkey1");
338+
expect(m.mock.calls[0][0].items[0].value).toEqual("key1_mynewvalue");
339+
340+
await stream1.stop();
341+
});
342+
343+
it('should be able to unsubscribe', async () => {
344+
const m = jest.fn(async (_res: SubscribeConfigurationResponse) => { return; });
345+
346+
const stream1 = await client.configuration.subscribeWithMetadata("config-redis", ["myconfigkey1", "myconfigkey2"], { "hello": "world" }, m);
332347
await DockerUtils.executeDockerCommand("dapr_redis redis-cli MSET myconfigkey1 key1_mynewvalue||1");
333348

334349
expect(m.mock.calls.length).toEqual(1);
335350
expect(m.mock.calls[0][0].items[0].key).toEqual("myconfigkey1");
336351
expect(m.mock.calls[0][0].items[0].value).toEqual("key1_mynewvalue");
352+
353+
await stream1.stop();
354+
355+
await DockerUtils.executeDockerCommand("dapr_redis redis-cli MSET myconfigkey1 key1_mynewvalue2||1");
356+
357+
// Expect no change after stop
358+
expect(m.mock.calls.length).toEqual(1);
359+
expect(m.mock.calls[0][0].items[0].key).toEqual("myconfigkey1");
360+
expect(m.mock.calls[0][0].items[0].value).toEqual("key1_mynewvalue");
337361
});
338362

339363
it('should be able to subscribe to configuration items through multiple streams', async () => {
340364
const m1 = jest.fn(async (_res: SubscribeConfigurationResponse) => { return; });
341365
const m2 = jest.fn(async (_res: SubscribeConfigurationResponse) => { return; });
342366

343-
await client.configuration.subscribeWithKeys("config-redis", ["myconfigkey1"], m1);
344-
await client.configuration.subscribeWithKeys("config-redis", ["myconfigkey1"], m2);
367+
const stream1 = await client.configuration.subscribeWithKeys("config-redis", ["myconfigkey1"], m1);
368+
const stream2 = await client.configuration.subscribeWithKeys("config-redis", ["myconfigkey1"], m2);
345369

346370
await DockerUtils.executeDockerCommand("dapr_redis redis-cli MSET myconfigkey1 key1_mynewvalue||1");
347371

@@ -352,6 +376,9 @@ describe('grpc/main', () => {
352376
expect(m2.mock.calls.length).toEqual(1);
353377
expect(m2.mock.calls[0][0].items[0].key).toEqual("myconfigkey1");
354378
expect(m2.mock.calls[0][0].items[0].value).toEqual("key1_mynewvalue");
379+
380+
await stream1.stop();
381+
await stream2.stop();
355382
});
356383
});
357384

0 commit comments

Comments
 (0)