Skip to content
This repository was archived by the owner on Feb 22, 2022. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion funcx_websocket_service/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def cli():
rabbitmq_host = os.environ.get("FUNCX_RABBITMQ_SERVICE_HOST", "127.0.0.1")
RABBITMQ_URI = f"amqp://funcx:rabbitmq@{rabbitmq_host}/"

RABBITMQ_QUEUE_TTL = os.environ.get("RABBITMQ_QUEUE_TTL")
if not RABBITMQ_QUEUE_TTL:
RABBITMQ_QUEUE_TTL = 604800

WEB_SERVICE_URI = os.environ.get("WEB_SERVICE_URI")

if REDIS_HOST is None:
Expand All @@ -49,14 +53,16 @@ def cli():
logger.info("Starting WebSocket Server")
logger.debug(
f"Using redis host: {REDIS_HOST}, redis port: {REDIS_PORT}, "
f"RabbitMQ uri: {RABBITMQ_URI}, web service URI: {WEB_SERVICE_URI}"
f"RabbitMQ uri: {RABBITMQ_URI}, web service URI: {WEB_SERVICE_URI}, "
f"RabbitMQ queue TTL: {RABBITMQ_QUEUE_TTL}"
)

WebSocketServer(
REDIS_HOST,
REDIS_PORT,
RABBITMQ_URI,
WEB_SERVICE_URI,
int(RABBITMQ_QUEUE_TTL),
)
except Exception:
logger.exception("Caught exception while starting server")
Expand Down
17 changes: 11 additions & 6 deletions funcx_websocket_service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
redis_port: str,
rabbitmq_uri: str,
web_service_uri: str,
rabbitmq_queue_ttl: int,
):
"""Initialize and run the server

Expand All @@ -51,16 +52,15 @@ def __init__(
web_service_uri : str
Web Service URI to use, likely an internal k8s DNS name

s3_bucket_name : str
Name of S3 bucket where results could be stored

redis_storage_threshold : int
Redis max storage threshold size for results
rabbitmq_queue_ttl : int
RabbitMQ queue TTL in seconds
(must match forwarder rabbitmq_queue_ttl)
"""
self.redis_host = redis_host
self.redis_port = redis_port
self.rabbitmq_uri = rabbitmq_uri
self.funcx_service_address = f"{web_service_uri}/v2"
self.rabbitmq_queue_ttl = rabbitmq_queue_ttl
logger.info(f"funcx_service_address : {self.funcx_service_address}")
self.auth_client = AuthClient(self.funcx_service_address)

Expand Down Expand Up @@ -359,11 +359,16 @@ async def mq_receive(self, ws_conn: WebSocketConnection, task_group_id: str):
)

async with mq_connection:
# This argument expects milliseconds, so multiply by 1000
queue_args = {
"x-expires": int(self.rabbitmq_queue_ttl * 1000),
}

channel = await mq_connection.channel()
exchange = await channel.declare_exchange(
"tasks", aio_pika.ExchangeType.DIRECT
)
queue = await channel.declare_queue(task_group_id)
queue = await channel.declare_queue(task_group_id, arguments=queue_args)
await queue.bind(exchange, routing_key=task_group_id)

async with queue.iterator() as queue_iter:
Expand Down