Skip to content

Commit 8184d92

Browse files
committed
WIP
1 parent 5d52f6b commit 8184d92

File tree

10 files changed

+82
-17
lines changed

10 files changed

+82
-17
lines changed

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<MessagePackVersion>3.1.4</MessagePackVersion>
1010
<MicrosoftCodeAnalysisVersion>4.3.1</MicrosoftCodeAnalysisVersion>
1111
<MicrosoftCodeAnalysisVersionUnity>3.9.0</MicrosoftCodeAnalysisVersionUnity>
12-
<MulticasterVersion>2.1.1</MulticasterVersion>
12+
<MulticasterVersion>0.1.0-ci-experimental-altchannel-20260209-063841</MulticasterVersion>
1313
</PropertyGroup>
1414
<ItemGroup>
1515
<PackageVersion Include="BenchmarkDotNet" Version="0.13.12" />

NuGet.Config

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@
22
<configuration>
33
<packageSourceMapping>
44
<clear />
5+
<packageSource key="Canary-Build">
6+
<package pattern="Multicaster*" />
7+
</packageSource>
58
<packageSource key="nuget.org">
69
<package pattern="*" />
710
</packageSource>
811
</packageSourceMapping>
12+
<packageSources>
13+
<add key="Canary-Build" value="https://pkgs.dev.azure.com/cysharp/Public/_packaging/Canary-Build/nuget/v3/index.json" />
14+
</packageSources>
915
</configuration>

