Skip to content

Commit 158b24e

Browse files
authored
Merge pull request #174 from Jubast/master
Add stream ref middleware
2 parents 5f1096e + f765897 commit 158b24e

File tree

8 files changed

+163
-28
lines changed

8 files changed

+163
-28
lines changed

Source/Orleankka/ActorSystem.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ public abstract class ActorSystem : IActorSystem
5252
readonly IServiceProvider serviceProvider;
5353
readonly IGrainFactory grainFactory;
5454
readonly IActorRefMiddleware actorRefMiddleware;
55+
readonly IStreamRefMiddleware streamRefMiddleware;
5556

5657
protected ActorSystem(Assembly[] assemblies, IServiceProvider serviceProvider)
5758
{
5859
this.serviceProvider = serviceProvider;
5960
this.grainFactory = serviceProvider.GetService<IGrainFactory>();
6061
this.actorRefMiddleware = serviceProvider.GetService<IActorRefMiddleware>() ?? DefaultActorRefMiddleware.Instance;
62+
this.streamRefMiddleware = serviceProvider.GetService<IStreamRefMiddleware>() ?? DefaultStreamRefMiddleware.Instance;
6163

6264
Register(assemblies);
6365
}
@@ -97,7 +99,8 @@ public StreamRef<TItem> StreamOf<TItem>(StreamPath path)
9799
throw new ArgumentException("Stream path is empty", nameof(path));
98100

99101
var provider = serviceProvider.GetServiceByName<IStreamProvider>(path.Provider);
100-
return new StreamRef<TItem>(path, provider);
102+
103+
return new StreamRef<TItem>(path, provider, streamRefMiddleware);
101104
}
102105

103106
/// <inheritdoc />

Source/Orleankka/Receive.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@
55
namespace Orleankka
66
{
77
public delegate Task<object> Receive(object message);
8+
9+
public delegate Task Receive<in TMessage>(TMessage message);
810
}

Source/Orleankka/StreamRef.cs

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
using Orleans.Streams;
1111
using Orleans.Runtime;
1212

