Skip to content

Commit 311691e

Browse files
authored
Fix telemetry reporting 500 for successful activities/orchestrations with caught exceptions (#1270)
1 parent 59f6533 commit 311691e

File tree

5 files changed

+201
-15
lines changed

5 files changed

+201
-15
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// ----------------------------------------------------------------------------------
2+
// Copyright Microsoft Corporation
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// You may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ----------------------------------------------------------------------------------
13+
#if !NET462
14+
#nullable enable
15+
namespace DurableTask.Core.Tests
16+
{
17+
using System.Collections.Generic;
18+
using System.Diagnostics;
19+
using DurableTask.Core.Entities.OperationFormat;
20+
using DurableTask.Core.Tracing;
21+
using Microsoft.VisualStudio.TestTools.UnitTesting;
22+
using DiagnosticsActivityStatusCode = System.Diagnostics.ActivityStatusCode;
23+
using TraceActivityStatusCode = DurableTask.Core.Tracing.ActivityStatusCode;
24+
25+
[TestClass]
26+
public class TraceHelperTests
27+
{
28+
[TestMethod]
29+
public void EndActivitiesForEntityInvocationResetsSuccessfulStatus()
30+
{
31+
var activities = new List<Activity>
32+
{
33+
new Activity("entityOperation").Start()
34+
};
35+
activities[0].SetStatus(TraceActivityStatusCode.Error, "instrumented error");
36+
37+
var results = new List<OperationResult>
38+
{
39+
new OperationResult()
40+
};
41+
42+
TraceHelper.EndActivitiesForProcessingEntityInvocation(activities, results, batchFailureDetails: null);
43+
44+
Assert.AreEqual(DiagnosticsActivityStatusCode.Ok, activities[0].Status);
45+
}
46+
47+
[TestMethod]
48+
public void EndActivitiesForEntityInvocationMarksFailures()
49+
{
50+
var activities = new List<Activity>
51+
{
52+
new Activity("entityOperation").Start()
53+
};
54+
55+
var failingResults = new List<OperationResult>
56+
{
57+
new OperationResult
58+
{
59+
ErrorMessage = "entity failure"
60+
}
61+
};
62+
63+
TraceHelper.EndActivitiesForProcessingEntityInvocation(activities, failingResults, batchFailureDetails: null);
64+
65+
Assert.AreEqual(DiagnosticsActivityStatusCode.Error, activities[0].Status);
66+
}
67+
}
68+
}
69+
#endif

src/DurableTask.Core/TaskActivityDispatcher.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,12 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
267267
eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, null);
268268
}
269269

270+
if (traceActivity != null && eventToRespond is TaskCompletedEvent)
271+
{
272+
// Ensure successful executions don't preserve a prior error status from custom instrumentation.
273+
traceActivity.SetStatus(ActivityStatusCode.OK, "Completed");
274+
}
275+
270276
var responseTaskMessage = new TaskMessage
271277
{
272278
Event = eventToRespond,

src/DurableTask.Core/TaskOrchestrationDispatcher.cs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,8 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
10431043
runtimeState.OrchestrationInstance!,
10441044
() => Utils.EscapeJson(JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true)));
10451045

1046+
SetOrchestrationActivityStatus(completeOrchestratorAction);
1047+
10461048
// Check to see if we need to start a new execution
10471049
if (completeOrchestratorAction.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
10481050
{
@@ -1090,12 +1092,6 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
10901092
subOrchestrationFailedEvent.FailureDetails = completeOrchestratorAction.FailureDetails;
10911093

10921094
taskMessage.Event = subOrchestrationFailedEvent;
1093-
1094-
if (completeOrchestratorAction.OrchestrationStatus == OrchestrationStatus.Failed)
1095-
{
1096-
DistributedTraceActivity.Current?.SetStatus(
1097-
ActivityStatusCode.Error, completeOrchestratorAction.Result);
1098-
}
10991095
}
11001096

11011097
ResetDistributedTraceActivity(runtimeState);
@@ -1107,12 +1103,6 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
11071103
}
11081104
}
11091105

1110-
if (completeOrchestratorAction.OrchestrationStatus == OrchestrationStatus.Failed)
1111-
{
1112-
DistributedTraceActivity.Current?.SetStatus(
1113-
ActivityStatusCode.Error, completeOrchestratorAction.Result);
1114-
}
1115-
11161106
ResetDistributedTraceActivity(runtimeState);
11171107

