Skip to content

Commit 53a6ed9

Browse files
authored
Fb/exclusive init (#52)
* serialize init so that double execution is really avoided * start of semaphore.test.js * makeExclusiveQueuing and makeExclusiveReturning * add semaphore test * semaphore test 2 * semaphore test 3 * semaphore test 4
1 parent 4041862 commit 53a6ed9

File tree

5 files changed

+162
-11
lines changed

5 files changed

+162
-11
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"index.cds"
1010
],
1111
"scripts": {
12+
"postinstall": "npx patch-package",
1213
"test": "jest",
1314
"test:remove-inline-snapshots": "npx replace '\\.toMatchInlineSnapshot\\(\\s*`[\\s\\S]*?`\\s*\\);' '.toMatchInlineSnapshot();' test -r --include='*.test.js'",
1415
"lint": "npm run prettier && npm run eslint",

src/featureToggles.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const { HandlerCollection } = require("./shared/handlerCollection");
2626
const { ENV, isObject, tryRequire } = require("./shared/static");
2727
const { promiseAllDone } = require("./shared/promiseAllDone");
2828
const { LimitedLazyCache } = require("./shared/cache");
29+
const { Semaphore } = require("./shared/semaphore");
2930

3031
const ENV_UNIQUE_NAME = process.env[ENV.UNIQUE_NAME];
3132
const DEFAULT_REDIS_CHANNEL = process.env[ENV.REDIS_CHANNEL] || "features";
@@ -644,7 +645,7 @@ class FeatureToggles {
644645
* Initialize needs to run and finish before other APIs are called. It processes the configuration, sets up
645646
* related internal state, and starts communication with redis.
646647
*/
647-
async initializeFeatures({ config: configInput, configFile: configFilepath = DEFAULT_CONFIG_FILEPATH }) {
648+
async _initializeFeatures({ config: configInput, configFile: configFilepath = DEFAULT_CONFIG_FILEPATH } = {}) {
648649
if (this.__isInitialized) {
649650
return;
650651
}
@@ -760,6 +761,10 @@ class FeatureToggles {
760761
return this;
761762
}
762763

764+
async initializeFeatures(options) {
765+
return await Semaphore.makeExclusiveReturning(this._initializeFeatures.bind(this))(options);
766+
}
767+
763768
// ========================================
764769
// END OF INITIALIZE SECTION
765770
// ========================================

src/redisWrapper.js

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ const INTEGRATION_MODE = Object.freeze({
2222
const CF_REDIS_SERVICE_LABEL = "redis-cache";
2323

2424
const logger = new Logger(COMPONENT_NAME);
25-
const watchedGetSetSemaphore = new Semaphore();
2625

2726
const MODE = Object.freeze({
2827
RAW: "raw",
@@ -300,15 +299,6 @@ const setObject = async (key, value, options) => {
300299
*/
301300
const del = async (key) => await _clientExec("DEL", { key });
302301

303-
const _watchedGetSetExclusive = async (key, newValueCallback, options) => {
304-
await watchedGetSetSemaphore.acquire();
305-
try {
306-
return await _watchedGetSet(key, newValueCallback, options);
307-
} finally {
308-
watchedGetSetSemaphore.release();
309-
}
310-
};
311-
312302
const _watchedGetSet = async (key, newValueCallback, { field, mode = MODE.OBJECT, attempts = 10 } = {}) => {
313303
const useHash = field !== undefined;
314304
if (!mainClient) {
@@ -373,6 +363,8 @@ const _watchedGetSet = async (key, newValueCallback, { field, mode = MODE.OBJECT
373363
);
374364
};
375365

366+
const _watchedGetSetExclusive = Semaphore.makeExclusiveQueuing(_watchedGetSet);
367+
376368
/**
377369
* @callback NewValueCallback
378370
* @see watchedGetSet

src/shared/semaphore.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,41 @@ class Semaphore {
5353
this.resolveCurrentSemaphore();
5454
}
5555
}
56+
57+
/**
58+
* Take an async function and turn it into an exclusively executing async function. Calls during async execution will
59+
* be queued and executed serially.
60+
*/
61+
static makeExclusiveQueuing(cb) {
62+
const semaphore = new Semaphore();
63+
return async (...args) => {
64+
await semaphore.acquire();
65+
try {
66+
return await cb(...args);
67+
} finally {
68+
semaphore.release();
69+
}
70+
};
71+
}
72+
73+
/**
74+
* Take an async function and turn it into an exclusively executing async function. Calls during async execution will
75+
* be returned.
76+
*/
77+
static makeExclusiveReturning(cb) {
78+
let isRunning;
79+
return async (...args) => {
80+
if (isRunning) {
81+
return;
82+
}
83+
isRunning = true;
84+
try {
85+
return await cb(...args);
86+
} finally {
87+
isRunning = false;
88+
}
89+
};
90+
}
5691
}
5792

5893
module.exports = { Semaphore };

test/shared/semaphore.test.js

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"use strict";
2+
const { promisify, format } = require("util");
3+
4+
const { Semaphore } = require("../../src/shared/semaphore");
5+
6+
const sleep = promisify(setTimeout);
7+
8+
describe("semaphore", () => {
9+
let executionLog;
10+
const n = 3;
11+
const m = 2;
12+
const result = "finished";
13+
const runner = async (index) => {
14+
executionLog.push(format("started %d", index));
15+
await sleep(10);
16+
executionLog.push(format("finished %d", index));
17+
return result;
18+
};
19+
20+
beforeEach(() => {
21+
executionLog = [];
22+
});
23+
24+
test("non-exclusive", async () => {
25+
const resultsPrimary = await Promise.all(Array.from({ length: n }, (_, i) => runner(i + 1)));
26+
27+
const resultsSecondary = await Promise.all(Array.from({ length: m }, (_, i) => runner(n + i + 1)));
28+
expect(resultsPrimary).toMatchInlineSnapshot(`
29+
[
30+
"finished",
31+
"finished",
32+
"finished",
33+
]
34+
`);
35+
expect(resultsSecondary).toMatchInlineSnapshot(`
36+
[
37+
"finished",
38+
"finished",
39+
]
40+
`);
41+
expect(executionLog).toMatchInlineSnapshot(`
42+
[
43+
"started 1",
44+
"started 2",
45+
"started 3",
46+
"finished 1",
47+
"finished 2",
48+
"finished 3",
49+
"started 4",
50+
"started 5",
51+
"finished 4",
52+
"finished 5",
53+
]
54+
`);
55+
});
56+
57+
test("exclusive queueing", async () => {
58+
const exclusiveRunner = Semaphore.makeExclusiveQueuing(runner);
59+
const resultsPrimary = await Promise.all(Array.from({ length: n }, (_, i) => exclusiveRunner(i + 1)));
60+
61+
const resultsSecondary = await Promise.all(Array.from({ length: m }, (_, i) => exclusiveRunner(n + i + 1)));
62+
expect(resultsPrimary).toMatchInlineSnapshot(`
63+
[
64+
"finished",
65+
"finished",
66+
"finished",
67+
]
68+
`);
69+
expect(resultsSecondary).toMatchInlineSnapshot(`
70+
[
71+
"finished",
72+
"finished",
73+
]
74+
`);
75+
expect(executionLog).toMatchInlineSnapshot(`
76+
[
77+
"started 1",
78+
"finished 1",
79+
"started 2",
80+
"finished 2",
81+
"started 3",
82+
"finished 3",
83+
"started 4",
84+
"finished 4",
85+
"started 5",
86+
"finished 5",
87+
]
88+
`);
89+
});
90+
91+
test("exclusive returning", async () => {
92+
const exclusiveRunner = Semaphore.makeExclusiveReturning(runner);
93+
const resultsPrimary = await Promise.all(Array.from({ length: n }, (_, i) => exclusiveRunner(i + 1)));
94+
95+
const resultsSecondary = await Promise.all(Array.from({ length: m }, (_, i) => exclusiveRunner(n + i + 1)));
96+
expect(resultsPrimary).toMatchInlineSnapshot(`
97+
[
98+
"finished",
99+
undefined,
100+
undefined,
101+
]
102+
`);
103+
expect(resultsSecondary).toMatchInlineSnapshot(`
104+
[
105+
"finished",
106+
undefined,
107+
]
108+
`);
109+
expect(executionLog).toMatchInlineSnapshot(`
110+
[
111+
"started 1",
112+
"finished 1",
113+
"started 4",
114+
"finished 4",
115+
]
116+
`);
117+
});
118+
});

0 commit comments

Comments
 (0)