diff --git a/packages/common/src/converter/failure-converter.ts b/packages/common/src/converter/failure-converter.ts index 767248792..fe1837054 100644 --- a/packages/common/src/converter/failure-converter.ts +++ b/packages/common/src/converter/failure-converter.ts @@ -3,8 +3,10 @@ import { ApplicationFailure, CancelledFailure, ChildWorkflowFailure, + decodeApplicationFailureCategory, decodeRetryState, decodeTimeoutType, + encodeApplicationFailureCategory, encodeRetryState, encodeTimeoutType, FAILURE_SOURCE, @@ -127,7 +129,9 @@ export class DefaultFailureConverter implements FailureConverter { failure.applicationFailureInfo.type, Boolean(failure.applicationFailureInfo.nonRetryable), arrayFromPayloads(payloadConverter, failure.applicationFailureInfo.details?.payloads), - this.optionalFailureToOptionalError(failure.cause, payloadConverter) + this.optionalFailureToOptionalError(failure.cause, payloadConverter), + undefined, + decodeApplicationFailureCategory(failure.applicationFailureInfo.category) ); } if (failure.serverFailureInfo) { @@ -273,6 +277,7 @@ export class DefaultFailureConverter implements FailureConverter { ? { payloads: toPayloads(payloadConverter, ...err.details) } : undefined, nextRetryDelay: msOptionalToTs(err.nextRetryDelay), + category: encodeApplicationFailureCategory(err.category), }, }; } diff --git a/packages/common/src/failure.ts b/packages/common/src/failure.ts index 0ef2fbeb3..f5f0e48c2 100644 --- a/packages/common/src/failure.ts +++ b/packages/common/src/failure.ts @@ -101,6 +101,34 @@ export const [encodeRetryState, decodeRetryState] = makeProtoEnumConverters< 'RETRY_STATE_' ); +/** + * A category to describe the severity and change the observability behavior of an application failure. + * + * Currently, observability behaviour changes are limited to: + * - activities that fail due to a BENIGN application failure emit DEBUG level logs and do not record metrics + * + * @experimental Category is a new feature and may be subject to change. + */ +export const ApplicationFailureCategory = { + BENIGN: 'BENIGN', +} as const; + +export type ApplicationFailureCategory = (typeof ApplicationFailureCategory)[keyof typeof ApplicationFailureCategory]; + +export const [encodeApplicationFailureCategory, decodeApplicationFailureCategory] = makeProtoEnumConverters< + temporal.api.enums.v1.ApplicationErrorCategory, + typeof temporal.api.enums.v1.ApplicationErrorCategory, + keyof typeof temporal.api.enums.v1.ApplicationErrorCategory, + typeof ApplicationFailureCategory, + 'APPLICATION_ERROR_CATEGORY_' +>( + { + [ApplicationFailureCategory.BENIGN]: 1, + UNSPECIFIED: 0, + } as const, + 'APPLICATION_ERROR_CATEGORY_' +); + export type WorkflowExecution = temporal.api.common.v1.IWorkflowExecution; /** @@ -172,7 +200,8 @@ export class ApplicationFailure extends TemporalFailure { public readonly nonRetryable?: boolean | undefined | null, public readonly details?: unknown[] | undefined | null, cause?: Error, - public readonly nextRetryDelay?: Duration | undefined | null + public readonly nextRetryDelay?: Duration | undefined | null, + public readonly category?: ApplicationFailureCategory | undefined | null ) { super(message, cause); } @@ -195,8 +224,8 @@ export class ApplicationFailure extends TemporalFailure { * By default, will be retryable (unless its `type` is included in {@link RetryPolicy.nonRetryableErrorTypes}). */ public static create(options: ApplicationFailureOptions): ApplicationFailure { - const { message, type, nonRetryable = false, details, nextRetryDelay, cause } = options; - return new this(message, type, nonRetryable, details, cause, nextRetryDelay); + const { message, type, nonRetryable = false, details, nextRetryDelay, cause, category } = options; + return new this(message, type, nonRetryable, details, cause, nextRetryDelay, category); } /** @@ -261,6 +290,12 @@ export interface ApplicationFailureOptions { * Cause of the failure */ cause?: Error; + + /** + * Severity category of the application error. + * Affects worker-side logging and metrics behavior of this failure. + */ + category?: ApplicationFailureCategory; } /** diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 7a418beca..872bfd60e 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -520,9 +520,7 @@ mod config { impl From for CorePollerBehavior { fn from(val: PollerBehavior) -> Self { match val { - PollerBehavior::SimpleMaximum { maximum } => { - Self::SimpleMaximum(maximum) - } + PollerBehavior::SimpleMaximum { maximum } => Self::SimpleMaximum(maximum), PollerBehavior::Autoscaling { minimum, maximum, @@ -771,10 +769,7 @@ mod custom_slot_supplier { slot_type: SK::kind().into(), task_queue: ctx.task_queue().to_string(), worker_identity: ctx.worker_identity().to_string(), - worker_deployment_version: ctx - .worker_deployment_version() - .clone() - .map(Into::into), + worker_deployment_version: ctx.worker_deployment_version().clone().map(Into::into), is_sticky: ctx.is_sticky(), }; diff --git a/packages/test/src/test-integration-workflows-with-recorded-logs.ts b/packages/test/src/test-integration-workflows-with-recorded-logs.ts index 000c3bfa1..859598b0a 100644 --- a/packages/test/src/test-integration-workflows-with-recorded-logs.ts +++ b/packages/test/src/test-integration-workflows-with-recorded-logs.ts @@ -1,6 +1,6 @@ import { ExecutionContext } from 'ava'; import * as workflow from '@temporalio/workflow'; -import { HandlerUnfinishedPolicy } from '@temporalio/common'; +import { ApplicationFailureCategory, HandlerUnfinishedPolicy } from '@temporalio/common'; import { LogEntry } from '@temporalio/worker'; import { WorkflowFailedError, WorkflowUpdateFailedError } from '@temporalio/client'; import { Context, helpers, makeTestFunction } from './helpers-integration'; @@ -469,3 +469,44 @@ async function assertWorkflowUpdateFailedBecauseWorkflowCompleted(t: ExecutionCo t.true((cause as workflow.ApplicationFailure).type === 'AcceptedUpdateCompletedWorkflow'); t.regex((cause as workflow.ApplicationFailure).message, /Workflow completed before the Update completed/); } + +export async function raiseErrorWorkflow(useBenign: boolean): Promise { + await workflow + .proxyActivities({ startToCloseTimeout: '10s', retry: { maximumAttempts: 1 } }) + .throwApplicationFailureActivity(useBenign); +} + +test('Application failure category controls log level', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + activities: { + async throwApplicationFailureActivity(useBenign: boolean) { + throw workflow.ApplicationFailure.create({ + category: useBenign ? ApplicationFailureCategory.BENIGN : undefined, + }); + }, + }, + }); + + await worker.runUntil(async () => { + // Run with BENIGN + let handle = await startWorkflow(raiseErrorWorkflow, { args: [true] }); + try { + await handle.result(); + } catch (_) { + const logs = recordedLogs[handle.workflowId]; + const activityFailureLog = logs.find((log) => log.message.includes('Activity failed')); + t.true(activityFailureLog !== undefined && activityFailureLog.level === 'DEBUG'); + } + + // Run without BENIGN + handle = await startWorkflow(raiseErrorWorkflow, { args: [false] }); + try { + await handle.result(); + } catch (_) { + const logs = recordedLogs[handle.workflowId]; + const activityFailureLog = logs.find((log) => log.message.includes('Activity failed')); + t.true(activityFailureLog !== undefined && activityFailureLog.level === 'WARN'); + } + }); +}); diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index df0c10b1d..7614017d0 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -3,6 +3,7 @@ import { asyncLocalStorage, CompleteAsyncError, Context, Info } from '@temporali import { ActivityFunction, ApplicationFailure, + ApplicationFailureCategory, CancelledFailure, ensureApplicationFailure, FAILURE_SOURCE, @@ -142,7 +143,12 @@ export class Activity { } else if (error instanceof CompleteAsyncError) { this.workerLogger.debug('Activity will complete asynchronously', { durationMs }); } else { - this.workerLogger.warn('Activity failed', { error, durationMs }); + if (error instanceof ApplicationFailure && error.category === ApplicationFailureCategory.BENIGN) { + // Downgrade log level to DEBUG for benign application errors. + this.workerLogger.debug('Activity failed', { error, durationMs }); + } else { + this.workerLogger.warn('Activity failed', { error, durationMs }); + } } } }