Skip to content

Commit 23a05f3

Browse files
authored
feat(package): Add support for gracefully shutting down the compression scheduler and compression workers (resolves #1037). (#1169)
1 parent b20b37d commit 23a05f3

File tree

3 files changed

+38
-13
lines changed

3 files changed

+38
-13
lines changed

components/clp-package-utils/clp_package_utils/scripts/stop_clp.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434
logger = logging.getLogger(__file__)
3535

3636

37-
def stop_running_container(container_name: str, already_exited_containers: List[str], force: bool):
37+
def stop_running_container(
38+
container_name: str, already_exited_containers: List[str], force: bool, timeout: int = 10
39+
):
3840
if is_container_running(container_name):
3941
logger.info(f"Stopping {container_name}...")
40-
cmd = ["docker", "stop", container_name]
42+
cmd = ["docker", "stop", "--timeout", str(timeout), container_name]
4143
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)
4244

4345
logger.info(f"Removing {container_name}...")
@@ -148,9 +150,6 @@ def main(argv):
148150
if target in (ALL_TARGET_NAME, QUERY_WORKER_COMPONENT_NAME):
149151
container_name = f"clp-{QUERY_WORKER_COMPONENT_NAME}-{instance_id}"
150152
stop_running_container(container_name, already_exited_containers, force)
151-
if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME):
152-
container_name = f"clp-{COMPRESSION_WORKER_COMPONENT_NAME}-{instance_id}"
153-
stop_running_container(container_name, already_exited_containers, force)
154153
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, QUERY_SCHEDULER_COMPONENT_NAME):
155154
container_name = f"clp-{QUERY_SCHEDULER_COMPONENT_NAME}-{instance_id}"
156155
stop_running_container(container_name, already_exited_containers, force)
@@ -164,11 +163,14 @@ def main(argv):
164163
COMPRESSION_SCHEDULER_COMPONENT_NAME,
165164
):
166165
container_name = f"clp-{COMPRESSION_SCHEDULER_COMPONENT_NAME}-{instance_id}"
167-
stop_running_container(container_name, already_exited_containers, force)
166+
stop_running_container(container_name, already_exited_containers, force, timeout=300)
168167

169168
container_config_file_path = logs_dir / f"{container_name}.yml"
170169
if container_config_file_path.exists():
171170
container_config_file_path.unlink()
171+
if target in (ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME):
172+
container_name = f"clp-{COMPRESSION_WORKER_COMPONENT_NAME}-{instance_id}"
173+
stop_running_container(container_name, already_exited_containers, force, timeout=60)
172174
if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, REDIS_COMPONENT_NAME):
173175
container_name = f"clp-{REDIS_COMPONENT_NAME}-{instance_id}"
174176
stop_running_container(container_name, already_exited_containers, force)

components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from contextlib import closing
77
from typing import Any, Dict, List, Optional, Tuple
88

9+
from celery import signals
910
from celery.app.task import Task
1011
from celery.utils.log import get_task_logger
1112
from clp_py_utils.clp_config import (
@@ -523,6 +524,11 @@ def run_clp(
523524
return CompressionTaskStatus.FAILED, worker_output
524525

525526

527+
@signals.worker_shutdown.connect
528+
def worker_shutdown_handler(signal=None, sender=None, **kwargs):
529+
logger.info("Shutdown signal received.")
530+
531+
526532
@app.task(bind=True)
527533
def compress(
528534
self: Task,

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import datetime
33
import logging
44
import os
5+
import signal
56
import sys
67
import time
78
from contextlib import closing
@@ -55,6 +56,14 @@
5556

5657
scheduled_jobs = {}
5758

59+
received_sigterm = False
60+
61+
62+
def sigterm_handler(signal_number, frame):
63+
global received_sigterm
64+
received_sigterm = True
65+
logger.info("Received SIGTERM.")
66+
5867

5968
def fetch_new_jobs(db_cursor):
6069
db_cursor.execute(
@@ -451,6 +460,10 @@ def poll_running_jobs(db_conn, db_cursor):
451460
for job_id in jobs_to_delete:
452461
del scheduled_jobs[job_id]
453462

463+
if received_sigterm and 0 == len(scheduled_jobs):
464+
logger.info("Recieved SIGTERM and there're no more running jobs. Exiting.")
465+
sys.exit(0)
466+
454467

455468
def main(argv):
456469
args_parser = argparse.ArgumentParser()
@@ -466,6 +479,9 @@ def main(argv):
466479
# Update logging level based on config
467480
set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL"))
468481

482+
# Register the SIGTERM handler
483+
signal.signal(signal.SIGTERM, sigterm_handler)
484+
469485
# Load configuration
470486
config_path = Path(args.config)
471487
try:
@@ -500,16 +516,17 @@ def main(argv):
500516
# Start Job Processing Loop
501517
while True:
502518
try:
503-
search_and_schedule_new_tasks(
504-
clp_config,
505-
db_conn,
506-
db_cursor,
507-
clp_metadata_db_connection_config,
508-
)
519+
if not received_sigterm:
520+
search_and_schedule_new_tasks(
521+
clp_config,
522+
db_conn,
523+
db_cursor,
524+
clp_metadata_db_connection_config,
525+
)
509526
poll_running_jobs(db_conn, db_cursor)
510527
time.sleep(clp_config.compression_scheduler.jobs_poll_delay)
511528
except KeyboardInterrupt:
512-
logger.info("Gracefully shutting down")
529+
logger.info("Forcefully shutting down")
513530
return -1
514531
except Exception:
515532
logger.exception(f"Error in scheduling.")

0 commit comments

Comments
 (0)