@@ -19,14 +19,17 @@ public sealed class DashScopeClientWebSocket : IDisposable
19
19
AllowSynchronousContinuations = true
20
20
} ;
21
21
22
- private readonly ClientWebSocket _socket = new ( ) ;
22
+ private readonly IClientWebSocket _socket ;
23
23
private Task ? _receiveTask ;
24
24
private TaskCompletionSource < bool > _taskStartedSignal = new ( ) ;
25
+ private Channel < byte > ? _binaryOutput ;
25
26
26
27
/// <summary>
27
28
/// The binary output.
28
29
/// </summary>
29
- public Channel < byte > BinaryOutput { get ; private set ; } = Channel . CreateUnbounded < byte > ( UnboundedChannelOptions ) ;
30
+ public ChannelReader < byte > BinaryOutput
31
+ => _binaryOutput ? . Reader
32
+ ?? throw new InvalidOperationException ( "Please call ResetOutput() before accessing output" ) ;
30
33
31
34
/// <summary>
32
35
/// A task that completed when received task-started event.
@@ -45,6 +48,7 @@ public sealed class DashScopeClientWebSocket : IDisposable
45
48
/// <param name="workspaceId">Optional workspace id.</param>
46
49
public DashScopeClientWebSocket ( string apiKey , string ? workspaceId = null )
47
50
{
51
+ _socket = new ClientWebSocketWrapper ( new ClientWebSocket ( ) ) ;
48
52
_socket . Options . SetRequestHeader ( "X-DashScope-DataInspection" , "enable" ) ;
49
53
_socket . Options . SetRequestHeader ( "Authorization" , $ "bearer { apiKey } ") ;
50
54
if ( string . IsNullOrEmpty ( workspaceId ) == false )
@@ -53,6 +57,15 @@ public DashScopeClientWebSocket(string apiKey, string? workspaceId = null)
53
57
}
54
58
}
55
59
60
+ /// <summary>
61
+ /// Initiate a <see cref="DashScopeClientWebSocket"/> with a pre-configured <see cref="ClientWebSocket"/>.
62
+ /// </summary>
63
+ /// <param name="socket">Pre-configured <see cref="ClientWebSocket"/>.</param>
64
+ internal DashScopeClientWebSocket ( IClientWebSocket socket )
65
+ {
66
+ _socket = socket ;
67
+ }
68
+
56
69
/// <summary>
57
70
/// Start a websocket connection.
58
71
/// </summary>
@@ -74,8 +87,9 @@ public async Task ConnectAsync<TOutput>(Uri uri, CancellationToken cancellationT
74
87
/// </summary>
75
88
public void ResetOutput ( )
76
89
{
77
- BinaryOutput . Writer . TryComplete ( ) ;
78
- BinaryOutput = Channel . CreateUnbounded < byte > ( UnboundedChannelOptions ) ;
90
+ _binaryOutput ? . Writer . TryComplete ( ) ;
91
+ _binaryOutput = Channel . CreateUnbounded < byte > ( UnboundedChannelOptions ) ;
92
+ _taskStartedSignal . TrySetResult ( false ) ;
79
93
_taskStartedSignal = new TaskCompletionSource < bool > ( ) ;
80
94
}
81
95
@@ -129,7 +143,7 @@ public Task SendMessageAsync<TInput, TParameter>(
129
143
{
130
144
for ( var i = 0 ; i < result . Count ; i ++ )
131
145
{
132
- await BinaryOutput . Writer . WriteAsync ( buffer [ i ] , cancellationToken ) ;
146
+ await _binaryOutput ! . Writer . WriteAsync ( buffer [ i ] , cancellationToken ) ;
133
147
}
134
148
135
149
return null ;
@@ -177,7 +191,7 @@ public async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationTo
177
191
break ;
178
192
case "task-finished" :
179
193
State = DashScopeWebSocketState . Ready ;
180
- BinaryOutput . Writer . Complete ( ) ;
194
+ _binaryOutput ? . Writer . Complete ( ) ;
181
195
break ;
182
196
case "task-failed" :
183
197
await CloseAsync ( cancellationToken ) ;
@@ -216,7 +230,7 @@ private void Dispose(bool disposing)
216
230
{
217
231
// Dispose managed resources.
218
232
_socket . Dispose ( ) ;
219
- BinaryOutput . Writer . TryComplete ( ) ;
233
+ _binaryOutput ? . Writer . TryComplete ( ) ;
220
234
}
221
235
}
222
236
0 commit comments