Skip to content

Commit 6975f2c

Browse files
committed
remote/coordinator: handle polling tasks separately
Reacquiring orphaned resources can take some time, but we don't want to block saving or scheduling in the meantime, so split them into separate asyncio tasks. Signed-off-by: Jan Luebbe <[email protected]>
1 parent e7d1240 commit 6975f2c

File tree

1 file changed

+20
-22
lines changed

1 file changed

+20
-22
lines changed

labgrid/remote/coordinator.py

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class Coordinator(labgrid_coordinator_pb2_grpc.CoordinatorServicer):
211211
def __init__(self) -> None:
212212
self.places: dict[str, Place] = {}
213213
self.reservations = {}
214-
self.poll_task = None
214+
self.poll_tasks = []
215215
self.save_scheduled = False
216216

217217
self.lock = asyncio.Lock()
@@ -220,35 +220,33 @@ def __init__(self) -> None:
220220
self.load()
221221

222222
self.loop = asyncio.get_running_loop()
223-
self.poll_task = self.loop.create_task(self.poll(), name="coordinator-poll")
223+
for name in ["save", "reacquire", "schedule"]:
224+
step_func = getattr(self, f"_poll_step_{name}")
225+
task = self.loop.create_task(self.poll(step_func), name=f"coordinator-poll-{name}")
226+
self.poll_tasks.append(task)
224227

225-
async def _poll_step(self):
228+
async def _poll_step_save(self):
226229
# save changes
227-
try:
228-
if self.save_scheduled:
229-
with warn_if_slow("save changes"):
230-
await self.save()
231-
except Exception: # pylint: disable=broad-except
232-
traceback.print_exc()
230+
if self.save_scheduled:
231+
with warn_if_slow("save changes"):
232+
await self.save()
233+
234+
async def _poll_step_reacquire(self):
233235
# try to re-acquire orphaned resources
234-
try:
235-
async with self.lock:
236-
with warn_if_slow("reacquire orphaned resources"):
237-
await self._reacquire_orphaned_resources()
238-
except Exception: # pylint: disable=broad-except
239-
traceback.print_exc()
236+
async with self.lock:
237+
with warn_if_slow("reacquire orphaned resources"):
238+
await self._reacquire_orphaned_resources()
239+
240+
async def _poll_step_schedule(self):
240241
# update reservations
241-
try:
242-
with warn_if_slow("schedule reservations"):
243-
self.schedule_reservations()
244-
except Exception: # pylint: disable=broad-except
245-
traceback.print_exc()
242+
with warn_if_slow("schedule reservations"):
243+
self.schedule_reservations()
246244

247-
async def poll(self):
245+
async def poll(self, step_func):
248246
while not self.loop.is_closed():
249247
try:
250248
await asyncio.sleep(15.0)
251-
await self._poll_step()
249+
await step_func()
252250
except asyncio.CancelledError:
253251
break
254252
except Exception: # pylint: disable=broad-except

0 commit comments

Comments
 (0)