Skip to content

Commit 4023804

Browse files
committed
implement SQL Filter
closes: #73 Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 6a08cb2 commit 4023804

File tree

3 files changed

+77
-3
lines changed

3 files changed

+77
-3
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.1.0-management
10+
readonly rabbitmq_image=rabbitmq:4.2-rc-management
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-python-client'

rabbitmq_amqp_python_client/entities.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ class MessageProperties:
161161
Attributes:
162162
message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str).
163163
user_id: Identity of the user responsible for producing the message.
164-
to: Intended destination node of the message.
165164
subject: Summary information about the message content and purpose.
166165
reply_to: Address of the node to send replies to.
167166
correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str).
@@ -176,7 +175,6 @@ class MessageProperties:
176175

177176
message_id: Optional[Union[int, str, bytes]] = None
178177
user_id: Optional[bytes] = None
179-
to: Optional[str] = None
180178
subject: Optional[str] = None
181179
reply_to: Optional[str] = None
182180
correlation_id: Optional[Union[int, str, bytes]] = None

tests/test_streams.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,82 @@ def test_stream_filter_application_properties(
536536
management.delete_queue(stream_name)
537537

538538

539+
class MyMessageHandlerSQLFilter(AMQPMessagingHandler):
540+
def __init__(
541+
self,
542+
):
543+
super().__init__()
544+
545+
def on_message(self, event: Event):
546+
self.delivery_context.accept(event)
547+
assert event.message.body == Converter.string_to_bytes("the_right_one_sql")
548+
assert event.message.subject == "something_in_the_filter"
549+
assert event.message.reply_to == "the_reply_to"
550+
assert (
551+
event.message.application_properties["a_in_the_filter_key"]
552+
== "a_in_the_filter_value"
553+
)
554+
555+
raise ConsumerTestException("consumed")
556+
557+
558+
def test_stream_filter_sql(connection: Connection, environment: Environment) -> None:
559+
consumer = None
560+
stream_name = "test_stream_filter_sql"
561+
messages_to_send = 30
562+
563+
queue_specification = StreamSpecification(
564+
name=stream_name,
565+
)
566+
management = connection.management()
567+
management.delete_queue(stream_name)
568+
management.declare_queue(queue_specification)
569+
570+
addr_queue = AddressHelper.queue_address(stream_name)
571+
sql = (
572+
"properties.subject LIKE '%in_the_filter%' AND properties.reply_to = 'the_reply_to' "
573+
"AND a_in_the_filter_key = 'a_in_the_filter_value'"
574+
)
575+
try:
576+
connection_consumer = environment.connection()
577+
connection_consumer.dial()
578+
consumer = connection_consumer.consumer(
579+
addr_queue,
580+
message_handler=MyMessageHandlerSQLFilter(),
581+
stream_consumer_options=StreamConsumerOptions(
582+
filter_options=StreamFilterOptions(sql=sql)
583+
),
584+
)
585+
publisher = connection.publisher(addr_queue)
586+
# won't match
587+
for i in range(messages_to_send):
588+
msg = Message(
589+
body=Converter.string_to_bytes("hello_{}".format(i)),
590+
)
591+
publisher.publish(msg)
592+
593+
msqMatch = Message(
594+
body=Converter.string_to_bytes("the_right_one_sql"),
595+
subject="something_in_the_filter",
596+
reply_to="the_reply_to",
597+
application_properties={"a_in_the_filter_key": "a_in_the_filter_value"},
598+
)
599+
600+
publisher.publish(msqMatch)
601+
602+
publisher.close()
603+
604+
consumer.run()
605+
# ack to terminate the consumer
606+
except ConsumerTestException:
607+
pass
608+
609+
if consumer is not None:
610+
consumer.close()
611+
612+
management.delete_queue(stream_name)
613+
614+
539615
class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler):
540616
def __init__(
541617
self,

0 commit comments

Comments
 (0)