Skip to content

Commit 9f69606

Browse files
DrRataplanline-o
authored andcommitted
feat(get): add a new threads to limit load on server
Downloading large collections caused an unpredictable amount of requests to run in parallel. The new limit makes download requests not to run in parallel. Also use Promise.all / correct awaits for promises in general.
1 parent 99c0e8b commit 9f69606

File tree

2 files changed

+94
-30
lines changed

2 files changed

+94
-30
lines changed

commands/get.js

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { resolve, join, posix, dirname, basename } from 'node:path'
2-
import { writeFileSync, statSync, existsSync, mkdirSync } from 'node:fs'
2+
import { statSync, existsSync, mkdirSync } from 'node:fs'
3+
import { writeFile } from 'node:fs/promises'
34
import { connect } from '@existdb/node-exist'
5+
import Bottleneck from 'bottleneck'
46

57
/**
68
* @typedef { import("@existdb/node-exist").NodeExist } NodeExist
@@ -14,6 +16,8 @@ import { connect } from '@existdb/node-exist'
1416
* @prop {Number} depth how many levels to traverse down for recursive and tree views
1517
* @prop {String[]} include filter items
1618
* @prop {String[]} exclude filter items
19+
* @prop {Number} threads How many resources should be downloaded at the same time
20+
* @prop {Number} mintime How long a downloads should take at least
1721
*/
1822

