@@ -76,8 +76,7 @@ public IConsumerBuilder SubscriptionListener(Action<IConsumerBuilder.ListenerCon
7676
7777 public IConsumerBuilder . IStreamOptions Stream ( )
7878 {
79- return new ConsumerBuilderStreamOptions ( this , _configuration . Filters ,
80- _amqpConnection . _featureFlags ) ;
79+ return new ConsumerBuilderStreamOptions ( this , _configuration . Filters ) ;
8180 }
8281
8382 public async Task < IConsumer > BuildAndStartAsync ( CancellationToken cancellationToken = default )
@@ -87,6 +86,13 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
8786 throw new ConsumerException ( "Message handler is not set" ) ;
8887 }
8988
89+ if ( _configuration . Filters [ Consts . s_sqlFilterSymbol ] is not null &&
90+ _amqpConnection . _featureFlags . IsSqlFeatureEnabled == false )
91+ {
92+ throw new ConsumerException ( "SQL filter is not supported by the connection." +
93+ "RabbitMQ 4.2.0 or later is required." ) ;
94+ }
95+
9096 AmqpConsumer consumer = new ( _amqpConnection , _configuration , _metricsReporter ) ;
9197
9298 // TODO pass cancellationToken
@@ -107,34 +113,17 @@ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions
107113 private static readonly Regex s_offsetValidator = new Regex ( "^[0-9]+[YMDhms]$" ,
108114 RegexOptions . Compiled | RegexOptions . CultureInvariant ) ;
109115
110- // sql-filter
111- private const string SqlFilter = "sql-filter" ;
112- private const string RmqStreamFilter = "rabbitmq:stream-filter" ;
113- private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec" ;
114-
115- private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered" ;
116-
117- // amqp:sql-filter
118- private const string AmqpSqlFilter = "amqp:sql-filter" ;
119-
120- private static readonly Symbol s_streamFilterSymbol = new ( RmqStreamFilter ) ;
121- private static readonly Symbol s_streamOffsetSpecSymbol = new ( RmqStreamOffsetSpec ) ;
122- private static readonly Symbol s_streamMatchUnfilteredSymbol = new ( RmqStreamMatchUnfiltered ) ;
123- private static readonly Symbol s_streamSqlFilterSymbol = new ( AmqpSqlFilter ) ;
124-
125116 private readonly Map _filters ;
126- private readonly FeatureFlags _featureFlags ;
127117
128- protected StreamOptions ( Map filters , FeatureFlags featureFlags )
118+ protected StreamOptions ( Map filters )
129119 {
130120 _filters = filters ;
131- _featureFlags = featureFlags ;
132121 }
133122
134123 public IConsumerBuilder . IStreamOptions Offset ( long offset )
135124 {
136- _filters [ s_streamOffsetSpecSymbol ] =
137- new DescribedValue ( s_streamOffsetSpecSymbol , offset ) ;
125+ _filters [ Consts . s_streamOffsetSpecSymbol ] =
126+ new DescribedValue ( Consts . s_streamOffsetSpecSymbol , offset ) ;
138127 return this ;
139128 }
140129
@@ -168,42 +157,24 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)
168157
169158 public IConsumerBuilder . IStreamOptions FilterValues ( params string [ ] values )
170159 {
171- _filters [ s_streamFilterSymbol ] =
172- new DescribedValue ( s_streamFilterSymbol , values . ToList ( ) ) ;
160+ _filters [ Consts . s_streamFilterSymbol ] =
161+ new DescribedValue ( Consts . s_streamFilterSymbol , values . ToList ( ) ) ;
173162 return this ;
174163 }
175164
176165 public IConsumerBuilder . IStreamOptions FilterMatchUnfiltered ( bool matchUnfiltered )
177166 {
178- _filters [ s_streamMatchUnfilteredSymbol ]
179- = new DescribedValue ( s_streamMatchUnfilteredSymbol , matchUnfiltered ) ;
167+ _filters [ Consts . s_streamMatchUnfilteredSymbol ]
168+ = new DescribedValue ( Consts . s_streamMatchUnfilteredSymbol , matchUnfiltered ) ;
180169 return this ;
181170 }
182171
183- public IConsumerBuilder . IStreamFilterOptions Sql ( string sql )
184- {
185- if ( string . IsNullOrWhiteSpace ( sql ) )
186- {
187- throw new ArgumentNullException ( nameof ( sql ) ) ;
188- }
189-
190- if ( false == _featureFlags . IsSqlFeatureEnabled )
191- {
192- throw new ConsumerException ( "SQL filter is not supported by the broker. " +
193- "The broker must be RabbitMQ 4.2 or later." ) ;
194- }
195-
196- _filters [ SqlFilter ] =
197- new DescribedValue ( s_streamSqlFilterSymbol , sql ) ;
198- return Filter ( ) ;
199- }
200-
201172 public abstract IConsumerBuilder Builder ( ) ;
202173
203174 private void SetOffsetSpecificationFilter ( object value )
204175 {
205- _filters [ s_streamOffsetSpecSymbol ]
206- = new DescribedValue ( s_streamOffsetSpecSymbol , value ) ;
176+ _filters [ Consts . s_streamOffsetSpecSymbol ]
177+ = new DescribedValue ( Consts . s_streamOffsetSpecSymbol , value ) ;
207178 }
208179
209180 public IConsumerBuilder . IStreamFilterOptions Filter ( )
@@ -226,8 +197,8 @@ public IConsumerBuilder.IStreamFilterOptions Filter()
226197 /// </summary>
227198 public class ListenerStreamOptions : StreamOptions
228199 {
229- public ListenerStreamOptions ( Map filters , FeatureFlags featureFlags )
230- : base ( filters , featureFlags )
200+ public ListenerStreamOptions ( Map filters )
201+ : base ( filters )
231202 {
232203 }
233204
@@ -249,8 +220,8 @@ public class ConsumerBuilderStreamOptions : StreamOptions
249220 private readonly IConsumerBuilder _consumerBuilder ;
250221
251222 public ConsumerBuilderStreamOptions ( IConsumerBuilder consumerBuilder ,
252- Map filters , FeatureFlags featureFlags )
253- : base ( filters , featureFlags )
223+ Map filters )
224+ : base ( filters )
254225 {
255226 _consumerBuilder = consumerBuilder ;
256227 }
@@ -267,15 +238,27 @@ public override IConsumerBuilder Builder()
267238 /// </summary>
268239 public class StreamFilterOptions : IConsumerBuilder . IStreamFilterOptions
269240 {
270- private IConsumerBuilder . IStreamOptions _streamOptions ;
271- private Map _filters ;
241+ private readonly IConsumerBuilder . IStreamOptions _streamOptions ;
242+ private readonly Map _filters ;
272243
273244 public StreamFilterOptions ( IConsumerBuilder . IStreamOptions streamOptions , Map filters )
274245 {
275246 _streamOptions = streamOptions ;
276247 _filters = filters ;
277248 }
278249
250+ public IConsumerBuilder . IStreamFilterOptions Sql ( string sql )
251+ {
252+ if ( string . IsNullOrWhiteSpace ( sql ) )
253+ {
254+ throw new ArgumentNullException ( nameof ( sql ) ) ;
255+ }
256+
257+ _filters [ Consts . s_sqlFilterSymbol ] =
258+ new DescribedValue ( Consts . s_streamSqlFilterSymbol , sql ) ;
259+ return this ;
260+ }
261+
279262 public IConsumerBuilder . IStreamOptions Stream ( )
280263 {
281264 return _streamOptions ;
@@ -328,9 +311,8 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v
328311
329312 private StreamFilterOptions PropertyFilter ( string propertyKey , object propertyValue )
330313 {
331- const string AmqpPropertiesFilter = "amqp:properties-filter" ;
332314
333- DescribedValue propertiesFilterValue = Filter ( AmqpPropertiesFilter ) ;
315+ DescribedValue propertiesFilterValue = Filter ( Consts . AmqpPropertiesFilter ) ;
334316 Map propertiesFilter = ( Map ) propertiesFilterValue . Value ;
335317 // Note: you MUST use a symbol as the key
336318 propertiesFilter . Add ( new Symbol ( propertyKey ) , propertyValue ) ;
@@ -339,9 +321,8 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa
339321
340322 private StreamFilterOptions ApplicationPropertyFilter ( string propertyKey , object propertyValue )
341323 {
342- const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter" ;
343324
344- DescribedValue applicationPropertiesFilterValue = Filter ( AmqpApplicationPropertiesFilter ) ;
325+ DescribedValue applicationPropertiesFilterValue = Filter ( Consts . AmqpApplicationPropertiesFilter ) ;
345326 Map applicationPropertiesFilter = ( Map ) applicationPropertiesFilterValue . Value ;
346327 // Note: do NOT put a symbol as the key
347328 applicationPropertiesFilter . Add ( propertyKey , propertyValue ) ;
0 commit comments