Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions packages/apollo-cli/src/commands/assembly/add-from-fasta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import type {
} from '@apollo-annotation/shared'
import { Args, Flags } from '@oclif/core'
import { ObjectId } from 'bson'
import type { Response } from 'undici'

import { FileCommand } from '../../fileCommand.js'
import { queryApollo, submitAssembly } from '../../utils.js'
import { isFileId, submitAssembly } from '../../utils.js'

export default class AddFasta extends FileCommand {
static summary = 'Add a new assembly from fasta input'
Expand Down Expand Up @@ -235,17 +234,3 @@ function isValidHttpUrl(x: string) {
}
return url.protocol === 'http:' || url.protocol === 'https:'
}

async function isFileId(x: string, address: string, accessToken: string) {
if (x.length != 24) {
return false
}
const files: Response = await queryApollo(address, accessToken, 'files')
const json = (await files.json()) as object[]
for (const fileDoc of json) {
if (fileDoc['_id' as keyof typeof fileDoc] === x) {
return true
}
}
return false
}
26 changes: 19 additions & 7 deletions packages/apollo-cli/src/commands/feature/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { FileCommand } from '../../fileCommand.js'
import {
convertAssemblyNameToId,
createFetchErrorMessage,
isFileId,
localhostToAddress,
} from '../../utils.js'

