Skip to content

Commit 3ddc58c

Browse files
committed
test
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 9fb4b41 commit 3ddc58c

File tree

1 file changed

+140
-0
lines changed

1 file changed

+140
-0
lines changed

tests/test_streams.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
from rabbitmq_amqp_python_client import (
24
AddressHelper,
35
AMQPMessagingHandler,
@@ -471,3 +473,141 @@ def test_stream_filter_message_properties(
471473
consumer.close()
472474

473475
management.delete_queue(stream_name)
476+
477+
478+
class MyMessageHandlerApplicationPropertiesFilter(AMQPMessagingHandler):
479+
def __init__(
480+
self,
481+
):
482+
super().__init__()
483+
484+
def on_message(self, event: Event):
485+
self.delivery_context.accept(event)
486+
assert event.message.application_properties == {"key": "value_17"}
487+
raise ConsumerTestException("consumed")
488+
489+
490+
def test_stream_filter_application_properties(
491+
connection: Connection, environment: Environment
492+
) -> None:
493+
consumer = None
494+
stream_name = "test_stream_application_message_properties"
495+
messages_to_send = 30
496+
497+
queue_specification = StreamSpecification(
498+
name=stream_name,
499+
)
500+
management = connection.management()
501+
management.declare_queue(queue_specification)
502+
503+
addr_queue = AddressHelper.queue_address(stream_name)
504+
505+
# consume and then publish
506+
try:
507+
connection_consumer = environment.connection()
508+
connection_consumer.dial()
509+
consumer = connection_consumer.consumer(
510+
addr_queue,
511+
message_handler=MyMessageHandlerApplicationPropertiesFilter(),
512+
stream_consumer_options=StreamConsumerOptions(
513+
filter_options=StreamFilterOptions(
514+
application_properties={"key": "value_17"},
515+
)
516+
),
517+
)
518+
publisher = connection.publisher(addr_queue)
519+
for i in range(messages_to_send):
520+
msg = Message(
521+
body=Converter.string_to_bytes("hello_{}".format(i)),
522+
application_properties={"key": "value_{}".format(i)},
523+
)
524+
publisher.publish(msg)
525+
526+
publisher.close()
527+
528+
consumer.run()
529+
# ack to terminate the consumer
530+
except ConsumerTestException:
531+
pass
532+
533+
if consumer is not None:
534+
consumer.close()
535+
536+
management.delete_queue(stream_name)
537+
538+
539+
class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler):
540+
def __init__(
541+
self,
542+
):
543+
super().__init__()
544+
545+
def on_message(self, event: Event):
546+
self.delivery_context.accept(event)
547+
assert event.message.annotations == {"x-stream-filter-value": "my_value"}
548+
assert event.message.application_properties == {"key": "value_17"}
549+
assert event.message.subject == "important_15"
550+
assert event.message.body == Converter.string_to_bytes("the_right_one")
551+
raise ConsumerTestException("consumed")
552+
553+
554+
def test_stream_filter_mixing_different(
555+
connection: Connection, environment: Environment
556+
) -> None:
557+
consumer = None
558+
stream_name = "test_stream_filter_mixing_different"
559+
messages_to_send = 30
560+
561+
queue_specification = StreamSpecification(
562+
name=stream_name,
563+
)
564+
management = connection.management()
565+
management.declare_queue(queue_specification)
566+
567+
addr_queue = AddressHelper.queue_address(stream_name)
568+
569+
# consume and then publish
570+
try:
571+
connection_consumer = environment.connection()
572+
connection_consumer.dial()
573+
consumer = connection_consumer.consumer(
574+
addr_queue,
575+
message_handler=MyMessageHandlerApplicationPropertiesFilter(),
576+
stream_consumer_options=StreamConsumerOptions(
577+
filter_options=StreamFilterOptions(
578+
values=["my_value"],
579+
application_properties={"key": "value_17"},
580+
message_properties=MessageProperties(subject="important_15"),
581+
)
582+
),
583+
)
584+
publisher = connection.publisher(addr_queue)
585+
# all these messages will be filtered out
586+
for i in range(messages_to_send):
587+
msg = Message(
588+
body=Converter.string_to_bytes("hello_{}".format(i)),
589+
)
590+
publisher.publish(msg)
591+
592+
time.sleep(
593+
0.5
594+
) # wait a bit to ensure messages are published in different chunks
595+
msg = Message(
596+
body=Converter.string_to_bytes("the_right_one"),
597+
annotations={"x-stream-filter-value": "my_value"},
598+
application_properties={"key": "value_17"},
599+
subject="important_15",
600+
)
601+
publisher.publish(msg)
602+
603+
publisher.close()
604+
605+
consumer.run()
606+
# ack to terminate the consumer
607+
except ConsumerTestException:
608+
pass
609+
610+
if consumer is not None:
611+
consumer.close()
612+
613+
management.delete_queue(stream_name)

0 commit comments

Comments
 (0)