13+
using Microsoft.Extensions.DependencyInjection;
14+
1315
namespace Orleankka
1416
{
1517
using Utility;
@@ -18,13 +20,20 @@ namespace Orleankka
1820
[DebuggerDisplay("s->{ToString()}")]
1921
public class StreamRef<TItem> : IEquatable<StreamRef<TItem>>, IEquatable<StreamPath>
2022
{
21-
[NonSerialized]
22-
readonly IStreamProvider provider;
23+
[NonSerialized] readonly IStreamProvider provider;
24+
[NonSerialized] readonly IStreamRefMiddleware middleware;
2325

24-
protected internal StreamRef(StreamPath path, IStreamProvider provider = null)
26+
protected StreamRef(StreamPath path)
2527
{
2628
Path = path;
29+
}
30+
31+
internal StreamRef(StreamPath path, IStreamProvider provider, IStreamRefMiddleware middleware)
32+
: this(path)
33+
{
34+
2735
this.provider = provider;
36+
this.middleware = middleware;
2837
}
2938

3039
[NonSerialized]
@@ -88,16 +97,28 @@ public virtual async Task Publish<TMessage>(TMessage message) where TMessage : P
8897
switch (message)
8998
{
9099
case NextItem<TItem> next:
91-
await Endpoint.OnNextAsync(next.Item, next.Token);
100+
await middleware.Publish(Path, next, async x =>
101+
{
102+
await Endpoint.OnNextAsync(x.Item, x.Token);
103+
});
92104
break;
93105
case NextItemBatch<TItem> next:
94-
await Endpoint.OnNextBatchAsync(next.Items, next.Token);
106+
await middleware.Publish(Path, next, async x =>
107+
{
108+
await Endpoint.OnNextBatchAsync(x.Items, x.Token);
109+
});
95110
break;
96111
case NotifyStreamError error:
97-
await Endpoint.OnErrorAsync(error.Exception);
112+
await middleware.Publish(Path, error, async x =>
113+
{
114+
await Endpoint.OnErrorAsync(x.Exception);
115+
});
98116
break;
99-
case NotifyStreamCompleted _:
100-
await Endpoint.OnCompletedAsync();
117+
case NotifyStreamCompleted completed:
118+
await middleware.Publish(Path, completed, async _ =>
119+
{
120+
await Endpoint.OnCompletedAsync();
121+
});
101122
break;
102123
default:
103124
throw new ArgumentOutOfRangeException(nameof(message), $"Unsupported type of publish message: '{message.GetType()}'");
@@ -130,7 +151,7 @@ public virtual async Task<StreamSubscription<TItem>> Subscribe<TOptions>(Func<St
130151

131152
async Task<StreamSubscription<TItem>> Subscribe(SubscribeReceiveItem o)
132153
{
133-
var observer = new Observer(this, callback);
154+
var observer = CreateObserver(callback);
134155

135156
var predicate = o.Filter != null
136157
? StreamFilter.Internal.Predicate
@@ -142,7 +163,7 @@ async Task<StreamSubscription<TItem>> Subscribe(SubscribeReceiveItem o)
142163

143164
async Task<StreamSubscription<TItem>> SubscribeBatch(SubscribeReceiveBatch o)
144165
{
145-
var observer = new BatchObserver(this, callback);
166+
var observer = CreateBatchObserver(callback);
146167
var handle = await Endpoint.SubscribeAsync(observer, o.Token);
147168
return new StreamSubscription<TItem>(this, handle);
148169
}
@@ -198,46 +219,66 @@ static object Deserialize(Type t, IDeserializationContext context)
198219
{
199220
var reader = context.StreamReader;
200221
var path = StreamPath.Parse(reader.ReadString());
201-
var provider = context.ServiceProvider.GetServiceByName<IStreamProvider>(path.Provider);
202-
return new StreamRef<TItem>(path, provider);
222+
var system = context.ServiceProvider.GetRequiredService<IActorSystem>();
223+
return system.StreamOf<TItem>(path);
203224
}
204225

205226
#endregion
206227

228+
internal BatchObserver CreateBatchObserver(Func<StreamMessage, Task> callback)
229+
{
230+
return new BatchObserver(this, callback, middleware);
231+
}
232+
233+
internal Observer CreateObserver(Func<StreamMessage, Task> callback)
234+
{
235+
return new Observer(this, callback, middleware);
236+
}
237+
207238
internal class BatchObserver : IAsyncBatchObserver<TItem>
208239
{
209240
readonly StreamRef<TItem> stream;
210241
readonly Func<StreamMessage, Task> callback;
242+
readonly IStreamRefMiddleware middleware;
211243

212-
public BatchObserver(StreamRef<TItem> stream, Func<StreamMessage, Task> callback)
244+
public BatchObserver(StreamRef<TItem> stream, Func<StreamMessage, Task> callback, IStreamRefMiddleware middleware)
213245
{
214246
this.stream = stream;
215247
this.callback = callback;
248+
this.middleware = middleware;
216249
}
217250

218-
public Task OnNextAsync(IList<SequentialItem<TItem>> items) =>
219-
callback(new StreamItemBatch<TItem>(stream, items));
251+
public Task OnNextAsync(IList<SequentialItem<TItem>> items) =>
252+
middleware.Receive(stream.Path, new StreamItemBatch<TItem>(stream, items), x => callback(x));
253+
254+
public Task OnCompletedAsync() =>
255+
middleware.Receive(stream.Path, new StreamCompleted(stream), x => callback(x));
220256

221-
public Task OnCompletedAsync() => callback(new StreamCompleted(stream));
222-
public Task OnErrorAsync(Exception ex) => callback(new StreamError(stream, ex));
257+
public Task OnErrorAsync(Exception ex) =>
258+
middleware.Receive(stream.Path, new StreamError(stream, ex), x => callback(x));
223259
}
224260

225261
internal class Observer : IAsyncObserver<TItem>
226262
{
227263
readonly StreamRef<TItem> stream;
228264
readonly Func<StreamMessage, Task> callback;
265+
readonly IStreamRefMiddleware middleware;
229266

230-
public Observer(StreamRef<TItem> stream, Func<StreamMessage, Task> callback)
267+
public Observer(StreamRef<TItem> stream, Func<StreamMessage, Task> callback, IStreamRefMiddleware middleware)
231268
{
232269
this.stream = stream;
233270
this.callback = callback;
271+
this.middleware = middleware;
234272
}
235273

236-
public Task OnNextAsync(TItem item, StreamSequenceToken token = null) =>
237-
callback(new StreamItem<TItem>(stream, item, token));
274+
public Task OnNextAsync(TItem item, StreamSequenceToken token = null) =>
275+
middleware.Receive(stream.Path, new StreamItem<TItem>(stream, item, token), x => callback(x));
276+
277+
public Task OnCompletedAsync() =>
278+
middleware.Receive(stream.Path, new StreamCompleted(stream), x => callback(x));
238279

239-
public Task OnCompletedAsync() => callback(new StreamCompleted(stream));
240-
public Task OnErrorAsync(Exception ex) => callback(new StreamError(stream, ex));
280+
public Task OnErrorAsync(Exception ex) =>
281+
middleware.Receive(stream.Path, new StreamError(stream, ex), x => callback(x));
241282
}
242283
}
243284
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Threading.Tasks;
2+
3+
namespace Orleankka
4+
{
5+
public interface IStreamRefMiddleware
6+
{
7+
Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : PublishMessage;
8+
Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : StreamMessage;
9+
}
10+
11+
public abstract class StreamRefMiddleware : IStreamRefMiddleware
12+
{
13+
readonly IStreamRefMiddleware next;
14+
15+
protected StreamRefMiddleware(IStreamRefMiddleware next = null) =>
16+
this.next = next ?? DefaultStreamRefMiddleware.Instance;
17+
18+
public virtual Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : PublishMessage =>
19+
next.Publish(path, message, receiver);
20+
21+
public virtual Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : StreamMessage =>
22+
next.Receive(path, message, receiver);
23+
}
24+
25+
class DefaultStreamRefMiddleware : IStreamRefMiddleware
26+
{
27+
public static readonly DefaultStreamRefMiddleware Instance = new DefaultStreamRefMiddleware();
28+
29+
public Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : PublishMessage =>
30+
receiver(message);
31+
32+
public Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver) where TMessage : StreamMessage =>
33+
receiver(message);
34+
}
35+
}

Source/Orleankka/StreamSubscription.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,13 @@ public virtual async Task<StreamSubscription<TItem>> Resume<TOptions>(Func<Strea
7373

7474
async Task<StreamSubscription<TItem>> Resume(ResumeReceiveItem o)
7575
{
76-
var observer = new StreamRef<TItem>.Observer(Stream, callback);
76+
var observer = Stream.CreateObserver(callback);
7777
return new StreamSubscription<TItem>(Stream, await handle.ResumeAsync(observer, o.Token));
7878
}
7979

8080
async Task<StreamSubscription<TItem>> ResumeBatch(ResumeReceiveBatch o)
8181
{
82-
var observer = new StreamRef<TItem>.BatchObserver(Stream, callback);
82+
var observer = Stream.CreateBatchObserver(callback);
8383
return new StreamSubscription<TItem>(Stream, await handle.ResumeAsync(observer, o.Token));
8484
}
8585
}

Tests/Orleankka.Tests/Checks/StreamRefFixture.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ public void Equatable_by_path()
1010
{
1111
var path = StreamPath.From("sms", "42");
1212

13-
var ref1 = new StreamRef<string>(path, null);
14-
var ref2 = new StreamRef<string>(path, null);
13+
var ref1 = new StreamRef<string>(path, null, null);
14+
var ref2 = new StreamRef<string>(path, null, null);
1515

1616
Assert.True(ref1 == ref2);
1717
Assert.True(ref1.Equals(ref2));

Tests/Orleankka.Tests/Features/Intercepting_requests.cs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,29 @@ public override Task<object> Receive(ActorPath actor, object message, Receive re
125125
}
126126
}
127127

128+
public class TestStreamRefMiddleware : StreamRefMiddleware
129+
{
130+
public override Task Publish<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver)
131+
{
132+
if (message is NextItem<ItemData> nextItem && nextItem.Item.Text == "StreamRefMiddlewarePublishTest")
133+
{
134+
nextItem.Item.Text += " - it works!";
135+
}
136+
137+
return base.Publish(path, message, receiver);
138+
}
139+
140+
public override Task Receive<TMessage>(StreamPath path, TMessage message, Receive<TMessage> receiver)
141+
{
142+
if (message is StreamItem<ItemData> streamItem && streamItem.Item.Text == "StreamRefMiddlewareSubscribeTest")
143+
{
144+
streamItem.Item.Text += " - it works!";
145+
}
146+
147+
return base.Receive(path, message, receiver);
148+
}
149+
}
150+
128151
[TestFixture]
129152
[RequiresSilo]
130153
public class Tests
@@ -177,7 +200,7 @@ public async Task Intercepting_stream_messages()
177200

178201
await stream.Publish(new ItemData {Text = "foo"});
179202
await Task.Delay(TimeSpan.FromMilliseconds(10));
180-
203+
181204
var received = await actor.Ask(new GetReceivedFromStream());
182205
Assert.That(received.Count, Is.EqualTo(1));
183206
Assert.That(received[0], Is.EqualTo("foo.intercepted"));
@@ -190,6 +213,36 @@ public async Task Intercepting_actor_ref()
190213
var result = await actor.Ask<string>(new CheckRef());
191214
Assert.That(result, Is.EqualTo("it works!"));
192215
}
216+
217+
[Test]
218+
public async Task Intercepting_stream_ref_publish()
219+
{
220+
var stream = system.StreamOf<ItemData>("sms", "test-stream-ref-publish-interception");
221+
222+
var received = new List<ItemData>();
223+
await stream.Subscribe((item, _) => received.Add(item));
224+
225+
await stream.Publish(new ItemData { Text = "StreamRefMiddlewarePublishTest" });
226+
await Task.Delay(TimeSpan.FromMilliseconds(10));
227+
228+
Assert.That(received.Count, Is.EqualTo(1));
229+
Assert.That(received[0].Text, Is.EqualTo("StreamRefMiddlewarePublishTest - it works!"));
230+
}
231+
232+
[Test]
233+
public async Task Intercepting_stream_ref_subscribe()
234+
{
235+
var stream = system.StreamOf<ItemData>("sms", "test-stream-ref-subscribe-interception");
236+
237+
var received = new List<ItemData>();
238+
await stream.Subscribe((item, _) => received.Add(item));
239+
240+
await stream.Publish(new ItemData { Text = "StreamRefMiddlewareSubscribeTest" });
241+
await Task.Delay(TimeSpan.FromMilliseconds(10));
242+
243+
Assert.That(received.Count, Is.EqualTo(1));
244+
Assert.That(received[0].Text, Is.EqualTo("StreamRefMiddlewareSubscribeTest - it works!"));
245+
}
193246
}
194247
}
195248
}

Tests/Orleankka.Tests/Testing/TestActions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public override void BeforeTest(ITest test)
8282

8383
services.AddSingleton<IActorRefMiddleware>(s => new TestActorRefMiddleware());
8484
services.AddSingleton<IActorMiddleware>(s => new TestActorMiddleware());
85+
services.AddSingleton<IStreamRefMiddleware>(s => new TestStreamRefMiddleware());
8586
})
8687
.ConfigureApplicationParts(x => x
8788
.AddApplicationPart(GetType().Assembly)

0 commit comments

Comments
 (0)