-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathaudio_visual_helpers.py
More file actions
113 lines (92 loc) · 4.42 KB
/
audio_visual_helpers.py
File metadata and controls
113 lines (92 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
import logging
import threading
import time
from concurrent.futures import Future
import bosdyn.client
from bosdyn.api import audio_visual_pb2
from bosdyn.client.audio_visual import (AudioVisualClient, BehaviorExpiredError, DoesNotExistError,
InvalidClientError)
from bosdyn.util import now_sec
_LOGGER = logging.getLogger(__name__)
class AudioVisualHelper:
"""Context manager that runs an AV behavior for the duration of the context.
Use as follows:
.. code-block:: python
with AudioVisualHelper(robot, behavior_name, refresh_rate):
# Lights and sounds will play here
# Lights and sounds will stop here.
Args:
robot: Robot object for creating clients
behavior_name: Name of the desired behavior to run
refresh_rate: What rate to refresh the behavior (seconds)
"""
def __init__(self, robot, behavior_name, refresh_rate, logger=None):
self.robot = robot
self.logger = logger
self.behavior_name = behavior_name
self.refresh_rate = refresh_rate
self.av_client = None
self._behavior_running_fut = None
try:
self.av_client = robot.ensure_client(AudioVisualClient.default_service_name)
except:
_LOGGER.warning("Could not initialize AV client, skipping AudioVisualHelper.")
self.av_thread = None
self.stop_event = threading.Event()
def __enter__(self):
self._behavior_running_fut = Future()
self._behavior_running_fut.set_running_or_notify_cancel()
if self.av_client:
self.av_thread = threading.Thread(target=self._run_behavior_thread, args=())
self.av_thread.start()
else:
self._behavior_running_fut.set_result(False)
return self._behavior_running_fut
def __exit__(self, exc_type, exc_value, tb):
if self.av_thread:
self.stop_event.set()
self.av_thread.join()
def _run_behavior_thread(self):
# Check if the robot has AV hardware
if not self.robot.get_cached_hardware_hardware_configuration().has_audio_visual_system:
self._behavior_running_fut.set_result(False)
return
def set_future_result(result):
if not self._behavior_running_fut.done():
self._behavior_running_fut.set_result(result)
def set_future_exception(exc):
if not self._behavior_running_fut.done():
self._behavior_running_fut.set_exception(exc)
# Run the AV behavior until the stop_event is triggered
while not self.stop_event.wait(self.refresh_rate):
try:
end_time_secs = now_sec() + self.refresh_rate + 0.10 # add 100ms margin
result = self.av_client.run_behavior(self.behavior_name, end_time_secs)
set_future_result(
result.run_result == audio_visual_pb2.RunBehaviorResponse.RESULT_BEHAVIOR_RUN)
except DoesNotExistError as exc:
set_future_exception(exc)
_LOGGER.exception(f'Audio Visual Behavior {self.behavior_name} does not exist.')
return # Since the behavior doesn't exist, we can stop trying to run it
except BehaviorExpiredError as exc:
set_future_exception(exc)
_LOGGER.warning('Behavior was expired when received by client.')
except bosdyn.client.PersistentRpcError as exc:
set_future_exception(exc)
_LOGGER.exception('Failed to run behavior. Quitting AudioVisualHelper.')
return # A persistent error means we can't talk to the AV service, we can stop.
except bosdyn.client.RpcError:
_LOGGER.exception('Failed to run behavior. Retrying.')
except bosdyn.client.Error as exc:
set_future_exception(exc)
_LOGGER.exception('Unknown exception caught, quitting AudioVisualHelper.')
return
try:
self.av_client.stop_behavior(self.behavior_name)
except InvalidClientError:
_LOGGER.warning('Failed to stop behavior, run by a different client.')