forked from y-scope/clp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
124 lines (108 loc) · 4.06 KB
/
utils.py
File metadata and controls
124 lines (108 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio
import multiprocessing
import time
from contextlib import closing
import msgpack
from clp_py_utils.clp_config import (
Database,
QUERY_JOBS_TABLE_NAME,
)
from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets
from clp_py_utils.sql_adapter import SqlAdapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.scheduler_data import QueryJobConfig
async def run_function_in_process(function, *args, initializer=None, init_args=None):
"""
Runs the given function in a separate process wrapped in a *cancellable*
asyncio task. This is necessary because asyncio's multiprocessing process
cannot be cancelled once it's started.
:param function: Method to run
:param args: Arguments for the method
:param initializer: Initializer for each process in the pool
:param init_args: Arguments for the initializer
:return: Return value of the method
"""
pool = multiprocessing.Pool(1, initializer, init_args)
loop = asyncio.get_event_loop()
fut = loop.create_future()
def process_done_callback(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def process_error_callback(err):
loop.call_soon_threadsafe(fut.set_exception, err)
pool.apply_async(
function, args, callback=process_done_callback, error_callback=process_error_callback
)
try:
return await fut
except asyncio.CancelledError:
pass
finally:
pool.terminate()
pool.close()
def submit_query_job(
sql_adapter: SqlAdapter, job_config: QueryJobConfig, job_type: QueryJobType
) -> int:
"""
Submits a query job.
:param sql_adapter:
:param job_config:
:param job_type:
:return: The job's ID.
"""
with (
closing(sql_adapter.create_connection(True)) as db_conn,
closing(db_conn.cursor(dictionary=True)) as db_cursor,
):
# Create job
db_cursor.execute(
f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`, `type`) VALUES (%s, %s)",
(msgpack.packb(job_config.model_dump()), job_type),
)
db_conn.commit()
return db_cursor.lastrowid
def validate_datasets_exist(db_config: Database, datasets: list[str]) -> None:
"""
Validates that all datasets in `datasets` exist in the metadata database.
:param db_config:
:param datasets:
:raise: ValueError if any dataset doesn't exist.
"""
sql_adapter = SqlAdapter(db_config)
clp_db_connection_params = db_config.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
with (
closing(sql_adapter.create_connection(True)) as db_conn,
closing(db_conn.cursor(dictionary=True)) as db_cursor,
):
existing_datasets = fetch_existing_datasets(db_cursor, table_prefix)
for dataset in datasets:
if dataset not in existing_datasets:
raise ValueError(f"Dataset `{dataset}` doesn't exist.")
def wait_for_query_job(sql_adapter: SqlAdapter, job_id: int) -> QueryJobStatus:
"""
Waits for the query job with the given ID to complete.
:param sql_adapter:
:param job_id:
:return: The job's status on completion.
"""
with (
closing(sql_adapter.create_connection(True)) as db_conn,
closing(db_conn.cursor(dictionary=True)) as db_cursor,
):
# Wait for the job to be marked complete
while True:
db_cursor.execute(
f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = {job_id}"
)
# There will only ever be one row since it's impossible to have more than one job with
# the same ID
new_status = QueryJobStatus(db_cursor.fetchall()[0]["status"])
db_conn.commit()
if new_status in (
QueryJobStatus.SUCCEEDED,
QueryJobStatus.FAILED,
QueryJobStatus.CANCELLED,
QueryJobStatus.KILLED,
):
return new_status
time.sleep(0.5)