Skip to content

Commit a8b6ab3

Browse files
authored
Merge pull request #1580 from jluebbe/refactor-orphaned-resources
refactor handling of orphaned resources in the coordinator
2 parents 7ee5d2f + 109ef0d commit a8b6ab3

File tree

1 file changed

+125
-42
lines changed

1 file changed

+125
-42
lines changed

labgrid/remote/coordinator.py

Lines changed: 125 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99
from contextlib import contextmanager
1010
import copy
11+
import random
1112

1213
import attr
1314
import grpc
@@ -26,7 +27,7 @@
2627
from .scheduler import TagSet, schedule
2728
from .generated import labgrid_coordinator_pb2
2829
from .generated import labgrid_coordinator_pb2_grpc
29-
from ..util import atomic_replace, labgrid_version, yaml
30+
from ..util import atomic_replace, labgrid_version, yaml, Timeout
3031

3132

3233
@contextmanager
@@ -220,7 +221,7 @@ def __init__(self) -> None:
220221
self.load()
221222

222223
self.loop = asyncio.get_running_loop()
223-
for name in ["save", "reacquire", "schedule"]:
224+
for name in ["save", "sync_resources", "schedule"]:
224225
step_func = getattr(self, f"_poll_step_{name}")
225226
task = self.loop.create_task(self.poll(step_func), name=f"coordinator-poll-{name}")
226227
self.poll_tasks.append(task)
@@ -231,11 +232,11 @@ async def _poll_step_save(self):
231232
with warn_if_slow("save changes", level=logging.DEBUG):
232233
await self.save()
233234

234-
async def _poll_step_reacquire(self):
235-
# try to re-acquire orphaned resources
235+
async def _poll_step_sync_resources(self):
236+
# try to synchronize resources
236237
async with self.lock:
237-
with warn_if_slow("reacquire orphaned resources", limit=3.0):
238-
await self._reacquire_orphaned_resources()
238+
with warn_if_slow("synchronize resources", limit=3.0):
239+
await self._synchronize_resources()
239240

240241
async def _poll_step_schedule(self):
241242
# update reservations
@@ -638,6 +639,14 @@ async def _acquire_resources(self, place, resources):
638639
if resource.acquired:
639640
return False
640641

642+
for otherplace in self.places.values():
643+
for oldres in otherplace.acquired_resources:
644+
if resource.path == oldres.path:
645+
logging.info(
646+
"Conflicting orphaned resource %s for acquire request for place %s", oldres, place.name
647+
)
648+
return False
649+
641650
# acquire resources
642651
acquired = []
643652
try:
@@ -692,47 +701,124 @@ async def _release_resources(self, place, resources, callback=True):
692701
except:
693702
logging.exception("failed to publish released resource %s", resource)
694703

695-
async def _reacquire_orphaned_resources(self):
704+
async def _synchronize_resources(self):
696705
assert self.lock.locked()
697706

698-
for place in self.places.values():
699-
changed = False
707+
# fix:
708+
# - a resource is acquired for a place that is not acquired
709+
# * perhaps caused by a resource acquire timeout (during normal lock)
710+
# -> release()
711+
# - a resource is acquired for a place that still has it as orphaned
712+
# * perhaps caused by a resource acquire timeout (during reacquire)
713+
# -> replace orphaned resource
714+
# - a resource is released, but a place still has it as orphaned
715+
# * perhaps caused by a exporter restart
716+
# -> acquire() and replace orphaned resource
717+
718+
acquired_resources = {}
719+
used_resources = {}
720+
orphaned_resources = {}
721+
722+
# find acquired resources
723+
for exporter in self.exporters.values():
724+
for group in exporter.groups.values():
725+
for resource in group.values():
726+
if resource.acquired:
727+
acquired_resources[resource.path] = resource
700728

701-
for idx, resource in enumerate(place.acquired_resources):
729+
# find resources used by places
730+
for place in self.places.values():
731+
for resource in place.acquired_resources:
702732
if not resource.orphaned:
703-
continue
733+
used_resources[resource.path] = resource
734+
else:
735+
orphaned_resources[resource.path] = resource
736+
737+
timeout = Timeout(5.0)
738+
739+
# find resources to be released
740+
to_release = list(acquired_resources.keys() - used_resources.keys() - orphaned_resources.keys())
741+
if to_release:
742+
logging.info("synchronize resources: %s acquired resource(s) should be released", len(to_release))
743+
random.shuffle(to_release) # don't get stuck on a problematic resource
744+
for resource_path in to_release:
745+
if timeout.expired:
746+
continue # release the coordinator lock
747+
748+
resource = acquired_resources[resource_path]
749+
if resource.acquired == "<broken>":
750+
continue
751+
place = self.places.get(resource.acquired)
752+
print(f"should release {resource} for {place}?")
704753

