Skip to content

Commit 12e3866

Browse files
committed
SQL filter
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 38e5d50 commit 12e3866

File tree

4 files changed

+245
-6
lines changed

4 files changed

+245
-6
lines changed

docs/examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@
1010
- [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client.
1111
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
1212
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
13-
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
13+
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
14+
- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams.
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/Azure/go-amqp"
7+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
8+
)
9+
10+
func ptr[T any](v T) *T {
11+
return &v
12+
}
13+
14+
func main() {
15+
// see: https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions
16+
queueName := "stream-sql-filter-example"
17+
18+
rmq.Info("AMQP 1.0 Client SQL Stream Filter Example")
19+
20+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
21+
22+
amqpConnection, err := env.NewConnection(context.Background())
23+
if err != nil {
24+
rmq.Error("Error opening connection", err)
25+
return
26+
}
27+
28+
_, err = amqpConnection.Management().DeclareQueue(context.Background(), &rmq.StreamQueueSpecification{
29+
Name: queueName,
30+
})
31+
if err != nil {
32+
rmq.Error("Error declaring stream queue", err)
33+
return
34+
}
35+
36+
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{
37+
Queue: queueName,
38+
}, nil)
39+
if err != nil {
40+
rmq.Error("Error creating publisher", err)
41+
return
42+
}
43+
44+
// this message will be stored on the queue but not received by the consumer
45+
// because it does not match the filter
46+
msgNotIntTheFilter := amqp.NewMessage([]byte("Message that does not match the filter"))
47+
msgNotIntTheFilter.Properties = &amqp.MessageProperties{
48+
Subject: ptr("No"),
49+
}
50+
msgNotIntTheFilter.ApplicationProperties = map[string]interface{}{"keyNo": "valueNO"}
51+
52+
pr, err := publisher.Publish(context.Background(), msgNotIntTheFilter)
53+
if err != nil {
54+
rmq.Error("Error publishing message", err)
55+
return
56+
}
57+
rmq.Info("Published message that does not match the filter", "publish result", pr.Outcome)
58+
59+
// this message will be stored on the queue and received by the consumer
60+
// because it matches the filter
61+
msgInTheFilter := amqp.NewMessage([]byte("Message that matches the filter"))
62+
msgInTheFilter.Properties = &amqp.MessageProperties{
63+
Subject: ptr("Yes_I_am_in_the_filter"),
64+
To: ptr("the_id"),
65+
}
66+
msgInTheFilter.ApplicationProperties = map[string]interface{}{"keyYes": "valueYES"}
67+
68+
pr, err = publisher.Publish(context.Background(), msgInTheFilter)
69+
if err != nil {
70+
rmq.Error("Error publishing message", err)
71+
return
72+
}
73+
rmq.Info("Published message that matches the filter", "publish result", pr.Outcome)
74+
75+
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, &rmq.StreamConsumerOptions{
76+
InitialCredits: 10,
77+
Offset: &rmq.OffsetFirst{},
78+
StreamFilterOptions: &rmq.StreamFilterOptions{
79+
// the SQL expression to filter messages
80+
SQL: "properties.subject LIKE '%Yes_I_am_in_the_filter%' AND properties.to = 'the_id' AND keyYes = 'valueYES'"},
81+
})
82+
83+
if err != nil {
84+
rmq.Error("Error creating consumer", err)
85+
return
86+
}
87+
88+
consumerContext, cancel := context.WithCancel(context.Background())
89+
defer cancel()
90+
91+
// Consume messages from the queue. It should only receive the message that matches the filter
92+
// the second
93+
deliveryContext, err := consumer.Receive(consumerContext)
94+
if err != nil {
95+
rmq.Error("[Consumer]", "Error receiving message", err)
96+
return
97+
}
98+
rmq.Info("[Consumer]", "Body",
99+
fmt.Sprintf("%s", deliveryContext.Message().Data),
100+
"Subject", *deliveryContext.Message().Properties.Subject, "To", *deliveryContext.Message().Properties.To)
101+
err = deliveryContext.Accept(context.Background())
102+
if err != nil {
103+
rmq.Error("Error accepting message", err)
104+
return
105+
}
106+
107+
err = amqpConnection.Management().DeleteQueue(context.Background(), queueName)
108+
if err != nil {
109+
rmq.Error("Error deleting stream queue", err)
110+
return
111+
}
112+
err = amqpConnection.Close(context.Background())
113+
if err != nil {
114+
rmq.Error("Error closing connection", err)
115+
return
116+
}
117+
_ = env.CloseConnections(context.Background())
118+
rmq.Info("AMQP 1.0 Client SQL Stream Filter Example Completed")
119+
120+
}

