Skip to content

Commit fc33503

Browse files
Merge pull request #1574 from jluebbe/improve-grpc-robustness
improve gRPC robustness
2 parents acbefcc + 396af5d commit fc33503

File tree

2 files changed

+38
-8
lines changed

2 files changed

+38
-8
lines changed

labgrid/remote/coordinator.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,22 @@ def __init__(self, request) -> None:
169169
self.request = request
170170
self.response = None
171171
self.completed = asyncio.Event()
172+
self.expired = False
172173

173174
def complete(self, response) -> None:
174175
self.response = response
175176
self.completed.set()
177+
if self.expired:
178+
logging.warning(
179+
"exporter command already expired for request %s -> response %s", self.request, self.response
180+
)
176181

177182
async def wait(self):
178-
await asyncio.wait_for(self.completed.wait(), 10)
183+
try:
184+
await asyncio.wait_for(self.completed.wait(), 10)
185+
finally:
186+
if self.response is None:
187+
self.expired = True
179188

180189

181190
class ExporterError(Exception):
@@ -592,7 +601,9 @@ async def _acquire_resource(self, place, resource):
592601
self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd)
593602
await cmd.wait()
594603
if not cmd.response.success:
595-
raise ExporterError("failed to acquire {resource}")
604+
raise ExporterError("failed to acquire {resource} ({cmd.response.reason})")
605+
if resource.acquired != place.name:
606+
logging.warning("resource %s not acquired by this place after acquire request", resource)
596607

597608
async def _acquire_resources(self, place, resources):
598609
assert self.lock.locked()
@@ -646,7 +657,9 @@ async def _release_resources(self, place, resources, callback=True):
646657
self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd)
647658
await cmd.wait()
648659
if not cmd.response.success:
649-
raise ExporterError(f"failed to release {resource}")
660+
raise ExporterError(f"failed to release {resource} ({cmd.response.reason})")
661+
if resource.acquired:
662+
logging.warning("resource %s still acquired after release request", resource)
650663
except (ExporterError, TimeoutError):
651664
logging.exception("failed to release %s", resource)
652665
# at leaset try to notify the clients

labgrid/remote/exporter.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ class BrokenResourceError(ExporterError):
3737
pass
3838

3939

40+
class UnknownResourceError(ExporterError):
41+
pass
42+
43+
44+
class InvalidResourceRequestError(ExporterError):
45+
pass
46+
47+
4048
def log_subprocess_kernel_stack(logger, child):
4149
if child.poll() is not None: # nothing to check if no longer running
4250
return
@@ -889,8 +897,9 @@ async def message_pump(self):
889897
out_message.set_acquired_request.resource_name,
890898
)
891899
success = True
892-
except BrokenResourceError as e:
900+
except (BrokenResourceError, InvalidResourceRequestError, UnknownResourceError) as e:
893901
reason = e.args[0]
902+
logging.warning("set_acquired_request failed: %s", reason)
894903
finally:
895904
in_message = labgrid_coordinator_pb2.ExporterInMessage()
896905
in_message.response.success = success
@@ -924,8 +933,14 @@ async def message_pump(self):
924933
async def acquire(self, group_name, resource_name, place_name):
925934
resource = self.groups.get(group_name, {}).get(resource_name)
926935
if resource is None:
927-
logging.error("acquire request for unknown resource %s/%s by %s", group_name, resource_name, place_name)
928-
return
936+
raise UnknownResourceError(
937+
f"acquire request for unknown resource {group_name}/{resource_name} by {place_name}"
938+
)
939+
940+
if resource.acquired:
941+
raise InvalidResourceRequestError(
942+
f"Resource {group_name}/{resource_name} is already acquired by {resource.acquired}"
943+
)
929944

930945
try:
931946
resource.acquire(place_name)
@@ -935,8 +950,10 @@ async def acquire(self, group_name, resource_name, place_name):
935950
async def release(self, group_name, resource_name):
936951
resource = self.groups.get(group_name, {}).get(resource_name)
937952
if resource is None:
938-
logging.error("release request for unknown resource %s/%s", group_name, resource_name)
939-
return
953+
raise UnknownResourceError(f"release request for unknown resource {group_name}/{resource_name}")
954+
955+
if not resource.acquired:
956+
raise InvalidResourceRequestError(f"Resource {group_name}/{resource_name} is not acquired")
940957

941958
try:
942959
resource.release()

0 commit comments

Comments
 (0)