705-
# is the exporter connected again?
706-
exporter = self.get_exporter_by_name(resource.path[0])
707-
if not exporter:
708-
continue
754+
if place is None:
755+
logging.warning("resource %s claims to be acquired by unknown place", resource)
756+
elif not place.acquired:
757+
logging.warning("resource %s claims to be acquired by unacquired place", resource)
758+
else:
759+
continue
760+
try:
761+
await self._release_resources(place, [resource])
762+
del acquired_resources[resource_path]
763+
except Exception:
764+
logging.exception("failed to release unused resource %s", resource)
765+
break
709766

710-
# does the resource exist again?
711-
try:
712-
new_resource = exporter.groups[resource.path[1]][resource.path[3]]
713-
except KeyError:
714-
continue
767+
# find orphaned resources to be acquired
768+
to_acquire = list(orphaned_resources.keys() - acquired_resources.keys())
769+
if to_acquire:
770+
logging.info("synchronize resources: %s orphaned resource(s) should be acquired", len(to_acquire))
771+
random.shuffle(to_acquire) # don't get stuck on a problematic resource
772+
for resource_path in to_acquire:
773+
if timeout.expired:
774+
continue # release the coordinator lock
775+
776+
resource = orphaned_resources[resource_path]
777+
if resource.acquired == "<broken>":
778+
continue
779+
place = self.places.get(resource.acquired)
780+
assert place is not None
781+
assert place.acquired
782+
print(f"should acquire {resource} for {place}?")
783+
784+
# is the exporter connected again?
785+
exporter = self.get_exporter_by_name(resource.path[0])
786+
if not exporter:
787+
continue
715788

716-
if new_resource.acquired:
717-
# this should only happen when resources become broken
718-
logging.debug("ignoring acquired/broken resource %s for place %s", new_resource, place.name)
719-
continue
789+
# does the resource exist again?
790+
try:
791+
new_resource = exporter.groups[resource.path[1]][resource.path[3]]
792+
except KeyError:
793+
continue
720794

721-
try:
722-
await self._acquire_resource(place, new_resource)
723-
place.acquired_resources[idx] = new_resource
724-
except Exception:
725-
logging.exception(
726-
"failed to reacquire orphaned resource %s for place %s", new_resource, place.name
727-
)
728-
break
729-
730-
logging.info("reacquired orphaned resource %s for place %s", new_resource, place.name)
731-
changed = True
732-
733-
if changed:
734-
self._publish_place(place)
735-
self.save_later()
795+
if new_resource.acquired:
796+
# this should only happen when resources become broken
797+
logging.warning("ignoring acquired/broken resource %s for place %s", new_resource, place.name)
798+
continue
799+
800+
try:
801+
await self._acquire_resource(place, new_resource)
802+
acquired_resources[new_resource.path] = new_resource
803+
except Exception:
804+
logging.exception("failed to reacquire orphaned resource %s for place %s", new_resource, place.name)
805+
break
806+
807+
# find orphaned resources to be replaced in the places
808+
to_replace = set(orphaned_resources.keys() & acquired_resources.keys())
809+
if to_replace:
810+
logging.info("synchronize resources: %s orphaned resource(s) should be replaced", len(to_replace))
811+
for resource_path in set(orphaned_resources.keys() & acquired_resources.keys()):
812+
oldresource = orphaned_resources[resource_path]
813+
newresource = acquired_resources[resource_path]
814+
assert oldresource.acquired == newresource.acquired
815+
816+
place = self.places.get(newresource.acquired)
817+
assert place is not None
818+
assert place.acquired
819+
820+
idx = place.acquired_resources.index(oldresource)
821+
place.acquired_resources[idx] = newresource
736822

737823
@locked
738824
async def AcquirePlace(self, request, context):
@@ -755,9 +841,6 @@ async def AcquirePlace(self, request, context):
755841
if not res.owner == username:
756842
await context.abort(grpc.StatusCode.PERMISSION_DENIED, f"Place {name} was not reserved for {username}")
757843

758-
# First try to reacquire orphaned resources to avoid conflicts.
759-
await self._reacquire_orphaned_resources()
760-
761844
# FIXME use the session object instead? or something else which
762845
# survives disconnecting clients?
763846
place.acquired = username

0 commit comments

Comments
 (0)