Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 339 additions & 0 deletions AI Contexts/CSV/csv-v2-context-07.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import { CsvImport } from '../../domain/CsvImport.js';
export interface CsvImportEntitiesImportsDataSource {
getAll(): Promise<CsvImport[]>;
getById(importId: string): Promise<ResultType<CsvImport, CsvImportDoesNotExistError>>;
cancel(importId: string): Promise<void>;
}
2 changes: 2 additions & 0 deletions app/api/csv.v2/application/contracts/CsvImportsDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ import { CsvImport } from '../../domain/CsvImport.js';
export interface CsvImportsDataSource {
insert(doc: CsvImport): Promise<void>;
update(doc: CsvImport): Promise<void>;
cancel(importId: string): Promise<void>;
isCancelled(importId: string): Promise<boolean>;
getById(importId: string): Promise<ResultType<CsvImport, CsvImportDoesNotExistError>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class CsvCreateRelationshipEntitiesJob extends AbstractUseCase<Input, void, Deps
};
await this.deps.csvImportsDS.update(withStatus.withStats(updatedStats));
await this.deps.relationshipValuesDS.replaceValues(importId, relationshipDocs);
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
await this.deps.jobsDispatcher.dispatch(CsvImportEntitiesJobHandler, {
tenantName,
userId,
Expand All @@ -139,6 +142,7 @@ class CsvCreateRelationshipEntitiesJob extends AbstractUseCase<Input, void, Deps
}) {
const { titlesByTemplate, totalTemplates, callbacks, importId, tenantName, userId } = params;
const observedTitles = CsvCreateRelationshipEntitiesJob.countObservedTitles(titlesByTemplate);
const shouldContinue = async () => !(await this.deps.csvImportsDS.isCancelled(importId));
if (!titlesByTemplate.size) {
return {
createdEntities: 0,
Expand Down Expand Up @@ -169,25 +173,40 @@ class CsvCreateRelationshipEntitiesJob extends AbstractUseCase<Input, void, Deps
tenantName,
userId,
}),
shouldContinue,
});
if (!(await shouldContinue())) {
return {
createdEntities,
relationshipDocs: [] as CsvImportRelationshipValues[],
observedTitles,
};
}
const relationshipDocs = await buildRelationshipAppliedValues({
entitiesDS: this.deps.entitiesDS,
importId,
titlesByTemplate,
chunkSize: RELATIONSHIP_TITLES_CHUNK_SIZE,
shouldContinue,
});

return { createdEntities, relationshipDocs, observedTitles };
}

async execute(input: Input): Promise<void> {
const { importId, callbacks } = input;
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}

callbacks.onStart({ importId });
await this.setStatus(importId, CsvImportStatus.PreflightRelationshipsCreate);

