Skip to content

Commit 222b2a8

Browse files
authored
fixes 707: use sql.unnest instead of sql.join for bulk insert (getodk#771)
We use a function called insertMany to do bulk inserting of certain things including Form Fields, Submission Attachments, Comments, Audits, and Client Audits. insertMany was using sql.join internally, but broke down in extreme cases such as a very large form with 25K fields. In such a form, we store ~10 columns about each form field, so sql.join was forming a query with 250,000 parameters that was too much. Slonik throws 'Maximum call stack size exceeded' due to a bug gajus/slonik#344. Even after solving that bug, it is not possible to execute sql query with 250K parameters in Postgresql because database supports only 65535 parameters. Right way to insert bulk rows in postgresql is to use unnest function, for e.g: INSERT INTO table (col1, col2) SELECT * FROM UNNEST($1, $2) as t where $1 is an array of values for col1 and $2 is an array of values for col2. Slonik has sql.unnest to implement similar query, e.g., when inserting 25K of 10-column form fields, it will have 10 parameters where each is a 25K length array of every value for that one column. The one caveat of using sql.unnest is that it takes a second required argument to list out the types of each column. Most of the code in this PR is about modifying our frames to also define field types on any frame that needs to use insertMany. If field types are missing and insertMany is called anyway, there is a 500 Problem explaining that. To summarize - insertMany changed to use sql.unnest instead of sql.join. - Frames that use insertMany must now also specify a fieldTypes array for the properties in that frame.
1 parent 8302818 commit 222b2a8

File tree

7 files changed

+83
-27
lines changed

7 files changed

+83
-27
lines changed

lib/model/frame.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// except according to the terms contained in the LICENSE file.
99

1010
const { raw } = require('slonik-sql-tag-raw');
11-
const { pick, without } = require('ramda');
11+
const { pick, without, remove, indexOf } = require('ramda');
1212
const uuid = require('uuid/v4');
1313
const { pickAll, noargs } = require('../util/util');
1414
const Option = require('../util/option');
@@ -26,6 +26,10 @@ const table = (name, to) => (def) => {
2626
def.from = name;
2727
def.to = to || name.substring(0, name.length - 1); // take off the s
2828
};
29+
const fieldTypes = (types) => (def) => {
30+
def.fieldTypes = types;
31+
};
32+
2933
const from = (fro) => (def) => { def.from = fro; };
3034
const into = (to) => (def) => { def.to = to; };
3135

@@ -66,6 +70,8 @@ class Frame {
6670
const Frame = class extends this { static get def() { return def; } };
6771
Frame.fieldlist = raw(def.fields.map((s) => `"${s}"`).join(','));
6872
Frame.insertfields = without([ 'id' ], def.fields);
73+
const indexOfId = indexOf('id', def.fields);
74+
Frame.insertFieldTypes = indexOfId > -1 ? remove(indexOf('id', def.fields), 1, def.fieldTypes ?? []) : def.fieldTypes;
6975
Frame.insertlist = raw(Frame.insertfields.map((s) => `"${s}"`).join(','));
7076
Frame.hasCreatedAt = def.fields.includes('createdAt');
7177
Frame.hasUpdatedAt = def.fields.includes('updatedAt');
@@ -156,5 +162,5 @@ Frame.def = { fields: [], readable: [], hasCreatedAt: false, hasUpdatedAt: false
156162
// util func to ensure the frame we fetched successfully joined to the intended def.
157163
const ensureDef = rejectIf(((f) => f.def.id == null), noargs(Problem.user.notFound));
158164

159-
module.exports = { Frame, table, from, into, aux, species, embedded, readable, writable, ensureDef };
165+
module.exports = { Frame, table, fieldTypes, from, into, aux, species, embedded, readable, writable, ensureDef };
160166

lib/model/frames.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// except according to the terms contained in the LICENSE file.
99

1010
const { sql } = require('slonik');
11-
const { Frame, table, readable, writable, aux, species, embedded } = require('./frame');
11+
const { Frame, table, readable, writable, aux, species, embedded, fieldTypes } = require('./frame');
1212
const { isBlank } = require('../util/util');
1313
const Option = require('../util/option');
1414

@@ -45,6 +45,7 @@ class Audit extends Frame.define(
4545
'details', readable, 'loggedAt', readable,
4646
'notes', readable,
4747
'claimed', 'processed', 'lastFailure', 'failures',
48+
fieldTypes(['int4', 'int4', 'text', 'varchar', 'jsonb', 'timestamptz', 'text', 'timestamptz', 'timestamptz', 'timestamptz', 'int4']),
4849
embedded('actor'), embedded('actee')
4950
) {
5051
// TODO: sort of duplicative of Audits.log
@@ -71,7 +72,12 @@ class Audit extends Frame.define(
7172
const { Blob } = require('./frames/blob');
7273

7374
const { headers } = require('../data/client-audits');
74-
const ClientAudit = Frame.define(table('client_audits'), 'blobId', 'remainder', ...headers);
75+
const ClientAudit = Frame.define(
76+
table('client_audits'),
77+
'blobId', 'remainder',
78+
...headers,
79+
fieldTypes(['int4', 'jsonb', ...headers.map(() => 'text')])
80+
);
7581

7682
const Comment = Frame.define(
7783
table('comments'),
@@ -103,7 +109,8 @@ Form.Attachment = class extends Frame.define(
103109
'formId', 'formDefId',
104110
'blobId', 'datasetId',
105111
'name', readable, 'type', readable,
106-
'updatedAt', readable
112+
'updatedAt', readable,
113+
fieldTypes(['int4', 'int4', 'int4', 'int4', 'text', 'text', 'timestamptz'])
107114
) {
108115
forApi() {
109116
const data = { name: this.name, type: this.type, exists: (this.blobId != null || this.datasetId != null), blobExists: this.blobId != null, datasetExists: this.datasetId != null };
@@ -171,7 +178,8 @@ class Session extends Frame.define(
171178
const { Submission } = require('./frames/submission');
172179
Submission.Attachment = class extends Frame.define(
173180
table('submission_attachments'),
174-
'submissionDefId', 'blobId', 'name', 'index', 'isClientAudit'
181+
'submissionDefId', 'blobId', 'name', 'index', 'isClientAudit',
182+
fieldTypes(['int4', 'int4', 'text', 'int4', 'bool'])
175183
) {
176184
forApi() {
177185
return { name: this.name, exists: (this.blobId != null) };

lib/model/frames/form.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
// As with Users/Actors, this information is merged together upon return via the
2626
// API. This is partly for legacy reasons: Forms and FormsDef did not used to
2727

28-
const { Frame, table, readable, writable, into, embedded } = require('../frame');
28+
const { Frame, table, readable, writable, into, embedded, fieldTypes } = require('../frame');
2929
const { injectPublicKey, addVersionSuffix } = require('../../data/schema');
3030
const { Key } = require('./key');
3131
const { md5sum, shasum, sha256sum, digestWith, generateVersionSuffix } = require('../../util/crypto');
@@ -184,15 +184,17 @@ Form.Field = class extends Frame.define(
184184
'formId', 'schemaId',
185185
'path', readable, 'name', readable,
186186
'type', readable, 'binary', readable,
187-
'order', 'selectMultiple', readable
187+
'order', 'selectMultiple', readable,
188+
fieldTypes(['int4', 'int4', 'text', 'text', 'varchar', 'bool', 'int4', 'bool' ])
188189
) {
189190
isStructural() { return (this.type === 'repeat') || (this.type === 'structure'); }
190191
};
191192

192193
Form.FieldValue = Frame.define(
193194
table('form_field_values'),
194195
'formId', 'submissionDefId',
195-
'path', 'value'
196+
'path', 'value',
197+
fieldTypes(['int4', 'int4', 'text', 'text'])
196198
);
197199

198200

lib/util/db.js

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// except according to the terms contained in the LICENSE file.
99

1010
const { inspect } = require('util');
11-
const { mergeRight, pick, always } = require('ramda');
11+
const { mergeRight, pick, always, without, remove, indexOf } = require('ramda');
1212
const { sql } = require('slonik');
1313
const { raw } = require('slonik-sql-tag-raw');
1414
const { reject } = require('./promise');
@@ -151,15 +151,35 @@ values (${sql.join(keys.map(_assign(obj)), sql`,`)})
151151
returning *`;
152152
};
153153

154+
// Arguments:
155+
// objs: An array of Frames to be inserted in the database table.
156+
// Frame should defined fieldTypes for this function to work
157+
// because Slonik's sql.unnest function has a mandatory columnType argument
154158
const insertMany = (objs) => {
155159
if (objs.length === 0) return sql`select true`;
156160
const Type = objs[0].constructor;
161+
if (!Type.def.fieldTypes) throw Problem.internal.fieldTypesNotDefined(Type.from);
162+
163+
let columns; let rows; let columnTypes; let
164+
selectExp;
165+
166+
// we need to set clock_timestamp if there's createdAt column
167+
// Slonik doesn't support setting sql identitfier for sql.unnest yet
168+
if (Type.hasCreatedAt) {
169+
columns = sql`"createdAt", ${raw(without(['createdAt'], Type.insertfields).map((s) => `"${s}"`).join(','))}`;
170+
rows = objs.map(obj => without(['createdAt'], Type.insertfields).map(_assign(obj)));
171+
columnTypes = remove(indexOf(['createdAt'], Type.insertfields), 1, Type.insertFieldTypes);
172+
selectExp = sql`clock_timestamp(), *`;
173+
} else {
174+
columns = Type.insertlist;
175+
rows = objs.map(obj => Type.insertfields.map(_assign(obj)));
176+
columnTypes = Type.insertFieldTypes;
177+
selectExp = sql`*`;
178+
}
179+
157180
return sql`
158-
insert into ${raw(Type.table)} (${Type.insertlist})
159-
values ${sql.join(
160-
objs.map((obj) => sql`(${sql.join(Type.insertfields.map(_assign(obj)), sql`,`)})`),
161-
sql`,`
162-
)}`;
181+
INSERT INTO ${raw(Type.table)} (${columns})
182+
SELECT ${selectExp} FROM ${sql.unnest(rows, columnTypes)} AS t`;
163183
};
164184

165185
// generic update utility

lib/util/problem.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,9 @@ const problems = {
226226
enketoUnexpectedResponse: problem(500.4, () => 'The Enketo service returned an unexpected response.'),
227227

228228
// returned if ODK Analytics returns an unexpected response.
229-
analyticsUnexpectedResponse: problem(500.5, () => 'The Analytics service returned an unexpected response.')
229+
analyticsUnexpectedResponse: problem(500.5, () => 'The Analytics service returned an unexpected response.'),
230+
231+
fieldTypesNotDefined: problem(500.6, (frame) => `fieldTypes are not defined on the ${frame} Frame, please define them to use insertMany.`)
230232
}
231233
};
232234

test/integration/other/encryption.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,13 @@ describe('managed encryption', () => {
143143
return Submission.fromXml(xml)
144144
.then((partial) => hijacked.SubmissionAttachments.create(partial, {}, []))
145145
.then(() => {
146+
// values to sql query of insertMany are passed as array of arrays
146147
results[0].values.should.eql([
147-
null, null, 'zulu.file', 0, false,
148-
null, null, 'alpha.file', 1, false,
149-
null, null, 'bravo.file', 2, false,
150-
null, null, 'submission.xml.enc', 3, null
148+
[null, null, null, null],
149+
[null, null, null, null],
150+
['zulu.file', 'alpha.file', 'bravo.file', 'submission.xml.enc'],
151+
[0, 1, 2, 3],
152+
[false, false, false, null]
151153
]);
152154
});
153155
}));

test/unit/util/db.js

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const appRoot = require('app-root-path');
22
const { sql } = require('slonik');
3+
const { fieldTypes } = require('../../../lib/model/frame');
34
// eslint-disable-next-line import/no-dynamic-require
45
const { Frame, table, into } = require(appRoot + '/lib/model/frame');
56
// eslint-disable-next-line import/no-dynamic-require
@@ -312,23 +313,38 @@ returning *`);
312313

313314
describe('insertMany', () => {
314315
const { insertMany } = util;
315-
const T = Frame.define(table('dogs'), 'x', 'y');
316+
const T = Frame.define(table('dogs'), 'x', 'y', fieldTypes(['text', 'text']));
316317

317318
it('should do nothing if given no data', () => {
318319
insertMany([]).should.eql(sql`select true`);
319320
});
320321

321322
it('should insert all data', () => {
322-
insertMany([ new T({ x: 2 }), new T({ y: 3 }) ]).should.eql(sql`
323-
insert into dogs ("x","y")
324-
values (${2},${null}),(${null},${3})`);
323+
const query = insertMany([ new T({ x: 2 }), new T({ y: 3 }) ]);
324+
query.sql.should.be.eql(`
325+
INSERT INTO dogs ("x","y")
326+
SELECT * FROM unnest($1::"text"[], $2::"text"[]) AS t`);
327+
query.values.should.be.eql([
328+
[2, null],
329+
[null, 3]
330+
]);
325331
});
326332

327333
it('should insert createdAt and strange values', () => {
334+
const U = Frame.define(table('dogs'), 'x', 'createdAt', fieldTypes(['timestamptz', 'timestamptz']));
335+
const query = insertMany([ new U({ x: new Date('2000-01-01') }), new U() ]);
336+
query.sql.should.be.eql(`
337+
INSERT INTO dogs ("createdAt", "x")
338+
SELECT clock_timestamp(), * FROM unnest($1::"timestamptz"[]) AS t`);
339+
query.values.should.be.eql([
340+
['2000-01-01T00:00:00.000Z', null]
341+
]);
342+
});
343+
344+
it('should throw fieldTypesNotDefined', () => {
328345
const U = Frame.define(table('dogs'), 'x', 'createdAt');
329-
insertMany([ new U({ x: new Date('2000-01-01') }), new U() ]).should.eql(sql`
330-
insert into dogs ("x","createdAt")
331-
values (${'2000-01-01T00:00:00.000Z'},${sql`clock_timestamp()`}),(${null},${sql`clock_timestamp()`})`);
346+
(() => insertMany([ new U({ x: new Date('2000-01-01') }), new U() ]))
347+
.should.throw('fieldTypes are not defined on the dogs Frame, please define them to use insertMany.');
332348
});
333349
});
334350

0 commit comments

Comments
 (0)