@@ -258,7 +258,7 @@ class MyMessageHandler(AMQPMessagingHandler):
258258 self .delivery_context.accept(event)
259259
260260stream_address = AddressHelper.queue_address(" some-stream" )
261- consumer = consumer_connection .consumer(
261+ consumer = connection .consumer(
262262 stream_address,
263263 message_handler = MyMessageHandler(),
264264 # ## This Bloom filter will be evaluated server-side per chunk (Stage 1).
@@ -427,6 +427,7 @@ The filter syntax is defined in the AMQP 1.0 extension specification [AMQP Filte
427427RabbitMQ supports a subset of this specification as described below.
428428
429429AMQP filter expressions are either [** Property** Filter Expressions](https:// docs.oasis- open .org/ amqp/ filtex/ v1.0/ csd01/ filtex- v1.0- csd01.html# _Toc67929259) or [**SQL** Filter Expressions](https://docs.oasis-open.org/amqp/filtex/v1.0/csd01/filtex-v1.0-csd01.html#_Toc67929276).
430+
430431
431432:::note
432433
@@ -447,6 +448,7 @@ RabbitMQ implements:
447448
448449As described in the specification, prefix and suffix matching is supported using `& p:< prefix> ` and `& s:< suffix> ` modifiers.
449450
451+
450452# ### Example: Property Filter Expressions
451453
452454The following example causes RabbitMQ to deliver only messages for which ** all ** of the following apply:
@@ -471,6 +473,56 @@ Consumer consumer = connection.consumerBuilder()
471473```
472474< / TabItem>
473475
476+ < TabItem value = " csharp" label = " C#" >
477+ ```csharp
478+ IConsumer consumer = await connection.ConsumerBuilder().Queue(" my-queue" ).
479+ MessageHandler((context, message) = >
480+ {
481+ // process the messages
482+ }
483+ ).Stream().Offset(StreamOffsetSpecification.First).Filter()
484+ .UserId(" John" u8.ToArray())
485+ .Subject(" &p:Order" )
486+ .Property(" region" , " emea" )
487+ .Stream().Builder()
488+ .BuildAndStartAsync();
489+ ```
490+ < / TabItem>
491+
492+ < TabItem value = " go" label = " Go" >
493+ ```go
494+ var subjectPrt = " &p:Order"
495+ consumer, err := amqpConnection.NewConsumer(context.Background(), " my-queue" ,
496+ & rmq.StreamConsumerOptions{
497+ Offset: & rmq.OffsetFirst{},
498+ StreamFilterOptions: & rmq.StreamFilterOptions{
499+ Properties: & amqp.MessageProperties{Subject: & subjectPrt, UserID: []byte(" John" )},
500+ ApplicationProperties: map [string]interface{}{" region" : " emea" },
501+ },
502+ })
503+ ```
504+ < / TabItem>
505+
506+ < TabItem value = " python" label = " Python" >
507+ ```python
508+ consumer = connection.consumer(
509+ " my-queue" ,
510+ message_handler = MyMessageHandler(),
511+ consumer_options = StreamConsumerOptions(
512+ offset_specification = OffsetSpecification.first,
513+ filter_options = StreamFilterOptions(
514+ message_properties = MessageProperties(
515+ subject = " &p:Order" ,
516+ user_id = " John" .encode(" utf-8" ),
517+
518+ ),
519+ application_properties = {" region" : " emea" },
520+ ),
521+ ),
522+ )
523+ ```
524+ < / TabItem>
525+
474526< TabItem value = " Erlang" label = " Erlang" >
475527```erlang
476528Filter = # {<<"filter-name-1">> =>
@@ -725,6 +777,52 @@ Consumer consumer = connection.consumerBuilder()
725777```
726778< / TabItem>
727779
780+ < TabItem value=" csharp" label=" C#" >
781+ ```csharp
782+ IConsumer consumer = await connection.ConsumerBuilder().Queue(" my-queue" ).
783+ MessageHandler((context, message) = >
784+ {
785+ // process the messages
786+ }
787+ ).Stream().Offset(StreamOffsetSpecification.First).Filter()
788+ .Sql(" properties.user_id = 'John' AND"
789+ + " properties.subject LIKE 'Order%' AND region = 'emea'" )
790+ .Stream().Builder()
791+ .Stream().Builder()
792+ .BuildAndStartAsync();
793+ ```
794+ < / TabItem>
795+
796+ < TabItem value=" go" label=" Go" >
797+ ```go
798+ consumer, err := amqpConnection.NewConsumer(context.Background(), " my-queue" ,
799+ & rmq.StreamConsumerOptions{
800+ Offset: & rmq.OffsetFirst{},
801+ StreamFilterOptions: & rmq.StreamFilterOptions{
802+ SQL : " properties.user_id = 'John' AND "
803+ + " properties.subject LIKE 'Order%' AND region = 'emea'" ,
804+ },
805+ })
806+ ```
807+ < / TabItem>
808+
809+ < TabItem value=" python" label=" Python" >
810+ ```python
811+ consumer = connection.consumer(
812+ " my-queue" ,
813+ message_handler = MyMessageHandler(),
814+ consumer_options = StreamConsumerOptions(
815+ offset_specification = OffsetSpecification.first,
816+ filter_options = StreamFilterOptions(
817+ sql = " properties.user_id = 'John' AND "
818+ + " properties.subject LIKE 'Order%' AND region = 'emea'" ,
819+ ),
820+ ),
821+ )
822+ ```
823+ < / TabItem>
824+
825+
728826
729827< TabItem value=" Erlang" label=" Erlang" >
730828```erlang
@@ -829,6 +927,69 @@ Consumer consumer = connection.consumerBuilder()
829927```
830928< / TabItem>
831929
930+ < TabItem value = " csharp" label = " C#" >
931+ ```csharp
932+ IConsumer consumer = await connection.ConsumerBuilder().Queue(" my-queue" ).
933+ MessageHandler((context, message) = >
934+ // message processing
935+ }).Stream().Offset(StreamOffsetSpecification.First)
936+ // This Bloom filter will be evaluated server- side per chunk (Stage 1 ).
937+ .FilterValues(" order.created" )
938+ .Filter()
939+ // This complex SQL filter expression will be evaluted server- side
940+ // per message at stage 2 .
941+ .Sql(" p.subject = 'order.created' AND " +
942+ " p.creation_time > UTC() - 3600000 AND " +
943+ " region IN ('AMER', 'EMEA', 'APJ') AND " +
944+ " (h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)" ).Stream().Builder()
945+ .BuildAndStartAsync().ConfigureAwait(false);
946+ ```
947+ < / TabItem>
948+
949+ < TabItem value = " go" label = " Go" >
950+ ```csharp
951+ consumer, err := amqpConnection.NewConsumer(context.Background(), " my-queue" ,
952+ & rmq.StreamConsumerOptions{
953+ Offset: & rmq.OffsetFirst{},
954+ StreamFilterOptions: & rmq.StreamFilterOptions{
955+ // This Bloom filter will be evaluated server- side per chunk (Stage 1 ).
956+ Values: []string{" order.created" },
957+ // This complex SQL filter expression will be evaluted server- side
958+ // per message at stage 2 .
959+ SQL :" p.subject = 'order.created' AND " +
960+ " p.creation_time > UTC() - 3600000 AND " +
961+ " region IN ('AMER', 'EMEA', 'APJ') AND " +
962+ " (h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)" ,
963+ },
964+ })
965+ ```
966+ < / TabItem>
967+
968+ < TabItem value = " python" label = " Python" >
969+ ```python
970+ consumer = consumer_connection.consumer(
971+ " my-queue" ,
972+ message_handler = MyMessageHandler(),
973+ # the consumer will only receive messages with filter value banana and subject yellow
974+ # and application property from = italy
975+ consumer_options = StreamConsumerOptions(
976+ offset_specification = OffsetSpecification.first,
977+ filter_options = StreamFilterOptions(
978+ # This Bloom filter will be evaluated server-side per chunk (Stage 1).
979+ values = [" order.created" ],
980+ # This complex SQL filter expression will be evaluted server-side
981+ # per message at stage 2.
982+ sql = " p.subject = 'order.created' AND " +
983+ " p.creation_time > UTC() - 3600000 AND " +
984+ " region IN ('AMER', 'EMEA', 'APJ') AND " +
985+ " (h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)" ,
986+ ),
987+ ),
988+ )
989+ ```
990+ < / TabItem>
991+
992+
832993< TabItem value = " Erlang" label = " Erlang" >
833994```erlang
834995Expression = << " p.subject = 'order.created' AND "
0 commit comments