try {
await this.runApplyFlow(input);
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
callbacks.onSuccess({ importId });
} catch (error) {
await this.persistFailure(importId, error as Error);
Expand All @@ -203,6 +222,9 @@ class CsvCreateRelationshipEntitiesJob extends AbstractUseCase<Input, void, Deps
relationshipPendingValuesDS: this.deps.relationshipPendingValuesDS,
importId,
});
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
const creation = await this.runCreation({
importId,
titlesByTemplate,
Expand Down
13 changes: 13 additions & 0 deletions app/api/csv.v2/application/jobs/CsvCreateThesauriValuesJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class CsvCreateThesauriValuesJob extends AbstractUseCase<Input, void, Deps> {
thesauriTouched: totals.touched,
};
await this.deps.csvImportsDS.update(withStatus.withStats(updatedStats));
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
await this.deps.jobsDispatcher.dispatch(CsvCreateRelationshipEntitiesJobHandler, {
tenantName,
userId,
Expand Down Expand Up @@ -206,6 +209,9 @@ class CsvCreateThesauriValuesJob extends AbstractUseCase<Input, void, Deps> {
// eslint-disable-next-line max-statements
async execute(input: Input): Promise<void> {
const { importId, callbacks, tenantName, userId } = input;
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}

callbacks.onStart({ importId });
await this.setStatus(importId, CsvImportStatus.PreflightThesauriCreate);
Expand All @@ -218,6 +224,10 @@ class CsvCreateThesauriValuesJob extends AbstractUseCase<Input, void, Deps> {

let index = 0;
for (const pendingDoc of pendingDocs) {
// eslint-disable-next-line no-await-in-loop
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
index += 1;
// eslint-disable-next-line no-await-in-loop
const { updatedDoc } = await this.applyPendingThesaurusValues({
Expand All @@ -236,6 +246,9 @@ class CsvCreateThesauriValuesJob extends AbstractUseCase<Input, void, Deps> {
tenantName,
userId,
});
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
callbacks.onSuccess({ importId });
} catch (error) {
await this.persistFailure(importId, error as Error);
Expand Down
26 changes: 26 additions & 0 deletions app/api/csv.v2/application/jobs/CsvExtractUploadedZipJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class CsvExtractUploadedZipJob extends AbstractUseCase<Input, void, Deps> {
await this.setStatus(importId, CsvImportStatus.Failed);
}

private async isCancelled(importId: string) {
return this.deps.csvImportsDS.isCancelled(importId);
}

private async dispatchPreflight(importId: string, tenantName: string, userId: string) {
await this.deps.jobsDispatcher.dispatch(CsvPreflightJobHandler, {
tenantName,
Expand Down Expand Up @@ -127,6 +131,9 @@ class CsvExtractUploadedZipJob extends AbstractUseCase<Input, void, Deps> {
CsvImportStatus.ExtractingFilesDone
);
await this.deps.csvImportsDS.update(updated);
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
await this.dispatchPreflight(importId, context.tenantName, context.userId);
});
}
Expand Down Expand Up @@ -160,18 +167,25 @@ class CsvExtractUploadedZipJob extends AbstractUseCase<Input, void, Deps> {
if (!rows.length) {
return;
}
if (await this.isCancelled(rows[0].importId)) {
return;
}
await this.transactionManager.run(async () => {
await this.deps.rowsDS.insertMany(rows);
});
}

private async stageRows(importId: string, destination: string, callbacks: Callbacks) {
if (await this.isCancelled(importId)) {
return;
}
await this.deps.rowsStager.stage({
importId,
destination,
onRowProgress: info => callbacks.onProgress({ type: 'rows', ...info }),
deleteRows: async () => this.deleteExistingRows(importId),
insertBatch: async rows => this.insertRowsBatch(rows),
shouldContinue: async () => !(await this.isCancelled(importId)),
});
}

Expand All @@ -184,6 +198,9 @@ class CsvExtractUploadedZipJob extends AbstractUseCase<Input, void, Deps> {
userId: string;
}) {
try {
if (await this.isCancelled(params.importId)) {
return;
}
const normalizeResult = await this.deps.fileNormalizer.normalize({
importId: params.importId,
destination: params.destination,
Expand All @@ -195,6 +212,9 @@ class CsvExtractUploadedZipJob extends AbstractUseCase<Input, void, Deps> {
processedFiles: info.processedFiles,
}),
});
if (await this.isCancelled(params.importId)) {
return;
}
await this.stageRows(params.importId, params.destination, params.callbacks);
await this.handleExtractionSuccess(
params.importId,
Expand All @@ -204,6 +224,9 @@ class CsvExtractUploadedZipJob extends AbstractUseCase<Input, void, Deps> {
},
normalizeResult
);
if (await this.isCancelled(params.importId)) {
return;
}
CsvExtractUploadedZipJob.emitSuccess(params.callbacks, params.importId);
} catch (e) {
await this.handleError(params.importId, params.callbacks, e as Error);
Expand All @@ -213,6 +236,9 @@ class CsvExtractUploadedZipJob extends AbstractUseCase<Input, void, Deps> {

async execute(input: Input): Promise<void> {
const { importId, callbacks, tenantName, userId } = input;
if (await this.isCancelled(importId)) {
return;
}

CsvExtractUploadedZipJob.emitStart(callbacks, importId);
await this.setStatus(importId, CsvImportStatus.ExtractingFiles);
Expand Down
11 changes: 10 additions & 1 deletion app/api/csv.v2/application/jobs/CsvImportEntitiesJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class CsvImportEntitiesJob extends AbstractUseCase<Input, void, Deps> {

private async runImport(importId: string, callbacks: Callbacks) {
const context = await this.loadContext(importId);
const { entitiesCreated, csvImport, processedRows, shouldStop, stopReason } =
const { entitiesCreated, csvImport, processedRows, shouldStop, stopReason, cancelled } =
await processImportRows({
context,
callbacks,
Expand Down Expand Up @@ -199,6 +199,9 @@ class CsvImportEntitiesJob extends AbstractUseCase<Input, void, Deps> {
transactionManager: this.transactionManager,
fileStorage: this.deps.fileStorage,
});
if (cancelled || (await this.deps.csvImportsDS.isCancelled(importId))) {
return;
}
if (shouldStop) {
throw new NonRetryableJobError(new Error(stopReason || 'Stopped due to failure policy'));
}
Expand All @@ -207,12 +210,18 @@ class CsvImportEntitiesJob extends AbstractUseCase<Input, void, Deps> {

async execute(input: Input): Promise<void> {
const { importId, callbacks } = input;
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}

callbacks.onStart({ importId });
await this.setStatus(importId, CsvImportStatus.ImportEntities);

try {
await this.runImport(importId, callbacks);
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
callbacks.onSuccess({ importId });
} catch (error) {
await this.persistFailure(importId, error as Error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const processImportRows = async (params: {
processedRows: number;
csvImport: CsvImport;
shouldStop: boolean;
cancelled: boolean;
stopReason?: string;
}> => {
const { context, callbacks, deps, batchSize, failurePolicy } = params;
Expand All @@ -60,6 +61,16 @@ const processImportRows = async (params: {
let consecutiveFailures = 0;

for (let offset = startOffset; offset < totalRows; offset += batchSize) {
// eslint-disable-next-line no-await-in-loop
if (await deps.csvImportsDS.isCancelled(context.csvImport.id)) {
return {
entitiesCreated,
processedRows,
csvImport: currentImport,
shouldStop: false,
cancelled: true,
};
}
// eslint-disable-next-line no-await-in-loop
const rows = await deps.rowsDS.getByImport(context.csvImport.id, offset, batchSize);
if (!rows.length) {
Expand Down Expand Up @@ -115,12 +126,19 @@ const processImportRows = async (params: {
processedRows,
csvImport: currentImport,
shouldStop: true,
cancelled: false,
stopReason: stopDecision.reason,
};
}
}

return { entitiesCreated, processedRows, csvImport: currentImport, shouldStop: false };
return {
entitiesCreated,
processedRows,
csvImport: currentImport,
shouldStop: false,
cancelled: false,
};
};

export { processImportRows };
26 changes: 24 additions & 2 deletions app/api/csv.v2/application/jobs/CsvPreflightJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,13 @@ export class CsvPreflightJob extends AbstractUseCase<Input, Output, Deps> {
const { importId, totalRows, callbacks } = params;
const rows: CsvImportRow[] = [];
let processedRows = 0;
let cancelled = false;
for (let offset = 0; offset < totalRows; offset += DEFAULT_SCAN_BATCH_SIZE) {
// eslint-disable-next-line no-await-in-loop
if (await this.deps.csvImportsDS.isCancelled(importId)) {
cancelled = true;
break;
}
// eslint-disable-next-line no-await-in-loop
const batch = await this.deps.rowsDS.getByImport(importId, offset, DEFAULT_SCAN_BATCH_SIZE);
if (!batch.length) {
Expand All @@ -149,7 +155,7 @@ export class CsvPreflightJob extends AbstractUseCase<Input, Output, Deps> {
offset,
});
}
return { rows, totalRows, processedRows };
return { rows, totalRows, processedRows, cancelled };
}

private static appendScanBatch(params: {
Expand Down Expand Up @@ -237,6 +243,9 @@ export class CsvPreflightJob extends AbstractUseCase<Input, Output, Deps> {
// eslint-disable-next-line max-statements
async execute(input: Input): Promise<Output> {
const { importId, callbacks, tenantName, userId } = input;
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return { importId, status: CsvImportStatus.Cancelled };
}

callbacks.onStart({ importId });
await this.setStatus(importId, CsvImportStatus.PreflightScan);
Expand All @@ -246,7 +255,10 @@ export class CsvPreflightJob extends AbstractUseCase<Input, Output, Deps> {

try {
csvImport = await this.getImport(importId);
const { rows: stagedRows } = await this.getStagedRows(importId, callbacks);
const { rows: stagedRows, cancelled } = await this.getStagedRows(importId, callbacks);
if (cancelled || (await this.deps.csvImportsDS.isCancelled(importId))) {
return { importId, status: CsvImportStatus.Cancelled };
}
const template = await this.getTemplate(csvImport.templateId);
const [availableLanguages, defaultLanguage, settings] = await Promise.all([
this.deps.settingsDS.getLanguageKeys(),
Expand Down Expand Up @@ -322,20 +334,30 @@ export class CsvPreflightJob extends AbstractUseCase<Input, Output, Deps> {
relationshipPendingDocs
);

if (await this.deps.csvImportsDS.isCancelled(importId)) {
return { importId, status: CsvImportStatus.Cancelled };
}

await this.transactionManager.run(async () => {
const clearedFailure = CsvImportDomain.clearFailure(csvImport!);
const updated = CsvImportDomain.withStatus(
clearedFailure,
CsvImportStatus.PreflightScanDone
);
await this.deps.csvImportsDS.update(updated);
if (await this.deps.csvImportsDS.isCancelled(importId)) {
return;
}
await this.jobsDispatcher.dispatch(CsvCreateThesauriValuesJobHandler, {
tenantName,
userId,
importId,
});
});

if (await this.deps.csvImportsDS.isCancelled(importId)) {
return { importId, status: CsvImportStatus.Cancelled };
}
callbacks.onSuccess({ importId });
return { importId, status: CsvImportStatus.PreflightScanDone };
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('CsvCreateThesauriValuesJob', () => {
const csvImportsDS = {
getById: jest.fn().mockResolvedValue(Result.ok(csvImport)),
update: jest.fn().mockResolvedValue(undefined),
isCancelled: jest.fn().mockResolvedValue(false),
};
const entry = new CsvThesauriPendingEntry({
propertyId: 'prop',
Expand Down
Loading
Loading