Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions src/Worker/Core/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,48 @@ namespace Microsoft.DurableTask
/// <summary>
/// Log messages.
/// </summary>
/// <remarks>
/// NOTE: Trying to make logs consistent with https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Logging/LogEvents.cs.
/// </remarks>
static partial class Logs
{
[LoggerMessage(EventId = 15, Level = LogLevel.Error, Message = "Unhandled exception in entity operation {entityInstanceId}/{operationName}.")]
public static partial void OperationError(this ILogger logger, Exception ex, EntityInstanceId entityInstanceId, string operationName);

[LoggerMessage(EventId = 55, Level = LogLevel.Information, Message = "{instanceId}: Evaluating custom retry handler for failed '{name}' task. Attempt = {attempt}.")]
public static partial void RetryingTask(this ILogger logger, string instanceId, string name, int attempt);

[LoggerMessage(EventId = 600, Level = LogLevel.Information, Message = "'{Name}' orchestration with ID '{InstanceId}' started.")]
public static partial void OrchestrationStarted(this ILogger logger, string instanceId, string name);

[LoggerMessage(EventId = 601, Level = LogLevel.Information, Message = "'{Name}' orchestration with ID '{InstanceId}' completed.")]
public static partial void OrchestrationCompleted(this ILogger logger, string instanceId, string name);

[LoggerMessage(EventId = 602, Level = LogLevel.Information, Message = "'{Name}' orchestration with ID '{InstanceId}' failed.")]
public static partial void OrchestrationFailed(this ILogger logger, Exception ex, string instanceId, string name);

[LoggerMessage(EventId = 603, Level = LogLevel.Information, Message = "'{Name}' activity of orchestration ID '{InstanceId}' started.")]
public static partial void ActivityStarted(this ILogger logger, string instanceId, string name);

[LoggerMessage(EventId = 604, Level = LogLevel.Information, Message = "'{Name}' activity of orchestration ID '{InstanceId}' completed.")]
public static partial void ActivityCompleted(this ILogger logger, string instanceId, string name);

[LoggerMessage(EventId = 605, Level = LogLevel.Information, Message = "'{Name}' activity of orchestration ID '{InstanceId}' failed.")]
public static partial void ActivityFailed(this ILogger logger, Exception ex, string instanceId, string name);

/// <summary>
/// Creates a logger named "Microsoft.DurableTask.Worker" with an optional subcategory.
/// </summary>
/// <param name="loggerFactory">The logger factory to use to create the logger.</param>
/// <param name="subcategory">The subcategory of the logger. For example, "Activities" or "Orchestrations".
/// </param>
/// <returns>The generated <see cref="ILogger"/>.</returns>
internal static ILogger CreateWorkerLogger(ILoggerFactory loggerFactory, string? subcategory = null)
{
string categoryName = "Microsoft.DurableTask.Worker";
if (!string.IsNullOrEmpty(subcategory))
{
categoryName += "." + subcategory;
}

return loggerFactory.CreateLogger(categoryName);
}
}
}
2 changes: 1 addition & 1 deletion src/Worker/Core/Shims/DurableTaskShimFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public TaskActivity CreateActivity(TaskName name, ITaskActivity activity)
{
Check.NotDefault(name);
Check.NotNull(activity);
return new TaskActivityShim(this.options.DataConverter, name, activity);
return new TaskActivityShim(this.loggerFactory, this.options.DataConverter, name, activity);
}

/// <summary>
Expand Down
32 changes: 27 additions & 5 deletions src/Worker/Core/Shims/TaskActivityShim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using DurableTask.Core;
using Microsoft.Extensions.Logging;

namespace Microsoft.DurableTask.Worker.Shims;

