Skip to content

Commit fccca12

Browse files
authored
feat(publish): enable throttling when publishing modules (lerna#4013)
1 parent 10fcb3a commit fccca12

File tree

5 files changed

+174
-1
lines changed

5 files changed

+174
-1
lines changed

libs/commands/publish/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ This is useful when a previous `lerna publish` failed to publish all packages to
7070
- [`--registry <url>`](#--registry-url)
7171
- [`--tag-version-prefix`](#--tag-version-prefix)
7272
- [`--temp-tag`](#--temp-tag)
73+
- [`--throttle`](#--throttle)
7374
- [`--yes`](#--yes)
7475
- [`--summary-file <dir>`](#--summary-file)
7576

@@ -302,6 +303,20 @@ new version(s) to the dist-tag configured by [`--dist-tag`](#--dist-tag-tag) (de
302303
This is not generally necessary, as Lerna will publish packages in topological
303304
order (all dependencies before dependents) by default.
304305

306+
### `--throttle`
307+
308+
This option class allows to throttle the timing at which modules are published to the configured registry.
309+
310+
- `--throttle`: Enable throttling when publishing modules
311+
- `--throttle-size`: The amount of modules that may be published at once (defaults to `25`)
312+
- `--throttle-delay`: How long to wait after a module was successfully published (defaults to 30 seconds)
313+
314+
This is usefull to avoid errors/retries when publishing to rate-limited repositories on huge monorepos:
315+
316+
```bash
317+
lerna publish from-git --throttle --throttle-delay=$((3600*24))
318+
```
319+
305320
### `--yes`
306321

307322
```sh

libs/commands/publish/src/command.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,18 @@ const command: CommandModule = {
107107
describe: "Create a temporary tag while publishing.",
108108
type: "boolean",
109109
},
110+
throttle: {
111+
describe: "Throttle module publication. This is implicit if a throttle size or delay is provided",
112+
type: "boolean",
113+
},
114+
"throttle-size": {
115+
describe: "Bucket size used to throttle module publication.",
116+
type: "number",
117+
},
118+
"throttle-delay": {
119+
describe: "Delay between throttle bucket items publications (in seconds).",
120+
type: "number",
121+
},
110122
"no-verify-access": {
111123
// proxy for --verify-access
112124
describe: "Do not verify package read-write access for current npm user.",

libs/commands/publish/src/index.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import { getTwoFactorAuthRequired } from "./lib/get-two-factor-auth-required";
5151
import { gitCheckout } from "./lib/git-checkout";
5252
import { interpolate } from "./lib/interpolate";
5353
import { removeTempLicenses } from "./lib/remove-temp-licenses";
54+
import { Queue, TailHeadQueue } from "./lib/throttle-queue";
5455
import { verifyNpmPackageAccess } from "./lib/verify-npm-package-access";
5556

5657
module.exports = function factory(argv: Arguments<PublishCommandConfigOptions>) {
@@ -87,6 +88,9 @@ interface PublishCommandConfigOptions extends CommandConfigOptions {
8788
rejectCycles?: boolean;
8889
distTag?: string;
8990
preDistTag?: string;
91+
throttle?: boolean;
92+
throttleSize?: number;
93+
throttleDelay?: number;
9094
}
9195

9296
class PublishCommand extends Command {
@@ -973,14 +977,29 @@ class PublishCommand extends Command {
973977
};
974978
process.on("log", logListener);
975979

980+
let queue: Queue | undefined = undefined;
981+
if (this.options.throttle) {
982+
const DEFAULT_QUEUE_THROTTLE_SIZE = 25;
983+
const DEFAULT_QUEUE_THROTTLE_DELAY = 30;
984+
queue = new TailHeadQueue(
985+
this.options.throttleSize !== undefined ? this.options.throttleSize : DEFAULT_QUEUE_THROTTLE_SIZE,
986+
(this.options.throttleDelay !== undefined
987+
? this.options.throttleDelay
988+
: DEFAULT_QUEUE_THROTTLE_DELAY) * 1000
989+
);
990+
}
976991
const mapper = pPipe(
977992
...[
978993
(pkg: Package) => {
979994
const preDistTag = this.getPreDistTag(pkg);
980995
const tag = !this.options.tempTag && preDistTag ? preDistTag : opts.tag;
981996
const pkgOpts = Object.assign({}, opts, { tag });
982997

983-
return pulseTillDone(npmPublish(pkg, pkg.packed.tarFilePath, pkgOpts, this.otpCache))
998+
return pulseTillDone(
999+
queue
1000+
? queue.queue(() => npmPublish(pkg, pkg.packed.tarFilePath, pkgOpts, this.otpCache))
1001+
: npmPublish(pkg, pkg.packed.tarFilePath, pkgOpts, this.otpCache)
1002+
)
9841003
.then(() => {
9851004
this.publishedPackages.push(pkg);
9861005

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import * as throttling from "./throttle-queue";
2+
3+
describe("verifyTailHeadQueueBehavior", () => {
4+
test("immediately runs all provided resolving promises within queue size", async () => {
5+
const count = 100;
6+
const queue = new throttling.TailHeadQueue(count, 1000);
7+
const queue_promises = Array(count)
8+
.fill(undefined)
9+
.map(() => queue.queue(async () => Date.now()));
10+
const acc = await Promise.all(queue_promises);
11+
expect(acc.length).toBe(count);
12+
acc.sort();
13+
expect(acc[count - 1] - acc[0]).toBeLessThan(100);
14+
});
15+
test("immediately runs all provided rejecting promises within queue size", async () => {
16+
const count = 100;
17+
const queue = new throttling.TailHeadQueue(count, 1000);
18+
const queue_promises = Array(count)
19+
.fill(undefined)
20+
.map(() => queue.queue(async () => new Promise((_, r) => r(Date.now()))));
21+
const acc = await Promise.allSettled(queue_promises);
22+
expect(acc.length).toBe(count);
23+
const resolved_sorted_acc = acc
24+
.map((r) => {
25+
expect(r.status).toBe("rejected");
26+
if (r.status === "rejected") {
27+
return r.reason;
28+
}
29+
})
30+
.sort();
31+
expect(resolved_sorted_acc[count - 1] - resolved_sorted_acc[0]).toBeLessThan(100);
32+
});
33+
test("runs all provided resolving promises with a delay if they exceed queue size", async () => {
34+
const count = 100;
35+
const queue = new throttling.TailHeadQueue(count / 3 + 1, 2 * 1000);
36+
const queue_promises = Array(count)
37+
.fill(undefined)
38+
.map(() => queue.queue(async () => Date.now()));
39+
const acc = await Promise.all(queue_promises);
40+
expect(acc.length).toBe(count);
41+
acc.sort();
42+
const total_time = acc[count - 1] - acc[0];
43+
expect(total_time).toBeGreaterThan(3.8 * 1000);
44+
expect(total_time).toBeLessThan(4.2 * 1000);
45+
});
46+
});
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* A queue abstraction for tasks that may require a delayed execution
3+
*/
4+
export interface Queue {
5+
/**
6+
* Run a task once the queue is cleared
7+
* @param f A function that will be executed once the queue is useable
8+
* @return A promise that wraps the value returned by f
9+
*/
10+
queue<T>(f: () => Promise<T>): Promise<T>;
11+
}
12+
13+
/**
14+
* A sized queue that adds a delay between the end of an item's execution.
15+
*
16+
* Use only with async code, multi-threading is not supported.
17+
*/
18+
export class TailHeadQueue implements Queue {
19+
private queue_list: ((v: any) => any)[];
20+
private queue_size: number;
21+
private queue_period: number;
22+
private allowance: number;
23+
private last_end: number[];
24+
25+
/**
26+
* @param size The number of items that may run concurrently
27+
* @param period The time between the end of the execution of an item and the start of the execution of the next one (ms)
28+
*/
29+
constructor(size, period) {
30+
this.queue_list = [];
31+
this.queue_size = Math.floor(size);
32+
this.queue_period = period;
33+
this.allowance = this.queue_size;
34+
this.last_end = [];
35+
}
36+
37+
/**
38+
* Validate the execution of a queue item and schedule the execution of the next one
39+
*/
40+
_on_settled() {
41+
const next = this.queue_list.shift();
42+
if (next !== undefined) {
43+
setTimeout(next, this.queue_period);
44+
} else {
45+
// If we ever reach this point and THEN a new item is queued, we need to
46+
// explicitly keep track of execution times to be able to add a delay.
47+
//
48+
// We can't simply wrap this in a setTimeout as it would add a flat
49+
// this.queue_period delay to lerna's execution end after the last item
50+
// in the queue is processed.
51+
this.last_end.push(Date.now());
52+
this.allowance += 1;
53+
}
54+
}
55+
56+
async queue<T>(f: () => Promise<T>): Promise<T> {
57+
let p: Promise<T>;
58+
if (this.allowance > 0) {
59+
this.allowance -= 1;
60+
// Check if the queue should delay the promise's execution
61+
if (this.allowance + 1 <= this.last_end.length) {
62+
const time_offset = Date.now() - (this.last_end.shift() || 0);
63+
if (time_offset < this.queue_period) {
64+
p = new Promise((r) => setTimeout(r, this.queue_period - time_offset)).then(f);
65+
}
66+
}
67+
if (p === undefined) {
68+
p = f();
69+
}
70+
} else {
71+
p = new Promise((r) => {
72+
this.queue_list.push(r);
73+
}).then(f);
74+
}
75+
return p.finally(() => {
76+
// Don't wait on _on_settled as it is for the queue's continuation
77+
// and should not delay processing of the queued item's result.
78+
this._on_settled();
79+
});
80+
}
81+
}

0 commit comments

Comments
 (0)