Skip to content

Commit 5e6f155

Browse files
committed
Add joint position streaming functionality to Arm component
1 parent 8311005 commit 5e6f155

File tree

8 files changed

+265
-63
lines changed

8 files changed

+265
-63
lines changed

src/viam/components/arm/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
from viam.proto.component.arm import JointPositions
33
from viam.resource.registry import Registry, ResourceRegistration
44

5-
from .arm import Arm
5+
from .arm import Arm, JointPositionStream
66
from .client import ArmClient
77
from .service import ArmRPCService
88

99
__all__ = [
1010
"Arm",
1111
"JointPositions",
12+
"JointPositionStream",
1213
"KinematicsFileFormat",
1314
"Pose",
1415
]

src/viam/components/arm/arm.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,29 @@
11
import abc
2+
from dataclasses import dataclass
3+
from datetime import datetime
24
from typing import Any, Dict, Final, Optional, Tuple
35

46
from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT
7+
from viam.streams import Stream
58

69
from ..component_base import ComponentBase
710
from . import JointPositions, KinematicsFileFormat, Pose
811

912

13+
@dataclass
14+
class JointPositionStream:
15+
"""A single frame from a joint position stream."""
16+
17+
positions: JointPositions
18+
"""Current joint positions."""
19+
20+
timestamp: datetime
21+
"""Timestamp when positions were captured from the arm hardware."""
22+
23+
sequence: int
24+
"""Sequential message number, used to detect dropped messages."""
25+
26+
1027
class Arm(ComponentBase):
1128
"""
1229
Arm represents a physical robot arm that exists in three-dimensional space.
@@ -221,3 +238,35 @@ async def get_kinematics(
221238
For more information, see `Arm component <https://docs.viam.com/dev/reference/apis/components/arm/#getkinematics>`_.
222239
"""
223240
...
241+
242+
@abc.abstractmethod
243+
async def stream_joint_positions(
244+
self,
245+
*,
246+
fps: Optional[int] = None,
247+
extra: Optional[Dict[str, Any]] = None,
248+
timeout: Optional[float] = None,
249+
**kwargs,
250+
) -> Stream[JointPositionStream]:
251+
"""
252+
Stream joint positions from the arm at the specified rate.
253+
254+
::
255+
256+
my_arm = Arm.from_robot(robot=machine, name="my_arm")
257+
258+
# Stream joint positions at 30 fps
259+
stream = await my_arm.stream_joint_positions(fps=30)
260+
async for position_frame in stream:
261+
print(f"Positions: {position_frame.positions.values}")
262+
print(f"Timestamp: {position_frame.timestamp}")
263+
264+
Args:
265+
fps: Target frames per second for the stream. If not specified, uses the arm's default rate.
266+
extra: Additional arguments to the method.
267+
timeout: Optional timeout in seconds.
268+
269+
Returns:
270+
Stream[JointPositionStream]: A stream of joint position frames containing positions, timestamps, and sequence numbers.
271+
"""
272+
...

src/viam/components/arm/client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
from datetime import datetime, timezone
12
from typing import Any, Dict, List, Mapping, Optional, Tuple
23

34
from grpclib.client import Channel
5+
from grpclib.client import Stream as ClientStream
46

57
from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry, GetKinematicsRequest, GetKinematicsResponse
68
from viam.proto.component.arm import (
@@ -15,11 +17,15 @@
1517
MoveToJointPositionsRequest,
1618
MoveToPositionRequest,
1719
StopRequest,
20+
StreamJointPositionsRequest,
21+
StreamJointPositionsResponse,
1822
)
1923
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
24+
from viam.streams import Stream, StreamWithIterator
2025
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict
2126

2227
from . import Arm, KinematicsFileFormat, Pose
28+
from .arm import JointPositionStream
2329

2430

2531
class ArmClient(Arm, ReconfigurableResourceRPCClientBase):
@@ -122,3 +128,34 @@ async def get_kinematics(
122128
async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]:
123129
md = kwargs.get("metadata", self.Metadata())
124130
return await get_geometries(self.client, self.name, extra, timeout, md)
131+
132+
async def stream_joint_positions(
133+
self,
134+
*,
135+
fps: Optional[int] = None,
136+
extra: Optional[Dict[str, Any]] = None,
137+
timeout: Optional[float] = None,
138+
**kwargs,
139+
) -> Stream[JointPositionStream]:
140+
request = StreamJointPositionsRequest(
141+
name=self.name,
142+
fps=fps,
143+
extra=dict_to_struct(extra),
144+
)
145+
146+
async def read():
147+
md = kwargs.get("metadata", self.Metadata()).proto
148+
stream: ClientStream[StreamJointPositionsRequest, StreamJointPositionsResponse]
149+
async with self.client.StreamJointPositions.open(metadata=md) as stream:
150+
await stream.send_message(request, end=True)
151+
async for response in stream:
152+
yield JointPositionStream(
153+
positions=response.positions,
154+
timestamp=datetime.fromtimestamp(
155+
response.timestamp.seconds + response.timestamp.nanos / 1e9,
156+
tz=timezone.utc,
157+
),
158+
sequence=response.sequence,
159+
)
160+
161+
return StreamWithIterator(read())

