Skip to content

Commit 70fb604

Browse files
CopilotYunchuWang
andcommitted
Add instance id logging scopes for orchestrations and activities
Co-authored-by: YunchuWang <[email protected]>
1 parent af83071 commit 70fb604

File tree

3 files changed

+182
-38
lines changed

3 files changed

+182
-38
lines changed

src/Worker/Core/Shims/TaskActivityShim.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public TaskActivityShim(
4444
TaskActivityContextWrapper contextWrapper = new(coreContext, this.name);
4545

4646
string instanceId = coreContext.OrchestrationInstance.InstanceId;
47+
using IDisposable? scope = this.logger.BeginScope(new Dictionary<string, object?>
48+
{
49+
["InstanceId"] = instanceId,
50+
});
4751
this.logger.ActivityStarted(instanceId, this.name);
4852

4953
try

src/Worker/Core/Shims/TaskOrchestrationShim.cs

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -57,55 +57,59 @@ public TaskOrchestrationShim(
5757

5858
/// <inheritdoc/>
5959
public override async Task<string?> Execute(OrchestrationContext innerContext, string rawInput)
60-
{
61-
Check.NotNull(innerContext);
62-
JsonDataConverterShim converterShim = new(this.invocationContext.Options.DataConverter);
63-
innerContext.MessageDataConverter = converterShim;
64-
innerContext.ErrorDataConverter = converterShim;
65-
66-
object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
67-
this.wrapperContext = new(innerContext, this.invocationContext, input, this.properties);
68-
69-
string instanceId = innerContext.OrchestrationInstance.InstanceId;
70-
if (!innerContext.IsReplaying)
7160
{
72-
this.logger.OrchestrationStarted(instanceId, this.invocationContext.Name);
73-
}
61+
Check.NotNull(innerContext);
62+
JsonDataConverterShim converterShim = new(this.invocationContext.Options.DataConverter);
63+
innerContext.MessageDataConverter = converterShim;
64+
innerContext.ErrorDataConverter = converterShim;
7465

75-
try
76-
{
77-
object? output = await this.implementation.RunAsync(this.wrapperContext, input);
66+
object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
67+
this.wrapperContext = new(innerContext, this.invocationContext, input, this.properties);
7868

69+
string instanceId = innerContext.OrchestrationInstance.InstanceId;
70+
using IDisposable? scope = this.logger.BeginScope(new Dictionary<string, object?>
71+
{
72+
["InstanceId"] = instanceId,
73+
});
7974
if (!innerContext.IsReplaying)
8075
{
81-
this.logger.OrchestrationCompleted(instanceId, this.invocationContext.Name);
76+
this.logger.OrchestrationStarted(instanceId, this.invocationContext.Name);
8277
}
8378

84-
// Return the output (if any) as a serialized string.
85-
return this.DataConverter.Serialize(output);
86-
}
87-
catch (TaskFailedException e)
88-
{
89-
if (!innerContext.IsReplaying)
79+
try
9080
{
91-
this.logger.OrchestrationFailed(e, instanceId, this.invocationContext.Name);
92-
}
81+
object? output = await this.implementation.RunAsync(this.wrapperContext, input);
82+
83+
if (!innerContext.IsReplaying)
84+
{
85+
this.logger.OrchestrationCompleted(instanceId, this.invocationContext.Name);
86+
}
9387

94-
// Convert back to something the Durable Task Framework natively understands so that
95-
// failure details are correctly propagated.
96-
throw new CoreTaskFailedException(e.Message, e.InnerException)
88+
// Return the output (if any) as a serialized string.
89+
return this.DataConverter.Serialize(output);
90+
}
91+
catch (TaskFailedException e)
9792
{
98-
FailureDetails = new FailureDetails(e,
99-
e.FailureDetails.ToCoreFailureDetails(),
100-
properties: e.FailureDetails.Properties),
101-
};
102-
}
103-
finally
104-
{
105-
// if user code crashed inside a critical section, or did not exit it, do that now
106-
this.wrapperContext.ExitCriticalSectionIfNeeded();
93+
if (!innerContext.IsReplaying)
94+
{
95+
this.logger.OrchestrationFailed(e, instanceId, this.invocationContext.Name);
96+
}
97+
98+
// Convert back to something the Durable Task Framework natively understands so that
99+
// failure details are correctly propagated.
100+
throw new CoreTaskFailedException(e.Message, e.InnerException)
101+
{
102+
FailureDetails = new FailureDetails(e,
103+
e.FailureDetails.ToCoreFailureDetails(),
104+
properties: e.FailureDetails.Properties),
105+
};
106+
}
107+
finally
108+
{
109+
// if user code crashed inside a critical section, or did not exit it, do that now
110+
this.wrapperContext.ExitCriticalSectionIfNeeded();
111+
}
107112
}
108-
}
109113

