-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathImplicitChannelSubscriptionAttribute.cs
More file actions
130 lines (116 loc) · 5.65 KB
/
ImplicitChannelSubscriptionAttribute.cs
File metadata and controls
130 lines (116 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.DependencyInjection;
using Orleans.BroadcastChannel;
using Orleans.Metadata;
using Orleans.Runtime;
namespace Orleans
{
/// <summary>
/// The [Orleans.ImplicitStreamSubscription] attribute is used to mark grains as implicit stream subscriptions.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public class ImplicitChannelSubscriptionAttribute : Attribute, IGrainBindingsProviderAttribute
{
/// <summary>
/// Gets the stream namespace filter predicate.
/// </summary>
public IChannelNamespacePredicate Predicate { get; }
/// <summary>
/// Gets the name of the channel identifier mapper.
/// </summary>
/// <value>The name of the channel identifier mapper.</value>
public string ChannelIdMapper { get; }
/// <summary>
/// Used to subscribe to all stream namespaces.
/// </summary>
public ImplicitChannelSubscriptionAttribute()
{
Predicate = new AllStreamNamespacesPredicate();
}
/// <summary>
/// Used to subscribe to the specified stream namespace.
/// </summary>
/// <param name="streamNamespace">The stream namespace to subscribe.</param>
/// <param name="channelIdMapper">The name of the stream identity mapper.</param>
public ImplicitChannelSubscriptionAttribute(string streamNamespace, string channelIdMapper = null)
{
Predicate = new ExactMatchChannelNamespacePredicate(streamNamespace.Trim());
ChannelIdMapper = channelIdMapper;
}
/// <summary>
/// Allows to pass an arbitrary predicate type to filter stream namespaces to subscribe. The predicate type
/// must have a constructor without parameters.
/// </summary>
/// <param name="predicateType">The stream namespace predicate type.</param>
/// <param name="channelIdMapper">The name of the stream identity mapper.</param>
public ImplicitChannelSubscriptionAttribute(Type predicateType, string channelIdMapper = null)
{
Predicate = (IChannelNamespacePredicate) Activator.CreateInstance(predicateType);
ChannelIdMapper = channelIdMapper;
}
/// <summary>
/// Allows to pass an instance of the stream namespace predicate. To be used mainly as an extensibility point
/// via inheriting attributes.
/// </summary>
/// <param name="predicate">The stream namespace predicate.</param>
/// <param name="channelIdMapper">The name of the stream identity mapper.</param>
public ImplicitChannelSubscriptionAttribute(IChannelNamespacePredicate predicate, string channelIdMapper = null)
{
Predicate = predicate;
ChannelIdMapper = channelIdMapper;
}
/// <inheritdoc />
public IEnumerable<Dictionary<string, string>> GetBindings(IServiceProvider services, Type grainClass, GrainType grainType)
{
// Register the predicate type so the constructor provider will accept it.
foreach (var provider in services.GetServices<IChannelNamespacePredicateProvider>())
{
if (provider is ConstructorChannelNamespacePredicateProvider ctorProvider)
{
ctorProvider.RegisterPredicateType(Predicate.GetType());
break;
}
}
var binding = new Dictionary<string, string>
{
[WellKnownGrainTypeProperties.BindingTypeKey] = WellKnownGrainTypeProperties.BroadcastChannelBindingTypeValue,
[WellKnownGrainTypeProperties.BroadcastChannelBindingPatternKey] = this.Predicate.PredicatePattern,
[WellKnownGrainTypeProperties.ChannelIdMapperKey] = this.ChannelIdMapper,
};
if (LegacyGrainId.IsLegacyGrainType(grainClass))
{
string keyType;
if (typeof(IGrainWithGuidKey).IsAssignableFrom(grainClass) || typeof(IGrainWithGuidCompoundKey).IsAssignableFrom(grainClass))
keyType = nameof(Guid);
else if (typeof(IGrainWithIntegerKey).IsAssignableFrom(grainClass) || typeof(IGrainWithIntegerCompoundKey).IsAssignableFrom(grainClass))
keyType = nameof(Int64);
else // fallback to string
keyType = nameof(String);
binding[WellKnownGrainTypeProperties.LegacyGrainKeyType] = keyType;
}
if (LegacyGrainId.IsLegacyKeyExtGrainType(grainClass))
{
binding[WellKnownGrainTypeProperties.StreamBindingIncludeNamespaceKey] = "true";
}
yield return binding;
}
}
/// <summary>
/// The [Orleans.RegexImplicitStreamSubscription] attribute is used to mark grains as implicit stream
/// subscriptions by filtering stream namespaces to subscribe using a regular expression.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class RegexImplicitChannelSubscriptionAttribute : ImplicitChannelSubscriptionAttribute
{
/// <summary>
/// Allows to pass a regular expression to filter stream namespaces to subscribe to.
/// </summary>
/// <param name="pattern">The stream namespace regular expression filter.</param>
public RegexImplicitChannelSubscriptionAttribute([StringSyntax(StringSyntaxAttribute.Regex)] string pattern)
: base(new RegexChannelNamespacePredicate(pattern))
{
}
}
}