diff --git a/src/DurableTask.AzureStorage/AnalyticsEventSource.cs b/src/DurableTask.AzureStorage/AnalyticsEventSource.cs index ababb3083..5f04c0bca 100644 --- a/src/DurableTask.AzureStorage/AnalyticsEventSource.cs +++ b/src/DurableTask.AzureStorage/AnalyticsEventSource.cs @@ -1019,7 +1019,7 @@ public void DiscardingWorkItem( ExtensionVersion); } - [Event(EventIds.ProcessingMessage, Level = EventLevel.Informational, Task = Tasks.Processing, Opcode = EventOpcode.Receive, Version = 5)] + [Event(EventIds.ProcessingMessage, Level = EventLevel.Informational, Task = Tasks.Processing, Opcode = EventOpcode.Receive, Version = 6)] public void ProcessingMessage( Guid relatedActivityId, string Account, @@ -1030,6 +1030,7 @@ public void ProcessingMessage( string ExecutionId, string MessageId, int Age, + string PartitionId, long SequenceNumber, int Episode, bool IsExtendedSession, @@ -1047,6 +1048,7 @@ public void ProcessingMessage( ExecutionId ?? string.Empty, MessageId, Age, + PartitionId, SequenceNumber, Episode, IsExtendedSession, diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 4dd87b4d2..04a32e9fd 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -752,7 +752,10 @@ async Task LockNextTaskOrchestrationWorkItemAsync(boo } else { - session.TraceProcessingMessage(message, isExtendedSession: false); + session.TraceProcessingMessage( + message, + isExtendedSession: false, + partitionId: session.ControlQueue.Name); } } @@ -1525,7 +1528,7 @@ public async Task LockNextTaskActivityWorkItem( }); TraceMessageReceived(this.settings, session.MessageData, this.azureStorageClient.QueueAccountName); - session.TraceProcessingMessage(message, isExtendedSession: false); + session.TraceProcessingMessage(message, isExtendedSession: false, this.workItemQueue.Name); if (!this.activeActivitySessions.TryAdd(message.Id, session)) { diff --git a/src/DurableTask.AzureStorage/Logging/LogEvents.cs b/src/DurableTask.AzureStorage/Logging/LogEvents.cs index 721577ab6..0d9f91214 100644 --- a/src/DurableTask.AzureStorage/Logging/LogEvents.cs +++ b/src/DurableTask.AzureStorage/Logging/LogEvents.cs @@ -2500,6 +2500,7 @@ public ProcessingMessage( string executionId, string messageId, int age, + string partitionId, long sequenceNumber, int episode, bool isExtendedSession) @@ -2513,6 +2514,7 @@ public ProcessingMessage( this.ExecutionId = executionId; this.MessageId = messageId; this.Age = age; + this.PartitionId = partitionId; this.SequenceNumber = sequenceNumber; this.Episode = episode; this.IsExtendedSession = isExtendedSession; @@ -2544,6 +2546,9 @@ public ProcessingMessage( [StructuredLogField] public int Age { get; } + [StructuredLogField] + public string PartitionId { get; } + [StructuredLogField] public long SequenceNumber { get; } @@ -2572,6 +2577,7 @@ void IEventSourceEvent.WriteEventSource() => AnalyticsEventSource.Log.Processing this.ExecutionId, this.MessageId, this.Age, + this.PartitionId, this.SequenceNumber, this.Episode, this.IsExtendedSession, diff --git a/src/DurableTask.AzureStorage/Logging/LogHelper.cs b/src/DurableTask.AzureStorage/Logging/LogHelper.cs index 104615ada..e8fe9438e 100644 --- a/src/DurableTask.AzureStorage/Logging/LogHelper.cs +++ b/src/DurableTask.AzureStorage/Logging/LogHelper.cs @@ -823,6 +823,7 @@ internal void ProcessingMessage( string executionId, string messageId, int age, + string partitionId, long sequenceNumber, int episode, bool isExtendedSession) @@ -837,6 +838,7 @@ internal void ProcessingMessage( executionId, messageId, age, + partitionId, sequenceNumber, episode, isExtendedSession); diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index f11c82e8b..719694e97 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -114,7 +114,7 @@ public async Task> FetchNewOrchestrationMessagesAsync( var messages = new List(this.CurrentMessageBatch.Count); foreach (MessageData msg in this.CurrentMessageBatch) { - this.TraceProcessingMessage(msg, isExtendedSession: true); + this.TraceProcessingMessage(msg, isExtendedSession: true, partitionId: this.ControlQueue.Name); messages.Add(msg.TaskMessage); } diff --git a/src/DurableTask.AzureStorage/Messaging/Session.cs b/src/DurableTask.AzureStorage/Messaging/Session.cs index b0771740f..dc81c9139 100644 --- a/src/DurableTask.AzureStorage/Messaging/Session.cs +++ b/src/DurableTask.AzureStorage/Messaging/Session.cs @@ -45,7 +45,7 @@ public void StartNewLogicalTraceScope() AnalyticsEventSource.SetLogicalTraceActivityId(this.TraceActivityId); } - public void TraceProcessingMessage(MessageData data, bool isExtendedSession) + public void TraceProcessingMessage(MessageData data, bool isExtendedSession, string partitionId) { if (data == null) { @@ -64,7 +64,8 @@ public void TraceProcessingMessage(MessageData data, bool isExtendedSession) taskMessage.OrchestrationInstance.InstanceId, taskMessage.OrchestrationInstance.ExecutionId, queueMessage.MessageId, - Math.Max(0, (int)DateTimeOffset.UtcNow.Subtract(queueMessage.InsertedOn.Value).TotalMilliseconds), + age: Math.Max(0, (int)DateTimeOffset.UtcNow.Subtract(queueMessage.InsertedOn.Value).TotalMilliseconds), + partitionId, data.SequenceNumber, data.Episode.GetValueOrDefault(-1), isExtendedSession);