1414
1515using System ;
1616using System . Collections . Generic ;
17- using System . ComponentModel ;
1817using System . IO ;
18+ using System . Linq ;
1919using System . Net ;
2020using System . Net . Http ;
2121using System . Net . Http . Headers ;
2727using Seq . Api . Model . Root ;
2828using Seq . Api . Serialization ;
2929using System . Threading ;
30- using Seq . Api . Streams ;
3130using System . Net . WebSockets ;
31+ using System . Runtime . CompilerServices ;
3232using Seq . Api . Model . Shared ;
33+ using Seq . Api . Streams ;
3334
3435namespace Seq . Api . Client
3536{
@@ -38,13 +39,13 @@ namespace Seq.Api.Client
3839 /// </summary>
3940 public sealed class SeqApiClient : IDisposable
4041 {
41- readonly string _apiKey ;
42-
4342 // Future versions of Seq may not completely support vN-1 features, however
4443 // providing this as an Accept header will ensure what compatibility is available
4544 // can be utilized.
4645 const string SeqApiV11MediaType = "application/vnd.datalust.seq.v11+json" ;
4746
47+ // ReSharper disable once NotAccessedField.Local
48+ readonly bool _defaultMessageHandler ;
4849 readonly CookieContainer _cookies = new ( ) ;
4950 readonly JsonSerializer _serializer = JsonSerializer . Create (
5051 new JsonSerializerSettings
@@ -54,36 +55,6 @@ public sealed class SeqApiClient : IDisposable
5455 FloatParseHandling = FloatParseHandling . Decimal
5556 } ) ;
5657
57- /// <summary>
58- /// Construct a <see cref="SeqApiClient"/>.
59- /// </summary>
60- /// <param name="serverUrl">The base URL of the Seq server.</param>
61- /// <param name="apiKey">An API key to use when making requests to the server, if required.</param>
62- /// <param name="useDefaultCredentials">Whether default credentials will be sent with HTTP requests; the default is <c>true</c>.</param>
63- [ Obsolete ( "Prefer `SeqApiClient(serverUrl, apiKey, createHttpMessageHandler)` instead." ) , EditorBrowsable ( EditorBrowsableState . Never ) ]
64- public SeqApiClient ( string serverUrl , string apiKey , bool useDefaultCredentials )
65- : this ( serverUrl , apiKey , handler => handler . UseDefaultCredentials = useDefaultCredentials )
66- {
67- }
68-
69- /// <summary>
70- /// Construct a <see cref="SeqApiClient"/>.
71- /// </summary>
72- /// <param name="serverUrl">The base URL of the Seq server.</param>
73- /// <param name="apiKey">An API key to use when making requests to the server, if required.</param>
74- /// <param name="configureHttpClientHandler">An optional callback to configure the <see cref="HttpClientHandler"/> used when making HTTP requests
75- /// to the Seq API.</param>
76- [ Obsolete ( "Prefer `SeqApiClient(serverUrl, apiKey, createHttpMessageHandler)` instead." ) , EditorBrowsable ( EditorBrowsableState . Never ) ]
77- public SeqApiClient ( string serverUrl , string apiKey , Action < HttpClientHandler > configureHttpClientHandler )
78- : this ( serverUrl , apiKey , cookies =>
79- {
80- var handler = new HttpClientHandler { CookieContainer = cookies } ;
81- configureHttpClientHandler ? . Invoke ( handler ) ;
82- return handler ;
83- } )
84- {
85- }
86-
8758 /// <summary>
8859 /// Construct a <see cref="SeqApiClient"/>.
8960 /// </summary>
@@ -93,6 +64,8 @@ public SeqApiClient(string serverUrl, string apiKey, Action<HttpClientHandler> c
9364 /// to the Seq API. The callback receives a <see cref="CookieContainer"/> that is shared with WebSocket requests made by the client.</param>
9465 public SeqApiClient ( string serverUrl , string apiKey = null , Func < CookieContainer , HttpMessageHandler > createHttpMessageHandler = null )
9566 {
67+ _defaultMessageHandler = createHttpMessageHandler == null ;
68+
9669 // This is required for compatibility with the obsolete constructor, which we can remove sometime in 2024.
9770 var httpMessageHandler = createHttpMessageHandler ? . Invoke ( _cookies ) ??
9871#if SOCKETS_HTTP_HANDLER
@@ -103,9 +76,6 @@ public SeqApiClient(string serverUrl, string apiKey = null, Func<CookieContainer
10376
10477 ServerUrl = serverUrl ?? throw new ArgumentNullException ( nameof ( serverUrl ) ) ;
10578
106- if ( ! string . IsNullOrEmpty ( apiKey ) )
107- _apiKey = apiKey ;
108-
10979 var baseAddress = serverUrl ;
11080 if ( ! baseAddress . EndsWith ( "/" , StringComparison . Ordinal ) )
11181 baseAddress += "/" ;
@@ -114,8 +84,8 @@ public SeqApiClient(string serverUrl, string apiKey = null, Func<CookieContainer
11484 HttpClient . BaseAddress = new Uri ( baseAddress ) ;
11585 HttpClient . DefaultRequestHeaders . Accept . Add ( new MediaTypeWithQualityHeaderValue ( SeqApiV11MediaType ) ) ;
11686
117- if ( _apiKey != null )
118- HttpClient . DefaultRequestHeaders . Add ( "X-Seq-ApiKey" , _apiKey ) ;
87+ if ( ! string . IsNullOrEmpty ( apiKey ) )
88+ HttpClient . DefaultRequestHeaders . Add ( "X-Seq-ApiKey" , apiKey ) ;
11989 }
12090
12191 /// <summary>
@@ -323,9 +293,27 @@ public async Task<TResponse> DeleteAsync<TEntity, TResponse>(ILinked entity, str
323293 /// <param name="parameters">Named parameters to substitute into the link template, if required.</param>
324294 /// <param name="cancellationToken">A <see cref="CancellationToken"/> supporting cancellation.</param>
325295 /// <returns>A stream of values from the websocket.</returns>
326- public async Task < ObservableStream < TEntity > > StreamAsync < TEntity > ( ILinked entity , string link , IDictionary < string , object > parameters = null , CancellationToken cancellationToken = default )
296+ public IAsyncEnumerable < TEntity > StreamAsync < TEntity > ( ILinked entity , string link , IDictionary < string , object > parameters = null , CancellationToken cancellationToken = default )
297+ {
298+ return WebSocketStreamAsync < NoMessage , TEntity > ( entity , link , default , parameters , delegate { } , reader => _serializer . Deserialize < TEntity > ( new JsonTextReader ( reader ) ) , cancellationToken ) ;
299+ }
300+
301+ /// <summary>
302+ /// Connect to a websocket at the address specified by following <paramref name="link"/> from <paramref name="entity"/>.
303+ /// When the WebSocket opens, a single message <paramref name="message"/> is sent, and then messages received on
304+ /// socket are returned.
305+ /// </summary>
306+ /// <typeparam name="TEntity">The type of the values received over the websocket.</typeparam>
307+ /// <typeparam name="TMessage">The type of message to send.</typeparam>
308+ /// <param name="entity">An entity previously retrieved from the API.</param>
309+ /// <param name="link">The name of the outbound link template present in <paramref name="entity"/>'s <see cref="ILinked.Links"/> collection.</param>
310+ /// <param name="message">The message to send at establishment of the WebSocket connection.</param>
311+ /// <param name="parameters">Named parameters to substitute into the link template, if required.</param>
312+ /// <param name="cancellationToken">A <see cref="CancellationToken"/> supporting cancellation.</param>
313+ /// <returns>A stream of values from the websocket.</returns>
314+ public IAsyncEnumerable < TEntity > StreamSendAsync < TMessage , TEntity > ( ILinked entity , string link , TMessage message , IDictionary < string , object > parameters = null , CancellationToken cancellationToken = default )
327315 {
328- return await WebSocketStreamAsync ( entity , link , parameters , reader => _serializer . Deserialize < TEntity > ( new JsonTextReader ( reader ) ) , cancellationToken ) ;
316+ return WebSocketStreamAsync ( entity , link , message , parameters , ( writer , m ) => _serializer . Serialize ( writer , m ) , reader => _serializer . Deserialize < TEntity > ( new JsonTextReader ( reader ) ) , cancellationToken ) ;
329317 }
330318
331319 /// <summary>
@@ -336,23 +324,106 @@ public async Task<ObservableStream<TEntity>> StreamAsync<TEntity>(ILinked entity
336324 /// <param name="parameters">Named parameters to substitute into the link template, if required.</param>
337325 /// <param name="cancellationToken">A <see cref="CancellationToken"/> supporting cancellation.</param>
338326 /// <returns>A stream of raw messages from the websocket.</returns>
339- public async Task < ObservableStream < string > > StreamTextAsync ( ILinked entity , string link , IDictionary < string , object > parameters = null , CancellationToken cancellationToken = default )
327+ public IAsyncEnumerable < string > StreamTextAsync ( ILinked entity , string link , IDictionary < string , object > parameters = null , CancellationToken cancellationToken = default )
328+ {
329+ return WebSocketStreamAsync < NoMessage , string > ( entity , link , default , parameters , delegate { } , reader => reader . ReadToEnd ( ) , cancellationToken ) ;
330+ }
331+
332+ /// <summary>
333+ /// Connect to a websocket at the address specified by following <paramref name="link"/> from <paramref name="entity"/>.
334+ /// When the WebSocket opens, a single message <paramref name="message"/> is sent, and then messages received on
335+ /// socket are returned.
336+ /// </summary>
337+ /// <typeparam name="TMessage">The type of message to send.</typeparam>
338+ /// <param name="entity">An entity previously retrieved from the API.</param>
339+ /// <param name="link">The name of the outbound link template present in <paramref name="entity"/>'s <see cref="ILinked.Links"/> collection.</param>
340+ /// <param name="message">The message to send at establishment of the WebSocket connection.</param>
341+ /// <param name="parameters">Named parameters to substitute into the link template, if required.</param>
342+ /// <param name="cancellationToken">A <see cref="CancellationToken"/> supporting cancellation.</param>
343+ /// <returns>A stream of raw messages from the websocket.</returns>
344+ public IAsyncEnumerable < string > StreamTextSendAsync < TMessage > ( ILinked entity , string link , TMessage message , IDictionary < string , object > parameters = null , CancellationToken cancellationToken = default )
340345 {
341- return await WebSocketStreamAsync ( entity , link , parameters , reader => reader . ReadToEnd ( ) , cancellationToken ) ;
346+ return WebSocketStreamAsync ( entity , link , message , parameters , delegate { } , reader => reader . ReadToEnd ( ) , cancellationToken ) ;
342347 }
343348
344- async Task < ObservableStream < T > > WebSocketStreamAsync < T > ( ILinked entity , string link , IDictionary < string , object > parameters , Func < TextReader , T > deserialize , CancellationToken cancellationToken = default )
349+ readonly struct NoMessage
350+ {
351+ // Type marker only.
352+ }
353+
354+ async IAsyncEnumerable < T > WebSocketStreamAsync < TMessage , T > ( ILinked entity , string link , TMessage message , IDictionary < string , object > parameters , Action < TextWriter , TMessage > serialize , Func < TextReader , T > deserialize , [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
345355 {
346356 var linkUri = ResolveLink ( entity , link , parameters ) ;
347357
348- var socket = new ClientWebSocket ( ) ;
349- socket . Options . Cookies = _cookies ;
350- if ( _apiKey != null )
351- socket . Options . SetRequestHeader ( "X-Seq-ApiKey" , _apiKey ) ;
358+ using var socket = new ClientWebSocket ( ) ;
359+
360+ #if WEBSOCKET_USE_HTTPCLIENT
361+ if ( _defaultMessageHandler )
362+ {
363+ await socket . ConnectAsync ( new Uri ( linkUri ) , HttpClient , cancellationToken ) . WithApiExceptions ( ) . ConfigureAwait ( false ) ;
364+ }
365+ else
366+ #endif
367+ {
368+ socket . Options . Cookies = _cookies ;
352369
353- await socket . ConnectAsync ( new Uri ( linkUri ) , cancellationToken ) ;
370+ foreach ( var header in HttpClient . DefaultRequestHeaders )
371+ {
372+ socket . Options . SetRequestHeader ( header . Key , header . Value . FirstOrDefault ( ) ) ;
373+ }
354374
355- return new ObservableStream < T > ( socket , deserialize ) ;
375+ await socket . ConnectAsync ( new Uri ( linkUri ) , cancellationToken ) . WithApiExceptions ( )
376+ . ConfigureAwait ( false ) ;
377+ }
378+
379+ var buffer = new byte [ 16 * 1024 ] ;
380+ var current = new MemoryStream ( ) ;
381+ var encoding = new UTF8Encoding ( false ) ;
382+ var reader = new StreamReader ( current , encoding ) ;
383+
384+ if ( message is not NoMessage )
385+ {
386+ var w = new StreamWriter ( current , encoding ) ;
387+ serialize ( w , message ) ;
388+ // ReSharper disable once MethodHasAsyncOverload
389+ w . Flush ( ) ;
390+ await socket . SendAsync ( new ArraySegment < byte > ( current . GetBuffer ( ) , 0 , ( int ) current . Length ) ,
391+ WebSocketMessageType . Text , true , cancellationToken ) . WithApiExceptions ( ) . ConfigureAwait ( false ) ;
392+ current . Position = 0 ;
393+ current . SetLength ( 0 ) ;
394+ }
395+
396+ while ( socket . State == WebSocketState . Open )
397+ {
398+ cancellationToken . ThrowIfCancellationRequested ( ) ;
399+
400+ var received = await socket . ReceiveAsync ( new ArraySegment < byte > ( buffer ) , cancellationToken ) . WithApiExceptions ( ) . ConfigureAwait ( false ) ;
401+ if ( received . MessageType == WebSocketMessageType . Close )
402+ {
403+ if ( socket . State != WebSocketState . Closed )
404+ await socket . CloseOutputAsync ( WebSocketCloseStatus . NormalClosure , "" , cancellationToken ) . WithApiExceptions ( ) . ConfigureAwait ( false ) ;
405+
406+ if ( received . CloseStatus != WebSocketCloseStatus . NormalClosure )
407+ {
408+ throw new SeqApiException ( received . CloseStatusDescription ?? received . CloseStatus . ToString ( ) ! ) ;
409+ }
410+ }
411+ else
412+ {
413+ current . Write ( buffer , 0 , received . Count ) ;
414+
415+ if ( received . EndOfMessage )
416+ {
417+ current . Position = 0 ;
418+ var value = deserialize ( reader ) ;
419+
420+ current . SetLength ( 0 ) ;
421+ reader . DiscardBufferedData ( ) ;
422+
423+ yield return value ;
424+ }
425+ }
426+ }
356427 }
357428
358429 async Task < T > HttpGetAsync < T > ( string url , CancellationToken cancellationToken = default )
0 commit comments