Skip to content

Commit 921468c

Browse files
authored
Implement cloneConcurrency config (#258)
Add new acceptance test that did not exist before for concurrency behavior in git clone flow.
1 parent 0724471 commit 921468c

File tree

5 files changed

+204
-1
lines changed

5 files changed

+204
-1
lines changed

packages/zpm-config/schema.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
},
1515
"default": ["main", "master"]
1616
},
17+
"cloneConcurrency": {
18+
"type": "usize",
19+
"description": "The number of concurrent git clone operations Yarn can perform",
20+
"default": 2
21+
},
1722
"compressionLevel": {
1823
"type": ["zpm_formats::CompressionAlgorithm", "null"],
1924
"description": "The compression level to use for the packed file"

packages/zpm/src/git.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,12 @@ pub async fn clone_repository(context: &InstallContext<'_>, source: &GitSource,
265265
return Ok(clone_dir);
266266
}
267267

268+
let _clone_permit
269+
= project.clone_limiter
270+
.acquire()
271+
.await
272+
.expect("The clone limiter semaphore should not be closed");
273+
268274
git_clone_into(source, commit, &clone_dir, &project.http_client.config).await?;
269275
Ok(clone_dir)
270276
}

packages/zpm/src/project.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pub struct Project {
7373
pub last_modified_at: LastModifiedAt,
7474
pub install_state: Option<InstallState>,
7575
pub http_client: std::sync::Arc<HttpClient>,
76+
pub clone_limiter: std::sync::Arc<tokio::sync::Semaphore>,
7677
}
7778

7879
impl Project {
@@ -144,6 +145,16 @@ impl Project {
144145
config.settings.enable_global_cache.source = config.settings.enable_migration_mode.source;
145146
}
146147

148+
let clone_concurrency
149+
= config.settings.clone_concurrency.value;
150+
151+
if clone_concurrency == 0 {
152+
return Err(Error::InvalidConfigValue("cloneConcurrency".to_string(), "must be >= 1".to_string()));
153+
}
154+
155+
let clone_limiter
156+
= Arc::new(tokio::sync::Semaphore::new(clone_concurrency));
157+
147158
let root_workspace
148159
= Workspace::from_root_path(&project_cwd)?;
149160

@@ -181,6 +192,7 @@ impl Project {
181192
last_modified_at,
182193
install_state: None,
183194
http_client,
195+
clone_limiter,
184196
})
185197
}
186198

tests/acceptance-tests/pkg-tests-specs/sources/commands/config/get.test.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,37 @@ describe(`Commands`, () => {
4343
}),
4444
);
4545

