Skip to content

Commit 93aafd0

Browse files
authored
Abstract away rollback management (#2889)
1 parent 14c9bd8 commit 93aafd0

File tree

5 files changed

+82
-27
lines changed

5 files changed

+82
-27
lines changed

src/components/file/file.service.ts

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
} from '@aws-sdk/client-s3';
55
import { Injectable } from '@nestjs/common';
66
import { bufferFromStream, cleanJoin, Nil } from '@seedcompany/common';
7-
import { Connection } from 'cypher-query-builder';
87
import { fileTypeFromStream } from 'file-type';
98
import { intersection } from 'lodash';
109
import { Duration } from 'luxon';
@@ -23,7 +22,13 @@ import {
2322
UnauthorizedException,
2423
} from '~/common';
2524
import { withAddedPath } from '~/common/url.util';
26-
import { ConfigService, IEventBus, ILogger, Logger } from '~/core';
25+
import {
26+
ConfigService,
27+
IEventBus,
28+
ILogger,
29+
Logger,
30+
RollbackManager,
31+
} from '~/core';
2732
import { FileBucket } from './bucket';
2833
import {
2934
CreateDefinedFileVersionInput,
@@ -56,7 +61,7 @@ export class FileService {
5661
constructor(
5762
private readonly bucket: FileBucket,
5863
private readonly repo: FileRepository,
59-
private readonly db: Connection,
64+
private readonly rollbacks: RollbackManager,
6065
private readonly config: ConfigService,
6166
private readonly mediaService: MediaService,
6267
private readonly eventBus: IEventBus,
@@ -384,28 +389,17 @@ export class FileService {
384389
if (existingUpload.status === 'rejected') {
385390
await this.bucket.moveObject(`temp/${uploadId}`, uploadId);
386391

387-
// A bit of a hacky way to move files back to the temp/ folder on
388-
// mutation error / transaction rollback. This prevents orphaned files in bucket.
389-
const tx = this.db.currentTransaction;
390-
// The mutation can be retried multiple times, when neo4j deems the error
391-
// is retry-able, but we only want to attach this rollback logic once.
392-
if (tx && !(tx as any).__S3_ROLLBACK) {
393-
(tx as any).__S3_ROLLBACK = true;
394-
395-
const orig = tx.rollback.bind(tx);
396-
tx.rollback = async () => {
397-
// Undo above operation by moving it back to temp folder.
398-
await this.bucket
399-
.moveObject(uploadId, `temp/${uploadId}`)
400-
.catch((e) => {
401-
this.logger.error('Failed to move file back to temp holding', {
402-
uploadId,
403-
exception: e,
404-
});
392+
// Undo the above operation by moving it back to temp folder.
393+
this.rollbacks.add(async () => {
394+
await this.bucket
395+
.moveObject(uploadId, `temp/${uploadId}`)
396+
.catch((e) => {
397+
this.logger.error('Failed to move file back to temp holding', {
398+
uploadId,
399+
exception: e,
405400
});
406-
await orig();
407-
};
408-
}
401+
});
402+
});
409403
}
410404

411405
await this.mediaService.detectAndSave(fv, media);

src/core/database/database.module.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { DatabaseService } from './database.service';
99
import { IndexerModule } from './indexer/indexer.module';
1010
import { MigrationModule } from './migration/migration.module';
1111
import { ParameterTransformer } from './parameter-transformer.service';
12+
import { RollbackManager } from './rollback-manager';
1213
import { TransactionalMutationsInterceptor } from './transactional-mutations.interceptor';
1314

1415
@Module({
@@ -18,8 +19,9 @@ import { TransactionalMutationsInterceptor } from './transactional-mutations.int
1819
DatabaseService,
1920
ParameterTransformer,
2021
{ provide: APP_INTERCEPTOR, useClass: TransactionalMutationsInterceptor },
22+
RollbackManager,
2123
],
22-
exports: [CypherFactory, DatabaseService, IndexerModule],
24+
exports: [CypherFactory, DatabaseService, IndexerModule, RollbackManager],
2325
})
2426
export class DatabaseModule implements OnApplicationShutdown {
2527
constructor(

src/core/database/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export { DtoRepository } from './dto.repository';
77
export * from './indexer';
88
export * from './migration';
99
export * from './db-type';
10+
export * from './rollback-manager';
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { Injectable } from '@nestjs/common';
2+
import { GqlContextHost } from '../graphql';
3+
4+
type RollbackFn = () => Promise<void>;
5+
6+
interface RollbackRef {
7+
fn: RollbackFn;
8+
remove: () => void;
9+
}
10+
11+
@Injectable()
12+
export class RollbackManager {
13+
private readonly functionsByContext = new WeakMap<
14+
GqlContextHost['context'],
15+
Set<RollbackFn>
16+
>();
17+
18+
constructor(private readonly contextHost: GqlContextHost) {}
19+
20+
/**
21+
* Add a function to call whenever a rollback is triggered.
22+
*/
23+
add(fn: RollbackFn): RollbackRef {
24+
const remove = () => this.functions.delete(fn);
25+
this.functions.add(fn);
26+
return { fn, remove };
27+
}
28+
29+
/**
30+
* Call all the stored rollback functions, and clear the list.
31+
*/
32+
async runAndClear() {
33+
const functions = this.functions;
34+
for (const fn of functions) {
35+
await fn();
36+
functions.delete(fn);
37+
}
38+
}
39+
40+
private get functions() {
41+
const contextId = this.contextHost.context;
42+
if (!this.functionsByContext.has(contextId)) {
43+
this.functionsByContext.set(contextId, new Set());
44+
}
45+
return this.functionsByContext.get(contextId)!;
46+
}
47+
}

src/core/database/transactional-mutations.interceptor.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ import { GqlContextType, GqlExecutionContext } from '@nestjs/graphql';
88
import { Connection } from 'cypher-query-builder';
99
import { GraphQLResolveInfo } from 'graphql';
1010
import { from, lastValueFrom } from 'rxjs';
11+
import { RollbackManager } from './rollback-manager';
1112

1213
/**
1314
* Run all mutations in a neo4j transaction.
1415
* This allows automatic rollbacks on error.
1516
*/
1617
@Injectable()
1718
export class TransactionalMutationsInterceptor implements NestInterceptor {
18-
constructor(private readonly db: Connection) {}
19+
constructor(
20+
private readonly db: Connection,
21+
private readonly rollbacks: RollbackManager,
22+
) {}
1923

2024
async intercept(context: ExecutionContext, next: CallHandler) {
2125
if (context.getType<GqlContextType>() !== 'graphql') {
@@ -29,7 +33,14 @@ export class TransactionalMutationsInterceptor implements NestInterceptor {
2933
}
3034

3135
return from(
32-
this.db.runInTransaction(async () => await lastValueFrom(next.handle())),
36+
this.db.runInTransaction(async () => {
37+
try {
38+
return await lastValueFrom(next.handle());
39+
} catch (e) {
40+
await this.rollbacks.runAndClear();
41+
throw e;
42+
}
43+
}),
3344
);
3445
}
3546
}

0 commit comments

Comments
 (0)