Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/common/src/converter/failure-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import {
ApplicationFailure,
CancelledFailure,
ChildWorkflowFailure,
decodeApplicationFailureCategory,
decodeRetryState,
decodeTimeoutType,
encodeApplicationFailureCategory,
encodeRetryState,
encodeTimeoutType,
FAILURE_SOURCE,
Expand Down Expand Up @@ -127,7 +129,9 @@ export class DefaultFailureConverter implements FailureConverter {
failure.applicationFailureInfo.type,
Boolean(failure.applicationFailureInfo.nonRetryable),
arrayFromPayloads(payloadConverter, failure.applicationFailureInfo.details?.payloads),
this.optionalFailureToOptionalError(failure.cause, payloadConverter)
this.optionalFailureToOptionalError(failure.cause, payloadConverter),
undefined,
decodeApplicationFailureCategory(failure.applicationFailureInfo.category)
);
}
if (failure.serverFailureInfo) {
Expand Down Expand Up @@ -273,6 +277,7 @@ export class DefaultFailureConverter implements FailureConverter {
? { payloads: toPayloads(payloadConverter, ...err.details) }
: undefined,
nextRetryDelay: msOptionalToTs(err.nextRetryDelay),
category: encodeApplicationFailureCategory(err.category),
},
};
}
Expand Down
36 changes: 33 additions & 3 deletions packages/common/src/failure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ export const [encodeRetryState, decodeRetryState] = makeProtoEnumConverters<
'RETRY_STATE_'
);

export const ApplicationFailureCategory = {
/**
* BENIGN category errors emit DEBUG level logs and do not record metrics
*/
BENIGN: 'BENIGN',
} as const;

export type ApplicationFailureCategory = (typeof ApplicationFailureCategory)[keyof typeof ApplicationFailureCategory];

export const [encodeApplicationFailureCategory, decodeApplicationFailureCategory] = makeProtoEnumConverters<
temporal.api.enums.v1.ApplicationErrorCategory,
typeof temporal.api.enums.v1.ApplicationErrorCategory,
keyof typeof temporal.api.enums.v1.ApplicationErrorCategory,
typeof ApplicationFailureCategory,
'APPLICATION_ERROR_CATEGORY_'
>(
{
[ApplicationFailureCategory.BENIGN]: 1,
UNSPECIFIED: 0,
} as const,
'APPLICATION_ERROR_CATEGORY_'
);

export type WorkflowExecution = temporal.api.common.v1.IWorkflowExecution;

/**
Expand Down Expand Up @@ -172,7 +195,8 @@ export class ApplicationFailure extends TemporalFailure {
public readonly nonRetryable?: boolean | undefined | null,
public readonly details?: unknown[] | undefined | null,
cause?: Error,
public readonly nextRetryDelay?: Duration | undefined | null
public readonly nextRetryDelay?: Duration | undefined | null,
public readonly category?: ApplicationFailureCategory | undefined | null
) {
super(message, cause);
}
Expand All @@ -195,8 +219,8 @@ export class ApplicationFailure extends TemporalFailure {
* By default, will be retryable (unless its `type` is included in {@link RetryPolicy.nonRetryableErrorTypes}).
*/
public static create(options: ApplicationFailureOptions): ApplicationFailure {
const { message, type, nonRetryable = false, details, nextRetryDelay, cause } = options;
return new this(message, type, nonRetryable, details, cause, nextRetryDelay);
const { message, type, nonRetryable = false, details, nextRetryDelay, cause, category } = options;
return new this(message, type, nonRetryable, details, cause, nextRetryDelay, category);
}

/**
Expand Down Expand Up @@ -261,6 +285,12 @@ export interface ApplicationFailureOptions {
* Cause of the failure
*/
cause?: Error;

/**
* Severity category of the application error.
* Maps to corresponding client-side logging/metrics behaviors.
*/
category?: ApplicationFailureCategory;
}

