Skip to content
This repository was archived by the owner on Feb 22, 2022. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ if [[ -z "${ENDPOINT_BASE_PORT}" ]]; then
ENDPOINT_BASE_PORT=55001
fi

if [[ -z "${RABBITMQ_QUEUE_TTL}" ]]; then
RABBITMQ_QUEUE_TTL_OPT=""
else
RABBITMQ_QUEUE_TTL_OPT="--rabbitmq_queue_ttl $RABBITMQ_QUEUE_TTL"
fi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we just ripped out the equivalent for the result TTL, I'm wondering if we can remove this layer of indirection here?

I'd rather see the forwarder pick up on any env var we want directly. It removes this entrypoint blob as a thing we need to modify/maintain at all.

Ultimately, the entrypoint.sh script could boil down all the way to

#!/bin/bash -e
forwarder-service


python3 wait_for_redis.py

if [[ -z "${ADVERTISED_FORWARDER_ADDRESS}" ]]; then
Expand All @@ -25,4 +31,4 @@ if [[ -z "${ADVERTISED_FORWARDER_ADDRESS}" ]]; then
fi


forwarder-service -a $ADVERTISED_FORWARDER_ADDRESS -p 8080 --redishost $REDIS_HOST --redisport $REDIS_PORT --rabbitmquri $RABBITMQ_URI -d --endpoint-base-port ${ENDPOINT_BASE_PORT}
forwarder-service -a $ADVERTISED_FORWARDER_ADDRESS -p 8080 --redishost $REDIS_HOST --redisport $REDIS_PORT --rabbitmquri $RABBITMQ_URI -d --endpoint-base-port ${ENDPOINT_BASE_PORT} $RABBITMQ_QUEUE_TTL_OPT
19 changes: 17 additions & 2 deletions funcx_forwarder/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def __init__(
response_queue,
address: str,
redis_address: str,
rabbitmq_conn_params,
rabbitmq_conn_params: pika.URLParameters,
rabbitmq_queue_ttl: int,
endpoint_ports=(55001, 55002, 55003),
redis_port: int = 6379,
logging_level=logging.INFO,
Expand All @@ -131,6 +132,13 @@ def __init__(
redis_address : str
full address to connect to redis. Required

rabbitmq_conn_params : pika.URLParameters
URL Params for RabbitMQ connection

rabbitmq_queue_ttl : int
RabbitMQ queue TTL in seconds
(must match websocket service rabbitmq_queue_ttl)

endpoint_ports : (int, int, int)
A triplet of ports: (tasks_port, results_port, commands_port)
Default: (55001, 55002, 55003)
Expand Down Expand Up @@ -159,6 +167,7 @@ def __init__(
self.address = address
self.redis_url = f"{redis_address}:{redis_port}"
self.rabbitmq_conn_params = rabbitmq_conn_params
self.rabbitmq_queue_ttl = rabbitmq_queue_ttl
self.tasks_port, self.results_port, self.commands_port = endpoint_ports
self.connected_endpoints: t.Dict[str, t.Dict[str, t.Any]] = {}
self.kill_event = Event()
Expand All @@ -177,6 +186,7 @@ def __init__(
logger.info(f"Initializing forwarder v{funcx_forwarder.__version__}")
logger.info(f"Forwarder running on public address: {self.address}")
logger.info(f"REDIS url: {self.redis_url}")
logger.info(f"RabbitMQ Queue TTL: {self.rabbitmq_queue_ttl}")
logger.info(f"Log level set to {loglevels[logging_level]}")

if not os.path.exists(self.keys_dir) or not os.listdir(self.keys_dir):
Expand Down Expand Up @@ -633,10 +643,15 @@ def handle_results(self):
# and the task result will not be acked if this fails
task_group_id = task.task_group_id
if task_group_id:
# This argument expects milliseconds, so multiply by 1000
queue_args = {
"x-expires": int(self.rabbitmq_queue_ttl * 1000),
}

connection = pika.BlockingConnection(self.rabbitmq_conn_params)
channel = connection.channel()
channel.exchange_declare(exchange="tasks", exchange_type="direct")
channel.queue_declare(queue=task_group_id)
channel.queue_declare(queue=task_group_id, arguments=queue_args)
channel.queue_bind(task_group_id, "tasks")

# important: the FuncX client must be capable of receiving the same
Expand Down
9 changes: 9 additions & 0 deletions funcx_forwarder/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ def cli_run():
parser.add_argument(
"-d", "--debug", action="store_true", help="Enables debug logging"
)
parser.add_argument(
"--rabbitmq_queue_ttl",
default=604800,
help=(
"Set RabbitMQ queue TTL in seconds "
"(must match websocket service queue TTL)"
),
)
parser.add_argument(
"-v", "--version", action="store_true", help="Print version information"
)
Expand Down Expand Up @@ -237,6 +245,7 @@ def cli_run():
args.address,
args.redishost,
rabbitmq_conn_params,
int(args.rabbitmq_queue_ttl),
endpoint_ports=range(args.endpoint_base_port, args.endpoint_base_port + 3),
logging_level=logging_level,
redis_port=args.redisport,
Expand Down