Skip to content

Commit 0bde4cc

Browse files
authored
feat: add built-in support for concurrency limiting (#559)
Inspired by fork-ts-checker-webpack-plugin-limiter plugin - thanks @vicaryCloses: #441
1 parent 0f7020d commit 0bde4cc

File tree

6 files changed

+132
-36
lines changed

6 files changed

+132
-36
lines changed

src/ForkTsCheckerWebpackPlugin.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,18 @@ import { tapAfterCompileToAddDependencies } from './hooks/tapAfterCompileToAddDe
2020
import { tapErrorToLogMessage } from './hooks/tapErrorToLogMessage';
2121
import { getForkTsCheckerWebpackPluginHooks } from './hooks/pluginHooks';
2222
import { tapAfterEnvironmentToPatchWatching } from './hooks/tapAfterEnvironmentToPatchWatching';
23+
import { createPool, Pool } from './utils/async/pool';
24+
import os from 'os';
2325

2426
class ForkTsCheckerWebpackPlugin implements webpack.Plugin {
27+
/**
28+
* Current version of the plugin
29+
*/
2530
static readonly version: string = '{{VERSION}}'; // will be replaced by the @semantic-release/exec
31+
/**
32+
* Default pool for the plugin concurrency limit
33+
*/
34+
static readonly pool: Pool = createPool(Math.max(1, os.cpus().length));
2635

2736
private readonly options: ForkTsCheckerWebpackPluginOptions;
2837

src/hooks/tapAfterCompileToGetIssues.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import webpack from 'webpack';
2-
import path from 'path';
32
import { ForkTsCheckerWebpackPluginConfiguration } from '../ForkTsCheckerWebpackPluginConfiguration';
43
import { ForkTsCheckerWebpackPluginState } from '../ForkTsCheckerWebpackPluginState';
54
import { getForkTsCheckerWebpackPluginHooks } from './pluginHooks';

src/hooks/tapDoneToAsyncGetIssues.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import webpack from 'webpack';
22
import chalk from 'chalk';
3-
import path from 'path';
43
import { ForkTsCheckerWebpackPluginConfiguration } from '../ForkTsCheckerWebpackPluginConfiguration';
54
import { ForkTsCheckerWebpackPluginState } from '../ForkTsCheckerWebpackPluginState';
65
import { getForkTsCheckerWebpackPluginHooks } from './pluginHooks';

src/hooks/tapStartToConnectAndRunReporter.ts

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { tapDoneToAsyncGetIssues } from './tapDoneToAsyncGetIssues';
88
import { tapAfterCompileToGetIssues } from './tapAfterCompileToGetIssues';
99
import { interceptDoneToGetWebpackDevServerTap } from './interceptDoneToGetWebpackDevServerTap';
1010
import { Issue } from '../issue';
11+
import { ForkTsCheckerWebpackPlugin } from '../ForkTsCheckerWebpackPlugin';
1112

1213
function tapStartToConnectAndRunReporter(
1314
compiler: webpack.Compiler,
@@ -76,40 +77,44 @@ function tapStartToConnectAndRunReporter(
7677
rejectIssues = reject;
7778
});
7879
const previousReportPromise = state.reportPromise;
79-
state.reportPromise = new Promise(async (resolve) => {
80-
change = await hooks.start.promise(change, compilation);
81-
82-
try {
83-
await reporter.connect();
84-
85-
const previousReport = await previousReportPromise;
86-
if (previousReport) {
87-
await previousReport.close();
88-
}
89-
90-
const report = await reporter.getReport(change);
91-
resolve(report);
92-
93-
report
94-
.getDependencies()
95-
.then(resolveDependencies)
96-
.catch(rejectedDependencies)
97-
.finally(() => {
98-
// get issues after dependencies are resolved as it can be blocking
99-
report.getIssues().then(resolveIssues).catch(rejectIssues);
100-
});
101-
} catch (error) {
102-
if (error instanceof OperationCanceledError) {
103-
hooks.canceled.call(compilation);
104-
} else {
105-
hooks.error.call(error, compilation);
106-
}
107-
108-
resolve(undefined);
109-
resolveDependencies(undefined);
110-
resolveIssues(undefined);
111-
}
112-
});
80+
state.reportPromise = ForkTsCheckerWebpackPlugin.pool.submit(
81+
(done) =>
82+
new Promise(async (resolve) => {
83+
change = await hooks.start.promise(change, compilation);
84+
85+
try {
86+
await reporter.connect();
87+
88+
const previousReport = await previousReportPromise;
89+
if (previousReport) {
90+
await previousReport.close();
91+
}
92+
93+
const report = await reporter.getReport(change);
94+
resolve(report);
95+
96+
report
97+
.getDependencies()
98+
.then(resolveDependencies)
99+
.catch(rejectedDependencies)
100+
.finally(() => {
101+
// get issues after dependencies are resolved as it can be blocking
102+
report.getIssues().then(resolveIssues).catch(rejectIssues).finally(done);
103+
});
104+
} catch (error) {
105+
if (error instanceof OperationCanceledError) {
106+
hooks.canceled.call(compilation);
107+
} else {
108+
hooks.error.call(error, compilation);
109+
}
110+
111+
resolve(undefined);
112+
resolveDependencies(undefined);
113+
resolveIssues(undefined);
114+
done();
115+
}
116+
})
117+
);
113118
});
114119
}
115120

