Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/common/src/converter/failure-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import {
ApplicationFailure,
CancelledFailure,
ChildWorkflowFailure,
decodeApplicationFailureCategory,
decodeRetryState,
decodeTimeoutType,
encodeApplicationFailureCategory,
encodeRetryState,
encodeTimeoutType,
FAILURE_SOURCE,
Expand Down Expand Up @@ -127,7 +129,9 @@ export class DefaultFailureConverter implements FailureConverter {
failure.applicationFailureInfo.type,
Boolean(failure.applicationFailureInfo.nonRetryable),
arrayFromPayloads(payloadConverter, failure.applicationFailureInfo.details?.payloads),
this.optionalFailureToOptionalError(failure.cause, payloadConverter)
this.optionalFailureToOptionalError(failure.cause, payloadConverter),
undefined,
decodeApplicationFailureCategory(failure.applicationFailureInfo.category)
);
}
if (failure.serverFailureInfo) {
Expand Down Expand Up @@ -273,6 +277,7 @@ export class DefaultFailureConverter implements FailureConverter {
? { payloads: toPayloads(payloadConverter, ...err.details) }
: undefined,
nextRetryDelay: msOptionalToTs(err.nextRetryDelay),
category: encodeApplicationFailureCategory(err.category),
},
};
}
Expand Down
41 changes: 38 additions & 3 deletions packages/common/src/failure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,34 @@ export const [encodeRetryState, decodeRetryState] = makeProtoEnumConverters<
'RETRY_STATE_'
);

/**
* A category to describe the severity and change the observability behavior of an application failure.
*
* Currently, observability behaviour changes are limited to:
* - activities that fail due to a BENIGN application failure emit DEBUG level logs and do not record metrics
*
* @experimental Category is a new feature and may be subject to change.
*/
export const ApplicationFailureCategory = {
BENIGN: 'BENIGN',
} as const;

export type ApplicationFailureCategory = (typeof ApplicationFailureCategory)[keyof typeof ApplicationFailureCategory];

export const [encodeApplicationFailureCategory, decodeApplicationFailureCategory] = makeProtoEnumConverters<
temporal.api.enums.v1.ApplicationErrorCategory,
typeof temporal.api.enums.v1.ApplicationErrorCategory,
keyof typeof temporal.api.enums.v1.ApplicationErrorCategory,
typeof ApplicationFailureCategory,
'APPLICATION_ERROR_CATEGORY_'
>(
{
[ApplicationFailureCategory.BENIGN]: 1,
UNSPECIFIED: 0,
} as const,
'APPLICATION_ERROR_CATEGORY_'
);

export type WorkflowExecution = temporal.api.common.v1.IWorkflowExecution;

/**
Expand Down Expand Up @@ -172,7 +200,8 @@ export class ApplicationFailure extends TemporalFailure {
public readonly nonRetryable?: boolean | undefined | null,
public readonly details?: unknown[] | undefined | null,
cause?: Error,
public readonly nextRetryDelay?: Duration | undefined | null
public readonly nextRetryDelay?: Duration | undefined | null,
public readonly category?: ApplicationFailureCategory | undefined | null
) {
super(message, cause);
}
Expand All @@ -195,8 +224,8 @@ export class ApplicationFailure extends TemporalFailure {
* By default, will be retryable (unless its `type` is included in {@link RetryPolicy.nonRetryableErrorTypes}).
*/
public static create(options: ApplicationFailureOptions): ApplicationFailure {
const { message, type, nonRetryable = false, details, nextRetryDelay, cause } = options;
return new this(message, type, nonRetryable, details, cause, nextRetryDelay);
const { message, type, nonRetryable = false, details, nextRetryDelay, cause, category } = options;
return new this(message, type, nonRetryable, details, cause, nextRetryDelay, category);
}

/**
Expand Down Expand Up @@ -261,6 +290,12 @@ export interface ApplicationFailureOptions {
* Cause of the failure
*/
cause?: Error;

/**
* Severity category of the application error.
* Affects worker-side logging and metrics behavior of this failure.
*/
category?: ApplicationFailureCategory;
}