110114
/// <inheritdoc/>
111115
public override string? GetStatus()
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using Microsoft.DurableTask;
6+
using Microsoft.DurableTask.Converters;
7+
using Microsoft.DurableTask.Worker;
8+
using Microsoft.DurableTask.Worker.Shims;
9+
using Microsoft.Extensions.Logging;
10+
11+
namespace Microsoft.DurableTask.Worker.Shims;
12+
13+
public class TaskShimLoggingScopeTests
14+
{
15+
[Fact]
16+
public async Task TaskActivityShim_RunAsync_UsesInstanceIdScope()
17+
{
18+
// Arrange
19+
string instanceId = Guid.NewGuid().ToString("N");
20+
IDictionary<string, object?>? scopeState = null;
21+
Mock<ILogger> loggerMock = new();
22+
loggerMock.Setup(l => l.BeginScope(It.IsAny<IDictionary<string, object?>>()))
23+
.Callback((IDictionary<string, object?> state) => scopeState = state)
24+
.Returns(Mock.Of<IDisposable>());
25+
loggerMock.Setup(l => l.IsEnabled(It.IsAny<LogLevel>())).Returns(true);
26+
Mock<ILoggerFactory> loggerFactoryMock = new();
27+
loggerFactoryMock.Setup(f => f.CreateLogger(It.IsAny<string>())).Returns(loggerMock.Object);
28+
TaskActivityShim shim = new(loggerFactoryMock.Object, JsonDataConverter.Default, new TaskName("TestActivity"), new TestActivity());
29+
TaskContext coreContext = new(new OrchestrationInstance { InstanceId = instanceId });
30+
31+
// Act
32+
await shim.RunAsync(coreContext, "\"input\"");
33+
34+
// Assert
35+
scopeState.Should().NotBeNull();
36+
scopeState!.Should().ContainKey("InstanceId").WhoseValue.Should().Be(instanceId);
37+
}
38+
39+
[Fact]
40+
public async Task TaskOrchestrationShim_Execute_UsesInstanceIdScope()
41+
{
42+
// Arrange
43+
string instanceId = Guid.NewGuid().ToString("N");
44+
IDictionary<string, object?>? scopeState = null;
45+
Mock<ILogger> loggerMock = new();
46+
loggerMock.Setup(l => l.BeginScope(It.IsAny<IDictionary<string, object?>>()))
47+
.Callback((IDictionary<string, object?> state) => scopeState = state)
48+
.Returns(Mock.Of<IDisposable>());
49+
loggerMock.Setup(l => l.IsEnabled(It.IsAny<LogLevel>())).Returns(true);
50+
Mock<ILoggerFactory> loggerFactoryMock = new();
51+
loggerFactoryMock.Setup(f => f.CreateLogger(It.IsAny<string>())).Returns(loggerMock.Object);
52+
OrchestrationInvocationContext invocationContext = new(new TaskName("TestOrchestrator"), new DurableTaskWorkerOptions(), loggerFactoryMock.Object);
53+
TaskOrchestrationShim shim = new(invocationContext, new TestOrchestrator());
54+
TestOrchestrationContext innerContext = new(instanceId);
55+
56+
// Act
57+
await shim.Execute(innerContext, "\"input\"");
58+
59+
// Assert
60+
scopeState.Should().NotBeNull();
61+
scopeState!.Should().ContainKey("InstanceId").WhoseValue.Should().Be(instanceId);
62+
}
63+
64+
class TestActivity : TaskActivity<string, string>
65+
{
66+
public override Task<string> RunAsync(TaskActivityContext context, string input)
67+
{
68+
return Task.FromResult("ok");
69+
}
70+
}
71+
72+
class TestOrchestrator : TaskOrchestrator<string, string>
73+
{
74+
public override Task<string> RunAsync(TaskOrchestrationContext context, string input)
75+
{
76+
return Task.FromResult("ok");
77+
}
78+
}
79+
80+
class TestOrchestrationContext : OrchestrationContext
81+
{
82+
public TestOrchestrationContext(string instanceId)
83+
{
84+
this.OrchestrationInstance = new OrchestrationInstance
85+
{
86+
InstanceId = instanceId,
87+
ExecutionId = Guid.NewGuid().ToString("N"),
88+
};
89+
}
90+
91+
public override Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters)
92+
{
93+
throw new NotImplementedException();
94+
}
95+
96+
public override Task<T> CreateTimer<T>(DateTime fireAt, T state)
97+
{
98+
throw new NotImplementedException();
99+
}
100+
101+
public override Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken)
102+
{
103+
throw new NotImplementedException();
104+
}
105+
106+
public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input)
107+
{
108+
throw new NotImplementedException();
109+
}
110+
111+
public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input)
112+
{
113+
throw new NotImplementedException();
114+
}
115+
116+
public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input, IDictionary<string, string> tags)
117+
{
118+
throw new NotImplementedException();
119+
}
120+
121+
public override void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData)
122+
{
123+
throw new NotImplementedException();
124+
}
125+
126+
public override void ContinueAsNew(object input)
127+
{
128+
throw new NotImplementedException();
129+
}
130+
131+
public override void ContinueAsNew(string newVersion, object input)
132+
{
133+
throw new NotImplementedException();
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)