33using System . Collections . Concurrent ;
44using System . Collections . Generic ;
55using System . IO . Pipelines ;
6+ using System . Reactive . Linq ;
7+ using System . Linq ;
68using System . Text ;
79using System . Threading ;
810using System . Threading . Tasks ;
@@ -27,7 +29,8 @@ public partial class RSocket : IRSocketProtocol
2729 private int NewStreamId ( ) => Interlocked . Add ( ref StreamId , 2 ) ; //TODO SPEC: To reuse or not... Should tear down the client if this happens or have to skip in-use IDs.
2830
2931 private ConcurrentDictionary < int , IRSocketStream > Dispatcher = new ConcurrentDictionary < int , IRSocketStream > ( ) ;
30- private int StreamDispatch ( IRSocketStream transform ) { var id = NewStreamId ( ) ; Dispatcher [ id ] = transform ; return id ; }
32+ private int StreamDispatch ( int id , IRSocketStream transform ) { Dispatcher [ id ] = transform ; return id ; }
33+ private int StreamDispatch ( IRSocketStream transform ) => StreamDispatch ( NewStreamId ( ) , transform ) ;
3134 //TODO Stream Destruction - i.e. removal from the dispatcher.
3235
3336 protected IDisposable ChannelSubscription ; //TODO Tracking state for channels
@@ -81,16 +84,16 @@ public async Task<IRSocketChannel> RequestChannel(IRSocketStream stream, ReadOnl
8184 var id = StreamDispatch ( stream ) ;
8285 new RSocketProtocol . RequestChannel ( id , data , metadata , initialRequest : Options . GetInitialRequestSize ( initial ) ) . Write ( Transport . Output , data , metadata ) ;
8386 await Transport . Output . FlushAsync ( ) ;
84- var channel = new Channel ( this , id ) ;
87+ var channel = new ChannelHandler ( this , id ) ;
8588 return channel ;
8689 }
8790
88- protected class Channel : IRSocketChannel //TODO hmmm...
91+ protected class ChannelHandler : IRSocketChannel //TODO hmmm...
8992 {
9093 readonly RSocket Socket ;
9194 readonly int Stream ;
9295
93- public Channel ( RSocket socket , int stream ) { Socket = socket ; Stream = stream ; }
96+ public ChannelHandler ( RSocket socket , int stream ) { Socket = socket ; Stream = stream ; }
9497
9598 public Task Send ( ( ReadOnlySequence < byte > metadata , ReadOnlySequence < byte > data ) value )
9699 {
@@ -176,6 +179,12 @@ async Task Respond(int stream)
176179 }
177180
178181
182+ public void Stream < TRequest , TResult > (
183+ Func < ( ReadOnlySequence < byte > Data , ReadOnlySequence < byte > Metadata ) , TRequest > requestTransform ,
184+ Func < TRequest , IAsyncEnumerable < TResult > > producer ,
185+ Func < TResult , ( ReadOnlySequence < byte > Data , ReadOnlySequence < byte > Metadata ) > resultTransform ) =>
186+ Streamer = ( request ) => from result in producer ( requestTransform ( request ) ) select resultTransform ( result ) ;
187+
179188 public Func < ( ReadOnlySequence < byte > Data , ReadOnlySequence < byte > Metadata ) , IAsyncEnumerable < ( ReadOnlySequence < byte > data , ReadOnlySequence < byte > metadata ) > > Streamer { get ; set ; } = request => throw new NotImplementedException ( ) ;
180189
181190 void IRSocketProtocol . RequestStream ( in RSocketProtocol . RequestStream message , ReadOnlySequence < byte > metadata , ReadOnlySequence < byte > data )
@@ -184,44 +193,96 @@ void IRSocketProtocol.RequestStream(in RSocketProtocol.RequestStream message, Re
184193 async Task Stream ( int stream )
185194 {
186195 var source = Streamer ( ( data , metadata ) ) ; //TODO Handle Errors.
187- var enumerator = source . GetAsyncEnumerator ( ) ;
188- try
189- {
190- while ( await enumerator . MoveNextAsync ( ) )
191- {
192- var ( Data , Metadata ) = enumerator . Current ;
193- new RSocketProtocol . Payload ( stream , Data , Metadata , next : true ) . Write ( Transport . Output , Data , Metadata ) ;
194- await Transport . Output . FlushAsync ( ) ;
195- }
196- new RSocketProtocol . Payload ( stream , complete : true ) . Write ( Transport . Output ) ;
197- await Transport . Output . FlushAsync ( ) ;
198- }
199- finally { await enumerator . DisposeAsync ( ) ; }
196+ await ForEach ( source ,
197+ action : value => new RSocketProtocol . Payload ( stream , value . data , value . metadata , next : true ) . WriteFlush ( Transport . Output , value . data , value . metadata ) ,
198+ final : ( ) => new RSocketProtocol . Payload ( stream , complete : true ) . WriteFlush ( Transport . Output ) ) ;
199+
200+ //var source = Streamer((data, metadata)); //TODO Handle Errors.
201+ //var enumerator = source.GetAsyncEnumerator();
202+ //try
203+ //{
204+ // while (await enumerator.MoveNextAsync())
205+ // {
206+ // var (Data, Metadata) = enumerator.Current;
207+ // new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
208+ // await Transport.Output.FlushAsync();
209+ // }
210+ // new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
211+ // await Transport.Output.FlushAsync();
212+ //}
213+ //finally { await enumerator.DisposeAsync(); }
200214 }
201215 }
202216
203217
204- //public void Channel<TSource, TResult>(IAsyncEnumerable<TSource> outgoing,
218+ //public void Channel<TSource, TResult>(Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata),
219+ // (IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Incoming,
220+ // IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Outgoing)>
221+
222+
205223 // Func<TSource, (ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> outgoingMapper,
206224 // Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), TResult> incomingMapper)
207225 //{
208- // var receiver = new Receiver<TSource>(stream => Task.CompletedTask, sourceMapper);
226+ // var observable = Observable.Create<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>(observer => {
227+ // Subscriber(observer).ConfigureAwait(false);
228+ // });
229+ // return observable
230+ // .Select(value => Mapper((value.data, value.metadata)))
231+ // .ToAsyncEnumerable()
232+
233+ // .GetAsyncEnumerator(cancellation);
234+
235+ // var receiver = new Receiver<TResult>(stream => Task.CompletedTask, incomingMapper);
209236
210237 // Channeler = request =>
211238 // (
212- // receiver,
239+ // receiver,
213240 // );
214241 //}
215242
243+ public void Channel < TRequest , TIncoming , TOutgoing > ( Func < TRequest , IObservable < TIncoming > , IAsyncEnumerable < TOutgoing > > pipeline ,
244+ Func < ( ReadOnlySequence < byte > Data , ReadOnlySequence < byte > Metadata ) , TRequest > requestTransform ,
245+ Func < ( ReadOnlySequence < byte > Data , ReadOnlySequence < byte > Metadata ) , TIncoming > incomingTransform ,
246+ Func < TOutgoing , ( ReadOnlySequence < byte > Data , ReadOnlySequence < byte > Metadata ) > outgoingTransform ) =>
247+ Channeler = ( request , incoming ) => from result in pipeline ( requestTransform ( request ) , from item in incoming select incomingTransform ( item ) ) select outgoingTransform ( result ) ;
248+
249+
250+ public Func < ( ReadOnlySequence < byte > Data , ReadOnlySequence < byte > Metadata ) , IObservable < ( ReadOnlySequence < byte > data , ReadOnlySequence < byte > metadata ) > , IAsyncEnumerable < ( ReadOnlySequence < byte > data , ReadOnlySequence < byte > metadata ) > > Channeler { get ; set ; } = ( request , incoming ) => throw new NotImplementedException ( ) ;
216251
217- //public Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata),
218- // (IObservable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Incoming,
219- // IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Outgoing)>
220- // Channeler { get; set; } = (request, incoming) => throw new NotImplementedException();
221252
222253 void IRSocketProtocol . RequestChannel ( in RSocketProtocol . RequestChannel message , ReadOnlySequence < byte > metadata , ReadOnlySequence < byte > data )
223254 {
224- throw new NotImplementedException ( ) ;
255+ Channel ( message . Stream ) . Start ( ) ;
256+ async Task Channel ( int stream , CancellationToken cancel = default )
257+ {
258+ //TODO Elsewhere in previous changes, bad Disposable.Empty
259+ var inc = Observable . Create < ( ReadOnlySequence < byte > metadata , ReadOnlySequence < byte > data ) > ( observer => ( ) => StreamDispatch ( stream , observer ) ) ;
260+ var outgoing = Channeler ( ( data , metadata ) , inc ) ; //TODO Handle Errors.
261+
262+ await ForEach ( outgoing ,
263+ action : value => new RSocketProtocol . Payload ( stream , value . data , value . metadata , next : true ) . WriteFlush ( Transport . Output , value . data , value . metadata ) ,
264+ final : ( ) => new RSocketProtocol . Payload ( stream , complete : true ) . WriteFlush ( Transport . Output ) ) ;
265+
266+ //var outgoingstream = ForEach(outgoing,
267+ // action: value => new RSocketProtocol.Payload(stream, value.data, value.metadata, next: true).WriteFlush(Transport.Output, value.data, value.metadata),
268+ // final: () => new RSocketProtocol.Payload(stream, complete: true).WriteFlush(Transport.Output));
269+
270+ //var enumerator = outgoing.GetAsyncEnumerator();
271+ //try
272+ //{
273+ // while (await enumerator.MoveNextAsync())
274+ // {
275+ // var (Data, Metadata) = enumerator.Current;
276+ // new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
277+ // await Transport.Output.FlushAsync();
278+ // }
279+ // new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
280+ // await Transport.Output.FlushAsync();
281+ //}
282+ //finally { await enumerator.DisposeAsync(); }
283+ }
284+
285+
225286 // Channel(message.Stream).Start();
226287
227288 // //new Receiver<bool>()
@@ -252,5 +313,17 @@ void IRSocketProtocol.RequestChannel(in RSocketProtocol.RequestChannel message,
252313 // }
253314 // }
254315 }
316+
317+ internal static async Task ForEach < TSource > ( IAsyncEnumerable < TSource > source , Func < TSource , Task > action , CancellationToken cancel = default , Func < Task > final = default )
318+ {
319+ //No idea why this isn't public... https://github.com/dotnet/reactive/blob/master/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ForEach.cs#L58
320+ var enumerator = AsyncEnumerableExtensions . WithCancellation ( source , cancel ) . ConfigureAwait ( false ) . GetAsyncEnumerator ( ) ;
321+ try
322+ {
323+ while ( await enumerator . MoveNextAsync ( ) ) { await action ( enumerator . Current ) ; }
324+ await final ? . Invoke ( ) ;
325+ }
326+ finally { await enumerator . DisposeAsync ( ) ; }
327+ }
255328 }
256329}
0 commit comments