Expand Down Expand Up @@ -46,9 +47,7 @@ export default class Import extends FileCommand {
public async run(): Promise<void> {
const { args, flags } = await this.parse(Import)

if (!fs.existsSync(args['input-file'])) {
this.error(`File "${args['input-file']}" does not exist`)
}
const inputFile = args['input-file']

const access = await this.getAccess()

Expand All @@ -63,13 +62,26 @@ export default class Import extends FileCommand {
)
}

const uploadId = await this.uploadFile(
let uploadId
const inputFileIsFileId = await isFileId(
inputFile,
access.address,
access.accessToken,
args['input-file'],
'text/x-gff3',
false,
)
if (inputFileIsFileId) {
uploadId = inputFile
} else {
if (!fs.existsSync(inputFile)) {
this.error(`File "${inputFile}" does not exist`)
}
uploadId = await this.uploadFile(
access.address,
access.accessToken,
inputFile,
'text/x-gff3',
false,
)
}

const body: SerializedAddFeaturesFromFileChange = {
typeName: 'AddFeaturesFromFileChange',
Expand Down
18 changes: 18 additions & 0 deletions packages/apollo-cli/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,21 @@ export async function idReader(
}
return ids
}

export async function isFileId(
x: string,
address: string,
accessToken: string,
) {
if (x.length != 24) {
return false
}
const files: Response = await queryApollo(address, accessToken, 'files')
const json = (await files.json()) as object[]
for (const fileDoc of json) {
if (fileDoc['_id' as keyof typeof fileDoc] === x) {
return true
}
}
return false
}
131 changes: 75 additions & 56 deletions packages/apollo-collaboration-server/src/changes/changes.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import {
UnprocessableEntityException,
} from '@nestjs/common'
import { InjectModel } from '@nestjs/mongoose'
import { type FilterQuery, Model } from 'mongoose'
import { type FilterQuery, Model, Types } from 'mongoose'

import { CountersService } from '../counters/counters.service.js'
import { FilesService } from '../files/files.service.js'
Expand Down Expand Up @@ -142,18 +142,18 @@ export class ChangesService {
this.logger.debug(
'*** INSERT DATA EXCEPTION - Start to clean up old temporary documents...',
)
await this.assemblyModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.featureModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.refSeqModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.refSeqChunkModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.assemblyModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
await this.featureModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
await this.refSeqModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
await this.refSeqChunkModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
throw new UnprocessableEntityException(String(error))
}

Expand Down Expand Up @@ -185,68 +185,58 @@ export class ChangesService {

await this.featureModel.db.transaction(async () => {
try {
await this.featureModel.updateMany(
{
$and: [{ status: -1, user: uniqUserId, _id: addedFeature._id }],
},
{ $set: { status: 0 } },
)
await this.featureModel
.updateMany(
{
$and: [
{ status: -1, user: uniqUserId, _id: addedFeature._id },
],
},
{ $set: { status: 0 } },
)
.exec()
} catch (error) {
const err = error as Error
this.logger.error(
`Error setting status of add feature change to 0: ${err.message}`,
)
await this.featureModel.deleteMany({
$and: [{ status: -1, user: uniqUserId, _id: addedFeature._id }],
})
await this.featureModel
.deleteMany({
$and: [{ status: -1, user: uniqUserId, _id: addedFeature._id }],
})
.exec()
}
})
}
}

if (STATUS_ZERO_CHANGE_TYPES.has(change.typeName)) {
this.logger.debug?.('*** TEMPORARY DATA INSERTTED ***')
// manual finalization of change since the data is too big for a transaction
this.logger.debug('*** TEMPORARY DATA INSERTED ***')
// Set "temporary document" -status --> "valid" -status i.e. (-1 --> 0)
await this.featureModel.db.transaction(async () => {
this.logger.debug(
'Updates "temporary document" -status --> "valid" -status',
)
try {
// We cannot use Mongo 'session' / transaction here because Mongo has 16 MB limit for transaction
await this.assemblyModel.updateMany(
{ $and: [{ status: -1, user: uniqUserId }] },
{ $set: { status: 0 } },
)
await this.refSeqChunkModel.updateMany(
{ $and: [{ status: -1, user: uniqUserId }] },
{ $set: { status: 0 } },
)
await this.featureModel.updateMany(
{ $and: [{ status: -1, user: uniqUserId }] },
{ $set: { status: 0 } },
)
await this.refSeqModel.updateMany(
{ $and: [{ status: -1, user: uniqUserId }] },
{ $set: { status: 0 } },
)
await this.batchUpdateMany(this.assemblyModel, uniqUserId)
await this.batchUpdateMany(this.refSeqChunkModel, uniqUserId)
await this.batchUpdateMany(this.featureModel, uniqUserId)
await this.batchUpdateMany(this.refSeqModel, uniqUserId)
} catch (error) {
// Clean up old "temporary document" -documents
this.logger.debug(
'*** UPDATE STATUS EXCEPTION - Start to clean up old temporary documents...',
)
// We cannot use Mongo 'session' / transaction here because Mongo has 16 MB limit for transaction
await this.assemblyModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.featureModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.refSeqModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.refSeqChunkModel.deleteMany({
$and: [{ status: -1, user: uniqUserId }],
})
await this.assemblyModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
await this.featureModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
await this.refSeqModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
await this.refSeqChunkModel
.deleteMany({ $and: [{ status: -1, user: uniqUserId }] })
.exec()
throw new UnprocessableEntityException(String(error))
}
})
Expand Down Expand Up @@ -333,4 +323,33 @@ export class ChangesService {

return change
}

async batchUpdateMany(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
model: Model<any>,
uniqUserId: string,
) {
let docsToUpdate = await model
.find({ $and: [{ status: -1, user: uniqUserId }] })
.limit(1000)
.exec()
let updatedCount = 0
while (docsToUpdate.length > 0) {
const lengthBefore = updatedCount
updatedCount += docsToUpdate.length
this.logger.debug(
`Finalizing ${model.collection.name} ${lengthBefore} to ${updatedCount}`,
)
const idsToUpdate = docsToUpdate.map(
(doc) => (doc as { _id: Types.ObjectId })._id,
)
await model
.updateMany({ _id: idsToUpdate }, { $set: { status: 0 } })
.exec()
docsToUpdate = await model
.find({ $and: [{ status: -1, user: uniqUserId }] })
.limit(1000)
.exec()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,12 @@ export class ChecksService {
}

async checkFeatures(docs: FeatureDocument[], checkTimestamps = true) {
if (docs.length > 1) {
this.logger.debug(`Checking ${docs.length} features`)
}
let docsChecked = 1
for (const doc of docs) {
if (docsChecked % 1000 === 0) {
this.logger.debug(`checked ${docsChecked} features`)
}
// @ts-expect-error ownerDocument does exist, TS just doesn't know it
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
if (doc.ownerDocument() === doc && doc.status === 0) {
await this.checkFeature(doc, checkTimestamps)
}
docsChecked += 1
}
}

Expand Down