Skip to content

Commit 55b1958

Browse files
authored
1.4.3
* version bump * add maxwps/COUCH_MAX_WPS config for rate limiting writes * 1.4.3
1 parent ddd3320 commit 55b1958

File tree

6 files changed

+849
-464
lines changed

6 files changed

+849
-464
lines changed

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ If you are importing data into a CouchDB database that already contains data, an
221221
* COUCH_PREVIEW - run in preview mode
222222
* COUCH_IGNORE_FIELDS - a comma-separated list of field names to ignore on import or export e.g. price,url,image
223223
* COUCH_OVERWRITE - overwrite existing document revisions with supplied data
224+
* COUCH_PARALLELISM - the maximum number of HTTP requests to have in flight at any one time (default: 1)
225+
* COUCH_MAX_WPS - the maximum number of write API calls to make per second (rate limiting) (default: 0 - no rate limiting)
224226
225227
## Command-line parameters
226228
@@ -239,6 +241,7 @@ You can also configure `couchimport` and `couchexport` using command-line parame
239241
* `--preview`/`-p` - if 'true', runs in preview mode (default false)
240242
* `--ignorefields`/`-i` - a comma-separated list of fields to ignore input or output (default none)
241243
* `--parallelism` - the number of HTTP request to have in flight at any one time (default 1)
244+
* `--maxwps` - the maximum number of write API calls to make per second (default 0 - no rate limiting)
242245
* `--overwrite`/`-o` - overwrite existing document revisions with supplied data (default: false)
243246
244247
e.g.
@@ -387,10 +390,12 @@ The emitted data is an object containing:
387390
* failed - the number of documents failed to write in the last batch
388391
* totalfailed - the number of documents that failed to write in total
389392
390-
## Parallelism
393+
## Parallelism & Rate limiting
391394
392395
Using the `COUCH_PARALLELISM` environment variable or the `--parallelism` command-line option, couchimport can be configured to write data in multiple parallel operations. If you have the networkbandwidth, this can significantly speed up large data imports e.g.
393396
394397
```sh
395398
cat bigdata.csv | couchimport --database mydb --parallelism 10 --delimiter ","
396399
```
400+
401+
This can be combined with the `COUCH_MAX_WPS`/`--maxwps` parameter to limit the number write API calls dispatched per second to make sure you don't exceed the number writes on a rate-limited service.

app.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const importStream = function (rs, opts, callback) {
1919
opts = defaults.merge(opts)
2020

2121
// load dependencies
22-
const writer = require('./includes/writer.js')(opts.url, opts.database, opts.buffer, opts.parallelism, opts.ignorefields, opts.overwrite)
22+
const writer = require('./includes/writer.js')(opts.url, opts.database, opts.buffer, opts.parallelism, opts.ignorefields, opts.overwrite, opts.maxwps)
2323
const transformer = require('./includes/transformer.js')(opts.transform, opts.meta)
2424
const JSONStream = require('JSONStream')
2525
if (opts.type === 'jsonl') {

includes/args.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ const parse = function () {
3838
describe: 'the number of HTTP requests to have in-flight at any one time',
3939
default: process.env.COUCH_PARALLELISM ? parseInt(process.env.COUCH_PARALLELISM) : 1
4040
})
41+
.option('maxwps', {
42+
number: true,
43+
describe: 'the maximum number of write operations to perform per second',
44+
default: process.env.MAX_WPS ? parseInt(process.env.MAX_WPS) : 0
45+
})
4146
.option('type', {
4247
alias: 't',
4348
describe: 'the type of file being imported',

includes/writer.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const async = require('async')
2+
const qrate = require('qrate')
23
const debug = require('debug')('couchimport')
34
const iam = require('./iam.js')
45
const axios = require('axios').default
@@ -7,7 +8,7 @@ const axios = require('axios').default
78
const IAM_API_KEY = process.env.IAM_API_KEY ? process.env.IAM_API_KEY : null
89
let iamAccessToken = null
910

10-
module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ignoreFields, overwrite) {
11+
module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ignoreFields, overwrite, maxwps) {
1112
const stream = require('stream')
1213

1314
let buffer = []
@@ -25,7 +26,7 @@ module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ign
2526
}
2627

2728
// process the writes in bulk as a queue
28-
const q = async.queue(async (payload) => {
29+
const q = qrate(async (payload) => {
2930
// detected whether we need to supply new_edits = false
3031
let allHaveRev = true
3132
for (var i in payload.docs) {
@@ -116,7 +117,7 @@ module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ign
116117
totalfailed += failed
117118
writer.emit('written', { documents: ok, failed: failed, total: written, totalfailed: totalfailed })
118119
debug({ documents: ok, failed: failed, total: written, totalfailed: totalfailed })
119-
}, parallelism)
120+
}, parallelism, maxwps || undefined)
120121

121122
// write the contents of the buffer to CouchDB in blocks of 500
122123
const processBuffer = function (flush, callback) {

0 commit comments

Comments
 (0)