Skip to content

Commit 3362fd1

Browse files
authored
chore: black and isort format on codebase (#298)
Formatted the codebase using black and isort.
1 parent 16ab11b commit 3362fd1

File tree

15 files changed

+124
-91
lines changed

15 files changed

+124
-91
lines changed

infscale/agent/agent.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ async def update_worker_status(self, message: WorkerStatusMessage) -> None:
153153

154154
def _handle_worker(self, status_msg: WorkerStatusMessage) -> None:
155155
"""Handle status message received from worker.
156-
156+
157157
Whenever DONE, FAILED or TERMINATED is received, worker manager
158158
will remove that worker from its state.
159159
When no workers are available for the given job id, cleanup is done
@@ -334,18 +334,17 @@ def _handle_command(self, action: pb2.Action) -> None:
334334
MessageType.FINISH_JOB, WorkerStatus.DONE, action.job_id
335335
)
336336
self.worker_mgr.send(w, msg)
337-
337+
338338
case CommandAction.CHECK_LOOP:
339339
workers = self.worker_mgr.get_workers_by_job_id(action.job_id)
340340
failed_wids_str = action.manifest.decode("utf-8")
341341
failed_wids_set = ast.literal_eval(failed_wids_str)
342-
342+
343343
for w in workers.values():
344344
msg = Message(
345345
MessageType.CHECK_LOOP, failed_wids_set, action.job_id
346346
)
347347
self.worker_mgr.send(w, msg)
348-
349348

350349
async def heart_beat(self):
351350
"""Send a heart beat message periodically."""
@@ -411,7 +410,9 @@ def _terminate_workers(self, config: JobConfig) -> None:
411410
job_id, force_terminate = config.job_id, config.force_terminate
412411
stop_wrkrs = self.job_mgr.get_workers(job_id, CommandAction.STOP)
413412

414-
msg_type = MessageType.FORCE_TERMINATE if force_terminate else MessageType.TERMINATE
413+
msg_type = (
414+
MessageType.FORCE_TERMINATE if force_terminate else MessageType.TERMINATE
415+
)
415416
self.worker_mgr._signal_terminate_wrkrs(job_id, True, stop_wrkrs, msg_type)
416417

417418
async def report(self):

infscale/agent/worker_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def add(
7575
self._workers[worker.pipe.fileno()] = worker
7676

7777
return worker
78-
78+
7979
def has_workers_for_job(self, job_id: str) -> bool:
8080
"""Return True if there are any workers assigned to the given job ID."""
8181
return any(worker.job_id == job_id for worker in self._workers.values())
@@ -136,7 +136,7 @@ def on_read_ready(
136136
# When ConnectionResetError is raised, the pipe is already closed due to worker failure
137137
# so we only need to ignore this error.
138138
pass
139-
139+
140140
def remove_worker(self, wrk_id: str) -> None:
141141
"""Remove worker related data."""
142142
for k, v in list(self._workers.items()):

infscale/configs/job.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,16 @@ def get_worlds_to_configure(
170170
updated_worlds = {
171171
k
172172
for k in common_keys
173-
if (curr_worlds[k].addr != new_worlds[k].addr
173+
if (
174+
curr_worlds[k].addr != new_worlds[k].addr
174175
or curr_worlds[k].data_port != new_worlds[k].data_port
175-
or curr_worlds[k].ctrl_port != new_worlds[k].ctrl_port)
176+
or curr_worlds[k].ctrl_port != new_worlds[k].ctrl_port
177+
)
176178
}
177179

178180
return deploy_worlds | updated_worlds
179-
181+
182+
180183
class ServeConfigHelper:
181184
"""Class for defining helper methods for serve config."""
182185

@@ -399,7 +402,7 @@ def is_identical(x: JobConfig, y: JobConfig) -> bool:
399402
def world_name(world_id: int) -> str:
400403
"""Return world name given a world id."""
401404
return f"w{world_id}"
402-
405+
403406
@staticmethod
404407
def get_pipeline_identifiers(new_cfg: JobConfig) -> set[str]:
405408
"""Get pipeline identifiers based on server id."""
@@ -450,30 +453,38 @@ def categorize_workers(
450453
# select workers that will be affected by workers to be started
451454
for w, world_info_list in new_config.flow_graph.items():
452455
for new_world_info in world_info_list:
453-
curr_world_info = helper.find_matching_world_info(curr_config, w, new_world_info)
454-
helper.pick_workers(update_wrkrs, start_wrkrs, w, new_world_info, curr_world_info)
456+
curr_world_info = helper.find_matching_world_info(
457+
curr_config, w, new_world_info
458+
)
459+
helper.pick_workers(
460+
update_wrkrs, start_wrkrs, w, new_world_info, curr_world_info
461+
)
455462

456463
if curr_config is None:
457464
return start_wrkrs, update_wrkrs, stop_wrkrs
458465

459466
# select workers that will be affected by workers to be stopped
460467
for w, world_info_list in curr_config.flow_graph.items():
461468
for new_world_info in world_info_list:
462-
curr_world_info = helper.find_matching_world_info(curr_config, w, new_world_info)
463-
helper.pick_workers(update_wrkrs, stop_wrkrs, w, new_world_info, curr_world_info)
469+
curr_world_info = helper.find_matching_world_info(
470+
curr_config, w, new_world_info
471+
)
472+
helper.pick_workers(
473+
update_wrkrs, stop_wrkrs, w, new_world_info, curr_world_info
474+
)
464475

465476
# due to pervious state, recover workers are included in update workers
466477
# therefore, recover workers need to be removed from the updated ones.
467478
update_wrkrs -= recover_wrkrs
468479

469480
return start_wrkrs, update_wrkrs, stop_wrkrs
470-
481+
471482
@staticmethod
472483
def get_workers_diff(a: JobConfig, b: JobConfig) -> set[str]:
473484
"""Return a set of worker ids diffs based on old and new cfg."""
474485
old_workers = {worker.id for worker in a.workers}
475486
new_workers = {worker.id for worker in b.workers}
476-
487+
477488
return old_workers - new_workers
478489

479490
@staticmethod
@@ -509,10 +520,10 @@ def remove_pipeline(config: JobConfig, workers_to_remove: set[str]) -> JobConfig
509520

510521
class JobConfigHelper:
511522
"""Class for defining helper methods for job config."""
523+
512524
def get_server_id(self, config: JobConfig) -> str:
513525
return next((w.id for w in config.workers if w.is_server), "")
514-
515-
526+
516527
def find_pipeline_nodes(
517528
self,
518529
flow_graph: dict[str, list[WorldInfo]],
@@ -577,9 +588,7 @@ def find_pipeline_nodes(
577588

578589
# everything else (except server) is removed
579590
to_remove = {
580-
wid
581-
for wid in flow_graph
582-
if wid != server_id and wid not in survivors
591+
wid for wid in flow_graph if wid != server_id and wid not in survivors
583592
}
584593

585594
return to_remove
@@ -600,11 +609,13 @@ def pick_workers(
600609
601610
The needles are workers to start or stop and the haystack is
602611
name and peers.
603-
612+
604613
Also includes peers of `name` if its connection details
605614
(`addr`, `ctrl_port`, `data_port`) differ from the previous config.
606615
"""
607-
if curr_world_info and self.has_connection_changed(curr_world_info, new_world_info):
616+
if curr_world_info and self.has_connection_changed(
617+
curr_world_info, new_world_info
618+
):
608619
for peer in new_world_info.peers:
609620
res_set.add(peer)
610621

@@ -634,15 +645,15 @@ def pick_workers(
634645
# because name is already affected by one peer
635646
# so we come out of the for-loop
636647
break
637-
648+
638649
def has_connection_changed(self, old: WorldInfo, new: WorldInfo) -> bool:
639650
"""Check if worker connection details are changed."""
640651
return (
641-
old.addr != new.addr or
642-
old.ctrl_port != new.ctrl_port or
643-
old.data_port != new.data_port
652+
old.addr != new.addr
653+
or old.ctrl_port != new.ctrl_port
654+
or old.data_port != new.data_port
644655
)
645-
656+
646657
def find_matching_world_info(
647658
self, curr_config: JobConfig | None, w: str, new_world_info: WorldInfo
648659
) -> WorldInfo | None:

infscale/controller/controller.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,12 +243,12 @@ async def send_command_to_agent(
243243
"""Send command to agent."""
244244
agent_context = self.agent_contexts[agent_id]
245245
context = agent_context.get_grpc_ctx()
246-
246+
247247
kwargs = {
248248
"type": action.action,
249249
"job_id": job_id,
250250
}
251-
251+
252252
# this is needed because checking against an empty set will be False
253253
if hasattr(action, "failed_wids"):
254254
kwargs["manifest"] = str(action.failed_wids).encode("utf-8")
@@ -281,7 +281,9 @@ async def job_setup(
281281
try:
282282
self.ctrl.handle_job_ports(req)
283283
except AttributeError:
284-
logger.error(f"Failed to process ports. Job: {req.job_id} Agent: {req.agent_id}")
284+
logger.error(
285+
f"Failed to process ports. Job: {req.job_id} Agent: {req.agent_id}"
286+
)
285287

286288
return empty_pb2.Empty()
287289

@@ -307,7 +309,9 @@ async def update_wrk_status(
307309
try:
308310
await self.ctrl.handle_wrk_status(request)
309311
except Exception as e:
310-
logger.error(f"Failed to process worker status. Worker: {request.worker_id} Status: {request.status} {e}")
312+
logger.error(
313+
f"Failed to process worker status. Worker: {request.worker_id} Status: {request.status} {e}"
314+
)
311315

312316
return empty_pb2.Empty()
313317

infscale/controller/ctrl_dtype.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ class CommandAction(str, Enum):
3939
UPDATE = "update" # CLI - Controller update command
4040
SETUP = "setup" # ctrl<->agent setup job, assign port numbers to workers
4141
FINISH_JOB = "finish_job" # ctrl<->agent action to notify job's completion
42-
CHECK_LOOP = "check_loop" # ctrl<->agent action for workers to check their pipeline loops
42+
CHECK_LOOP = (
43+
"check_loop" # ctrl<->agent action for workers to check their pipeline loops
44+
)
4345

4446

4547
class CommandActionModel(BaseModel):

infscale/controller/job_checker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"""job_checker.py."""
1818

1919
from collections import defaultdict
20+
2021
from infscale import get_logger
2122
from infscale.common.job_msg import WorkerStatus
2223
from infscale.configs.job import JobConfig

0 commit comments

Comments
 (0)