/**
Expand Down
9 changes: 2 additions & 7 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,7 @@ mod config {
impl From<PollerBehavior> for CorePollerBehavior {
fn from(val: PollerBehavior) -> Self {
match val {
PollerBehavior::SimpleMaximum { maximum } => {
Self::SimpleMaximum(maximum)
}
PollerBehavior::SimpleMaximum { maximum } => Self::SimpleMaximum(maximum),
PollerBehavior::Autoscaling {
minimum,
maximum,
Expand Down Expand Up @@ -771,10 +769,7 @@ mod custom_slot_supplier {
slot_type: SK::kind().into(),
task_queue: ctx.task_queue().to_string(),
worker_identity: ctx.worker_identity().to_string(),
worker_deployment_version: ctx
.worker_deployment_version()
.clone()
.map(Into::into),
worker_deployment_version: ctx.worker_deployment_version().clone().map(Into::into),
is_sticky: ctx.is_sticky(),
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ExecutionContext } from 'ava';
import * as workflow from '@temporalio/workflow';
import { HandlerUnfinishedPolicy } from '@temporalio/common';
import { ApplicationFailureCategory, HandlerUnfinishedPolicy } from '@temporalio/common';
import { LogEntry } from '@temporalio/worker';
import { WorkflowFailedError, WorkflowUpdateFailedError } from '@temporalio/client';
import { Context, helpers, makeTestFunction } from './helpers-integration';
Expand Down Expand Up @@ -469,3 +469,44 @@ async function assertWorkflowUpdateFailedBecauseWorkflowCompleted(t: ExecutionCo
t.true((cause as workflow.ApplicationFailure).type === 'AcceptedUpdateCompletedWorkflow');
t.regex((cause as workflow.ApplicationFailure).message, /Workflow completed before the Update completed/);
}

export async function raiseErrorWorkflow(useBenign: boolean): Promise<void> {
await workflow
.proxyActivities({ startToCloseTimeout: '10s', retry: { maximumAttempts: 1 } })
.throwApplicationFailureActivity(useBenign);
}

test('Application failure category controls log level', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
async throwApplicationFailureActivity(useBenign: boolean) {
throw workflow.ApplicationFailure.create({
category: useBenign ? ApplicationFailureCategory.BENIGN : undefined,
});
},
},
});

await worker.runUntil(async () => {
// Run with BENIGN
let handle = await startWorkflow(raiseErrorWorkflow, { args: [true] });
try {
await handle.result();
} catch (_) {
const logs = recordedLogs[handle.workflowId];
const activityFailureLog = logs.find((log) => log.message.includes('Activity failed'));
t.true(activityFailureLog !== undefined && activityFailureLog.level === 'DEBUG');
}

// Run without BENIGN
handle = await startWorkflow(raiseErrorWorkflow, { args: [false] });
try {
await handle.result();
} catch (_) {
const logs = recordedLogs[handle.workflowId];
const activityFailureLog = logs.find((log) => log.message.includes('Activity failed'));
t.true(activityFailureLog !== undefined && activityFailureLog.level === 'WARN');
}
});
});
8 changes: 7 additions & 1 deletion packages/worker/src/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { asyncLocalStorage, CompleteAsyncError, Context, Info } from '@temporali
import {
ActivityFunction,
ApplicationFailure,
ApplicationFailureCategory,
CancelledFailure,
ensureApplicationFailure,
FAILURE_SOURCE,
Expand Down Expand Up @@ -142,7 +143,12 @@ export class Activity {
} else if (error instanceof CompleteAsyncError) {
this.workerLogger.debug('Activity will complete asynchronously', { durationMs });
} else {
this.workerLogger.warn('Activity failed', { error, durationMs });
if (error instanceof ApplicationFailure && error.category === ApplicationFailureCategory.BENIGN) {
// Downgrade log level to DEBUG for benign application errors.
this.workerLogger.debug('Activity failed', { error, durationMs });
} else {
this.workerLogger.warn('Activity failed', { error, durationMs });
}
}
}
}
Expand Down
Loading