Expand All @@ -11,17 +12,24 @@ namespace Microsoft.DurableTask.Worker.Shims;
class TaskActivityShim : TaskActivity
{
readonly ITaskActivity implementation;
readonly ILogger logger;
readonly DataConverter dataConverter;
readonly TaskName name;

/// <summary>
/// Initializes a new instance of the <see cref="TaskActivityShim"/> class.
/// </summary>
/// <param name="loggerFactory">The logger factory.</param>
/// <param name="dataConverter">The data converter.</param>
/// <param name="name">The name of the activity.</param>
/// <param name="implementation">The activity implementation to wrap.</param>
public TaskActivityShim(DataConverter dataConverter, TaskName name, ITaskActivity implementation)
public TaskActivityShim(
ILoggerFactory loggerFactory,
DataConverter dataConverter,
TaskName name,
ITaskActivity implementation)
{
this.logger = Logs.CreateWorkerLogger(Check.NotNull(loggerFactory), "Activities");
this.dataConverter = Check.NotNull(dataConverter);
this.name = Check.NotDefault(name);
this.implementation = Check.NotNull(implementation);
Expand All @@ -34,11 +42,25 @@ public TaskActivityShim(DataConverter dataConverter, TaskName name, ITaskActivit
string? strippedRawInput = StripArrayCharacters(rawInput);
object? deserializedInput = this.dataConverter.Deserialize(strippedRawInput, this.implementation.InputType);
TaskActivityContextWrapper contextWrapper = new(coreContext, this.name);
object? output = await this.implementation.RunAsync(contextWrapper, deserializedInput);

// Return the output (if any) as a serialized string.
string? serializedOutput = this.dataConverter.Serialize(output);
return serializedOutput;
string instanceId = coreContext.OrchestrationInstance.InstanceId;
this.logger.ActivityStarted(instanceId, this.name);

try
{
object? output = await this.implementation.RunAsync(contextWrapper, deserializedInput);

// Return the output (if any) as a serialized string.
string? serializedOutput = this.dataConverter.Serialize(output);
this.logger.ActivityCompleted(instanceId, this.name);

return serializedOutput;
}
catch (Exception e)
{
this.logger.ActivityFailed(e, instanceId, this.name);
throw;
}
}

/// <inheritdoc/>
Expand Down
22 changes: 20 additions & 2 deletions src/Worker/Core/Shims/TaskOrchestrationShim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ partial class TaskOrchestrationShim : TaskOrchestration
{
readonly ITaskOrchestrator implementation;
readonly OrchestrationInvocationContext invocationContext;
readonly ILogger logger;

TaskOrchestrationContextWrapper? wrapperContext;

/// <summary>
Expand All @@ -31,12 +33,12 @@ public TaskOrchestrationShim(
{
this.invocationContext = Check.NotNull(invocationContext);
this.implementation = Check.NotNull(implementation);

this.logger = Logs.CreateWorkerLogger(this.invocationContext.LoggerFactory, "Orchestrations");
}

DataConverter DataConverter => this.invocationContext.Options.DataConverter;

ILoggerFactory LoggerFactory => this.invocationContext.LoggerFactory;

/// <inheritdoc/>
public override async Task<string?> Execute(OrchestrationContext innerContext, string rawInput)
{
Expand All @@ -48,15 +50,31 @@ public TaskOrchestrationShim(
object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
this.wrapperContext = new(innerContext, this.invocationContext, input);

string instanceId = innerContext.OrchestrationInstance.InstanceId;
if (!innerContext.IsReplaying)
{
this.logger.OrchestrationStarted(instanceId, this.invocationContext.Name);
}

try
{
object? output = await this.implementation.RunAsync(this.wrapperContext, input);

if (!innerContext.IsReplaying)
{
this.logger.OrchestrationCompleted(instanceId, this.invocationContext.Name);
}

// Return the output (if any) as a serialized string.
return this.DataConverter.Serialize(output);
}
catch (TaskFailedException e)
{
if (!innerContext.IsReplaying)
{
this.logger.OrchestrationFailed(e, instanceId, this.invocationContext.Name);
}

// Convert back to something the Durable Task Framework natively understands so that
// failure details are correctly propagated.
throw new CoreTaskFailedException(e.Message, e.InnerException)
Expand Down
3 changes: 0 additions & 3 deletions src/Worker/Grpc/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ namespace Microsoft.DurableTask.Worker.Grpc
/// <summary>
/// Log messages.
/// </summary>
/// <remarks>
/// NOTE: Trying to make logs consistent with https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Logging/LogEvents.cs.
/// </remarks>
static partial class Logs
{
[LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "Durable Task gRPC worker starting and connecting to {endpoint}.")]
Expand Down
55 changes: 55 additions & 0 deletions test/Grpc.IntegrationTests/IntegrationTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,61 @@ protected IReadOnlyCollection<LogEntry> GetLogs()
return logs;
}

protected IReadOnlyCollection<LogEntry> GetLogs(string category)
{
this.logProvider.TryGetLogs(category, out IReadOnlyCollection<LogEntry> logs);
return logs ?? [];
}

protected static bool MatchLog(
LogEntry log,
string logEventName,
(Type exceptionType, string exceptionMessage)? exception,
params (string key, string value)[] metadataPairs)
{
if (log.EventId.Name != logEventName)
{
return false;
}

if (log.Exception is null != exception is null)
{
return false;
}
else if (exception is not null)
{
if (log.Exception?.GetType() != exception.Value.exceptionType)
{
return false;
}
if (log.Exception?.Message != exception.Value.exceptionMessage)
{
return false;
}
}

if (log.State is not IReadOnlyCollection<KeyValuePair<string, object>> metadataList)
{
return false;
}

Dictionary<string, object> state = new(metadataList);
foreach ((string key, string value) in metadataPairs)
{
if (!state.TryGetValue(key, out object? stateValue))
{
return false;
}

if (stateValue is not string stateString || stateString != value)
{
return false;
}
}

return true;
}

protected struct HostTestLifetime : IAsyncDisposable
{
readonly IHost host;
Expand Down
42 changes: 39 additions & 3 deletions test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Runtime.Serialization;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Tests.Logging;
using Microsoft.DurableTask.Worker;
using Xunit.Abstractions;
using Xunit.Sdk;
Expand All @@ -28,8 +29,11 @@ public async Task UnhandledActivityException()
TaskName activityName = "FaultyActivity";

// Use local function definitions to simplify the validation of the call stacks
async Task MyOrchestrationImpl(TaskOrchestrationContext ctx) => await ctx.CallActivityAsync(activityName);
void MyActivityImpl(TaskActivityContext ctx) => throw new Exception(errorMessage, new CustomException("Inner!"));
async Task MyOrchestrationImpl(TaskOrchestrationContext ctx) =>
await ctx.CallActivityAsync(activityName);

void MyActivityImpl(TaskActivityContext ctx) =>
throw new InvalidOperationException(errorMessage, new CustomException("Inner!"));

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
Expand Down Expand Up @@ -64,13 +68,45 @@ public async Task UnhandledActivityException()

// Check that the inner exception - i.e. the exact exception that failed the orchestration - was populated correctly
Assert.NotNull(failureDetails.InnerFailure);
Assert.Equal(typeof(Exception).FullName, failureDetails.InnerFailure!.ErrorType);
Assert.Equal(typeof(InvalidOperationException).FullName, failureDetails.InnerFailure!.ErrorType);
Assert.Equal(errorMessage, failureDetails.InnerFailure.ErrorMessage);

// Check that the inner-most exception was populated correctly too (the custom exception type)
Assert.NotNull(failureDetails.InnerFailure.InnerFailure);
Assert.Equal(typeof(CustomException).FullName, failureDetails.InnerFailure.InnerFailure!.ErrorType);
Assert.Equal("Inner!", failureDetails.InnerFailure.InnerFailure.ErrorMessage);

IReadOnlyCollection<LogEntry> workerLogs = this.GetLogs(category: "Microsoft.DurableTask.Worker");
Assert.NotEmpty(workerLogs);

// Check that the orchestrator and activity logs are present
Assert.Single(workerLogs, log => MatchLog(
log,
logEventName: "OrchestrationStarted",
exception: null,
("InstanceId", instanceId),
("Name", orchestratorName.Name)));

Assert.Single(workerLogs, log => MatchLog(
log,
logEventName: "ActivityStarted",
exception: null,
("InstanceId", instanceId),
("Name", activityName.Name)));

Assert.Single(workerLogs, log => MatchLog(
log,
logEventName: "ActivityFailed",
exception: (typeof(InvalidOperationException), errorMessage),
("InstanceId", instanceId),
("Name", activityName.Name)));

Assert.Single(workerLogs, log => MatchLog(
log,
logEventName: "OrchestrationFailed",
exception: (typeof(TaskFailedException), $"Task '{activityName}' (#0) failed with an unhandled exception: {errorMessage}"),
("InstanceId", instanceId),
("Name", orchestratorName.Name)));
}

/// <summary>
Expand Down
Loading
Loading