Skip to content

Commit 9e10b99

Browse files
feat(job-orchestration): Add Spider compression task; Move Celery compression task into a dedicated submodule. (#1340)
Co-authored-by: LinZhihao-723 <[email protected]>
1 parent 52d7848 commit 9e10b99

File tree

7 files changed

+115
-19
lines changed

7 files changed

+115
-19
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from celery import signals
2+
from celery.app.task import Task
3+
from celery.utils.log import get_task_logger
4+
from job_orchestration.executor.compress.celery import app
5+
from job_orchestration.executor.compress.compression_task import compression_entry_point
6+
7+
# Setup logging
8+
logger = get_task_logger(__name__)
9+
10+
11+
@signals.worker_shutdown.connect
12+
def worker_shutdown_handler(signal=None, sender=None, **kwargs):
13+
logger.info("Shutdown signal received.")
14+
15+
16+
@app.task(bind=True)
17+
def compress(
18+
self: Task,
19+
job_id: int,
20+
task_id: int,
21+
tag_ids,
22+
clp_io_config_json: str,
23+
paths_to_compress_json: str,
24+
clp_metadata_db_connection_config,
25+
):
26+
return compression_entry_point(
27+
job_id,
28+
task_id,
29+
tag_ids,
30+
clp_io_config_json,
31+
paths_to_compress_json,
32+
clp_metadata_db_connection_config,
33+
logger,
34+
)

components/job-orchestration/job_orchestration/executor/compress/celeryconfig.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
# Force workers to consume only one task at a time
77
worker_prefetch_multiplier = 1
88
imports = [
9-
"job_orchestration.executor.compress.compression_task",
9+
"job_orchestration.executor.compress.celery_compress",
1010
]
1111

1212
# Queue settings
1313
task_queue_max_priority = TASK_QUEUE_HIGHEST_PRIORITY
1414
task_routes = {
15-
"job_orchestration.executor.compress.compression_task.compress": SchedulerType.COMPRESSION,
15+
"job_orchestration.executor.compress.celery_compress.compress": SchedulerType.COMPRESSION,
1616
}
1717
task_create_missing_queues = True
1818

components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
from contextlib import closing
77
from typing import Any, Dict, List, Optional, Tuple
88

9-
from celery import signals
10-
from celery.app.task import Task
11-
from celery.utils.log import get_task_logger
129
from clp_py_utils.clp_config import (
1310
CLP_DB_PASS_ENV_VAR_NAME,
1411
CLP_DB_USER_ENV_VAR_NAME,
@@ -32,7 +29,6 @@
3229
s3_put,
3330
)
3431
from clp_py_utils.sql_adapter import SQL_Adapter
35-
from job_orchestration.executor.compress.celery import app
3632
from job_orchestration.scheduler.constants import CompressionTaskStatus
3733
from job_orchestration.scheduler.job_config import (
3834
ClpIoConfig,
@@ -42,9 +38,6 @@
4238
)
4339
from job_orchestration.scheduler.scheduler_data import CompressionTaskResult
4440

45-
# Setup logging
46-
logger = get_task_logger(__name__)
47-
4841

4942
def update_compression_task_metadata(db_cursor, task_id, kv):
5043
if not len(kv):
@@ -322,6 +315,7 @@ def run_clp(
322315
paths_to_compress: PathsToCompress,
323316
sql_adapter: SQL_Adapter,
324317
clp_metadata_db_connection_config,
318+
logger,
325319
):
326320
"""
327321
Compresses logs into archives.
@@ -336,6 +330,7 @@ def run_clp(
336330
:param paths_to_compress: PathToCompress
337331
:param sql_adapter: SQL_Adapter
338332
:param clp_metadata_db_connection_config
333+
:param logger
339334
:return: tuple -- (whether compression was successful, output messages)
340335
"""
341336
instance_id_str = f"compression-job-{job_id}-task-{task_id}"
@@ -523,20 +518,14 @@ def run_clp(
523518
return CompressionTaskStatus.FAILED, worker_output
524519

525520

526-
@signals.worker_shutdown.connect
527-
def worker_shutdown_handler(signal=None, sender=None, **kwargs):
528-
logger.info("Shutdown signal received.")
529-
530-
531-
@app.task(bind=True)
532-
def compress(
533-
self: Task,
521+
def compression_entry_point(
534522
job_id: int,
535523
task_id: int,
536524
tag_ids,
537525
clp_io_config_json: str,
538526
paths_to_compress_json: str,
539-
clp_metadata_db_connection_config,
527+
clp_metadata_db_connection_config: Dict[str, Any],
528+
logger,
540529
):
541530
clp_home = pathlib.Path(os.getenv("CLP_HOME"))
542531

@@ -578,6 +567,7 @@ def compress(
578567
paths_to_compress,
579568
sql_adapter,
580569
clp_metadata_db_connection_config,
570+
logger,
581571
)
582572
duration = (datetime.datetime.now() - start_time).total_seconds()
583573
logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.")
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import json
2+
3+
from clp_py_utils.clp_logging import get_logger
4+
from job_orchestration.executor.compress.compression_task import compression_entry_point
5+
from job_orchestration.utils.spider_utils import int8_list_to_utf8_str, utf8_str_to_int8_list
6+
from spider_py import Int8, Int64, TaskContext
7+
8+
# Setup logging
9+
logger = get_logger("spider_compression")
10+
11+
12+
def compress(
13+
_: TaskContext,
14+
job_id: Int64,
15+
task_id: Int64,
16+
tag_ids: list[Int64],
17+
clp_io_config_json: list[Int8],
18+
paths_to_compress_json: list[Int8],
19+
clp_metadata_db_connection_config_json: list[Int8],
20+
) -> list[Int8]:
21+
"""
22+
Compresses files using the general compression entry point.
23+
:param _: Spider's task context. Not used in the function.
24+
:param job_id:
25+
:param task_id:
26+
:param tag_ids:
27+
:param clp_io_config_json: A JSON string representation of
28+
`job_orchestration.scheduler.constants.ClpIoConfig`.
29+
:param paths_to_compress_json: A JSON string representation of
30+
`job_orchestration.scheduler.constants.PathToCompress`.
31+
:param clp_metadata_db_connection_config_json: A JSON string representation of
32+
`clp_py_utils.clp_config.Database`.
33+
:return: A JSON string representation of
34+
`job_orchestration.scheduler.constants.CompressionTaskResult`, encoded as a list of
35+
`spider_py.Int8`.
36+
"""
37+
result_as_json_str = json.dumps(
38+
compression_entry_point(
39+
int(job_id),
40+
int(task_id),
41+
[int(tag_id) for tag_id in tag_ids],
42+
int8_list_to_utf8_str(clp_io_config_json),
43+
int8_list_to_utf8_str(paths_to_compress_json),
44+
json.loads(int8_list_to_utf8_str(clp_metadata_db_connection_config_json)),
45+
logger,
46+
)
47+
)
48+
49+
return utf8_str_to_int8_list(result_as_json_str)

components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from clp_py_utils.core import read_yaml_config_file
3131
from clp_py_utils.s3_utils import s3_get_object_metadata
3232
from clp_py_utils.sql_adapter import SQL_Adapter
33-
from job_orchestration.executor.compress.compression_task import compress
33+
from job_orchestration.executor.compress.celery_compress import compress
3434
from job_orchestration.scheduler.compress.partition import PathsToCompressBuffer
3535
from job_orchestration.scheduler.constants import (
3636
CompressionJobStatus,

components/job-orchestration/job_orchestration/utils/__init__.py

Whitespace-only changes.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from __future__ import annotations
2+
3+
import spider_py
4+
5+
6+
def int8_list_to_utf8_str(byte_list: list[spider_py.Int8]) -> str:
7+
"""
8+
Converts a list of `spider_py.Int8` values to a UTF-8 encoded string.
9+
10+
:param byte_list:
11+
:return: Decoded UTF-8 string constructed from the input byte list.
12+
"""
13+
return bytes(int(byte) for byte in byte_list).decode("utf-8")
14+
15+
16+
def utf8_str_to_int8_list(utf8_str: str) -> list[spider_py.Int8]:
17+
"""
18+
Converts a UTF-8 encoded string to a list of `spider_py.Int8` values.
19+
20+
:param utf8_str:
21+
:return: A list of `spider_py.Int8` values representing the input string.
22+
"""
23+
return [spider_py.Int8(byte) for byte in utf8_str.encode("utf-8")]

0 commit comments

Comments
 (0)