diff --git a/funcx_websocket_service/application.py b/funcx_websocket_service/application.py index b290731..47d70cf 100644 --- a/funcx_websocket_service/application.py +++ b/funcx_websocket_service/application.py @@ -35,6 +35,10 @@ def cli(): rabbitmq_host = os.environ.get("FUNCX_RABBITMQ_SERVICE_HOST", "127.0.0.1") RABBITMQ_URI = f"amqp://funcx:rabbitmq@{rabbitmq_host}/" + RABBITMQ_QUEUE_TTL = os.environ.get("RABBITMQ_QUEUE_TTL") + if not RABBITMQ_QUEUE_TTL: + RABBITMQ_QUEUE_TTL = 604800 + WEB_SERVICE_URI = os.environ.get("WEB_SERVICE_URI") if REDIS_HOST is None: @@ -49,7 +53,8 @@ def cli(): logger.info("Starting WebSocket Server") logger.debug( f"Using redis host: {REDIS_HOST}, redis port: {REDIS_PORT}, " - f"RabbitMQ uri: {RABBITMQ_URI}, web service URI: {WEB_SERVICE_URI}" + f"RabbitMQ uri: {RABBITMQ_URI}, web service URI: {WEB_SERVICE_URI}, " + f"RabbitMQ queue TTL: {RABBITMQ_QUEUE_TTL}" ) WebSocketServer( @@ -57,6 +62,7 @@ def cli(): REDIS_PORT, RABBITMQ_URI, WEB_SERVICE_URI, + int(RABBITMQ_QUEUE_TTL), ) except Exception: logger.exception("Caught exception while starting server") diff --git a/funcx_websocket_service/server.py b/funcx_websocket_service/server.py index 6681fef..2927122 100644 --- a/funcx_websocket_service/server.py +++ b/funcx_websocket_service/server.py @@ -34,6 +34,7 @@ def __init__( redis_port: str, rabbitmq_uri: str, web_service_uri: str, + rabbitmq_queue_ttl: int, ): """Initialize and run the server @@ -51,16 +52,15 @@ def __init__( web_service_uri : str Web Service URI to use, likely an internal k8s DNS name - s3_bucket_name : str - Name of S3 bucket where results could be stored - - redis_storage_threshold : int - Redis max storage threshold size for results + rabbitmq_queue_ttl : int + RabbitMQ queue TTL in seconds + (must match forwarder rabbitmq_queue_ttl) """ self.redis_host = redis_host self.redis_port = redis_port self.rabbitmq_uri = rabbitmq_uri self.funcx_service_address = f"{web_service_uri}/v2" + self.rabbitmq_queue_ttl = rabbitmq_queue_ttl logger.info(f"funcx_service_address : {self.funcx_service_address}") self.auth_client = AuthClient(self.funcx_service_address) @@ -359,11 +359,16 @@ async def mq_receive(self, ws_conn: WebSocketConnection, task_group_id: str): ) async with mq_connection: + # This argument expects milliseconds, so multiply by 1000 + queue_args = { + "x-expires": int(self.rabbitmq_queue_ttl * 1000), + } + channel = await mq_connection.channel() exchange = await channel.declare_exchange( "tasks", aio_pika.ExchangeType.DIRECT ) - queue = await channel.declare_queue(task_group_id) + queue = await channel.declare_queue(task_group_id, arguments=queue_args) await queue.bind(exchange, routing_key=task_group_id) async with queue.iterator() as queue_iter: