Skip to content

Conversation

@Ifechukwu001
Copy link
Contributor

@Ifechukwu001 Ifechukwu001 commented Apr 18, 2025

The PR implements FastStream for RabbitMQ communications.

To publish a message to RabbitMQ,

   from src.config.asgi import broker
   
   async publish_message(queue: str, message: [JSON serializable object (str, int, dict, list)]) -> None:
       await broker.publish(message, queue, persist=True)
       # Successfully published

To register a consumer from RabbitMQ,

    # src/api/services/ExampleService.py
    from faststream.rabbit import RabbitRouter
    
    EXAMPLE_ROUTER = RabbitRouter()
    
    # In a function
    @EXAMPLE_ROUTER.subscriber("queue")
    async def consume(message: [JSON serializable object (it will be checked as it uses pydantic to decode the message)]) -> None:
        print(message)
        
    # In a class
    class Example:
        
        @staticmethod
        @EXAMPLE_ROUTER.subscriber("queue")
        async def (message: [JSON serializable object (it will be checked as it uses pydantic to decode the message)]) -> None:
            print(message)

Then register the consumer in the src.api.services.external.RabbitMQRoutes as below

    # src/api/services/external/RabbitMQRoutes.py
    from src.config.asgi import broker
    
    from ..ExampleService import EXAMPLE_ROUTER
    
    
    broker.include_router(EXAMPLE_ROUTER)

@CaptainAril CaptainAril merged commit 9d19398 into Developer-s-Foundry:main Apr 19, 2025
2 checks passed
@Ifechukwu001 Ifechukwu001 deleted the feat/rabbitmq branch April 19, 2025 07:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants