Skip to content

Commit 109ef0d

Browse files
committed
remote/coordinator: refactor handling of orphaned resources
Labgrid currently uses bidirectional streams between coordinator and client/exporter. For the client side this is a good fit, since the client sends requests and the coordinator can answer directly. However for the exporter we have a case where nested calls were used in the old crossbar infrastructure, namely when re-acquiring a resource after the exporter was offline but the place was kept acquired. We call these orphaned resources. They replace the real resource on the coordinator side until the resource can be reacquired on the respective exporter after it has restarted. With crossbar, when seeing the resource update, the coordinator could directly call the exporter to acquire the resource for the specific place. This was possible since crossbar did the RPC route handling and arbitrary services connected to the crossbar could provide RPC calls to the service. With GRPC, we are more constrained. Since we only have a single Input/Output stream which needs to multiplex different objects, nested calls are not directly supported, since the exporter side would still wait for the coordinator to answer its own request. A different approach to orphaned resource handling is required. The coordinator now uses a loop where it checks the orphaned resources and tries to reacquire them if the exporter reappears. This however introduces another problem, the exporter can be under high load and thus the acquire request from the coordinator can time out. In this case, we need to abort the acquisition during a regular lock and in case of an orphaned resource need to replace the orphaned resource with the eventually acquired resource from the exporter. We also need to handle the case where the exporter has an acquired resource, but the place has been released in the meantime (perhaps due to a timeout on a normal place acquire), the same poll loop handles this in the coordinator as well. All in all this means that the resource acquired state for each place is not necessarily consistent on the coordinator, but will reach an eventual consistent state. This should be sufficient, since exporter restarts with orphaned resources should be relatively rare. Signed-off-by: Jan Luebbe <[email protected]>
1 parent 28cab27 commit 109ef0d

File tree

1 file changed

+117
-39
lines changed

1 file changed

+117
-39
lines changed

labgrid/remote/coordinator.py

Lines changed: 117 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99
from contextlib import contextmanager
1010
import copy
11+
import random
1112

1213
import attr
1314
import grpc
@@ -26,7 +27,7 @@
2627
from .scheduler import TagSet, schedule
2728
from .generated import labgrid_coordinator_pb2
2829
from .generated import labgrid_coordinator_pb2_grpc
29-
from ..util import atomic_replace, labgrid_version, yaml
30+
from ..util import atomic_replace, labgrid_version, yaml, Timeout
3031

3132

3233
@contextmanager
@@ -220,7 +221,7 @@ def __init__(self) -> None:
220221
self.load()
221222

222223
self.loop = asyncio.get_running_loop()
223-
for name in ["save", "reacquire", "schedule"]:
224+
for name in ["save", "sync_resources", "schedule"]:
224225
step_func = getattr(self, f"_poll_step_{name}")
225226
task = self.loop.create_task(self.poll(step_func), name=f"coordinator-poll-{name}")
226227
self.poll_tasks.append(task)
@@ -231,11 +232,11 @@ async def _poll_step_save(self):
231232
with warn_if_slow("save changes", level=logging.DEBUG):
232233
await self.save()
233234

234-
async def _poll_step_reacquire(self):
235-
# try to re-acquire orphaned resources
235+
async def _poll_step_sync_resources(self):
236+
# try to synchronize resources
236237
async with self.lock:
237-
with warn_if_slow("reacquire orphaned resources", limit=3.0):
238-
await self._reacquire_orphaned_resources()
238+
with warn_if_slow("synchronize resources", limit=3.0):
239+
await self._synchronize_resources()
239240

240241
async def _poll_step_schedule(self):
241242
# update reservations
@@ -700,47 +701,124 @@ async def _release_resources(self, place, resources, callback=True):
700701
except:
701702
logging.exception("failed to publish released resource %s", resource)
702703

703-
async def _reacquire_orphaned_resources(self):
704+
async def _synchronize_resources(self):
704705
assert self.lock.locked()
705706

706-
for place in self.places.values():
707-
changed = False
707+
# fix:
708+
# - a resource is acquired for a place that is not acquired
709+
# * perhaps caused by a resource acquire timeout (during normal lock)
710+
# -> release()
711+
# - a resource is acquired for a place that still has it as orphaned
712+
# * perhaps caused by a resource acquire timeout (during reacquire)
713+
# -> replace orphaned resource
714+
# - a resource is released, but a place still has it as orphaned
715+
# * perhaps caused by a exporter restart
716+
# -> acquire() and replace orphaned resource
717+
718+
acquired_resources = {}
719+
used_resources = {}
720+
orphaned_resources = {}
721+
722+
# find acquired resources
723+
for exporter in self.exporters.values():
724+
for group in exporter.groups.values():
725+
for resource in group.values():
726+
if resource.acquired:
727+
acquired_resources[resource.path] = resource
708728

709-
for idx, resource in enumerate(place.acquired_resources):
729+
# find resources used by places
730+
for place in self.places.values():
731+
for resource in place.acquired_resources:
710732
if not resource.orphaned:
711-
continue
733+
used_resources[resource.path] = resource
734+
else:
735+
orphaned_resources[resource.path] = resource
736+
737+
timeout = Timeout(5.0)
738+
739+
# find resources to be released
740+
to_release = list(acquired_resources.keys() - used_resources.keys() - orphaned_resources.keys())
741+
if to_release:
742+
logging.info("synchronize resources: %s acquired resource(s) should be released", len(to_release))
743+
random.shuffle(to_release) # don't get stuck on a problematic resource
744+
for resource_path in to_release:
745+
if timeout.expired:
746+
continue # release the coordinator lock
747+
748+
resource = acquired_resources[resource_path]
749+
if resource.acquired == "<broken>":
750+
continue
751+
place = self.places.get(resource.acquired)
752+
print(f"should release {resource} for {place}?")
712753

713-
# is the exporter connected again?
714-
exporter = self.get_exporter_by_name(resource.path[0])
715-
if not exporter:
716-
continue
754+
if place is None:
755+
logging.warning("resource %s claims to be acquired by unknown place", resource)
756+
elif not place.acquired:
757+
logging.warning("resource %s claims to be acquired by unacquired place", resource)
758+
else:
759+
continue
760+
try:
761+
await self._release_resources(place, [resource])
762+
del acquired_resources[resource_path]
763+
except Exception:
764+
logging.exception("failed to release unused resource %s", resource)
765+
break
717766

718-
# does the resource exist again?
719-
try:
720-
new_resource = exporter.groups[resource.path[1]][resource.path[3]]
721-
except KeyError:
722-
continue
767+
# find orphaned resources to be acquired
768+
to_acquire = list(orphaned_resources.keys() - acquired_resources.keys())
769+
if to_acquire:
770+
logging.info("synchronize resources: %s orphaned resource(s) should be acquired", len(to_acquire))
771+
random.shuffle(to_acquire) # don't get stuck on a problematic resource
772+
for resource_path in to_acquire:
773+
if timeout.expired:
774+
continue # release the coordinator lock
775+
776+
resource = orphaned_resources[resource_path]
777+
if resource.acquired == "<broken>":
778+
continue
779+
place = self.places.get(resource.acquired)
780+
assert place is not None
781+
assert place.acquired
782+
print(f"should acquire {resource} for {place}?")
783+
784+
# is the exporter connected again?
785+
exporter = self.get_exporter_by_name(resource.path[0])
786+
if not exporter:
787+
continue
723788

724-
if new_resource.acquired:
725-
# this should only happen when resources become broken
726-
logging.debug("ignoring acquired/broken resource %s for place %s", new_resource, place.name)
727-
continue
789+
# does the resource exist again?
790+
try:
791+
new_resource = exporter.groups[resource.path[1]][resource.path[3]]
792+
except KeyError:
793+
continue
728794

729-
try:
730-
await self._acquire_resource(place, new_resource)
731-
place.acquired_resources[idx] = new_resource
732-
except Exception:
733-
logging.exception(
734-
"failed to reacquire orphaned resource %s for place %s", new_resource, place.name
735-
)
736-
break
737-
738-
logging.info("reacquired orphaned resource %s for place %s", new_resource, place.name)
739-
changed = True
740-
741-
if changed:
742-
self._publish_place(place)
743-
self.save_later()
795+
if new_resource.acquired:
796+
# this should only happen when resources become broken
797+
logging.warning("ignoring acquired/broken resource %s for place %s", new_resource, place.name)
798+
continue
799+
800+
try:
801+
await self._acquire_resource(place, new_resource)
802+
acquired_resources[new_resource.path] = new_resource
803+
except Exception:
804+
logging.exception("failed to reacquire orphaned resource %s for place %s", new_resource, place.name)
805+
break
806+
807+
# find orphaned resources to be replaced in the places
808+
to_replace = set(orphaned_resources.keys() & acquired_resources.keys())
809+
if to_replace:
810+
logging.info("synchronize resources: %s orphaned resource(s) should be replaced", len(to_replace))
811+
for resource_path in set(orphaned_resources.keys() & acquired_resources.keys()):
812+
oldresource = orphaned_resources[resource_path]
813+
newresource = acquired_resources[resource_path]
814+
assert oldresource.acquired == newresource.acquired
815+
816+
place = self.places.get(newresource.acquired)
817+
assert place is not None
818+
assert place.acquired
819+
820+
idx = place.acquired_resources.index(oldresource)
821+
place.acquired_resources[idx] = newresource
744822

745823
@locked
746824
async def AcquirePlace(self, request, context):

0 commit comments

Comments
 (0)