From f0d836f4b53e713f02e199203c8da95dfe81b7e8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 8 Jan 2025 16:55:48 +0000 Subject: [PATCH 1/2] Add more logging around new metadata system --- .../metadata/updateMetadata.server.ts | 38 +++++++++++++++++-- references/nextjs-realtime/src/trigger/csv.ts | 18 --------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/apps/webapp/app/services/metadata/updateMetadata.server.ts b/apps/webapp/app/services/metadata/updateMetadata.server.ts index ffab7d4807..6f1cb82adb 100644 --- a/apps/webapp/app/services/metadata/updateMetadata.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadata.server.ts @@ -351,6 +351,15 @@ export class UpdateMetadataService extends BaseService { }); if (result.count === 0) { + if (this.flushLoggingEnabled) { + logger.debug( + `[UpdateMetadataService][updateRunMetadataWithOperations] Optimistic lock failed for run ${runId}`, + { + metadataVersion: run.metadataVersion, + } + ); + } + // If this was our last attempt, buffer the operations and return optimistically if (attempts === MAX_RETRIES) { this.#ingestRunOperations(runId, operations); @@ -363,6 +372,15 @@ export class UpdateMetadataService extends BaseService { continue; } + if (this.flushLoggingEnabled) { + logger.debug( + `[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${runId}`, + { + metadata: applyResults.newMetadata, + } + ); + } + // Success! Return the new metadata return applyResults.newMetadata; } @@ -383,10 +401,15 @@ export class UpdateMetadataService extends BaseService { metadataPacket.data !== "{}" || (existingMetadata.data && metadataPacket.data !== existingMetadata.data) ) { - logger.debug(`Updating metadata directly for run`, { - metadata: metadataPacket.data, - runId, - }); + if (this.flushLoggingEnabled) { + logger.debug( + `[UpdateMetadataService][updateRunMetadataDirectly] Updating metadata directly for run`, + { + metadata: metadataPacket.data, + runId, + } + ); + } // Update the metadata without version check await this._prisma.taskRun.update({ @@ -416,6 +439,13 @@ export class UpdateMetadataService extends BaseService { }; }); + if (this.flushLoggingEnabled) { + logger.debug(`[UpdateMetadataService] Ingesting operations for run`, { + runId, + bufferedOperations, + }); + } + const existingBufferedOperations = this._bufferedOperations.get(runId) ?? []; this._bufferedOperations.set(runId, [...existingBufferedOperations, ...bufferedOperations]); diff --git a/references/nextjs-realtime/src/trigger/csv.ts b/references/nextjs-realtime/src/trigger/csv.ts index 3acdddd4d4..50167b484c 100644 --- a/references/nextjs-realtime/src/trigger/csv.ts +++ b/references/nextjs-realtime/src/trigger/csv.ts @@ -62,16 +62,6 @@ export const handleCSVUpload = schemaTask({ const successfulRows = results.runs.filter((r) => r.ok); const failedRows = results.runs.filter((r) => !r.ok); - const firstSuccessfulRow = successfulRows[0]; - - if (firstSuccessfulRow) { - const stream = await metadata.fetchStream(firstSuccessfulRow.id); - - for await (const value of stream) { - logger.info(`Stream value from ${firstSuccessfulRow.id}`, { value }); - } - } - return { file, rows, @@ -93,14 +83,6 @@ export const handleCSVRow = schemaTask({ metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id); - await metadata.parent.stream( - ctx.run.id, - (async function* () { - yield "hello"; - yield "world"; - })() - ); - return row; }, }); From badf513312cb35f0a1a150783ec8f5f07e522e0e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 8 Jan 2025 20:04:05 +0000 Subject: [PATCH 2/2] Make the authenticated env optional when updating run metadata --- .../app/routes/api.v1.runs.$runId.metadata.ts | 2 +- .../services/metadata/updateMetadata.server.ts | 16 ++++++++++------ .../app/v3/services/finalizeTaskRun.server.ts | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index 1ebf43bc3d..ce9222a18a 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -16,7 +16,7 @@ const { action } = createActionApiRoute( method: "PUT", }, async ({ authentication, body, params }) => { - const result = await updateMetadataService.call(authentication.environment, params.runId, body); + const result = await updateMetadataService.call(params.runId, body, authentication.environment); if (!result) { return json({ error: "Task Run not found" }, { status: 404 }); diff --git a/apps/webapp/app/services/metadata/updateMetadata.server.ts b/apps/webapp/app/services/metadata/updateMetadata.server.ts index 6f1cb82adb..47a9a8f5cb 100644 --- a/apps/webapp/app/services/metadata/updateMetadata.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadata.server.ts @@ -229,17 +229,21 @@ export class UpdateMetadataService extends BaseService { } public async call( - environment: AuthenticatedEnvironment, runId: string, - body: UpdateMetadataRequestBody + body: UpdateMetadataRequestBody, + environment?: AuthenticatedEnvironment ) { const runIdType = runId.startsWith("run_") ? "friendly" : "internal"; const taskRun = await this._prisma.taskRun.findFirst({ - where: { - runtimeEnvironmentId: environment.id, - ...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }), - }, + where: environment + ? { + runtimeEnvironmentId: environment.id, + ...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }), + } + : { + ...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }), + }, select: { id: true, status: true, diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index c31486021b..3c87dd5f41 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -70,9 +70,9 @@ export class FinalizeTaskRunService extends BaseService { completedAt, }); - if (env && metadata) { + if (metadata) { try { - await updateMetadataService.call(env, id, metadata); + await updateMetadataService.call(id, metadata, env); } catch (e) { logger.error("[FinalizeTaskRunService] Failed to update metadata", { taskRun: id,