src/MagicOnion.Client.SourceGenerator/CodeGen/StaticStreamingHubClientGenerator.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,32 +115,32 @@ static void EmitHelperMethods(StreamingHubClientBuildContext ctx)
115115
if (ctx.EnableStreamingHubDiagnosticHandler)
116116
{
117117
ctx.Writer.AppendLineWithFormat($$"""
118-
global::System.Threading.Tasks.Task<TResponse> WriteMessageWithResponseDiagnosticTaskAsync<TRequest, TResponse>(int methodId, TRequest message, global::MagicOnion.TransportReliability reliability = global::MagicOnion.TransportReliability.Reliable, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
118+
global::System.Threading.Tasks.Task<TResponse> WriteMessageWithResponseDiagnosticTaskAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
119119
{
120120
if (diagnosticHandler is null)
121121
{
122-
return base.WriteMessageWithResponseTaskAsync<TRequest, TResponse>(methodId, message, reliability);
122+
return base.WriteMessageWithResponseTaskAsync<TRequest, TResponse>(methodId, message);
123123
}
124124
125125
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>).AsTask();
126126
}
127127
128-
async global::System.Threading.Tasks.ValueTask WriteMessageWithResponseDiagnosticValueTaskAsync<TRequest, TResponse>(int methodId, TRequest message, global::MagicOnion.TransportReliability reliability = global::MagicOnion.TransportReliability.Reliable, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
128+
async global::System.Threading.Tasks.ValueTask WriteMessageWithResponseDiagnosticValueTaskAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
129129
{
130130
if (diagnosticHandler is null)
131131
{
132-
await base.WriteMessageWithResponseValueTaskAsync<TRequest, TResponse>(methodId, message, reliability);
132+
await base.WriteMessageWithResponseValueTaskAsync<TRequest, TResponse>(methodId, message);
133133
return;
134134
}
135135
136136
await diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>);
137137
}
138138
139-
global::System.Threading.Tasks.ValueTask<TResponse> WriteMessageWithResponseDiagnosticValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message, global::MagicOnion.TransportReliability reliability = global::MagicOnion.TransportReliability.Reliable, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
139+
global::System.Threading.Tasks.ValueTask<TResponse> WriteMessageWithResponseDiagnosticValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message, [global::System.Runtime.CompilerServices.CallerMemberName] string callerMemberName = default!)
140140
{
141141
if (diagnosticHandler is null)
142142
{
143-
return base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>(methodId, message, reliability);
143+
return base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>(methodId, message);
144144
}
145145
146146
return diagnosticHandler.OnMethodInvoke(this, methodId, callerMemberName, message, isFireAndForget: true, base.WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>);

src/MagicOnion.Client/Internal/ClientDataChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ async Task RunReceiveLoop()
162162
SendAckFromClient();
163163
connectedTcs.TrySetResult(true);
164164
break;
165-
case 0x03: // Data
165+
case 0x11: // Data (from Server)
166166
if (result.Buffer.Length > 1 + 8 + 8 /* Sequence */)
167167
{
168168
var sequence = BitConverter.ToUInt64(result.Buffer, 1 + 8);

src/MagicOnion.Client/StreamingHubClientBase.cs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ internal async Task __ConnectAndSubscribeAsync(CancellationToken connectAndSubsc
277277
}
278278
catch (Exception ex)
279279
{
280+
// TODO: Error logging / retry?
280281
dataChannel.Dispose();
281282
dataChannel = null;
282283
}
@@ -330,13 +331,50 @@ void EnsureConnected()
330331
}
331332
}
332333

334+
readonly Channel<StreamingHubPayload> receivedDataQueue = Channel.CreateUnbounded<StreamingHubPayload>();
335+
336+
async Task RunDataConsumerLoopAsync(SynchronizationContext? syncContext, CancellationToken cancellationToken)
337+
{
338+
while (!cancellationToken.IsCancellationRequested)
339+
{
340+
if (!await receivedDataQueue.Reader.WaitToReadAsync().ConfigureAwait(false))
341+
{
342+
break;
343+
}
344+
345+
while (receivedDataQueue.Reader.TryRead(out var payload))
346+
{
347+
ConsumeData(syncContext, payload);
348+
}
349+
}
350+
}
351+
352+
async Task RunDataChannelReaderLoopAsync(CancellationToken cancellationToken)
353+
{
354+
if (dataChannel is null) throw new InvalidOperationException();
355+
while (!cancellationToken.IsCancellationRequested && await dataChannel.DataReader.WaitToReadAsync().ConfigureAwait(false))
356+
{
357+
while (dataChannel.DataReader.TryRead(out var payload))
358+
{
359+
receivedDataQueue.Writer.TryWrite(payload);
360+
}
361+
}
362+
}
363+
333364
async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstMoveNext, CancellationToken subscriptionToken)
334365
{
335366
EnsureConnected();
336367

337368
var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null);
338369
writerTask = RunWriterLoopAsync(subscriptionToken);
339370

371+
RunDataConsumerLoopAsync(syncContext, subscriptionToken);
372+
373+
if (dataChannel is not null)
374+
{
375+
RunDataChannelReaderLoopAsync(subscriptionToken);
376+
}
377+
340378
var reader = this.reader;
341379
try
342380
{
@@ -345,7 +383,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
345383
{
346384
try
347385
{
348-
ConsumeData(syncContext, reader.Current);
386+
receivedDataQueue.Writer.TryWrite(reader.Current);
349387
}
350388
catch (Exception ex)
351389
{
@@ -393,6 +431,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
393431
}
394432
finally
395433
{
434+
receivedDataQueue.Writer.TryComplete();
396435
disconnected = true;
397436

398437
try

src/MagicOnion.Server/Hubs/Internal/DataChannel/DataChannelServer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
7676
channel.SetConnected();
7777
break;
7878
case 0x10:
79-
// Data
79+
// Data (from Client)
8080
data = data.Slice(9);
8181
if (data.Length < 8 + 1)
8282
{

src/MagicOnion.Server/Hubs/Internal/DataChannel/ServerDataChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void SendPayload(StreamingHubPayload payload)
102102
var span = array.AsSpan(0, length);
103103
try
104104
{
105-
span[0] = 0x10; // Data
105+
span[0] = 0x11; // Data (from Server)
106106
BitConverter.TryWriteBytes(span.Slice(1, 8), SessionId);
107107
BitConverter.TryWriteBytes(span.Slice(9, 8), outgoingSequence);
108108
payload.Span.CopyTo(span.Slice(17, payload.Length));

src/MagicOnion.Server/Hubs/Internal/HubReceiverMethodReliabilityMap.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Frozen;
22
using System.Reflection;
3+
using MagicOnion.Internal;
34

45
namespace MagicOnion.Server.Hubs.Internal;
56

@@ -17,7 +18,7 @@ public static HubReceiverMethodReliabilityMap Create<TReceiver>()
1718
var reliabilityByMethods = targetType.GetMethods()
1819
.Concat(targetType.GetInterfaces().SelectMany(x => x.GetMethods()))
1920
.Select(x => (x.Name, x.ReturnType, Reliability: x.GetCustomAttribute<TransportAttribute>()?.Reliability ?? reliabilityForType))
20-
.ToDictionary(k => k.Name, v => (v.Reliability, v.ReturnType, MethodId: v.Name.GetHashCode()));
21+
.ToDictionary(k => k.Name, v => (v.Reliability, v.ReturnType, MethodId: FNV1A32.GetHashCode(v.Name))); // TODO: MethodIdAttribute
2122

2223
// Validate
2324
if (reliabilityByMethods.FirstOrDefault(x => IsAwaitable(x.Value.ReturnType) && x.Value.Reliability != TransportReliability.Reliable) is { Key: not null } method)
Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,42 @@
11
using Cysharp.Runtime.Multicast.Remoting;
22
using MagicOnion.Internal;
3+
using MagicOnion.Server.Hubs.Internal.DataChannel;
34

45
namespace MagicOnion.Server.Hubs.Internal;
56

67
internal class MagicOnionRemoteReceiverWriter : IRemoteReceiverWriter
78
{
89
readonly StreamingServiceContext<StreamingHubPayload, StreamingHubPayload> writer;
910
readonly HubReceiverMethodReliabilityMap reliabilityMap;
11+
readonly ServerDataChannel? dataChannel;
1012

1113
public IRemoteClientResultPendingTaskRegistry PendingTasks { get; }
1214

13-
public MagicOnionRemoteReceiverWriter(StreamingServiceContext<StreamingHubPayload, StreamingHubPayload> writer, IRemoteClientResultPendingTaskRegistry pendingTasks, HubReceiverMethodReliabilityMap reliabilityMap)
15+
public MagicOnionRemoteReceiverWriter(
16+
StreamingServiceContext<StreamingHubPayload, StreamingHubPayload> writer,
17+
IRemoteClientResultPendingTaskRegistry pendingTasks,
18+
HubReceiverMethodReliabilityMap reliabilityMap,
19+
ServerDataChannel? dataChannel)
1420
{
1521
this.writer = writer;
1622
PendingTasks = pendingTasks;
1723
this.reliabilityMap = reliabilityMap;
24+
this.dataChannel = dataChannel;
1825
}
1926

20-
public void Write(ReadOnlyMemory<byte> payload)
27+
public void Write(InvocationWriteContext context)
2128
{
22-
writer.QueueResponseStreamWrite(StreamingHubPayloadPool.Shared.RentOrCreate(payload.Span));
29+
var payload = StreamingHubPayloadPool.Shared.RentOrCreate(context.Payload.Span);
30+
if (dataChannel is not null &&
31+
reliabilityMap.ReliabilityByMethodId.TryGetValue(context.MethodId, out var reliability) &&
32+
reliability != TransportReliability.Reliable)
33+
{
34+
// Unreliable or Reliable unordered
35+
dataChannel.SendPayload(payload);
36+
}
37+
else
38+
{
39+
writer.QueueResponseStreamWrite(payload);
40+
}
2341
}
2442
}

src/MagicOnion.Server/Hubs/StreamingHub.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public abstract class StreamingHubBase<THubInterface, TReceiver> : ServiceBase<T
4747
// HACK: If the ID of the message is `-1`, the client will ignore the message.
4848
static ReadOnlySpan<byte> MarkerResponseBytes => [0x93, 0xff, 0x00, 0x0c]; // MsgPack: [-1, 0, nil]
4949

50+
static readonly HubReceiverMethodReliabilityMap reliabilityMap = HubReceiverMethodReliabilityMap.Create<TReceiver>();
51+
5052
readonly Channel<(StreamingHubPayload Payload, UniqueHashDictionary<StreamingHubHandler> Handlers, int MethodId, int MessageId, ReadOnlyMemory<byte> Body, bool HasResponse)> requests
5153
= Channel.CreateBounded<(StreamingHubPayload, UniqueHashDictionary<StreamingHubHandler>, int, int, ReadOnlyMemory<byte>, bool)>(new BoundedChannelOptions(capacity: 10)
5254
{
@@ -106,12 +108,11 @@ async Task<DuplexStreamingResult<StreamingHubPayload, StreamingHubPayload>> IStr
106108
handlers = streamingHubFeature.Handlers;
107109

108110
var isDataChannelCreated = TryCreateDataChannel();
109-
var reliabilityMap = HubReceiverMethodReliabilityMap.Create<TReceiver>();
110111

111112
var remoteProxyFactory = serviceProvider.GetRequiredService<IRemoteProxyFactory>();
112113
var remoteSerializer = serviceProvider.GetRequiredService<IRemoteSerializer>();
113114
this.remoteClientResultPendingTasks = serviceProvider.GetRequiredService<IRemoteClientResultPendingTaskRegistry>();
114-
this.Client = remoteProxyFactory.CreateDirect<TReceiver>(new MagicOnionRemoteReceiverWriter(StreamingServiceContext, remoteClientResultPendingTasks, reliabilityMap), remoteSerializer);
115+
this.Client = remoteProxyFactory.CreateDirect<TReceiver>(new MagicOnionRemoteReceiverWriter(StreamingServiceContext, remoteClientResultPendingTasks, reliabilityMap, isDataChannelCreated ? dataChannel : null), remoteSerializer);
115116

116117
this.Group = new HubGroupRepository<TReceiver>(Client, StreamingServiceContext, streamingHubFeature.GroupProvider);
117118

0 commit comments

Comments
 (0)