src/utils/async/pool.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// provide done callback because our promise chain is a little bit complicated
2+
type Task<T> = (done: () => void) => Promise<T>;
3+
4+
interface Pool {
5+
submit<T>(task: Task<T>): Promise<T>;
6+
size: number;
7+
readonly pending: number;
8+
}
9+
10+
function createPool(size: number): Pool {
11+
let pendingPromises: Promise<unknown>[] = [];
12+
13+
const pool = {
14+
async submit<T>(task: Task<T>): Promise<T> {
15+
while (pendingPromises.length >= pool.size) {
16+
await Promise.race(pendingPromises).catch(() => undefined);
17+
}
18+
19+
let resolve: (result: T) => void;
20+
let reject: (error: Error) => void;
21+
const taskPromise = new Promise<T>((taskResolve, taskReject) => {
22+
resolve = taskResolve;
23+
reject = taskReject;
24+
});
25+
26+
const donePromise = new Promise((doneResolve) => {
27+
task(() => {
28+
doneResolve(undefined);
29+
pendingPromises = pendingPromises.filter(
30+
(pendingPromise) => pendingPromise !== donePromise
31+
);
32+
})
33+
.then(resolve)
34+
.catch(reject);
35+
});
36+
pendingPromises.push(donePromise);
37+
38+
return taskPromise;
39+
},
40+
size,
41+
get pending() {
42+
return pendingPromises.length;
43+
},
44+
};
45+
46+
return pool;
47+
}
48+
49+
export { Pool, createPool };

test/unit/utils/async/pool.spec.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { createPool } from 'lib/utils/async/pool';
2+
3+
describe('createPool', () => {
4+
it('creates new pool', () => {
5+
const pool = createPool(10);
6+
expect(pool).toBeDefined();
7+
expect(pool.size).toEqual(10);
8+
expect(pool.pending).toEqual(0);
9+
expect(pool.submit).toBeInstanceOf(Function);
10+
});
11+
12+
it('limits concurrency', async () => {
13+
const pool = createPool(2);
14+
const shortTask = jest.fn(async (done: () => void) => {
15+
setTimeout(done, 10);
16+
});
17+
const longTask = jest.fn(async (done: () => void) => {
18+
setTimeout(done, 10000);
19+
});
20+
21+
pool.submit(shortTask);
22+
pool.submit(shortTask);
23+
pool.submit(longTask);
24+
pool.submit(longTask);
25+
pool.submit(longTask);
26+
27+
expect(shortTask).toHaveBeenCalledTimes(2);
28+
expect(longTask).toHaveBeenCalledTimes(0);
29+
30+
await new Promise((resolve) => setTimeout(resolve, 200));
31+
32+
expect(shortTask).toHaveBeenCalledTimes(2);
33+
expect(longTask).toHaveBeenCalledTimes(2);
34+
});
35+
});

0 commit comments

Comments
 (0)