Skip to content

Commit df98986

Browse files
authored
[RSDK-199] Add timeouts to components (#169)
1 parent 7e8e982 commit df98986

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1191
-473
lines changed

examples/server/v1/viam.wav

-5.26 MB
Binary file not shown.

src/viam/components/arm/arm.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ class Arm(ComponentBase):
1818
"""
1919

2020
@abc.abstractmethod
21-
async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> Pose:
21+
async def get_end_position(
22+
self,
23+
*,
24+
extra: Optional[Dict[str, Any]] = None,
25+
timeout: Optional[float] = None,
26+
**kwargs,
27+
) -> Pose:
2228
"""
2329
Get the current position of the end of the arm expressed as a Pose.
2430
@@ -28,7 +34,13 @@ async def get_end_position(self, extra: Optional[Dict[str, Any]] = None, **kwarg
2834

2935
@abc.abstractmethod
3036
async def move_to_position(
31-
self, pose: Pose, world_state: Optional[WorldState] = None, extra: Optional[Dict[str, Any]] = None, **kwargs
37+
self,
38+
pose: Pose,
39+
world_state: Optional[WorldState] = None,
40+
*,
41+
extra: Optional[Dict[str, Any]] = None,
42+
timeout: Optional[float] = None,
43+
**kwargs,
3244
):
3345
"""
3446
Move the end of the arm to the Pose specified in `pose`.
@@ -43,7 +55,14 @@ async def move_to_position(
4355
...
4456

4557
@abc.abstractmethod
46-
async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None, **kwargs):
58+
async def move_to_joint_positions(
59+
self,
60+
positions: JointPositions,
61+
*,
62+
extra: Optional[Dict[str, Any]] = None,
63+
timeout: Optional[float] = None,
64+
**kwargs,
65+
):
4766
"""
4867
Move each joint on the arm to the corresponding angle specified in `positions`.
4968
@@ -54,7 +73,13 @@ async def move_to_joint_positions(self, positions: JointPositions, extra: Option
5473
...
5574

5675
@abc.abstractmethod
57-
async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kwargs) -> JointPositions:
76+
async def get_joint_positions(
77+
self,
78+
*,
79+
extra: Optional[Dict[str, Any]] = None,
80+
timeout: Optional[float] = None,
81+
**kwargs,
82+
) -> JointPositions:
5883
"""
5984
Get the JointPositions representing the current position of the arm.
6085
@@ -64,7 +89,13 @@ async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None, **kw
6489
...
6590

6691
@abc.abstractmethod
67-
async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
92+
async def stop(
93+
self,
94+
*,
95+
extra: Optional[Dict[str, Any]] = None,
96+
timeout: Optional[float] = None,
97+
**kwargs,
98+
):
6899
"""
69100
Stop all motion of the arm. It is assumed that the arm stops immediately.
70101
"""

src/viam/components/arm/client.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,42 +31,70 @@ def __init__(self, name: str, channel: Channel):
3131
self.client = ArmServiceStub(channel)
3232
super().__init__(name)
3333

34-
async def get_end_position(self, extra: Optional[Dict[str, Any]] = None) -> Pose:
34+
async def get_end_position(
35+
self,
36+
*,
37+
extra: Optional[Dict[str, Any]] = None,
38+
timeout: Optional[float] = None,
39+
) -> Pose:
3540
if extra is None:
3641
extra = {}
3742
request = GetEndPositionRequest(name=self.name, extra=dict_to_struct(extra))
38-
response: GetEndPositionResponse = await self.client.GetEndPosition(request)
43+
response: GetEndPositionResponse = await self.client.GetEndPosition(request, timeout=timeout)
3944
return response.pose
4045

4146
async def move_to_position(
4247
self,
4348
pose: Pose,
4449
world_state: Optional[WorldState] = None,
50+
*,
4551
extra: Optional[Dict[str, Any]] = None,
52+
timeout: Optional[float] = None,
4653
):
4754
if extra is None:
4855
extra = {}
4956
request = MoveToPositionRequest(name=self.name, to=pose, world_state=world_state, extra=dict_to_struct(extra))
50-
await self.client.MoveToPosition(request)
57+
await self.client.MoveToPosition(request, timeout=timeout)
5158

52-
async def get_joint_positions(self, extra: Optional[Dict[str, Any]] = None) -> JointPositions:
59+
async def get_joint_positions(
60+
self,
61+
*,
62+
extra: Optional[Dict[str, Any]] = None,
63+
timeout: Optional[float] = None,
64+
) -> JointPositions:
5365
if extra is None:
5466
extra = {}
5567
request = GetJointPositionsRequest(name=self.name, extra=dict_to_struct(extra))
56-
response: GetJointPositionsResponse = await self.client.GetJointPositions(request)
68+
response: GetJointPositionsResponse = await self.client.GetJointPositions(request, timeout=timeout)
5769
return response.positions
5870

59-
async def move_to_joint_positions(self, positions: JointPositions, extra: Optional[Dict[str, Any]] = None):
71+
async def move_to_joint_positions(
72+
self,
73+
positions: JointPositions,
74+
*,
75+
extra: Optional[Dict[str, Any]] = None,
76+
timeout: Optional[float] = None,
77+
):
6078
if extra is None:
6179
extra = {}
6280
request = MoveToJointPositionsRequest(name=self.name, positions=positions, extra=dict_to_struct(extra))
63-
await self.client.MoveToJointPositions(request)
81+
await self.client.MoveToJointPositions(request, timeout=timeout)
6482

65-
async def stop(self, extra: Optional[Dict[str, Any]] = None):
83+
async def stop(
84+
self,
85+
*,
86+
extra: Optional[Dict[str, Any]] = None,
87+
timeout: Optional[float] = None,
88+
):
6689
if extra is None:
6790
extra = {}
6891
request = StopRequest(name=self.name, extra=dict_to_struct(extra))
69-
await self.client.Stop(request)
92+
await self.client.Stop(request, timeout=timeout)
7093

71-
async def do_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
72-
return await do_command(self.channel, self.name, command)
94+
async def do_command(
95+
self,
96+
command: Dict[str, Any],
97+
*,
98+
timeout: Optional[float] = None,
99+
) -> Dict[str, Any]:
100+
return await do_command(self.channel, self.name, command, timeout=timeout)

src/viam/components/arm/service.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ async def GetEndPosition(self, stream: Stream[GetEndPositionRequest, GetEndPosit
3434
arm = self.get_component(name)
3535
except ComponentNotFoundError as e:
3636
raise e.grpc_error
37-
position = await arm.get_end_position(extra=struct_to_dict(request.extra))
37+
timeout = stream.deadline.time_remaining() if stream.deadline else None
38+
position = await arm.get_end_position(extra=struct_to_dict(request.extra), timeout=timeout)
3839
response = GetEndPositionResponse(pose=position)
3940
await stream.send_message(response)
4041

@@ -46,7 +47,8 @@ async def MoveToPosition(self, stream: Stream[MoveToPositionRequest, MoveToPosit
4647
arm = self.get_component(name)
4748
except ComponentNotFoundError as e:
4849
raise e.grpc_error
49-
await arm.move_to_position(request.to, request.world_state, extra=struct_to_dict(request.extra))
50+
timeout = stream.deadline.time_remaining() if stream.deadline else None
51+
await arm.move_to_position(request.to, request.world_state, extra=struct_to_dict(request.extra), timeout=timeout)
5052
response = MoveToPositionResponse()
5153
await stream.send_message(response)
5254

@@ -58,7 +60,8 @@ async def GetJointPositions(self, stream: Stream[GetJointPositionsRequest, GetJo
5860
arm = self.get_component(name)
5961
except ComponentNotFoundError as e:
6062
raise e.grpc_error
61-
positions = await arm.get_joint_positions(extra=struct_to_dict(request.extra))
63+
timeout = stream.deadline.time_remaining() if stream.deadline else None
64+
positions = await arm.get_joint_positions(extra=struct_to_dict(request.extra), timeout=timeout)
6265
response = GetJointPositionsResponse(positions=positions)
6366
await stream.send_message(response)
6467

@@ -70,7 +73,8 @@ async def MoveToJointPositions(self, stream: Stream[MoveToJointPositionsRequest,
7073
arm = self.get_component(name)
7174
except ComponentNotFoundError as e:
7275
raise e.grpc_error
73-
await arm.move_to_joint_positions(request.positions, extra=struct_to_dict(request.extra))
76+
timeout = stream.deadline.time_remaining() if stream.deadline else None
77+
await arm.move_to_joint_positions(request.positions, extra=struct_to_dict(request.extra), timeout=timeout)
7478
response = MoveToJointPositionsResponse()
7579
await stream.send_message(response)
7680

@@ -82,6 +86,7 @@ async def Stop(self, stream: Stream[StopRequest, StopResponse]) -> None:
8286
arm = self.get_component(name)
8387
except ComponentNotFoundError as e:
8488
raise e.grpc_error
85-
await arm.stop(extra=struct_to_dict(request.extra))
89+
timeout = stream.deadline.time_remaining() if stream.deadline else None
90+
await arm.stop(extra=struct_to_dict(request.extra), timeout=timeout)
8691
response = StopResponse()
8792
await stream.send_message(response)

src/viam/components/audio_input/audio_input.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22
from dataclasses import dataclass
33
from datetime import timedelta
4+
from typing import Optional
45

56
from google.protobuf.duration_pb2 import Duration
67
from typing_extensions import Self
@@ -50,7 +51,7 @@ def from_proto(cls, proto: PropertiesResponse) -> Self:
5051
)
5152

5253
@abc.abstractmethod
53-
async def stream(self) -> AudioStream:
54+
async def stream(self, *, timeout: Optional[float] = None, **kwargs) -> AudioStream:
5455
"""Stream audio samples from the audio input of the underlying robot
5556
5657
Returns:
@@ -59,7 +60,7 @@ async def stream(self) -> AudioStream:
5960
...
6061

6162
@abc.abstractmethod
62-
async def get_properties(self) -> Properties:
63+
async def get_properties(self, *, timeout: Optional[float] = None, **kwargs) -> Properties:
6364
"""Get the properties of the audio input of the underlying robot
6465
6566
Returns:

src/viam/components/audio_input/client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, AsyncIterator, Dict, Union
1+
from typing import Any, AsyncIterator, Dict, Optional, Union
22

33
from grpclib.client import Channel
44

@@ -27,9 +27,9 @@ def __init__(self, name: str, channel: Channel):
2727
self.client = AudioInputServiceStub(channel)
2828
super().__init__(name)
2929

30-
async def stream(self) -> MediaStream[Audio]:
30+
async def stream(self, *, timeout: Optional[float] = None) -> MediaStream[Audio]:
3131
async def read() -> AsyncIterator[Audio]:
32-
async with self.client.Chunks.open() as chunks_stream:
32+
async with self.client.Chunks.open(timeout=timeout) as chunks_stream:
3333
await chunks_stream.send_message(
3434
ChunksRequest(name=self.name, sample_format=SampleFormat.SAMPLE_FORMAT_FLOAT32_INTERLEAVED), end=True
3535
)
@@ -47,10 +47,10 @@ async def read() -> AsyncIterator[Audio]:
4747

4848
return MediaStreamWithIterator(read())
4949

50-
async def get_properties(self) -> AudioInput.Properties:
50+
async def get_properties(self, *, timeout: Optional[float] = None) -> AudioInput.Properties:
5151
request = PropertiesRequest(name=self.name)
52-
response: PropertiesResponse = await self.client.Properties(request)
52+
response: PropertiesResponse = await self.client.Properties(request, timeout=timeout)
5353
return AudioInput.Properties.from_proto(response)
5454

55-
async def do_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
56-
return await do_command(self.channel, self.name, command)
55+
async def do_command(self, command: Dict[str, Any], *, timeout: Optional[float] = None) -> Dict[str, Any]:
56+
return await do_command(self.channel, self.name, command, timeout=timeout)

src/viam/components/audio_input/service.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ async def Chunks(self, stream: Stream[ChunksRequest, ChunksResponse]) -> None:
3636
except ComponentNotFoundError as e:
3737
raise e.grpc_error
3838

39-
audio_stream = await audio_input.stream()
39+
timeout = stream.deadline.time_remaining() if stream.deadline else None
40+
audio_stream = await audio_input.stream(timeout=timeout)
4041
first_chunk = await audio_stream.__anext__()
4142
await stream.send_message(ChunksResponse(info=first_chunk.info))
4243
await stream.send_message(ChunksResponse(chunk=first_chunk.chunk))
@@ -51,7 +52,8 @@ async def Properties(self, stream: Stream[PropertiesRequest, PropertiesResponse]
5152
audio_input = self.get_component(request.name)
5253
except ComponentNotFoundError as e:
5354
raise e.grpc_error
54-
response = (await audio_input.get_properties()).proto
55+
timeout = stream.deadline.time_remaining() if stream.deadline else None
56+
response = (await audio_input.get_properties(timeout=timeout)).proto
5557
await stream.send_message(response)
5658

5759
async def Record(self, stream: Stream[RecordRequest, HttpBody]) -> None:

src/viam/components/base/base.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@ class Base(ComponentBase):
1717
"""
1818

1919
@abc.abstractmethod
20-
async def move_straight(self, distance: int, velocity: float, extra: Optional[Dict[str, Any]] = None, **kwargs):
20+
async def move_straight(
21+
self,
22+
distance: int,
23+
velocity: float,
24+
*,
25+
extra: Optional[Dict[str, Any]] = None,
26+
timeout: Optional[float] = None,
27+
**kwargs,
28+
):
2129
"""
2230
Move the base in a straight line the given `distance`, expressed in millimeters,
2331
at the given `velocity`, expressed in millimeters per second.
@@ -33,7 +41,15 @@ async def move_straight(self, distance: int, velocity: float, extra: Optional[Di
3341
...
3442

3543
@abc.abstractmethod
36-
async def spin(self, angle: float, velocity: float, extra: Optional[Dict[str, Any]] = None, **kwargs):
44+
async def spin(
45+
self,
46+
angle: float,
47+
velocity: float,
48+
*,
49+
extra: Optional[Dict[str, Any]] = None,
50+
timeout: Optional[float] = None,
51+
**kwargs,
52+
):
3753
"""
3854
Spin the base in place `angle` degrees, at the given angular `velocity`,
3955
expressed in degrees per second.
@@ -49,7 +65,15 @@ async def spin(self, angle: float, velocity: float, extra: Optional[Dict[str, An
4965
...
5066

5167
@abc.abstractmethod
52-
async def set_power(self, linear: Vector3, angular: Vector3, extra: Optional[Dict[str, Any]] = None, **kwargs):
68+
async def set_power(
69+
self,
70+
linear: Vector3,
71+
angular: Vector3,
72+
*,
73+
extra: Optional[Dict[str, Any]] = None,
74+
timeout: Optional[float] = None,
75+
**kwargs,
76+
):
5377
"""Set the linear and angular velocity of the Base
5478
When `linear` is 0, the the base will spin.
5579
When `angular` is 0, the the base will move in a straight line.
@@ -66,7 +90,15 @@ async def set_power(self, linear: Vector3, angular: Vector3, extra: Optional[Dic
6690
...
6791

6892
@abc.abstractmethod
69-
async def set_velocity(self, linear: Vector3, angular: Vector3, extra: Optional[Dict[str, Any]] = None, **kwargs):
93+
async def set_velocity(
94+
self,
95+
linear: Vector3,
96+
angular: Vector3,
97+
*,
98+
extra: Optional[Dict[str, Any]] = None,
99+
timeout: Optional[float] = None,
100+
**kwargs,
101+
):
70102
"""
71103
Set the linear and angular velocities of the base.
72104
@@ -77,7 +109,13 @@ async def set_velocity(self, linear: Vector3, angular: Vector3, extra: Optional[
77109
"""
78110

79111
@abc.abstractmethod
80-
async def stop(self, extra: Optional[Dict[str, Any]] = None, **kwargs):
112+
async def stop(
113+
self,
114+
*,
115+
extra: Optional[Dict[str, Any]] = None,
116+
timeout: Optional[float] = None,
117+
**kwargs,
118+
):
81119
"""
82120
Stop the base.
83121
"""

0 commit comments

Comments
 (0)