/**
Expand Down
9 changes: 2 additions & 7 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,7 @@ mod config {
impl From<PollerBehavior> for CorePollerBehavior {
fn from(val: PollerBehavior) -> Self {
match val {
PollerBehavior::SimpleMaximum { maximum } => {
Self::SimpleMaximum(maximum)
}
PollerBehavior::SimpleMaximum { maximum } => Self::SimpleMaximum(maximum),
PollerBehavior::Autoscaling {
minimum,
maximum,
Expand Down Expand Up @@ -771,10 +769,7 @@ mod custom_slot_supplier {
slot_type: SK::kind().into(),
task_queue: ctx.task_queue().to_string(),
worker_identity: ctx.worker_identity().to_string(),
worker_deployment_version: ctx
.worker_deployment_version()
.clone()
.map(Into::into),
worker_deployment_version: ctx.worker_deployment_version().clone().map(Into::into),
is_sticky: ctx.is_sticky(),
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ExecutionContext } from 'ava';
import * as workflow from '@temporalio/workflow';
import { HandlerUnfinishedPolicy } from '@temporalio/common';
import { ApplicationFailureCategory, HandlerUnfinishedPolicy } from '@temporalio/common';
import { LogEntry } from '@temporalio/worker';
import { WorkflowFailedError, WorkflowUpdateFailedError } from '@temporalio/client';
import { Context, helpers, makeTestFunction } from './helpers-integration';
Expand Down Expand Up @@ -469,3 +469,44 @@ async function assertWorkflowUpdateFailedBecauseWorkflowCompleted(t: ExecutionCo
t.true((cause as workflow.ApplicationFailure).type === 'AcceptedUpdateCompletedWorkflow');
t.regex((cause as workflow.ApplicationFailure).message, /Workflow completed before the Update completed/);
}

export async function raiseErrorWorkflow(useBenign: boolean): Promise<void> {
await workflow
.proxyActivities({ startToCloseTimeout: '10s', retry: { maximumAttempts: 1 } })
.throwApplicationFailureActivity(useBenign);
}

test('Application failure category controls log level', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
async throwApplicationFailureActivity(useBenign: boolean) {
throw workflow.ApplicationFailure.create({
category: useBenign ? ApplicationFailureCategory.BENIGN : undefined,
});
},
},
});

await worker.runUntil(async () => {
// Run with BENIGN
let handle = await startWorkflow(raiseErrorWorkflow, { args: [true] });
try {
await handle.result();
} catch (_) {
const logs = recordedLogs[handle.workflowId];
const activityFailureLog = logs.find((log) => log.message.includes('Activity failed'));
t.true(activityFailureLog !== undefined && activityFailureLog.level === 'DEBUG');
}

// Run without BENIGN
handle = await startWorkflow(raiseErrorWorkflow, { args: [false] });
try {
await handle.result();
} catch (_) {
const logs = recordedLogs[handle.workflowId];
const activityFailureLog = logs.find((log) => log.message.includes('Activity failed'));
t.true(activityFailureLog !== undefined && activityFailureLog.level === 'WARN');
}
});
});
8 changes: 7 additions & 1 deletion packages/worker/src/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { asyncLocalStorage, CompleteAsyncError, Context, Info } from '@temporali
import {
ActivityFunction,
ApplicationFailure,
ApplicationFailureCategory,
CancelledFailure,
ensureApplicationFailure,
FAILURE_SOURCE,
Expand Down Expand Up @@ -142,7 +143,12 @@ export class Activity {
} else if (error instanceof CompleteAsyncError) {
this.workerLogger.debug('Activity will complete asynchronously', { durationMs });
} else {
this.workerLogger.warn('Activity failed', { error, durationMs });
if (error instanceof ApplicationFailure && error.category === ApplicationFailureCategory.BENIGN) {
// Downgrade log level to DEBUG for benign application errors.
this.workerLogger.debug('Activity failed', { error, durationMs });
} else {
this.workerLogger.warn('Activity failed', { error, durationMs });
}
}
}
}
Expand Down
Loading