46+
test(
47+
`it should print cloneConcurrency default value`,
48+
makeTemporaryEnv({}, async ({path, run, source}) => {
49+
await expect(run(`config`, `get`, `--json`, `cloneConcurrency`)).resolves.toMatchObject({
50+
stdout: `2\n`,
51+
});
52+
}),
53+
);
54+
55+
test(
56+
`it should print cloneConcurrency configured value`,
57+
makeTemporaryEnv({}, async ({path, run, source}) => {
58+
await xfs.writeFilePromise(`${path}/.yarnrc.yml`, `cloneConcurrency: 7\n`);
59+
60+
await expect(run(`config`, `get`, `--json`, `cloneConcurrency`)).resolves.toMatchObject({
61+
stdout: `7\n`,
62+
});
63+
}),
64+
);
65+
66+
test(
67+
`it should reject cloneConcurrency lower than 1`,
68+
makeTemporaryEnv({}, async ({path, run, source}) => {
69+
await xfs.writeFilePromise(`${path}/.yarnrc.yml`, `cloneConcurrency: 0\n`);
70+
71+
await expect(run(`config`, `get`, `cloneConcurrency`)).rejects.toMatchObject({
72+
stdout: expect.stringContaining(`Invalid config value for cloneConcurrency (must be >= 1)`),
73+
});
74+
}),
75+
);
76+
4677
test(
4778
`it should support printing sub-keys`,
4879
makeTemporaryEnv({}, async ({path, run, source}) => {

tests/acceptance-tests/pkg-tests-specs/sources/protocols/git.test.ts

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {execUtils, semverUtils} from '@yarnpkg/core';
2-
import {npath} from '@yarnpkg/fslib';
2+
import {npath, ppath, xfs} from '@yarnpkg/fslib';
33
import {tests} from 'pkg-tests-core';
44

55
const TESTED_URLS = {
@@ -21,6 +21,110 @@ const TESTED_URLS = {
2121
[`https://github.com/yarnpkg/util-deprecate.git#b3562c2798507869edb767da869cd7b85487726d`]: {version: `1.0.0`, runOnCI: true},
2222
};
2323

24+
const makeCloneMetricsWrapper = ({realGitPath, metricsDir}: {realGitPath: string, metricsDir: string}) => `
25+
#!/usr/bin/env node
26+
const fs = require('fs');
27+
const path = require('path');
28+
const {spawn} = require('child_process');
29+
30+
const realGitPath = ${JSON.stringify(realGitPath)};
31+
const metricsDir = ${JSON.stringify(metricsDir)};
32+
const stateFile = path.join(metricsDir, 'state');
33+
const lockDir = path.join(metricsDir, 'lock');
34+
35+
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
36+
37+
async function withLock(fn) {
38+
while (true) {
39+
try {
40+
await fs.promises.mkdir(lockDir);
41+
break;
42+
} catch (error) {
43+
if (error && error.code === 'EEXIST') {
44+
await sleep(10);
45+
continue;
46+
}
47+
48+
throw error;
49+
}
50+
}
51+
52+
try {
53+
return await fn();
54+
} finally {
55+
await fs.promises.rmdir(lockDir);
56+
}
57+
}
58+
59+
async function readState() {
60+
const content = await fs.promises.readFile(stateFile, 'utf8');
61+
const [currentLine = '0', maxLine = '0'] = content.trim().split(/\\r\\n|\\r|\\n/);
62+
63+
return {
64+
current: Number(currentLine),
65+
max: Number(maxLine),
66+
};
67+
}
68+
69+
async function writeState(current, max) {
70+
await fs.promises.writeFile(stateFile, \`\${current}\\n\${max}\\n\`);
71+
}
72+
73+
async function incrementCounter() {
74+
await withLock(async () => {
75+
const {current, max} = await readState();
76+
const nextCurrent = current + 1;
77+
const nextMax = Math.max(max, nextCurrent);
78+
79+
await writeState(nextCurrent, nextMax);
80+
});
81+
}
82+
83+
async function decrementCounter() {
84+
await withLock(async () => {
85+
const {current, max} = await readState();
86+
await writeState(current - 1, max);
87+
});
88+
}
89+
90+
function runGit(args) {
91+
return new Promise((resolve, reject) => {
92+
const child = spawn(realGitPath, args, {stdio: 'inherit'});
93+
94+
child.on('error', reject);
95+
child.on('exit', code => {
96+
resolve(typeof code === 'number' ? code : 1);
97+
});
98+
});
99+
}
100+
101+
async function main() {
102+
const args = process.argv.slice(2);
103+
104+
if (args[0] === 'clone') {
105+
await incrementCounter();
106+
await sleep(200);
107+
108+
const exitCode = await (async () => {
109+
try {
110+
return await runGit(args);
111+
} finally {
112+
await decrementCounter();
113+
}
114+
})();
115+
116+
process.exit(exitCode);
117+
}
118+
119+
process.exit(await runGit(args));
120+
}
121+
122+
main().catch(error => {
123+
console.error(error);
124+
process.exit(1);
125+
});
126+
`.trimStart();
127+
24128
describe(`Protocols`, () => {
25129
describe(`git:`, () => {
26130
for (const [url, {version, runOnCI}] of Object.entries(TESTED_URLS)) {
@@ -156,6 +260,51 @@ describe(`Protocols`, () => {
156260
),
157261
);
158262

263+
tests.testIf(
264+
() => process.platform !== `win32`,
265+
`it should respect cloneConcurrency when cloning git repositories`,
266+
makeTemporaryEnv(
267+
{
268+
dependencies: {
269+
[`pkg-a`]: tests.startPackageServer().then(url => `${url}/repositories/deep-projects.git#cwd=projects/pkg-a`),
270+
[`pkg-b`]: tests.startPackageServer().then(url => `${url}/repositories/deep-projects.git#cwd=projects/pkg-b`),
271+
[`lib-a`]: tests.startPackageServer().then(url => `${url}/repositories/deep-projects.git#cwd=projects/pkg-a&workspace=lib`),
272+
[`lib-b`]: tests.startPackageServer().then(url => `${url}/repositories/deep-projects.git#cwd=projects/pkg-b&workspace=lib`),
273+
},
274+
},
275+
{
276+
cloneConcurrency: 1,
277+
},
278+
async ({path, run, source}) => {
279+
const {stdout: gitPathStdout} = await execUtils.execvp(`which`, [`git`], {cwd: path});
280+
const realGitPath = gitPathStdout.trim();
281+
282+
const binDir = ppath.join(path, `bin`);
283+
const metricsDir = ppath.join(path, `.clone-metrics`);
284+
const stateFile = ppath.join(metricsDir, `state`);
285+
const wrapperPath = ppath.join(binDir, `git`);
286+
287+
await xfs.mkdirPromise(binDir, {recursive: true});
288+
await xfs.mkdirPromise(metricsDir, {recursive: true});
289+
await xfs.writeFilePromise(stateFile, `0\n0\n`);
290+
await xfs.writeFilePromise(wrapperPath, makeCloneMetricsWrapper({
291+
realGitPath,
292+
metricsDir: npath.fromPortablePath(metricsDir),
293+
}));
294+
295+
await xfs.chmodPromise(wrapperPath, 0o755);
296+
297+
await run(`install`);
298+
299+
const [currentLine, maxLine] = (await xfs.readFilePromise(stateFile, `utf8`)).trim().split(/\r\n|\r|\n/);
300+
301+
expect(Number(currentLine)).toBe(0);
302+
expect(Number(maxLine)).toBeGreaterThan(0);
303+
expect(Number(maxLine)).toBeLessThanOrEqual(1);
304+
},
305+
),
306+
);
307+
159308
test(
160309
`it should use Yarn Classic to setup classic repositories`,
161310
makeTemporaryEnv(

0 commit comments

Comments
 (0)