Skip to content

Commit d672195

Browse files
authored
Handle errors that may happen in the web socket receiver thread (#235)
1 parent 1db6ef4 commit d672195

File tree

3 files changed

+20
-6
lines changed

3 files changed

+20
-6
lines changed

shell/agents/Microsoft.Azure.Agent/AzureCopilotReceiver.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Concurrent;
22
using System.Net.WebSockets;
3+
using System.Runtime.ExceptionServices;
34
using System.Text.Json;
45

56
namespace Microsoft.Azure.Agent;
@@ -26,7 +27,6 @@ private AzureCopilotReceiver(ClientWebSocket webSocket)
2627
}
2728

2829
internal int Watermark { get; private set; }
29-
internal BlockingCollection<CopilotActivity> ActivityQueue => _activityQueue;
3030

3131
internal static async Task<AzureCopilotReceiver> CreateAsync(string streamUrl)
3232
{
@@ -52,6 +52,7 @@ private async Task ProcessActivities()
5252
if (result.MessageType is WebSocketMessageType.Close)
5353
{
5454
closingMessage = "Close message received";
55+
_activityQueue.Add(new CopilotActivity { Error = new ConnectionDroppedException("The server websocket is closing. Connection dropped.") });
5556
}
5657
}
5758
catch (OperationCanceledException)
@@ -65,6 +66,7 @@ private async Task ProcessActivities()
6566
{
6667
// TODO: log the closing request.
6768
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, closingMessage, CancellationToken.None);
69+
_activityQueue.CompleteAdding();
6870
break;
6971
}
7072

@@ -98,8 +100,20 @@ private async Task ProcessActivities()
98100
}
99101
}
100102

101-
// TODO: log the current state of the web socket
102-
// TODO: handle error state, such as 'aborted'
103+
// TODO: log the current state of the web socket.
104+
_activityQueue.Add(new CopilotActivity { Error = new ConnectionDroppedException($"The websocket got in '{_webSocket.State}' state. Connection dropped.") });
105+
_activityQueue.CompleteAdding();
106+
}
107+
108+
internal CopilotActivity Take(CancellationToken cancellationToken)
109+
{
110+
CopilotActivity activity = _activityQueue.Take(cancellationToken);
111+
if (activity.Error is not null)
112+
{
113+
ExceptionDispatchInfo.Capture(activity.Error).Throw();
114+
}
115+
116+
return activity;
103117
}
104118

105119
public void Dispose()

shell/agents/Microsoft.Azure.Agent/ChatSession.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private async Task StartConversationAsync(IHost host, CancellationToken cancella
142142

143143
while (true)
144144
{
145-
CopilotActivity activity = _copilotReceiver.ActivityQueue.Take(cancellationToken);
145+
CopilotActivity activity = _copilotReceiver.Take(cancellationToken);
146146
if (activity.IsMessage && activity.IsFromCopilot && _copilotReceiver.Watermark is 0)
147147
{
148148
activity.ExtractMetadata(out _, out ConversationState conversationState);
@@ -259,7 +259,7 @@ internal async Task<CopilotResponse> GetChatResponseAsync(string input, IStatusC
259259

260260
while (true)
261261
{
262-
CopilotActivity activity = _copilotReceiver.ActivityQueue.Take(cancellationToken);
262+
CopilotActivity activity = _copilotReceiver.Take(cancellationToken);
263263

264264
if (activity.ReplyToId != activityId)
265265
{

shell/agents/Microsoft.Azure.Agent/Schema.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ internal CopilotActivity ReadChunk(CancellationToken cancellationToken)
159159
return null;
160160
}
161161

162-
CopilotActivity activity = _receiver.ActivityQueue.Take(cancellationToken);
162+
CopilotActivity activity = _receiver.Take(cancellationToken);
163163

164164
if (!activity.IsMessageUpdate)
165165
{

0 commit comments

Comments
 (0)