pkg/rabbitmqamqp/amqp_consumer_stream_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,98 @@ var _ = Describe("Consumer stream test", func() {
555555
}()
556556
})
557557

558+
It("SQL filter consumer", func() {
559+
qName := generateName("SQL filter consumer")
560+
connection, err := Dial(context.Background(), "amqp://", nil)
561+
Expect(err).To(BeNil())
562+
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
563+
Name: qName,
564+
})
565+
Expect(err).To(BeNil())
566+
Expect(queueInfo).NotTo(BeNil())
567+
Expect(queueInfo.name).To(Equal(qName))
568+
569+
publishMessagesWithMessageLogic(qName, "not_in_the_filter", 10, func(msg *amqp.Message) {
570+
msg.Properties = &amqp.MessageProperties{Subject: ptr("not")}
571+
})
572+
573+
publishMessagesWithMessageLogic(qName, "in_the_filter", 10, func(msg *amqp.Message) {
574+
msg.Properties = &amqp.MessageProperties{Subject: ptr("in_the_filter")}
575+
})
576+
577+
publishMessagesWithMessageLogic(qName, "in_the_filter_with_more", 10, func(msg *amqp.Message) {
578+
msg.Properties = &amqp.MessageProperties{Subject: ptr("in_the_filter_with_more")}
579+
})
580+
581+
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
582+
InitialCredits: 200,
583+
Offset: &OffsetFirst{},
584+
StreamFilterOptions: &StreamFilterOptions{
585+
SQL: "properties.subject LIKE '%in_the_filter%'",
586+
},
587+
})
588+
Expect(err).To(BeNil())
589+
Expect(consumer).NotTo(BeNil())
590+
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
591+
for i := 0; i < 20; i++ {
592+
dc, err := consumer.Receive(context.Background())
593+
Expect(err).To(BeNil())
594+
Expect(dc.Message()).NotTo(BeNil())
595+
if i < 10 {
596+
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "in_the_filter")))
597+
} else {
598+
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i-10, "in_the_filter_with_more")))
599+
}
600+
Expect(dc.Accept(context.Background())).To(BeNil())
601+
}
602+
603+
Expect(consumer.Close(context.Background())).To(BeNil())
604+
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
605+
Expect(connection.Close(context.Background())).To(BeNil())
606+
})
607+
608+
It("SQL filter consumer combined to other fields", func() {
609+
qName := generateName("SQL filter consumer combined to other fields")
610+
connection, err := Dial(context.Background(), "amqp://", nil)
611+
Expect(err).To(BeNil())
612+
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
613+
Name: qName,
614+
})
615+
Expect(err).To(BeNil())
616+
Expect(queueInfo).NotTo(BeNil())
617+
Expect(queueInfo.name).To(Equal(qName))
618+
publishMessagesWithMessageLogic(qName, "not_in_the_filter", 10, func(msg *amqp.Message) {
619+
msg.Properties = &amqp.MessageProperties{Subject: ptr("not")}
620+
})
621+
622+
publishMessagesWithMessageLogic(qName, "in_the_filter", 10, func(msg *amqp.Message) {
623+
msg.Properties = &amqp.MessageProperties{Subject: ptr("p_in_the_filter"), To: ptr("the_id")}
624+
msg.ApplicationProperties = map[string]interface{}{"a_in_the_filter_key": "a_in_the_filter_value"}
625+
})
626+
627+
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
628+
InitialCredits: 200,
629+
Offset: &OffsetFirst{},
630+
StreamFilterOptions: &StreamFilterOptions{
631+
SQL: "properties.subject LIKE '%in_the_filter%' AND properties.to = 'the_id' AND a_in_the_filter_key = 'a_in_the_filter_value'",
632+
},
633+
})
634+
635+
Expect(err).To(BeNil())
636+
Expect(consumer).NotTo(BeNil())
637+
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
638+
for i := 0; i < 10; i++ {
639+
dc, err := consumer.Receive(context.Background())
640+
Expect(err).To(BeNil())
641+
Expect(dc.Message()).NotTo(BeNil())
642+
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "in_the_filter")))
643+
Expect(dc.Accept(context.Background())).To(BeNil())
644+
}
645+
646+
Expect(consumer.Close(context.Background())).To(BeNil())
647+
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
648+
Expect(connection.Close(context.Background())).To(BeNil())
649+
})
558650
})
559651