src/viam/components/arm/service.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
from google.protobuf.timestamp_pb2 import Timestamp
12
from grpclib.server import Stream
3+
from h2.exceptions import StreamClosedError
24

5+
from viam.logging import getLogger
36
from viam.proto.common import (
47
DoCommandRequest,
58
DoCommandResponse,
@@ -15,19 +18,24 @@
1518
GetJointPositionsResponse,
1619
IsMovingRequest,
1720
IsMovingResponse,
21+
JointPositions,
1822
MoveToJointPositionsRequest,
1923
MoveToJointPositionsResponse,
2024
MoveToPositionRequest,
2125
MoveToPositionResponse,
2226
StopRequest,
2327
StopResponse,
28+
StreamJointPositionsRequest,
29+
StreamJointPositionsResponse,
2430
UnimplementedArmServiceBase,
2531
)
2632
from viam.resource.rpc_service_base import ResourceRPCServiceBase
2733
from viam.utils import dict_to_struct, struct_to_dict
2834

2935
from .arm import Arm
3036

37+
LOGGER = getLogger(__name__)
38+
3139

3240
class ArmRPCService(UnimplementedArmServiceBase, ResourceRPCServiceBase[Arm]):
3341
"""
@@ -121,3 +129,31 @@ async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometries
121129
geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout)
122130
response = GetGeometriesResponse(geometries=geometries)
123131
await stream.send_message(response)
132+
133+
async def StreamJointPositions(
134+
self, stream: Stream[StreamJointPositionsRequest, StreamJointPositionsResponse]
135+
) -> None:
136+
request = await stream.recv_message()
137+
assert request is not None
138+
arm = self.get_resource(request.name)
139+
fps = request.fps if request.HasField("fps") else None
140+
position_stream = await arm.stream_joint_positions(
141+
fps=fps,
142+
extra=struct_to_dict(request.extra),
143+
metadata=stream.metadata,
144+
)
145+
async for frame in position_stream:
146+
try:
147+
timestamp = Timestamp()
148+
timestamp.FromDatetime(frame.timestamp)
149+
response = StreamJointPositionsResponse(
150+
positions=JointPositions(values=list(frame.positions.values)),
151+
timestamp=timestamp,
152+
sequence=frame.sequence,
153+
)
154+
await stream.send_message(response)
155+
except StreamClosedError:
156+
return
157+
except Exception as e:
158+
LOGGER.error(e)
159+
return

src/viam/gen/component/arm/v1/arm_grpc.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from .... import common
99
import google.api.annotations_pb2
1010
import google.protobuf.struct_pb2
11+
import google.protobuf.timestamp_pb2
1112
from .... import component
1213

1314
class ArmServiceBase(abc.ABC):
@@ -24,6 +25,10 @@ async def MoveToPosition(self, stream: 'grpclib.server.Stream[component.arm.v1.a
2425
async def GetJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse]') -> None:
2526
pass
2627

28+
@abc.abstractmethod
29+
async def StreamJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse]') -> None:
30+
pass
31+
2732
@abc.abstractmethod
2833
async def MoveToJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse]') -> None:
2934
pass
@@ -57,7 +62,7 @@ async def Get3DModels(self, stream: 'grpclib.server.Stream[common.v1.common_pb2.
5762
pass
5863

5964
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
60-
return {'/viam.component.arm.v1.ArmService/GetEndPosition': grpclib.const.Handler(self.GetEndPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetEndPositionRequest, component.arm.v1.arm_pb2.GetEndPositionResponse), '/viam.component.arm.v1.ArmService/MoveToPosition': grpclib.const.Handler(self.MoveToPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToPositionRequest, component.arm.v1.arm_pb2.MoveToPositionResponse), '/viam.component.arm.v1.ArmService/GetJointPositions': grpclib.const.Handler(self.GetJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveToJointPositions': grpclib.const.Handler(self.MoveToJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveThroughJointPositions': grpclib.const.Handler(self.MoveThroughJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveThroughJointPositionsRequest, component.arm.v1.arm_pb2.MoveThroughJointPositionsResponse), '/viam.component.arm.v1.ArmService/Stop': grpclib.const.Handler(self.Stop, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.StopRequest, component.arm.v1.arm_pb2.StopResponse), '/viam.component.arm.v1.ArmService/IsMoving': grpclib.const.Handler(self.IsMoving, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.IsMovingRequest, component.arm.v1.arm_pb2.IsMovingResponse), '/viam.component.arm.v1.ArmService/DoCommand': grpclib.const.Handler(self.DoCommand, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.DoCommandRequest, common.v1.common_pb2.DoCommandResponse), '/viam.component.arm.v1.ArmService/GetKinematics': grpclib.const.Handler(self.GetKinematics, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetKinematicsRequest, common.v1.common_pb2.GetKinematicsResponse), '/viam.component.arm.v1.ArmService/GetGeometries': grpclib.const.Handler(self.GetGeometries, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetGeometriesRequest, common.v1.common_pb2.GetGeometriesResponse), '/viam.component.arm.v1.ArmService/Get3DModels': grpclib.const.Handler(self.Get3DModels, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.Get3DModelsRequest, common.v1.common_pb2.Get3DModelsResponse)}
65+
return {'/viam.component.arm.v1.ArmService/GetEndPosition': grpclib.const.Handler(self.GetEndPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetEndPositionRequest, component.arm.v1.arm_pb2.GetEndPositionResponse), '/viam.component.arm.v1.ArmService/MoveToPosition': grpclib.const.Handler(self.MoveToPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToPositionRequest, component.arm.v1.arm_pb2.MoveToPositionResponse), '/viam.component.arm.v1.ArmService/GetJointPositions': grpclib.const.Handler(self.GetJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse), '/viam.component.arm.v1.ArmService/StreamJointPositions': grpclib.const.Handler(self.StreamJointPositions, grpclib.const.Cardinality.UNARY_STREAM, component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveToJointPositions': grpclib.const.Handler(self.MoveToJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveThroughJointPositions': grpclib.const.Handler(self.MoveThroughJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveThroughJointPositionsRequest, component.arm.v1.arm_pb2.MoveThroughJointPositionsResponse), '/viam.component.arm.v1.ArmService/Stop': grpclib.const.Handler(self.Stop, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.StopRequest, component.arm.v1.arm_pb2.StopResponse), '/viam.component.arm.v1.ArmService/IsMoving': grpclib.const.Handler(self.IsMoving, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.IsMovingRequest, component.arm.v1.arm_pb2.IsMovingResponse), '/viam.component.arm.v1.ArmService/DoCommand': grpclib.const.Handler(self.DoCommand, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.DoCommandRequest, common.v1.common_pb2.DoCommandResponse), '/viam.component.arm.v1.ArmService/GetKinematics': grpclib.const.Handler(self.GetKinematics, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetKinematicsRequest, common.v1.common_pb2.GetKinematicsResponse), '/viam.component.arm.v1.ArmService/GetGeometries': grpclib.const.Handler(self.GetGeometries, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetGeometriesRequest, common.v1.common_pb2.GetGeometriesResponse), '/viam.component.arm.v1.ArmService/Get3DModels': grpclib.const.Handler(self.Get3DModels, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.Get3DModelsRequest, common.v1.common_pb2.Get3DModelsResponse)}
6166

6267
class UnimplementedArmServiceBase(ArmServiceBase):
6368

@@ -70,6 +75,9 @@ async def MoveToPosition(self, stream: 'grpclib.server.Stream[component.arm.v1.a
7075
async def GetJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse]') -> None:
7176
raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
7277

78+
async def StreamJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse]') -> None:
79+
raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
80+
7381
async def MoveToJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse]') -> None:
7482
raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
7583

@@ -100,6 +108,7 @@ def __init__(self, channel: grpclib.client.Channel) -> None:
100108
self.GetEndPosition = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/GetEndPosition', component.arm.v1.arm_pb2.GetEndPositionRequest, component.arm.v1.arm_pb2.GetEndPositionResponse)
101109
self.MoveToPosition = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/MoveToPosition', component.arm.v1.arm_pb2.MoveToPositionRequest, component.arm.v1.arm_pb2.MoveToPositionResponse)
102110
self.GetJointPositions = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/GetJointPositions', component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse)
111+
self.StreamJointPositions = grpclib.client.UnaryStreamMethod(channel, '/viam.component.arm.v1.ArmService/StreamJointPositions', component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse)
103112
self.MoveToJointPositions = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/MoveToJointPositions', component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse)
104113
self.MoveThroughJointPositions = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/MoveThroughJointPositions', component.arm.v1.arm_pb2.MoveThroughJointPositionsRequest, component.arm.v1.arm_pb2.MoveThroughJointPositionsResponse)
105114
self.Stop = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/Stop', component.arm.v1.arm_pb2.StopRequest, component.arm.v1.arm_pb2.StopResponse)

0 commit comments

Comments
 (0)