Skip to content

Commit 6d2e1d5

Browse files
committed
Add benchmark
1 parent fcc65b3 commit 6d2e1d5

File tree

6 files changed

+12064
-8
lines changed

6 files changed

+12064
-8
lines changed

components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,12 @@ def handle_pending_query_jobs(
911911
orchestration_type,
912912
spider_driver,
913913
)
914-
logger.info("Dispatched search job.")
914+
logger.info(
915+
"Dispatched search job %s at %s with %d tasks.",
916+
job.id,
917+
job.start_time.isoformat(),
918+
len(archives_for_search),
919+
)
915920

916921
return reducer_acquisition_tasks
917922

@@ -1037,20 +1042,37 @@ async def handle_finished_search_job(
10371042

10381043
# We set the status regardless of the job's previous status to handle the case where the
10391044
# job is cancelled (status = CANCELLING) while we're in this method.
1045+
end_time = datetime.datetime.now()
1046+
duration = (end_time - job.start_time).total_seconds()
10401047
if set_job_or_task_status(
10411048
db_conn,
10421049
QUERY_JOBS_TABLE_NAME,
10431050
job_id,
10441051
new_job_status,
10451052
num_tasks_completed=job.num_archives_searched,
1046-
duration=(datetime.datetime.now() - job.start_time).total_seconds(),
1053+
duration=duration,
10471054
):
10481055
if new_job_status == QueryJobStatus.SUCCEEDED:
1049-
logger.info("Completed job.")
1056+
logger.info(
1057+
"Completed job %s at %s, total duration=%.2fs.",
1058+
job_id,
1059+
end_time.isoformat(),
1060+
duration,
1061+
)
10501062
elif reducer_failed:
1051-
logger.error("Completed job with failing reducer.")
1063+
logger.error(
1064+
"Completed job %s at %s with failing reducer, total duration=%.2fs.",
1065+
job_id,
1066+
end_time.isoformat(),
1067+
duration,
1068+
)
10521069
else:
1053-
logger.info("Completed job with failing tasks.")
1070+
logger.info(
1071+
"Completed job %s at %s with failing tasks, total duration=%.2fs.",
1072+
job_id,
1073+
end_time.isoformat(),
1074+
duration,
1075+
)
10541076
del active_jobs[job_id]
10551077

10561078

components/job-orchestration/job_orchestration/scheduler/query/spider_query_scheduler.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515

1616
import datetime
1717
import json
18-
from logging import getLogger
1918
from typing import Any, TYPE_CHECKING
2019

20+
from clp_py_utils.clp_logging import get_logger
21+
2122
from spider_py import Int64
2223
from spider_py.client import Channel, channel_task, Driver, group, Job, TaskGraph
2324
from spider_py.core import JobStatus
@@ -36,7 +37,7 @@
3637
if TYPE_CHECKING:
3738
from job_orchestration.scheduler.job_config import SearchJobConfig
3839

39-
logger = getLogger(__name__)
40+
logger = get_logger("search-job-handler")
4041

4142

4243
def build_search_task_graph(num_archives: int, has_aggregation: bool) -> TaskGraph:
@@ -245,11 +246,27 @@ def dispatch_search_job( # noqa: PLR0913
245246
has_aggregation=has_aggregation,
246247
)
247248

248-
logger.info("Submitting Spider job.")
249+
submit_start_time = datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)
250+
logger.info(
251+
"Submitting Spider job %s at %s with %d search tasks, aggregation=%s",
252+
job_id,
253+
submit_start_time.isoformat(),
254+
len(archives),
255+
has_aggregation,
256+
)
249257

250258
# Submit job to Spider
251259
jobs = driver.submit_jobs([task_graph], [tuple(job_inputs)])
252260

261+
submit_end_time = datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)
262+
submit_duration = (submit_end_time - submit_start_time).total_seconds()
263+
logger.info(
264+
"Submitted Spider job %s at %s, submission took %.3fs",
265+
job_id,
266+
submit_end_time.isoformat(),
267+
submit_duration,
268+
)
269+
253270
if not jobs:
254271
raise SpiderJobSubmitError(job_id)
255272

0 commit comments

Comments
 (0)