Skip to content

Commit 6a0c864

Browse files
Channel-Producer updates (interim). Committing to merge forthcoming changes to simplify from the System.Reactive.Linq oprerators pull request.
1 parent bff537f commit 6a0c864

File tree

3 files changed

+62
-55
lines changed

3 files changed

+62
-55
lines changed

RSocket.Core.Tests/ServerTests.cs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,17 @@ await yield.ReturnAsync((
7777
//[TestMethod]
7878
//public async Task ServerRequestChannelTest()
7979
//{
80-
// Server.Streamer = ((ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata) request) => new System.Collections.Async.AsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)>(async yield =>
80+
// Server.Channeler = ((ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata) request, IObservable<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)> incoming) =>
8181
// {
82-
// foreach (var index in Enumerable.Range(0, 3))
83-
// { await Task.CompletedTask; await yield.ReturnAsync((request.Data, request.Metadata)); }
84-
// }).ToAsyncEnumerable();
82+
// Action<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)> onNext = value => { };
83+
// Action OnCompleted = () => { };
84+
// var enumerable = new System.Collections.Async.AsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)>(async yield =>
85+
// {
86+
// foreach (var index in Enumerable.Range(0, 3))
87+
// { await Task.CompletedTask; await yield.ReturnAsync((request.Data, request.Metadata)); }
88+
// }).ToAsyncEnumerable();
89+
// return (onNext, OnCompleted, enumerable);
90+
// };
8591

8692
// var (data, metadata) = ("TEST DATA", "METADATA?_____");
8793
// var list = await StringClient.RequestStream(data, metadata).ToListAsync();
@@ -101,22 +107,4 @@ public void TestInitialize()
101107
StringClient = new RSocketClient.ForStrings(Client);
102108
}
103109
}
104-
105-
106-
public class Sample
107-
{
108-
static Random random = new Random(1234);
109-
public int Id = random.Next(1000000);
110-
public string Name = nameof(Sample) + random.Next(10000).ToString();
111-
public DateTime Created = DateTime.Now;
112-
113-
public static implicit operator string(Sample value) => string.Join('|', value.Id, value.Name, value.Created);
114-
public static implicit operator ReadOnlySequence<byte>(Sample value) => new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(string.Join('|', value.Id, value.Name, value.Created)));
115-
public static implicit operator Sample(string value) { var values = value.Split('|'); return new Sample(values[0], values[1], values[2]); }
116-
public static implicit operator Sample(ReadOnlySequence<byte> value) => Encoding.UTF8.GetString(value.ToArray());
117-
118-
public Sample() { }
119-
public Sample(string id, string name, string created) { Id = int.Parse(id); Name = name; Created = DateTime.Parse(created); }
120-
public ReadOnlySequence<byte> Bytes => this;
121-
}
122110
}

RSocket.Core/IRSocketStream.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
namespace RSocket
66
{
77
/// <summary>
8-
/// A stream of items from an RSocket. This is simply an Observer of the protcol's tuples.
8+
/// A stream of items from an RSocket. This is simply an Observer of the protocol's tuples.
99
/// </summary>
1010
public interface IRSocketStream : IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>
1111
{

RSocket.Core/RSocket.cs

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ async Task Respond(int stream)
184184

185185
void IRSocketProtocol.RequestStream(in RSocketProtocol.RequestStream message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
186186
{
187-
Respond(message.Stream).Start();
188-
async Task Respond(int stream)
187+
Stream(message.Stream).Start();
188+
async Task Stream(int stream)
189189
{
190190
var source = Streamer((data, metadata)); //TODO Handle Errors.
191191
var enumerator = source.GetAsyncEnumerator();
@@ -205,36 +205,55 @@ async Task Respond(int stream)
205205
}
206206

207207

208-
public Func<IAsyncEnumerable<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata)>, IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)>> Channeler { get; set; } = request => throw new NotImplementedException();
208+
//public void Channel<TSource, TResult>(IAsyncEnumerable<TSource> outgoing,
209+
// Func<TSource, (ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> outgoingMapper,
210+
// Func<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata), TResult> incomingMapper)
211+
//{
212+
// var receiver = new Receiver<TSource>(stream => Task.CompletedTask, sourceMapper);
209213

210-
void IRSocketProtocol.RequestChannel(in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
211-
{
212-
throw new NotImplementedException();
213-
//Respond(message.Stream).Start();
214-
215-
//new Receiver<bool>()
216-
217-
//new Receiver<bool>(stream => RequestFireAndForget(stream, data, metadata), _ => true).ExecuteAsync(result: true);
218-
//var id = StreamDispatch(stream);
219-
220-
221-
//async Task Respond(int stream)
222-
//{
223-
// var source = Channeler((data, metadata)); //TODO Handle Errors.
224-
// var enumerator = source.GetAsyncEnumerator();
225-
// try
226-
// {
227-
// while (await enumerator.MoveNextAsync())
228-
// {
229-
// var (Data, Metadata) = enumerator.Current;
230-
// new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
231-
// await Transport.Output.FlushAsync();
232-
// }
233-
// new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
234-
// await Transport.Output.FlushAsync();
235-
// }
236-
// finally { await enumerator.DisposeAsync(); }
237-
//}
238-
}
214+
// Channeler = request =>
215+
// (
216+
// receiver,
217+
// );
218+
//}
219+
220+
221+
//public Func<(ReadOnlySequence<byte> Data, ReadOnlySequence<byte> Metadata),
222+
// (IObservable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Incoming,
223+
// IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Outgoing)>
224+
// Channeler { get; set; } = (request, incoming) => throw new NotImplementedException();
225+
226+
//void IRSocketProtocol.RequestChannel(in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
227+
//{
228+
// Channel(message.Stream).Start();
229+
230+
// //new Receiver<bool>()
231+
232+
// //new Receiver<bool>(stream => RequestFireAndForget(stream, data, metadata), _ => true).ExecuteAsync(result: true);
233+
// //var id = StreamDispatch(stream);
234+
235+
// async Task Channel(int stream)
236+
// {
237+
// var (Incoming, Outoing) = Channeler((data, metadata)); //TODO Handle Errors.
238+
239+
240+
// using (observable.Subscribe())
241+
// {
242+
// var enumerator = source.GetAsyncEnumerator();
243+
// try
244+
// {
245+
// while (await enumerator.MoveNextAsync())
246+
// {
247+
// var (Data, Metadata) = enumerator.Current;
248+
// new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
249+
// await Transport.Output.FlushAsync();
250+
// }
251+
// new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
252+
// await Transport.Output.FlushAsync();
253+
// }
254+
// finally { await enumerator.DisposeAsync(); }
255+
// }
256+
// }
257+
//}
239258
}
240259
}

0 commit comments

Comments
 (0)