11181108
return null;
@@ -1125,6 +1115,34 @@ private void ResetDistributedTraceActivity(OrchestrationRuntimeState runtimeStat
11251115
DistributedTraceActivity.Current = null;
11261116
}
11271117

1118+
private static void SetOrchestrationActivityStatus(OrchestrationCompleteOrchestratorAction completeOrchestratorAction)
1119+
{
1120+
if (DistributedTraceActivity.Current == null)
1121+
{
1122+
return;
1123+
}
1124+
1125+
string failureDescription = completeOrchestratorAction.FailureDetails?.ErrorMessage
1126+
?? completeOrchestratorAction.Result
1127+
?? completeOrchestratorAction.OrchestrationStatus.ToString();
1128+
1129+
switch (completeOrchestratorAction.OrchestrationStatus)
1130+
{
1131+
case OrchestrationStatus.Completed:
1132+
DistributedTraceActivity.Current.SetStatus(ActivityStatusCode.OK, OrchestrationStatus.Completed.ToString());
1133+
break;
1134+
case OrchestrationStatus.ContinuedAsNew:
1135+
DistributedTraceActivity.Current.SetStatus(ActivityStatusCode.OK, OrchestrationStatus.ContinuedAsNew.ToString());
1136+
break;
1137+
case OrchestrationStatus.Failed:
1138+
DistributedTraceActivity.Current.SetStatus(ActivityStatusCode.Error, failureDescription);
1139+
break;
1140+
case OrchestrationStatus.Terminated:
1141+
DistributedTraceActivity.Current.SetStatus(ActivityStatusCode.Error, OrchestrationStatus.Terminated.ToString());
1142+
break;
1143+
}
1144+
}
1145+
11281146
TaskMessage ProcessScheduleTaskDecision(
11291147
ScheduleTaskOrchestratorAction scheduleTaskOrchestratorAction,
11301148
OrchestrationRuntimeState runtimeState,

src/DurableTask.Core/Tracing/TraceHelper.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,13 @@ internal static void EndActivitiesForProcessingEntityInvocation(List<Activity> t
602602
{
603603
if (result.ErrorMessage != null || result.FailureDetails != null)
604604
{
605-
activity.SetTag(Schema.Task.ErrorMessage, result.ErrorMessage ?? result.FailureDetails!.ErrorMessage);
605+
string errorDetails = result.ErrorMessage ?? result.FailureDetails!.ErrorMessage;
606+
activity.SetTag(Schema.Task.ErrorMessage, errorDetails);
607+
activity.SetStatus(ActivityStatusCode.Error, errorDetails);
608+
}
609+
else
610+
{
611+
activity.SetStatus(ActivityStatusCode.OK, "Completed");
606612
}
607613
if (result.StartTimeUtc is DateTime startTime)
608614
{
@@ -630,6 +636,7 @@ internal static void EndActivitiesForProcessingEntityInvocation(List<Activity> t
630636
if (activity != null)
631637
{
632638
activity.SetTag(Schema.Task.ErrorMessage, errorMessage);
639+
activity.SetStatus(ActivityStatusCode.Error, errorMessage);
633640
activity.Dispose();
634641
}
635642
}

test/DurableTask.Core.Tests/DispatcherMiddlewareTests.cs

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ namespace DurableTask.Core.Tests
2626
using System.Xml;
2727
using DurableTask.Core.Command;
2828
using DurableTask.Core.History;
29+
using DurableTask.Core.Tracing;
2930
using DurableTask.Emulator;
3031
using DurableTask.Test.Orchestrations;
3132
using Microsoft.Extensions.Logging;
3233
using Microsoft.Extensions.Logging.Console;
3334
using Microsoft.VisualStudio.TestTools.UnitTesting;
35+
using DiagnosticsActivityStatusCode = System.Diagnostics.ActivityStatusCode;
36+
using TraceActivityStatusCode = DurableTask.Core.Tracing.ActivityStatusCode;
3437

3538
[TestClass]
3639
public class DispatcherMiddlewareTests
@@ -53,8 +56,15 @@ public void InitializeTests()
5356
// We use `GetAwaiter().GetResult()` because otherwise this method will fail with:
5457
// "X has wrong signature. The method must be non-static, public, does not return a value and should not take any parameter."
5558
this.worker
56-
.AddTaskOrchestrations(typeof(SimplestGreetingsOrchestration), typeof(ParentWorkflow), typeof(ChildWorkflow))
57-
.AddTaskActivities(typeof(SimplestGetUserTask), typeof(SimplestSendGreetingTask))
59+
.AddTaskOrchestrations(
60+
typeof(SimplestGreetingsOrchestration),
61+
typeof(ParentWorkflow),
62+
typeof(ChildWorkflow),
63+
typeof(ActivityStatusResetOrchestration))
64+
.AddTaskActivities(
65+
typeof(SimplestGetUserTask),
66+
typeof(SimplestSendGreetingTask),
67+
typeof(ActivityStatusResetActivity))
5868
.StartAsync().GetAwaiter().GetResult();
5969

6070
this.client = new TaskHubClient(service);
@@ -451,6 +461,82 @@ public async Task MockActivityOrchestration()
451461
Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus);
452462
Assert.AreEqual("FakeActivity,FakeActivityVersion,SomeInput", state.Output);
453463
}
464+
465+
[TestMethod]
466+
public async Task ActivityAndOrchestrationSpansResetStatuses()
467+
{
468+
using var activityCollector = new DurableActivityCollector();
469+
470+
OrchestrationInstance instance = await this.client.CreateOrchestrationInstanceAsync(
471+
typeof(ActivityStatusResetOrchestration),
472+
"payload");
473+
474+
TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10);
475+
OrchestrationState finalState = await this.client.WaitForOrchestrationAsync(instance, timeout);
476+
Assert.AreEqual(OrchestrationStatus.Completed, finalState.OrchestrationStatus);
477+
478+
string orchestrationName = NameVersionHelper.GetDefaultName(typeof(ActivityStatusResetOrchestration));
479+
Activity? orchestrationActivity = activityCollector.Find(TraceActivityConstants.Orchestration, orchestrationName, ActivityKind.Server);
480+
Assert.IsNotNull(orchestrationActivity, "Expected orchestration server trace to be captured.");
481+
Assert.AreEqual(DiagnosticsActivityStatusCode.Ok, orchestrationActivity!.Status);
482+
483+
string activityName = NameVersionHelper.GetDefaultName(typeof(ActivityStatusResetActivity));
484+
Activity? activitySpan = activityCollector.Find(TraceActivityConstants.Activity, activityName, ActivityKind.Server);
485+
Assert.IsNotNull(activitySpan, "Expected activity server trace to be captured.");
486+
Assert.AreEqual(DiagnosticsActivityStatusCode.Ok, activitySpan!.Status);
487+
}
488+
489+
private sealed class DurableActivityCollector : IDisposable
490+
{
491+
private readonly ActivityListener listener;
492+
private readonly ConcurrentBag<Activity> activities = new ConcurrentBag<Activity>();
493+
494+
public DurableActivityCollector()
495+
{
496+
this.listener = new ActivityListener
497+
{
498+
ShouldListenTo = source => source.Name == "DurableTask.Core",
499+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
500+
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
501+
ActivityStopped = activity => this.activities.Add(activity),
502+
};
503+
504+
ActivitySource.AddActivityListener(this.listener);
505+
}
506+
507+
public Activity? Find(string taskType, string taskName, ActivityKind kind)
508+
{
509+
return this.activities
510+
.Where(activity => activity.Kind == kind)
511+
.Where(activity => string.Equals(Convert.ToString(activity.GetTagItem(Schema.Task.Type)), taskType, StringComparison.Ordinal))
512+
.Where(activity => string.Equals(Convert.ToString(activity.GetTagItem(Schema.Task.Name)), taskName, StringComparison.Ordinal))
513+
.LastOrDefault();
514+
}
515+
516+
public void Dispose()
517+
{
518+
this.listener.Dispose();
519+
}
520+
}
521+
522+
private sealed class ActivityStatusResetOrchestration : TaskOrchestration<string, string>
523+
{
524+
public override async Task<string> RunTask(OrchestrationContext context, string input)
525+
{
526+
DistributedTraceActivity.Current?.SetStatus(TraceActivityStatusCode.Error, "orchestration instrumentation error");
527+
string? activityOutput = await context.ScheduleTask<string>(typeof(ActivityStatusResetActivity), input ?? "ok");
528+
return activityOutput ?? string.Empty;
529+
}
530+
}
531+
532+
private sealed class ActivityStatusResetActivity : TaskActivity<string, string>
533+
{
534+
protected override string Execute(TaskContext context, string input)
535+
{
536+
Activity.Current?.SetStatus(TraceActivityStatusCode.Error, "activity instrumentation error");
537+
return input ?? "ok";
538+
}
539+
}
454540
}
455541
}
456542
#endif

0 commit comments

Comments
 (0)