Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/viam/components/arm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from viam.proto.component.arm import JointPositions
from viam.resource.registry import Registry, ResourceRegistration

from .arm import Arm
from .arm import Arm, JointPositionStream
from .client import ArmClient
from .service import ArmRPCService

__all__ = [
"Arm",
"JointPositions",
"JointPositionStream",
"KinematicsFileFormat",
"Pose",
]
Expand Down
49 changes: 49 additions & 0 deletions src/viam/components/arm/arm.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
import abc
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Final, Optional, Tuple

from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT
from viam.streams import Stream

from ..component_base import ComponentBase
from . import JointPositions, KinematicsFileFormat, Pose


@dataclass
class JointPositionStream:
"""A single frame from a joint position stream."""

positions: JointPositions
"""Current joint positions."""

timestamp: datetime
"""Timestamp when positions were captured from the arm hardware."""

sequence: int
"""Sequential message number, used to detect dropped messages."""


class Arm(ComponentBase):
"""
Arm represents a physical robot arm that exists in three-dimensional space.
Expand Down Expand Up @@ -221,3 +238,35 @@ async def get_kinematics(
For more information, see `Arm component <https://docs.viam.com/dev/reference/apis/components/arm/#getkinematics>`_.
"""
...

@abc.abstractmethod
async def stream_joint_positions(
self,
*,
fps: Optional[int] = None,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Stream[JointPositionStream]:
"""
Stream joint positions from the arm at the specified rate.

::

my_arm = Arm.from_robot(robot=machine, name="my_arm")

# Stream joint positions at 30 fps
stream = await my_arm.stream_joint_positions(fps=30)
async for position_frame in stream:
print(f"Positions: {position_frame.positions.values}")
print(f"Timestamp: {position_frame.timestamp}")

Args:
fps: Target frames per second for the stream. If not specified, uses the arm's default rate.
extra: Additional arguments to the method.
timeout: Optional timeout in seconds.

Returns:
Stream[JointPositionStream]: A stream of joint position frames containing positions, timestamps, and sequence numbers.
"""
...
37 changes: 37 additions & 0 deletions src/viam/components/arm/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime, timezone
from typing import Any, Dict, List, Mapping, Optional, Tuple

from grpclib.client import Channel
from grpclib.client import Stream as ClientStream

from viam.proto.common import DoCommandRequest, DoCommandResponse, Geometry, GetKinematicsRequest, GetKinematicsResponse
from viam.proto.component.arm import (
Expand All @@ -15,11 +17,15 @@
MoveToJointPositionsRequest,
MoveToPositionRequest,
StopRequest,
StreamJointPositionsRequest,
StreamJointPositionsResponse,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.streams import Stream, StreamWithIterator
from viam.utils import ValueTypes, dict_to_struct, get_geometries, struct_to_dict

from . import Arm, KinematicsFileFormat, Pose
from .arm import JointPositionStream


class ArmClient(Arm, ReconfigurableResourceRPCClientBase):
Expand Down Expand Up @@ -122,3 +128,34 @@ async def get_kinematics(
async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> List[Geometry]:
md = kwargs.get("metadata", self.Metadata())
return await get_geometries(self.client, self.name, extra, timeout, md)

async def stream_joint_positions(
self,
*,
fps: Optional[int] = None,
extra: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> Stream[JointPositionStream]:
request = StreamJointPositionsRequest(
name=self.name,
fps=fps,
extra=dict_to_struct(extra),
)

async def read():
md = kwargs.get("metadata", self.Metadata()).proto
stream: ClientStream[StreamJointPositionsRequest, StreamJointPositionsResponse]
async with self.client.StreamJointPositions.open(metadata=md) as stream:
await stream.send_message(request, end=True)
async for response in stream:
yield JointPositionStream(
positions=response.positions,
timestamp=datetime.fromtimestamp(
response.timestamp.seconds + response.timestamp.nanos / 1e9,
tz=timezone.utc,
),
sequence=response.sequence,
)

return StreamWithIterator(read())
36 changes: 36 additions & 0 deletions src/viam/components/arm/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from google.protobuf.timestamp_pb2 import Timestamp
from grpclib.server import Stream
from h2.exceptions import StreamClosedError

from viam.logging import getLogger
from viam.proto.common import (
DoCommandRequest,
DoCommandResponse,
Expand All @@ -15,19 +18,24 @@
GetJointPositionsResponse,
IsMovingRequest,
IsMovingResponse,
JointPositions,
MoveToJointPositionsRequest,
MoveToJointPositionsResponse,
MoveToPositionRequest,
MoveToPositionResponse,
StopRequest,
StopResponse,
StreamJointPositionsRequest,
StreamJointPositionsResponse,
UnimplementedArmServiceBase,
)
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict

from .arm import Arm

LOGGER = getLogger(__name__)


class ArmRPCService(UnimplementedArmServiceBase, ResourceRPCServiceBase[Arm]):
"""
Expand Down Expand Up @@ -121,3 +129,31 @@ async def GetGeometries(self, stream: Stream[GetGeometriesRequest, GetGeometries
geometries = await arm.get_geometries(extra=struct_to_dict(request.extra), timeout=timeout)
response = GetGeometriesResponse(geometries=geometries)
await stream.send_message(response)

async def StreamJointPositions(
self, stream: Stream[StreamJointPositionsRequest, StreamJointPositionsResponse]
) -> None:
request = await stream.recv_message()
assert request is not None
arm = self.get_resource(request.name)
fps = request.fps if request.HasField("fps") else None
position_stream = await arm.stream_joint_positions(
fps=fps,
extra=struct_to_dict(request.extra),
metadata=stream.metadata,
)
async for frame in position_stream:
try:
timestamp = Timestamp()
timestamp.FromDatetime(frame.timestamp)
response = StreamJointPositionsResponse(
positions=JointPositions(values=list(frame.positions.values)),
timestamp=timestamp,
sequence=frame.sequence,
)
await stream.send_message(response)
except StreamClosedError:
return
except Exception as e:
LOGGER.error(e)
return
11 changes: 10 additions & 1 deletion src/viam/gen/component/arm/v1/arm_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .... import common
import google.api.annotations_pb2
import google.protobuf.struct_pb2
import google.protobuf.timestamp_pb2
from .... import component

class ArmServiceBase(abc.ABC):
Expand All @@ -24,6 +25,10 @@ async def MoveToPosition(self, stream: 'grpclib.server.Stream[component.arm.v1.a
async def GetJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse]') -> None:
pass

@abc.abstractmethod
async def StreamJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse]') -> None:
pass

@abc.abstractmethod
async def MoveToJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse]') -> None:
pass
Expand Down Expand Up @@ -57,7 +62,7 @@ async def Get3DModels(self, stream: 'grpclib.server.Stream[common.v1.common_pb2.
pass

def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {'/viam.component.arm.v1.ArmService/GetEndPosition': grpclib.const.Handler(self.GetEndPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetEndPositionRequest, component.arm.v1.arm_pb2.GetEndPositionResponse), '/viam.component.arm.v1.ArmService/MoveToPosition': grpclib.const.Handler(self.MoveToPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToPositionRequest, component.arm.v1.arm_pb2.MoveToPositionResponse), '/viam.component.arm.v1.ArmService/GetJointPositions': grpclib.const.Handler(self.GetJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveToJointPositions': grpclib.const.Handler(self.MoveToJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveThroughJointPositions': grpclib.const.Handler(self.MoveThroughJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveThroughJointPositionsRequest, component.arm.v1.arm_pb2.MoveThroughJointPositionsResponse), '/viam.component.arm.v1.ArmService/Stop': grpclib.const.Handler(self.Stop, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.StopRequest, component.arm.v1.arm_pb2.StopResponse), '/viam.component.arm.v1.ArmService/IsMoving': grpclib.const.Handler(self.IsMoving, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.IsMovingRequest, component.arm.v1.arm_pb2.IsMovingResponse), '/viam.component.arm.v1.ArmService/DoCommand': grpclib.const.Handler(self.DoCommand, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.DoCommandRequest, common.v1.common_pb2.DoCommandResponse), '/viam.component.arm.v1.ArmService/GetKinematics': grpclib.const.Handler(self.GetKinematics, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetKinematicsRequest, common.v1.common_pb2.GetKinematicsResponse), '/viam.component.arm.v1.ArmService/GetGeometries': grpclib.const.Handler(self.GetGeometries, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetGeometriesRequest, common.v1.common_pb2.GetGeometriesResponse), '/viam.component.arm.v1.ArmService/Get3DModels': grpclib.const.Handler(self.Get3DModels, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.Get3DModelsRequest, common.v1.common_pb2.Get3DModelsResponse)}
return {'/viam.component.arm.v1.ArmService/GetEndPosition': grpclib.const.Handler(self.GetEndPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetEndPositionRequest, component.arm.v1.arm_pb2.GetEndPositionResponse), '/viam.component.arm.v1.ArmService/MoveToPosition': grpclib.const.Handler(self.MoveToPosition, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToPositionRequest, component.arm.v1.arm_pb2.MoveToPositionResponse), '/viam.component.arm.v1.ArmService/GetJointPositions': grpclib.const.Handler(self.GetJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse), '/viam.component.arm.v1.ArmService/StreamJointPositions': grpclib.const.Handler(self.StreamJointPositions, grpclib.const.Cardinality.UNARY_STREAM, component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveToJointPositions': grpclib.const.Handler(self.MoveToJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse), '/viam.component.arm.v1.ArmService/MoveThroughJointPositions': grpclib.const.Handler(self.MoveThroughJointPositions, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.MoveThroughJointPositionsRequest, component.arm.v1.arm_pb2.MoveThroughJointPositionsResponse), '/viam.component.arm.v1.ArmService/Stop': grpclib.const.Handler(self.Stop, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.StopRequest, component.arm.v1.arm_pb2.StopResponse), '/viam.component.arm.v1.ArmService/IsMoving': grpclib.const.Handler(self.IsMoving, grpclib.const.Cardinality.UNARY_UNARY, component.arm.v1.arm_pb2.IsMovingRequest, component.arm.v1.arm_pb2.IsMovingResponse), '/viam.component.arm.v1.ArmService/DoCommand': grpclib.const.Handler(self.DoCommand, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.DoCommandRequest, common.v1.common_pb2.DoCommandResponse), '/viam.component.arm.v1.ArmService/GetKinematics': grpclib.const.Handler(self.GetKinematics, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetKinematicsRequest, common.v1.common_pb2.GetKinematicsResponse), '/viam.component.arm.v1.ArmService/GetGeometries': grpclib.const.Handler(self.GetGeometries, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.GetGeometriesRequest, common.v1.common_pb2.GetGeometriesResponse), '/viam.component.arm.v1.ArmService/Get3DModels': grpclib.const.Handler(self.Get3DModels, grpclib.const.Cardinality.UNARY_UNARY, common.v1.common_pb2.Get3DModelsRequest, common.v1.common_pb2.Get3DModelsResponse)}

class UnimplementedArmServiceBase(ArmServiceBase):

Expand All @@ -70,6 +75,9 @@ async def MoveToPosition(self, stream: 'grpclib.server.Stream[component.arm.v1.a
async def GetJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse]') -> None:
raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

async def StreamJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse]') -> None:
raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

async def MoveToJointPositions(self, stream: 'grpclib.server.Stream[component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse]') -> None:
raise grpclib.exceptions.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

Expand Down Expand Up @@ -100,6 +108,7 @@ def __init__(self, channel: grpclib.client.Channel) -> None:
self.GetEndPosition = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/GetEndPosition', component.arm.v1.arm_pb2.GetEndPositionRequest, component.arm.v1.arm_pb2.GetEndPositionResponse)
self.MoveToPosition = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/MoveToPosition', component.arm.v1.arm_pb2.MoveToPositionRequest, component.arm.v1.arm_pb2.MoveToPositionResponse)
self.GetJointPositions = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/GetJointPositions', component.arm.v1.arm_pb2.GetJointPositionsRequest, component.arm.v1.arm_pb2.GetJointPositionsResponse)
self.StreamJointPositions = grpclib.client.UnaryStreamMethod(channel, '/viam.component.arm.v1.ArmService/StreamJointPositions', component.arm.v1.arm_pb2.StreamJointPositionsRequest, component.arm.v1.arm_pb2.StreamJointPositionsResponse)
self.MoveToJointPositions = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/MoveToJointPositions', component.arm.v1.arm_pb2.MoveToJointPositionsRequest, component.arm.v1.arm_pb2.MoveToJointPositionsResponse)
self.MoveThroughJointPositions = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/MoveThroughJointPositions', component.arm.v1.arm_pb2.MoveThroughJointPositionsRequest, component.arm.v1.arm_pb2.MoveThroughJointPositionsResponse)
self.Stop = grpclib.client.UnaryUnaryMethod(channel, '/viam.component.arm.v1.ArmService/Stop', component.arm.v1.arm_pb2.StopRequest, component.arm.v1.arm_pb2.StopResponse)
Expand Down
Loading
Loading