1923
/**
@@ -57,10 +61,7 @@ const xmlBooleanSetting = {
5761
return value
5862
}
5963
}
60-
const serializationOptionNames = [
61-
'insert-final-newline',
62-
'omit-xml-declaration'
63-
]
64+
const serializationOptionNames = ['insert-final-newline', 'omit-xml-declaration']
6465

6566
const serializationDefaults = {
6667
// "exist:indent": "no",
@@ -71,7 +72,7 @@ const serializationDefaults = {
7172

7273
function getSerializationOptions (options) {
7374
const serializationOptions = serializationDefaults
74-
serializationOptionNames.forEach(o => {
75+
serializationOptionNames.forEach((o) => {
7576
if (o in options) {
7677
serializationOptions[o] = options[o]
7778
}
@@ -83,6 +84,7 @@ function getSerializationOptions (options) {
8384
/**
8485
* Download a single resource into an existdb instance
8586
* @param {NodeExist.BoundModules} db NodeExist client
87+
* @param {GetOptions} options
8688
* @param {Boolean} verbose
8789
* @param {ResourceInfo} resource
8890
* @param {String} directory
@@ -100,7 +102,7 @@ async function downloadResource (db, options, resource, directory, collection, r
100102
}
101103
const localName = rename || posix.basename(resource.name)
102104
const localPath = join(directory, localName)
103-
await writeFileSync(localPath, fileContents)
105+
await writeFile(localPath, fileContents)
104106

105107
if (verbose) {
106108
console.log(`✔︎ downloaded resource ${localPath}`)
@@ -115,11 +117,13 @@ async function downloadResource (db, options, resource, directory, collection, r
115117
/**
116118
* download a collection from an existdb instance
117119
* @param {NodeExist} db NodeExist client
120+
* @param {GetOptions} options
118121
* @param {boolean} verbose
119122
* @param {String} collection
120123
* @param {String} baseCollection
124+
* @param {Bottleneck} limiter
121125
*/
122-
async function downloadCollection (db, options, collection, baseCollection, directory) {
126+
async function downloadCollection (db, options, collection, baseCollection, directory, limiter) {
123127
const absCollection = posix.join(baseCollection, collection)
124128
const { verbose } = options
125129
try {
@@ -134,12 +138,21 @@ async function downloadCollection (db, options, collection, baseCollection, dire
134138
}
135139

136140
const targetDir = posix.join(directory, collection)
137-
await collectionMeta.documents.forEach(
138-
async resource => downloadResource(db, options, resource, targetDir, absCollection))
141+
// Download all documents. Do this in parallel, but not everything at once. Pool that work so we don't take down the
142+
// server
143+
await Promise.all(
144+
collectionMeta.documents.map(async (resource) => {
145+
await limiter.schedule(() => downloadResource(db, options, resource, targetDir, absCollection))
146+
})
147+
)
139148

140149
// recursive (optional?)
141-
await collectionMeta.collections.forEach(
142-
async collection => downloadCollection(db, options, collection, absCollection, targetDir))
150+
151+
// There should always be fewer collections than resources, so no need for pooling. Go over them one by one. No need
152+
// to do this in parallel
153+
for (const collection of collectionMeta.collections) {
154+
await downloadCollection(db, options, collection, absCollection, targetDir, limiter)
155+
}
143156

144157
return true
145158
} catch (e) {
@@ -189,12 +202,13 @@ async function getPathInfo (db, path) {
189202
*/
190203
async function downloadCollectionOrResource (db, source, target, options) {
191204
// read parameters
192-
// const start = Date.now()
205+
// const start = Date.now()
193206
const root = resolve(target)
194207

195208
if (options.verbose) {
196209
console.log('Downloading:', source, 'to', root)
197-
console.log('Server:',
210+
console.log(
211+
'Server:',
198212
(db.client.isSecure ? 'https' : 'http') + '://' + db.client.options.host + ':' + db.client.options.port,
199213
'(v' + options.version + ')'
200214
)
@@ -205,6 +219,7 @@ async function downloadCollectionOrResource (db, source, target, options) {
205219
if (options.exclude.length) {
206220
console.log('Exclude:\n', ...options.exclude, '\n')
207221
}
222+
console.log(`Downloading up to ${options.threads} resources at a time`)
208223
}
209224

210225
// initial file
@@ -260,10 +275,13 @@ async function downloadCollectionOrResource (db, source, target, options) {
260275
throw Error(`${source} is a collection but ${root} is not a directory`)
261276
}
262277

278+
const limiter = new Bottleneck({
279+
maxConcurrent: options.threads,
280+
minTime: options.mintime
281+
})
282+
263283
// download collection into a folder
264-
return await downloadCollection(db, options,
265-
posix.basename(info.name),
266-
posix.dirname(info.name), root)
284+
return await downloadCollection(db, options, posix.basename(info.name), posix.dirname(info.name), root, limiter)
267285
}
268286

269287
export const command = ['get [options] <source> <target>', 'download', 'fetch']
@@ -301,14 +319,34 @@ export function builder (yargs) {
301319
type: 'boolean',
302320
default: false
303321
})
322+
.option('t', {
323+
alias: 'threads',
324+
describe: 'The maximum number of concurrent threads that will be used to dowload data',
325+
type: 'number',
326+
default: 4
327+
})
328+
.option('m', {
329+
alias: 'mintime',
330+
describe: 'The minimum time each dowload will take',
331+
type: 'number',
332+
default: 0
333+
})
304334
.nargs({ i: 1, e: 1 })
305335
}
306336

307337
export async function handler (argv) {
308338
if (argv.help) {
309339
return 0
310340
}
311-
const { source } = argv
341+
342+
const { threads, mintime, source } = argv
343+
344+
if (typeof mintime !== 'number' || mintime < 0) {
345+
throw Error('Invalid value for option "mintime"; must be an integer equal or greater than zero.')
346+
}
347+
if (typeof threads !== 'number' || threads <= 0) {
348+
throw Error('Invalid value for option "threads"; must be an integer equal or greater than zero.')
349+
}
312350

313351
const target = argv.target ? argv.target : '.'
314352

spec/tests/get.js

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ const testCollection = '/db/' + testCollectionName
77

88
async function removeLocalDownload () {
99
const { stderrRmLocal } = await run('rm', ['-rf', testCollectionName])
10-
if (stderrRmLocal) { return console.error(stderrRmLocal) }
10+
if (stderrRmLocal) {
11+
return console.error(stderrRmLocal)
12+
}
1113
}
1214

1315
async function removeRemoteCollection (t) {
1416
const { stderrRmRemote } = await run('xst', ['rm', '-rf', testCollection])
15-
if (stderrRmRemote) { return console.error(stderrRmRemote) }
17+
if (stderrRmRemote) {
18+
return console.error(stderrRmRemote)
19+
}
1620
}
1721

1822
async function tearDown (t) {
@@ -51,7 +55,9 @@ async function prepare (t) {
5155
storeResourceQuery(testCollection, 'test.xq', '"1"')
5256
].join(',')
5357
const { stderr, stdout } = await run('xst', ['run', query], asAdmin)
54-
if (stderr) { return t.fail(stderr) }
58+
if (stderr) {
59+
return t.fail(stderr)
60+
}
5561
t.true(stdout)
5662
}
5763

@@ -61,7 +67,9 @@ function storeResourceQuery (collection, fileName, content) {
6167

6268
test("calling 'xst get --help'", async (t) => {
6369
const { stderr, stdout } = await run('xst', ['get', '--help'])
64-
if (stderr) { return t.fail(stderr) }
70+
if (stderr) {
71+
return t.fail(stderr)
72+
}
6573

6674
t.ok(stdout, stdout)
6775
t.end()
@@ -72,8 +80,10 @@ test('with test collection', async (t) => {
7280
t.test(prepare)
7381

7482
t.test(`can get ${testCollection} as admin`, async (st) => {
75-
const { stderr, stdout } = await run('xst', ['get', testCollection, '.'], asAdmin)
76-
if (stderr) { return st.fail(stderr) }
83+
const { stderr, stdout } = await run('xst', ['get', testCollection, '.', '--threads', '2'], asAdmin)
84+
if (stderr) {
85+
return st.fail(stderr)
86+
}
7787
st.plan(3)
7888

7989
st.notOk(stdout, 'no output')
@@ -84,7 +94,7 @@ test('with test collection', async (t) => {
8494
})
8595

8696
t.test(`cannot get ${testCollection} as guest`, async (st) => {
87-
const { stderr, stdout } = await run('xst', ['get', testCollection, '.'])
97+
const { stderr, stdout } = await run('xst', ['get', testCollection, '.', '--threads', '2'])
8898
if (stdout) {
8999
await removeLocalDownload()
90100
return st.ok(stderr)
@@ -98,7 +108,7 @@ test('with test collection', async (t) => {
98108
})
99109

100110
t.test(`'xst get --verbose ${testCollection}' as admin`, async (st) => {
101-
const { stderr, stdout } = await run('xst', ['get', '--verbose', testCollection, '.'], asAdmin)
111+
const { stderr, stdout } = await run('xst', ['get', '--verbose', testCollection, '.', '--threads', '2'], asAdmin)
102112
st.plan(9)
103113
st.equal(stderr, 'Connecting to https://localhost:8443 as admin\n', stderr)
104114

@@ -110,8 +120,16 @@ test('with test collection', async (t) => {
110120
st.equal(lines[2], 'User: admin')
111121
st.equal(lines.length, 16, 'all expected lines in verbose output')
112122
// files are not downloaded in reproducible order
113-
st.equal(lines.filter(l => /^ created directory [/\w]+\/get-test/.exec(l)).length, 3, 'notify 3 directories created')
114-
st.equal(lines.filter(l => /^ downloaded resource [/\w]+\/get-test/.exec(l)).length, 9, 'notify 9 resources downloaded')
123+
st.equal(
124+
lines.filter((l) => /^ created directory [/\w]+\/get-test/.exec(l)).length,
125+
3,
126+
'notify 3 directories created'
127+
)
128+
st.equal(
129+
lines.filter((l) => /^ downloaded resource [/\w]+\/get-test/.exec(l)).length,
130+
9,
131+
'notify 9 resources downloaded'
132+
)
115133

116134
st.deepEqual(readdirSync(testCollectionName), expectedDirectoryListing, 'all files were downloaded')
117135
st.deepEqual(readdirSync(testCollectionName + '/subcollection'), ['b'], 'subcollection contents were downloaded')
@@ -123,15 +141,23 @@ test('with test collection', async (t) => {
123141

124142
t.test(`'xst get ${testCollection}/empty-subcollection ${additionalTestDirectory}' as admin`, async (st) => {
125143
await run('mkdir', [additionalTestDirectory])
126-
const { stderr, stdout } = await run('xst', ['get', testCollection + '/empty-subcollection', additionalTestDirectory], asAdmin)
144+
const { stderr, stdout } = await run(
145+
'xst',
146+
['get', testCollection + '/empty-subcollection', additionalTestDirectory],
147+
asAdmin
148+
)
127149
if (stderr) {
128150
st.fail(stderr)
129151
return st.end()
130152
}
131153
st.plan(2)
132154
st.notOk(stdout, 'no output')
133155

134-
st.deepEqual(readdirSync(additionalTestDirectory), ['empty-subcollection'], `empty subcollection was created in ${additionalTestDirectory} folder`)
156+
st.deepEqual(
157+
readdirSync(additionalTestDirectory),
158+
['empty-subcollection'],
159+
`empty subcollection was created in ${additionalTestDirectory} folder`
160+
)
135161
await run('rm', ['-rf', additionalTestDirectory])
136162
})
137163

0 commit comments

Comments
 (0)