560652
type msgLogic = func(*amqp.Message)

pkg/rabbitmqamqp/amqp_types.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,13 @@ type IOffsetSpecification interface {
118118
toLinkFilter() amqp.LinkFilter
119119
}
120120

121+
// DescriptorCodeSqlFilter see:
122+
// https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/amqp10_common/include/amqp10_filter.hrl
123+
// see DESCRIPTOR_CODE_SQL_FILTER in rabbitmq-server
124+
// DESCRIPTOR_CODE_SQL_FILTER is the uint64 code for amqpSqlFilter = "amqp:sql-filter"
125+
const DescriptorCodeSqlFilter = 0x120
126+
121127
const sqlFilter = "sql-filter"
122-
const amqpSqlFilter = "amqp:sql-filter"
123128
const rmqStreamFilter = "rabbitmq:stream-filter"
124129
const rmqStreamOffsetSpec = "rabbitmq:stream-offset-spec"
125130
const rmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered"
@@ -174,7 +179,27 @@ type StreamFilterOptions struct {
174179
// Filter the data based on Message Properties
175180
Properties *amqp.MessageProperties
176181

177-
// SQLFilter
182+
/* SQLFilter: documentation https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions
183+
It requires RabbitMQ 4.2 or later
184+
185+
Example:
186+
<code>
187+
Define a message like:
188+
var msg := NewMessage([]byte(..))
189+
msg.Properties = &amqp.MessageProperties{Subject: ptr("mySubject"), To: ptr("To")}
190+
msg.ApplicationProperties = map[string]interface{}{"filter_key": "filter_value"}
191+
192+
publisher.Publish(context.Background(), msg)
193+
Then you can create a consumer with a SQL filter like:
194+
consumer, err := connection.NewConsumer(context.Background(), "myQueue", &StreamConsumerOptions{
195+
InitialCredits: 200,
196+
Offset: &OffsetFirst{},
197+
StreamFilterOptions: &StreamFilterOptions{
198+
SQL: "properties.subject LIKE '%mySubject%' AND properties.to = 'To' AND filter_key = 'filter_value'",
199+
},
200+
})
201+
</code>
202+
*/
178203
SQL string
179204
}
180205

@@ -208,9 +233,10 @@ func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter {
208233

209234
filters = append(filters, sco.Offset.toLinkFilter())
210235
if sco.StreamFilterOptions != nil && !isStringNilOrEmpty(&sco.StreamFilterOptions.SQL) {
211-
l := map[string]any{}
212-
l[amqpSqlFilter] = sco.StreamFilterOptions.SQL
213-
filters = append(filters, amqp.NewLinkFilter(sqlFilter, 0, l))
236+
// here we use DescriptorCodeSqlFilter as the code for the sql filter
237+
// since we need to create a simple DescribedType
238+
// see DescriptorCodeSqlFilter const for more information
239+
filters = append(filters, amqp.NewLinkFilter(sqlFilter, DescriptorCodeSqlFilter, sco.StreamFilterOptions.SQL))
214240
}
215241

216242
if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Values != nil {

0 commit comments

Comments
 (0)