-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy patharea_callback_service_utils.py
More file actions
134 lines (113 loc) · 5.72 KB
/
area_callback_service_utils.py
File metadata and controls
134 lines (113 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
import logging
import time
from typing import List
import bosdyn.client
from bosdyn.api import service_fault_pb2
from bosdyn.api.graph_nav import area_callback_pb2
from bosdyn.api.service_customization_pb2 import DictParam
from bosdyn.client.directory import NonexistentServiceError
from bosdyn.client.fault import ServiceFaultAlreadyExistsError, ServiceFaultDoesNotExistError
from bosdyn.client.service_customization_helpers import dict_params_to_dict
_LOGGER = logging.getLogger(__name__)
class AreaCallbackServiceConfig:
"""Config data required to run a area callback service."""
def __init__(self, service_name: str, required_lease_resources: List[str] = (),
log_begin_callback_data: bool = False,
area_callback_information: area_callback_pb2.AreaCallbackInformation = None):
"""AreaCallbackServiceConfig constructor.
Args:
service_name (str): The name of the service, for registering with directory.
required_lease_resources (List[str]): List of required lease resources.
log_begin_callback_data (bool): Log the data field of the begin callback request.
area_callback_information (area_callback_pb2.AreaCallbackInformation): Information describing the area callback.
"""
self.service_name = service_name
self.required_lease_resources = required_lease_resources
self.log_begin_callback_data = log_begin_callback_data
self.area_callback_information = area_callback_information or \
area_callback_pb2.AreaCallbackInformation(
required_lease_resources=self.required_lease_resources)
def parse_params(self, params: DictParam):
""" Parse params and validate they agree with the spec stored in area_callback_information.
Args:
params (DictParam): The parameters being validated.
"""
return dict_params_to_dict(params, self.area_callback_information.custom_params)
# Helper to raise service faults when other services are unavailable.
def handle_service_faults(fault_client, robot_state_client, directory_client, service_name,
prereq_services):
service_fault = service_fault_pb2.ServiceFault()
service_fault.fault_id.fault_name = service_name
service_fault.fault_id.service_name = service_name
service_fault.severity = service_fault_pb2.ServiceFault.SEVERITY_CRITICAL
check_period = 0.5 # seconds.
while True:
time.sleep(check_period)
# Don't fault the service if it doesn't actually exist.
try:
registered_service = directory_client.get_entry(service_name)
except NonexistentServiceError as exc:
continue
except bosdyn.client.Error as exc:
_LOGGER.error("Failed to check if %s exists during fault handling: %s", service_name,
exc)
continue
set_fault = False
fault_exists = False
unavailable_services = []
# Need robot state to check existing faults.
try:
state = robot_state_client.get_robot_state()
except bosdyn.client.Error as exc:
_LOGGER.error("Failed to get robot state during fault handling: %s", exc)
continue
for prereq_service in prereq_services:
# Make sure the prereq service exists.
try:
registered_service = directory_client.get_entry(prereq_service)
except NonexistentServiceError as exc:
set_fault = True
unavailable_services.append(prereq_service)
continue
except bosdyn.client.Error as exc:
_LOGGER.error("Failed to check if %s exists during fault handling: %s",
prereq_service, exc)
set_fault = True
unavailable_services.append(prereq_service)
continue
# Make sure the prereq service isn't faulted.
for fault in state.service_fault_state.faults:
if fault.fault_id.service_name == prereq_service:
set_fault = True
unavailable_services.append(prereq_service)
break
# Check if the service is already faulted.
for fault in state.service_fault_state.faults:
if fault.fault_id.service_name == service_name:
fault_exists = True
break
# Fault the service if there isn't an existing fault.
if set_fault and not fault_exists:
service_fault.error_message = 'Faulted due to issues with ' + ','.join(
unavailable_services)
try:
fault_client.trigger_service_fault(service_fault)
_LOGGER.info("Triggered fault on %s", service_name)
except ServiceFaultAlreadyExistsError:
pass
except bosdyn.client.Error as exc:
_LOGGER.error("Failed to set %s fault: %s", service_name, exc)
# Otherwise, clear the fault if it exists.
elif not set_fault and fault_exists:
try:
fault_client.clear_service_fault(service_fault.fault_id)
_LOGGER.info("Cleared fault on %s", service_name)
except ServiceFaultDoesNotExistError:
pass
except bosdyn.client.Error as exc:
_LOGGER.error("Failed to clear %s fault: %s", service_name, exc)