@@ -11,10 +11,13 @@ namespace Cnblogs.DashScope.Core;
11
11
/// </summary>
12
12
public sealed class DashScopeClientWebSocket : IDisposable
13
13
{
14
- private static readonly UnboundedChannelOptions UnboundedChannelOptions = new ( )
15
- {
16
- SingleWriter = true ,
17
- } ;
14
+ private static readonly UnboundedChannelOptions UnboundedChannelOptions =
15
+ new ( )
16
+ {
17
+ SingleWriter = true ,
18
+ SingleReader = true ,
19
+ AllowSynchronousContinuations = true
20
+ } ;
18
21
19
22
private readonly ClientWebSocket _socket = new ( ) ;
20
23
private Task ? _receiveTask ;
@@ -63,14 +66,15 @@ public async Task ConnectAsync<TOutput>(Uri uri, CancellationToken cancellationT
63
66
{
64
67
await _socket . ConnectAsync ( uri , cancellationToken ) ;
65
68
_receiveTask = ReceiveMessagesAsync < TOutput > ( cancellationToken ) ;
66
- State = DashScopeWebSocketState . Connected ;
69
+ State = DashScopeWebSocketState . Ready ;
67
70
}
68
71
69
72
/// <summary>
70
73
/// Reset binary output.
71
74
/// </summary>
72
75
public void ResetOutput ( )
73
76
{
77
+ BinaryOutput . Writer . TryComplete ( ) ;
74
78
BinaryOutput = Channel . CreateUnbounded < byte > ( UnboundedChannelOptions ) ;
75
79
_taskStartedSignal = new TaskCompletionSource < bool > ( ) ;
76
80
}
@@ -84,17 +88,22 @@ public void ResetOutput()
84
88
/// <typeparam name="TInput">Type of the input.</typeparam>
85
89
/// <typeparam name="TParameter">Type of the parameter.</typeparam>
86
90
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> is requested.</exception>
87
- /// <exception cref="InvalidOperationException">Websocket is not connected.</exception>
91
+ /// <exception cref="InvalidOperationException">Websocket is not connected or already closed .</exception>
88
92
/// <exception cref="ObjectDisposedException">The underlying websocket has already been closed.</exception>
89
93
public Task SendMessageAsync < TInput , TParameter > (
90
94
DashScopeWebSocketRequest < TInput , TParameter > request ,
91
95
CancellationToken cancellationToken = default )
92
96
where TInput : class
93
97
where TParameter : class
94
98
{
99
+ if ( State == DashScopeWebSocketState . Closed )
100
+ {
101
+ throw new InvalidOperationException ( "Socket is already closed." ) ;
102
+ }
103
+
104
+ var json = JsonSerializer . Serialize ( request , DashScopeDefaults . SerializationOptions ) ;
95
105
return _socket . SendAsync (
96
- new ArraySegment < byte > (
97
- Encoding . UTF8 . GetBytes ( JsonSerializer . Serialize ( request , DashScopeDefaults . SerializationOptions ) ) ) ,
106
+ new ArraySegment < byte > ( Encoding . UTF8 . GetBytes ( json ) ) ,
98
107
WebSocketMessageType . Text ,
99
108
true ,
100
109
cancellationToken ) ;
@@ -110,7 +119,6 @@ public Task SendMessageAsync<TInput, TParameter>(
110
119
try
111
120
{
112
121
var result = await _socket . ReceiveAsync ( segment , cancellationToken ) ;
113
-
114
122
if ( result . MessageType == WebSocketMessageType . Close )
115
123
{
116
124
await CloseAsync ( cancellationToken ) ;
@@ -168,7 +176,7 @@ public async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationTo
168
176
_taskStartedSignal . TrySetResult ( true ) ;
169
177
break ;
170
178
case "task-finished" :
171
- State = DashScopeWebSocketState . Connected ;
179
+ State = DashScopeWebSocketState . Ready ;
172
180
BinaryOutput . Writer . Complete ( ) ;
173
181
break ;
174
182
case "task-failed" :
@@ -199,7 +207,7 @@ public async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationTo
199
207
public async Task CloseAsync ( CancellationToken cancellationToken = default )
200
208
{
201
209
await _socket . CloseAsync ( WebSocketCloseStatus . NormalClosure , "Closing" , cancellationToken ) ;
202
- Dispose ( ) ;
210
+ State = DashScopeWebSocketState . Closed ;
203
211
}
204
212
205
213
private void Dispose ( bool disposing )
@@ -208,7 +216,6 @@ private void Dispose(bool disposing)
208
216
{
209
217
// Dispose managed resources.
210
218
_socket . Dispose ( ) ;
211
- State = DashScopeWebSocketState . Closed ;
212
219
BinaryOutput . Writer . TryComplete ( ) ;
213
220
}
214
221
}
0 commit comments