Skip to content

Commit fc7b802

Browse files
jluebbeBastian-Krause
authored andcommitted
remote/coordinator: refactor handling of orphaned resources
If an exporter disconnects, corresponding resources that are acquired become orphaned. If the exporter reconnects, the resources are not locked. Requiring the user to unlock and lock the corresponding place again is inconvenient and would change the previous behavior. Prior to this commit, reacquiring did not work due to a logic error: ExporterCommand.complete() und ExporterCommand.wait() are both called in ExporterStream.request_task(). The blocking wait() prevents further processing of exporter messages. That also means responses for ExporterSetAcquiredRequest are not handled anymore. This leads to a state where resources cannot be acquired/released by their place anymore. Ultimatively, this leads to an inconsistent state requiring a coordinator restart. Refactor handling of these orphaned resources to solve this. Signed-off-by: Jan Luebbe <[email protected]>
1 parent 33126fa commit fc7b802

File tree

1 file changed

+61
-36
lines changed

1 file changed

+61
-36
lines changed

labgrid/remote/coordinator.py

Lines changed: 61 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def set_resource(self, groupname, resourcename, resource):
5454
"""This is called when Exporters update resources or when they disconnect."""
5555
logging.info("set_resource %s %s %s", groupname, resourcename, resource)
5656
group = self.groups.setdefault(groupname, {})
57-
old = group.get(resourcename)
57+
old: ResourceImport = group.get(resourcename)
5858
if resource is not None:
5959
new = ResourceImport(
6060
data=ResourceImport.data_from_pb2(resource), path=(self.name, groupname, resource.cls, resourcename)
@@ -66,6 +66,8 @@ def set_resource(self, groupname, resourcename, resource):
6666
group[resourcename] = new
6767
else:
6868
new = None
69+
if old.acquired:
70+
old.orphaned = True
6971
try:
7072
del group[resourcename]
7173
except KeyError:
@@ -150,6 +152,7 @@ class ResourceImport(ResourceEntry):
150152
"""
151153

152154
path = attr.ib(kw_only=True, validator=attr.validators.instance_of(tuple))
155+
orphaned = attr.ib(init=False, default=False, validator=attr.validators.instance_of(bool))
153156

154157

155158
def locked(func):
@@ -181,7 +184,7 @@ class ExporterError(Exception):
181184

182185
class Coordinator(labgrid_coordinator_pb2_grpc.CoordinatorServicer):
183186
def __init__(self) -> None:
184-
self.places = {}
187+
self.places: dict[str, Place] = {}
185188
self.reservations = {}
186189
self.poll_task = None
187190
self.save_scheduled = False
@@ -200,6 +203,12 @@ async def _poll_step(self):
200203
await self.save()
201204
except Exception: # pylint: disable=broad-except
202205
traceback.print_exc()
206+
# try to re-acquire orphaned resources
207+
try:
208+
async with self.lock:
209+
await self._reacquire_orphaned_resources()
210+
except Exception: # pylint: disable=broad-except
211+
traceback.print_exc()
203212
# update reservations
204213
try:
205214
self.schedule_reservations()
@@ -334,34 +343,6 @@ def get_exporter_by_name(self, name):
334343
if exporter.name == name:
335344
return exporter
336345

337-
async def _update_acquired_places(self, action, resource):
338-
"""Update acquired places when resources are added or removed."""
339-
if action not in [Action.ADD, Action.DEL]:
340-
return # currently nothing needed for Action.UPD
341-
342-
# collect affected places
343-
places = []
344-
for place in self.places.values():
345-
if not place.acquired:
346-
continue
347-
if not place.hasmatch(resource.path):
348-
continue
349-
places.append(place)
350-
351-
if action is Action.ADD:
352-
# only add if there is no conflict
353-
if len(places) != 1:
354-
return
355-
place = places[0]
356-
await self._acquire_resources(place, [resource])
357-
self._publish_place(place)
358-
else:
359-
for place in places:
360-
# resources only disappear when exporters disconnect, so we
361-
# can't call back to the exporter
362-
await self._release_resources(place, [resource], callback=False)
363-
self._publish_place(place)
364-
365346
def _publish_place(self, place):
366347
msg = labgrid_coordinator_pb2.ClientOutMessage()
367348
msg.updates.add().place.CopyFrom(place.as_pb2())
@@ -411,15 +392,12 @@ async def request_task():
411392
logging.debug("Received startup from %s with %s", name, version)
412393
elif kind == "resource":
413394
logging.debug("Received resource from %s with %s", name, in_msg.resource)
414-
action, resource = session.set_resource(
395+
action, _ = session.set_resource(
415396
in_msg.resource.path.group_name, in_msg.resource.path.resource_name, in_msg.resource
416397
)
417398
if action is Action.ADD:
418399
async with self.lock:
419400
self._add_default_place(in_msg.resource.path.group_name)
420-
if action in (Action.ADD, Action.DEL):
421-
async with self.lock:
422-
await self._update_acquired_places(action, resource)
423401
self.save_later()
424402
else:
425403
logging.warning("received unknown kind %s from exporter %s (version %s)", kind, name, version)
@@ -453,8 +431,7 @@ async def request_task():
453431

454432
for groupname, group in session.groups.items():
455433
for resourcename in group.copy():
456-
action, resource = session.set_resource(groupname, resourcename, None)
457-
await self._update_acquired_places(action, resource)
434+
session.set_resource(groupname, resourcename, None)
458435

459436
logging.debug("exporter aborted %s, cancelled: %s", context.peer(), context.cancelled())
460437

@@ -652,6 +629,8 @@ async def _release_resources(self, place, resources, callback=True):
652629
pass
653630

654631
for resource in resources:
632+
if resource.orphaned:
633+
continue
655634
try:
656635
# this triggers an update from the exporter which is published
657636
# to the clients
@@ -673,6 +652,48 @@ async def _release_resources(self, place, resources, callback=True):
673652
except:
674653
logging.exception("failed to publish released resource %s", resource)
675654

655+
async def _reacquire_orphaned_resources(self):
656+
assert self.lock.locked()
657+
658+
for place in self.places.values():
659+
changed = False
660+
661+
for idx, resource in enumerate(place.acquired_resources):
662+
if not resource.orphaned:
663+
continue
664+
665+
# is the exporter connected again?
666+
exporter = self.get_exporter_by_name(resource.path[0])
667+
if not exporter:
668+
continue
669+
670+
# does the resource exist again?
671+
try:
672+
new_resource = exporter.groups[resource.path[1]][resource.path[3]]
673+
except KeyError:
674+
continue
675+
676+
if new_resource.acquired:
677+
# this should only happen when resources become broken
678+
logging.debug("ignoring acquired/broken resource %s for place %s", new_resource, place.name)
679+
continue
680+
681+
try:
682+
await self._acquire_resource(place, new_resource)
683+
place.acquired_resources[idx] = new_resource
684+
except Exception:
685+
logging.exception(
686+
"failed to reacquire orphaned resource %s for place %s", new_resource, place.name
687+
)
688+
break
689+
690+
logging.info("reacquired orphaned resource %s for place %s", new_resource, place.name)
691+
changed = True
692+
693+
if changed:
694+
self._publish_place(place)
695+
self.save_later()
696+
676697
@locked
677698
async def AcquirePlace(self, request, context):
678699
peer = context.peer()
@@ -693,6 +714,10 @@ async def AcquirePlace(self, request, context):
693714
res = self.reservations[place.reservation]
694715
if not res.owner == username:
695716
await context.abort(grpc.StatusCode.PERMISSION_DENIED, f"Place {name} was not reserved for {username}")
717+
718+
# First try to reacquire orphaned resources to avoid conflicts.
719+
await self._reacquire_orphaned_resources()
720+
696721
# FIXME use the session object instead? or something else which
697722
# survives disconnecting clients?
698723
place.acquired = username

0 commit comments

Comments
 (0)