Skip to content

Commit 2632ca6

Browse files
Get and parse Slurm job status JSON object (flyteorg#3192)
* refactor: Parse job info using JSON obj Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com> * refactor: Apply JSON obj parsing to SlurmFunctionAgent get Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com> * fix: Remove the downloaded stdout file Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com> * refactor: Remove redundant check and simplify file read/write logic Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com> * Remove redundant tests Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com> * Remove raise to skip stdout delay issue Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com> * fix: Fix wrong merging operations Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com> --------- Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
1 parent 55edf2d commit 2632ca6

File tree

2 files changed

+35
-22
lines changed

2 files changed

+35
-22
lines changed

plugins/flytekit-slurm/flytekitplugins/slurm/function/connector.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import json
12
import tempfile
23
import uuid
34
from dataclasses import dataclass
45
from typing import Dict, List, Optional, Tuple, Union
56

67
from asyncssh import SSHClientConnection
8+
from asyncssh.sftp import SFTPNoSuchFile
79

10+
from flytekit import logger
811
from flytekit.extend.backend.base_connector import AsyncConnectorBase, ConnectorRegistry, Resource, ResourceMeta
912
from flytekit.extend.backend.utils import convert_to_flyte_phase
1013
from flytekit.models.literals import LiteralMap
@@ -80,21 +83,25 @@ async def get(self, resource_meta: SlurmJobMetadata, **kwargs) -> Resource:
8083
conn = await get_ssh_conn(
8184
ssh_config=resource_meta.ssh_config, slurm_cluster_to_ssh_conn=self.slurm_cluster_to_ssh_conn
8285
)
83-
job_res = await conn.run(f"scontrol show job {resource_meta.job_id}", check=True)
86+
job_res = await conn.run(f"scontrol --json show job {resource_meta.job_id}", check=True)
87+
job_info = json.loads(job_res.stdout)["jobs"][0]
8488

8589
# Determine the current flyte phase from Slurm job state
86-
job_state = "running"
87-
msg = "No stdout available"
88-
for o in job_res.stdout.split(" "):
89-
if "JobState" in o:
90-
job_state = o.split("=")[1].strip().lower()
91-
elif "StdOut" in o:
92-
stdout_path = o.split("=")[1].strip()
93-
msg_res = await conn.run(f"cat {stdout_path}", check=True)
94-
msg = msg_res.stdout
95-
90+
job_state = job_info["job_state"][0].strip().lower()
9691
cur_phase = convert_to_flyte_phase(job_state)
9792

93+
# Read stdout of the Slurm job
94+
msg = ""
95+
async with conn.start_sftp_client() as sftp:
96+
with tempfile.NamedTemporaryFile("w+") as f:
97+
try:
98+
await sftp.get(job_info["standard_output"], f.name)
99+
100+
msg = f.read()
101+
logger.info(f"[SLURM STDOUT] {msg}")
102+
except SFTPNoSuchFile:
103+
logger.debug("Standard output file path doesn't exist on the Slurm cluster.")
104+
98105
return Resource(phase=cur_phase, message=msg)
99106

100107
async def delete(self, resource_meta: SlurmJobMetadata, **kwargs) -> None:

plugins/flytekit-slurm/flytekitplugins/slurm/script/connector.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
import json
12
import tempfile
23
import uuid
34
from dataclasses import dataclass
45
from typing import Any, Dict, List, Optional, Tuple, Type
56

67
from asyncssh import SSHClientConnection
7-
from asyncssh.sftp import SFTPError
8+
from asyncssh.sftp import SFTPError, SFTPNoSuchFile
89

910
import flytekit
1011
from flytekit.core.type_engine import TypeEngine
@@ -111,20 +112,25 @@ async def get(self, resource_meta: SlurmJobMetadata, **kwargs) -> Resource:
111112
conn = await get_ssh_conn(
112113
ssh_config=resource_meta.ssh_config, slurm_cluster_to_ssh_conn=self.slurm_cluster_to_ssh_conn
113114
)
114-
job_res = await conn.run(f"scontrol show job {resource_meta.job_id}", check=True)
115+
job_res = await conn.run(f"scontrol --json show job {resource_meta.job_id}", check=True)
116+
job_info = json.loads(job_res.stdout)["jobs"][0]
115117

116118
# Determine the current flyte phase from Slurm job state
117-
msg = ""
118-
job_state = "running"
119-
for o in job_res.stdout.split(" "):
120-
if "JobState" in o:
121-
job_state = o.split("=")[1].strip().lower()
122-
elif "StdOut" in o:
123-
stdout_path = o.split("=")[1].strip()
124-
msg_res = await conn.run(f"cat {stdout_path}", check=True)
125-
msg = msg_res.stdout
119+
job_state = job_info["job_state"][0].strip().lower()
126120
cur_phase = convert_to_flyte_phase(job_state)
127121

122+
# Read stdout of the Slurm job
123+
msg = ""
124+
async with conn.start_sftp_client() as sftp:
125+
with tempfile.NamedTemporaryFile("w+") as f:
126+
try:
127+
await sftp.get(job_info["standard_output"], f.name)
128+
129+
msg = f.read()
130+
logger.info(f"[SLURM STDOUT] {msg}")
131+
except SFTPNoSuchFile:
132+
logger.debug("Standard output file path doesn't exist on the Slurm cluster.")
133+
128134
return Resource(phase=cur_phase, message=msg, outputs=resource_meta.outputs)
129135

130136
async def delete(self, resource_meta: SlurmJobMetadata, **kwargs) -> None:

0 commit comments

Comments
 (0)