Skip to content

Commit 868f11a

Browse files
Fix out-of-memory error on feature import (#762)
* Add ability for file in `file import` to be an id * Process updates in batches to avoid out-of-memory
1 parent 7abb9f5 commit 868f11a

File tree

5 files changed

+113
-87
lines changed

5 files changed

+113
-87
lines changed

packages/apollo-cli/src/commands/assembly/add-from-fasta.ts

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@ import type {
88
} from '@apollo-annotation/shared'
99
import { Args, Flags } from '@oclif/core'
1010
import { ObjectId } from 'bson'
11-
import type { Response } from 'undici'
1211

1312
import { FileCommand } from '../../fileCommand.js'
14-
import { queryApollo, submitAssembly } from '../../utils.js'
13+
import { isFileId, submitAssembly } from '../../utils.js'
1514

1615
export default class AddFasta extends FileCommand {
1716
static summary = 'Add a new assembly from fasta input'
@@ -235,17 +234,3 @@ function isValidHttpUrl(x: string) {
235234
}
236235
return url.protocol === 'http:' || url.protocol === 'https:'
237236
}
238-
239-
async function isFileId(x: string, address: string, accessToken: string) {
240-
if (x.length != 24) {
241-
return false
242-
}
243-
const files: Response = await queryApollo(address, accessToken, 'files')
244-
const json = (await files.json()) as object[]
245-
for (const fileDoc of json) {
246-
if (fileDoc['_id' as keyof typeof fileDoc] === x) {
247-
return true
248-
}
249-
}
250-
return false
251-
}

packages/apollo-cli/src/commands/feature/import.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { FileCommand } from '../../fileCommand.js'
88
import {
99
convertAssemblyNameToId,
1010
createFetchErrorMessage,
11+
isFileId,
1112
localhostToAddress,
1213
} from '../../utils.js'
1314

@@ -46,9 +47,7 @@ export default class Import extends FileCommand {
4647
public async run(): Promise<void> {
4748
const { args, flags } = await this.parse(Import)
4849

49-
if (!fs.existsSync(args['input-file'])) {
50-
this.error(`File "${args['input-file']}" does not exist`)
51-
}
50+
const inputFile = args['input-file']
5251

5352
const access = await this.getAccess()
5453

@@ -63,13 +62,26 @@ export default class Import extends FileCommand {
6362
)
6463
}
6564

66-
const uploadId = await this.uploadFile(
65+
let uploadId
66+
const inputFileIsFileId = await isFileId(
67+
inputFile,
6768
access.address,
6869
access.accessToken,
69-
args['input-file'],
70-
'text/x-gff3',
71-
false,
7270
)
71+
if (inputFileIsFileId) {
72+
uploadId = inputFile
73+
} else {
74+
if (!fs.existsSync(inputFile)) {
75+
this.error(`File "${inputFile}" does not exist`)
76+
}
77+
uploadId = await this.uploadFile(
78+
access.address,
79+
access.accessToken,
80+
inputFile,
81+
'text/x-gff3',
82+
false,
83+
)
84+
}
7385

7486
const body: SerializedAddFeaturesFromFileChange = {
7587
typeName: 'AddFeaturesFromFileChange',

packages/apollo-cli/src/utils.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,3 +471,21 @@ export async function idReader(
471471
}
472472
return ids
473473
}
474+
475+
export async function isFileId(
476+
x: string,
477+
address: string,
478+
accessToken: string,
479+
) {
480+
if (x.length != 24) {
481+
return false
482+
}
483+
const files: Response = await queryApollo(address, accessToken, 'files')
484+
const json = (await files.json()) as object[]
485+
for (const fileDoc of json) {
486+
if (fileDoc['_id' as keyof typeof fileDoc] === x) {
487+
return true
488+
}
489+
}
490+
return false
491+
}

packages/apollo-collaboration-server/src/changes/changes.service.ts

Lines changed: 75 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import {
3838
UnprocessableEntityException,
3939
} from '@nestjs/common'
4040
import { InjectModel } from '@nestjs/mongoose'
41-
import { type FilterQuery, Model } from 'mongoose'
41+
import { type FilterQuery, Model, Types } from 'mongoose'
4242

4343
import { CountersService } from '../counters/counters.service.js'
4444
import { FilesService } from '../files/files.service.js'
@@ -142,18 +142,18 @@ export class ChangesService {
142142
this.logger.debug(
143143
'*** INSERT DATA EXCEPTION - Start to clean up old temporary documents...',
144144
)
145-
await this.assemblyModel.deleteMany({
146-
$and: [{ status: -1, user: uniqUserId }],
147-
})
148-
await this.featureModel.deleteMany({
149-
$and: [{ status: -1, user: uniqUserId }],
150-
})
151-
await this.refSeqModel.deleteMany({
152-
$and: [{ status: -1, user: uniqUserId }],
153-
})
154-
await this.refSeqChunkModel.deleteMany({
155-
$and: [{ status: -1, user: uniqUserId }],
156-
})
145+
await this.assemblyModel
146+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
147+
.exec()
148+
await this.featureModel
149+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
150+
.exec()
151+
await this.refSeqModel
152+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
153+
.exec()
154+
await this.refSeqChunkModel
155+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
156+
.exec()
157157
throw new UnprocessableEntityException(String(error))
158158
}
159159

@@ -185,68 +185,58 @@ export class ChangesService {
185185

186186
await this.featureModel.db.transaction(async () => {
187187
try {
188-
await this.featureModel.updateMany(
189-
{
190-
$and: [{ status: -1, user: uniqUserId, _id: addedFeature._id }],
191-
},
192-
{ $set: { status: 0 } },
193-
)
188+
await this.featureModel
189+
.updateMany(
190+
{
191+
$and: [
192+
{ status: -1, user: uniqUserId, _id: addedFeature._id },
193+
],
194+
},
195+
{ $set: { status: 0 } },
196+
)
197+
.exec()
194198
} catch (error) {
195199
const err = error as Error
196200
this.logger.error(
197201
`Error setting status of add feature change to 0: ${err.message}`,
198202
)
199-
await this.featureModel.deleteMany({
200-
$and: [{ status: -1, user: uniqUserId, _id: addedFeature._id }],
201-
})
203+
await this.featureModel
204+
.deleteMany({
205+
$and: [{ status: -1, user: uniqUserId, _id: addedFeature._id }],
206+
})
207+
.exec()
202208
}
203209
})
204210
}
205211
}
206212

207213
if (STATUS_ZERO_CHANGE_TYPES.has(change.typeName)) {
208-
this.logger.debug?.('*** TEMPORARY DATA INSERTTED ***')
214+
// manual finalization of change since the data is too big for a transaction
215+
this.logger.debug('*** TEMPORARY DATA INSERTED ***')
209216
// Set "temporary document" -status --> "valid" -status i.e. (-1 --> 0)
210217
await this.featureModel.db.transaction(async () => {
211-
this.logger.debug(
212-
'Updates "temporary document" -status --> "valid" -status',
213-
)
214218
try {
215-
// We cannot use Mongo 'session' / transaction here because Mongo has 16 MB limit for transaction
216-
await this.assemblyModel.updateMany(
217-
{ $and: [{ status: -1, user: uniqUserId }] },
218-
{ $set: { status: 0 } },
219-
)
220-
await this.refSeqChunkModel.updateMany(
221-
{ $and: [{ status: -1, user: uniqUserId }] },
222-
{ $set: { status: 0 } },
223-
)
224-
await this.featureModel.updateMany(
225-
{ $and: [{ status: -1, user: uniqUserId }] },
226-
{ $set: { status: 0 } },
227-
)
228-
await this.refSeqModel.updateMany(
229-
{ $and: [{ status: -1, user: uniqUserId }] },
230-
{ $set: { status: 0 } },
231-
)
219+
await this.batchUpdateMany(this.assemblyModel, uniqUserId)
220+
await this.batchUpdateMany(this.refSeqChunkModel, uniqUserId)
221+
await this.batchUpdateMany(this.featureModel, uniqUserId)
222+
await this.batchUpdateMany(this.refSeqModel, uniqUserId)
232223
} catch (error) {
233224
// Clean up old "temporary document" -documents
234225
this.logger.debug(
235226
'*** UPDATE STATUS EXCEPTION - Start to clean up old temporary documents...',
236227
)
237-
// We cannot use Mongo 'session' / transaction here because Mongo has 16 MB limit for transaction
238-
await this.assemblyModel.deleteMany({
239-
$and: [{ status: -1, user: uniqUserId }],
240-
})
241-
await this.featureModel.deleteMany({
242-
$and: [{ status: -1, user: uniqUserId }],
243-
})
244-
await this.refSeqModel.deleteMany({
245-
$and: [{ status: -1, user: uniqUserId }],
246-
})
247-
await this.refSeqChunkModel.deleteMany({
248-
$and: [{ status: -1, user: uniqUserId }],
249-
})
228+
await this.assemblyModel
229+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
230+
.exec()
231+
await this.featureModel
232+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
233+
.exec()
234+
await this.refSeqModel
235+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
236+
.exec()
237+
await this.refSeqChunkModel
238+
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
239+
.exec()
250240
throw new UnprocessableEntityException(String(error))
251241
}
252242
})
@@ -333,4 +323,33 @@ export class ChangesService {
333323

334324
return change
335325
}
326+
327+
async batchUpdateMany(
328+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
329+
model: Model<any>,
330+
uniqUserId: string,
331+
) {
332+
let docsToUpdate = await model
333+
.find({ $and: [{ status: -1, user: uniqUserId }] })
334+
.limit(1000)
335+
.exec()
336+
let updatedCount = 0
337+
while (docsToUpdate.length > 0) {
338+
const lengthBefore = updatedCount
339+
updatedCount += docsToUpdate.length
340+
this.logger.debug(
341+
`Finalizing ${model.collection.name} ${lengthBefore} to ${updatedCount}`,
342+
)
343+
const idsToUpdate = docsToUpdate.map(
344+
(doc) => (doc as { _id: Types.ObjectId })._id,
345+
)
346+
await model
347+
.updateMany({ _id: idsToUpdate }, { $set: { status: 0 } })
348+
.exec()
349+
docsToUpdate = await model
350+
.find({ $and: [{ status: -1, user: uniqUserId }] })
351+
.limit(1000)
352+
.exec()
353+
}
354+
}
336355
}

packages/apollo-collaboration-server/src/checks/checks.service.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,20 +74,12 @@ export class ChecksService {
7474
}
7575

7676
async checkFeatures(docs: FeatureDocument[], checkTimestamps = true) {
77-
if (docs.length > 1) {
78-
this.logger.debug(`Checking ${docs.length} features`)
79-
}
80-
let docsChecked = 1
8177
for (const doc of docs) {
82-
if (docsChecked % 1000 === 0) {
83-
this.logger.debug(`checked ${docsChecked} features`)
84-
}
8578
// @ts-expect-error ownerDocument does exist, TS just doesn't know it
8679
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
8780
if (doc.ownerDocument() === doc && doc.status === 0) {
8881
await this.checkFeature(doc, checkTimestamps)
8982
}
90-
docsChecked += 1
9183
}
9284
}
9385

0 commit comments

Comments
 (0)