-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathStreamSubscriptionAttributes.cs
More file actions
133 lines (119 loc) · 5.78 KB
/
StreamSubscriptionAttributes.cs
File metadata and controls
133 lines (119 loc) · 5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Metadata;
using Orleans.Runtime;
using Orleans.Streams;
namespace Orleans
{
/// <summary>
/// The [Orleans.ImplicitStreamSubscription] attribute is used to mark grains as implicit stream subscriptions.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public class ImplicitStreamSubscriptionAttribute : Attribute, IGrainBindingsProviderAttribute
{
/// <summary>
/// Gets the stream namespace filter predicate.
/// </summary>
public IStreamNamespacePredicate Predicate { get; }
/// <summary>
/// Gets the name of the stream identifier mapper.
/// </summary>
/// <value>The name of the stream identifier mapper.</value>
/// <remarks>
/// This value is the name used to resolve the <see cref="IStreamIdMapper"/> registered in the dependency injection container.
/// </remarks>
public string StreamIdMapper { get; init; }
/// <summary>
/// Used to subscribe to all stream namespaces.
/// </summary>
public ImplicitStreamSubscriptionAttribute()
{
Predicate = new AllStreamNamespacesPredicate();
}
/// <summary>
/// Used to subscribe to the specified stream namespace.
/// </summary>
/// <param name="streamNamespace">The stream namespace to subscribe.</param>
/// <param name="streamIdMapper">The name of the stream identity mapper.</param>
public ImplicitStreamSubscriptionAttribute(string streamNamespace, string streamIdMapper = null)
{
Predicate = new ExactMatchStreamNamespacePredicate(streamNamespace.Trim());
StreamIdMapper = streamIdMapper;
}
/// <summary>
/// Allows to pass an arbitrary predicate type to filter stream namespaces to subscribe. The predicate type
/// must have a constructor without parameters.
/// </summary>
/// <param name="predicateType">The stream namespace predicate type.</param>
/// <param name="streamIdMapper">The name of the stream identity mapper.</param>
public ImplicitStreamSubscriptionAttribute(Type predicateType, string streamIdMapper = null)
{
Predicate = (IStreamNamespacePredicate) Activator.CreateInstance(predicateType);
StreamIdMapper = streamIdMapper;
}
/// <summary>
/// Allows to pass an instance of the stream namespace predicate. To be used mainly as an extensibility point
/// via inheriting attributes.
/// </summary>
/// <param name="predicate">The stream namespace predicate.</param>
/// <param name="streamIdMapper">The name of the stream identity mapper.</param>
public ImplicitStreamSubscriptionAttribute(IStreamNamespacePredicate predicate, string streamIdMapper = null)
{
Predicate = predicate;
StreamIdMapper = streamIdMapper;
}
/// <inheritdoc />
public IEnumerable<Dictionary<string, string>> GetBindings(IServiceProvider services, Type grainClass, GrainType grainType)
{
// Register the predicate type so the constructor provider will accept it.
foreach (var provider in services.GetServices<IStreamNamespacePredicateProvider>())
{
if (provider is ConstructorStreamNamespacePredicateProvider ctorProvider)
{
ctorProvider.RegisterPredicateType(Predicate.GetType());
break;
}
}
var binding = new Dictionary<string, string>
{
[WellKnownGrainTypeProperties.BindingTypeKey] = WellKnownGrainTypeProperties.StreamBindingTypeValue,
[WellKnownGrainTypeProperties.StreamBindingPatternKey] = this.Predicate.PredicatePattern,
[WellKnownGrainTypeProperties.StreamIdMapperKey] = this.StreamIdMapper,
};
if (LegacyGrainId.IsLegacyGrainType(grainClass))
{
string keyType;
if (typeof(IGrainWithGuidKey).IsAssignableFrom(grainClass) || typeof(IGrainWithGuidCompoundKey).IsAssignableFrom(grainClass))
keyType = nameof(Guid);
else if (typeof(IGrainWithIntegerKey).IsAssignableFrom(grainClass) || typeof(IGrainWithIntegerCompoundKey).IsAssignableFrom(grainClass))
keyType = nameof(Int64);
else // fallback to string
keyType = nameof(String);
binding[WellKnownGrainTypeProperties.LegacyGrainKeyType] = keyType;
}
if (LegacyGrainId.IsLegacyKeyExtGrainType(grainClass))
{
binding[WellKnownGrainTypeProperties.StreamBindingIncludeNamespaceKey] = "true";
}
yield return binding;
}
}
/// <summary>
/// The [Orleans.RegexImplicitStreamSubscription] attribute is used to mark grains as implicit stream
/// subscriptions by filtering stream namespaces to subscribe using a regular expression.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class RegexImplicitStreamSubscriptionAttribute : ImplicitStreamSubscriptionAttribute
{
/// <summary>
/// Allows to pass a regular expression to filter stream namespaces to subscribe to.
/// </summary>
/// <param name="pattern">The stream namespace regular expression filter.</param>
public RegexImplicitStreamSubscriptionAttribute([StringSyntax(StringSyntaxAttribute.Regex)] string pattern)
: base(new RegexStreamNamespacePredicate(pattern))
{
}
}
}