@@ -307,6 +307,40 @@ await connection.ReceiveJsonMessage(
307
307
}
308
308
}
309
309
310
+ [ Fact ]
311
+ public async Task CanCancelTokenDuringStream_SendsCancelInvocation ( )
312
+ {
313
+ using ( StartVerifiableLog ( ) )
314
+ {
315
+ var connection = new TestConnection ( ) ;
316
+ var hubConnection = CreateHubConnection ( connection , loggerFactory : LoggerFactory ) ;
317
+
318
+ await hubConnection . StartAsync ( ) . DefaultTimeout ( ) ;
319
+
320
+ using var cts = new CancellationTokenSource ( ) ;
321
+ var asyncEnumerable = hubConnection . StreamAsync < int > ( "Stream" , 1 , cts . Token ) ;
322
+
323
+ await using var e = asyncEnumerable . GetAsyncEnumerator ( cts . Token ) ;
324
+ var task = e . MoveNextAsync ( ) ;
325
+
326
+ var item = await connection . ReadSentJsonAsync ( ) . DefaultTimeout ( ) ;
327
+ var invocationId = item [ "invocationId" ] ;
328
+ await connection . ReceiveJsonMessage (
329
+ new { type = HubProtocolConstants . StreamItemMessageType , invocationId , item = 1 }
330
+ ) . DefaultTimeout ( ) ;
331
+
332
+ await task . DefaultTimeout ( ) ;
333
+ cts . Cancel ( ) ;
334
+
335
+ item = await connection . ReadSentJsonAsync ( ) . DefaultTimeout ( ) ;
336
+ Assert . Equal ( HubProtocolConstants . CancelInvocationMessageType , item [ "type" ] ) ;
337
+ Assert . Equal ( invocationId , item [ "invocationId" ] ) ;
338
+
339
+ // Stream on client-side completes on cancellation
340
+ await Assert . ThrowsAsync < TaskCanceledException > ( async ( ) => await e . MoveNextAsync ( ) ) . DefaultTimeout ( ) ;
341
+ }
342
+ }
343
+
310
344
[ Fact ]
311
345
public async Task ConnectionTerminatedIfServerTimeoutIntervalElapsesWithNoMessages ( )
312
346
{
0 commit comments