Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/apis.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
"func": "get_position",
"packagePath": "viam.components"
},
"switch": {
"func": "get_position",
"packagePath": "viam.components"
},
"arm": {
"func": "get_end_position",
"packagePath": "viam.components"
Expand Down
14 changes: 14 additions & 0 deletions src/viam/components/switch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import viam.gen.component.switch.v1.switch_pb2
from viam.resource.registry import Registry, ResourceRegistration

from .client import SwitchClient
from .service import SwitchRPCService
from .switch import Switch

__all__ = ["Switch"]

Registry.register_api(
ResourceRegistration(
Switch, SwitchRPCService, lambda name, channel: SwitchClient(name, channel)
)
)
83 changes: 83 additions & 0 deletions src/viam/components/switch/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Any, Mapping, Optional

from grpclib.client import Channel

from viam.proto.common import (
DoCommandRequest,
DoCommandResponse,
)
from viam.proto.component.switch import SwitchServiceStub
from viam.gen.component.switch.v1.switch_pb2 import (
GetPositionRequest,
GetPositionResponse,
SetPositionRequest,
GetNumberOfPositionsRequest,
GetNumberOfPositionsResponse,
)
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase
from viam.utils import (
ValueTypes,
dict_to_struct,
struct_to_dict,
)

from .switch import Switch


class SwitchClient(Switch, ReconfigurableResourceRPCClientBase):
"""
gRPC client for Switch component
"""

def __init__(self, name: str, channel: Channel):
self.channel = channel
self.client = SwitchServiceStub(channel)
super().__init__(name)

async def get_position(
self,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> int:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetPositionRequest(name=self.name, extra=dict_to_struct(extra))
response: GetPositionResponse = await self.client.GetPosition(request, timeout=timeout, metadata=md)
return response.position

async def set_position(
self,
position: int,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> None:
md = kwargs.get("metadata", self.Metadata()).proto
request = SetPositionRequest(name=self.name, position=position, extra=dict_to_struct(extra))
await self.client.SetPosition(request, timeout=timeout, metadata=md)

async def get_number_of_positions(
self,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> int:
md = kwargs.get("metadata", self.Metadata()).proto
request = GetNumberOfPositionsRequest(name=self.name, extra=dict_to_struct(extra))
response: GetNumberOfPositionsResponse = await self.client.GetNumberOfPositions(request, timeout=timeout, metadata=md)
return response.number_of_positions

async def do_command(
self,
command: Mapping[str, ValueTypes],
*,
timeout: Optional[float] = None,
**kwargs,
) -> Mapping[str, ValueTypes]:
md = kwargs.get("metadata", self.Metadata()).proto
request = DoCommandRequest(name=self.name, command=dict_to_struct(command))
response: DoCommandResponse = await self.client.DoCommand(request, timeout=timeout, metadata=md)
return struct_to_dict(response.result)
72 changes: 72 additions & 0 deletions src/viam/components/switch/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from grpclib.server import Stream

from viam.proto.common import (
DoCommandRequest,
DoCommandResponse,
)
from viam.gen.component.switch.v1.switch_pb2 import (
GetPositionRequest,
GetPositionResponse,
SetPositionRequest,
SetPositionResponse,
GetNumberOfPositionsRequest,
GetNumberOfPositionsResponse,
)
from viam.proto.component.switch import SwitchServiceBase
from viam.resource.rpc_service_base import ResourceRPCServiceBase
from viam.utils import dict_to_struct, struct_to_dict

from .switch import Switch


class SwitchRPCService(SwitchServiceBase, ResourceRPCServiceBase[Switch]):
"""
gRPC Service for a generic Switch
"""

RESOURCE_TYPE = Switch

async def GetPosition(self, stream: Stream[GetPositionRequest, GetPositionResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
switch = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
position = await switch.get_position(extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
response = GetPositionResponse(position=position)
await stream.send_message(response)

async def SetPosition(self, stream: Stream[SetPositionRequest, SetPositionResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
switch = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
await switch.set_position(position=request.position, extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata)
await stream.send_message(SetPositionResponse())

async def GetNumberOfPositions(self, stream: Stream[GetNumberOfPositionsRequest, GetNumberOfPositionsResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
switch = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
number_of_positions = await switch.get_number_of_positions(
extra=struct_to_dict(request.extra), timeout=timeout, metadata=stream.metadata
)
response = GetNumberOfPositionsResponse(number_of_positions=number_of_positions)
await stream.send_message(response)

async def DoCommand(self, stream: Stream[DoCommandRequest, DoCommandResponse]) -> None:
request = await stream.recv_message()
assert request is not None
name = request.name
switch = self.get_resource(name)
timeout = stream.deadline.time_remaining() if stream.deadline else None
result = await switch.do_command(
command=struct_to_dict(request.command),
timeout=timeout,
metadata=stream.metadata,
)
response = DoCommandResponse(result=dict_to_struct(result))
await stream.send_message(response)
95 changes: 95 additions & 0 deletions src/viam/components/switch/switch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import abc
from typing import Any, Final, Mapping, Optional

from viam.resource.types import API, RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT
from viam.components.component_base import ComponentBase


class Switch(ComponentBase):
"""
Switch represents a device with two or more finite states (or positions) than can be set and retrieved.
This acts as an abstract base class for any drivers representing specific
switch implementations. This cannot be used on its own. If the ``__init__()`` function is
overridden, it must call the ``super().__init__()`` function.
::
from viam.components.switch import Switch
For more information, see `Switch component <https://docs.viam.com/dev/reference/apis/components/switch/>`_.
"""

API: Final = API( # pyright: ignore [reportIncompatibleVariableOverride]
RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_COMPONENT, "switch"
)

@abc.abstractmethod
async def get_position(self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
"""
Get the current position of the switch
::
my_switch = Switch.from_robot(robot=machine, name="my_switch")
# Update the switch from its current position to the desired position of 1.
await my_switch.set_position(1)
# Get the current set position of the switch.
pos1 = await my_switch.get_position()
# Update the switch from its current position to the desired position.
await my_switch.set_position(0)
# Get the current set position of the switch.
pos2 = await my_switch.get_position()
Returns:
int: The current position of the switch within the range of available positions.
For more information, see `Switch component <https://docs.viam.com/dev/reference/apis/components/Switch/#getposition>`_.
"""
...

@abc.abstractmethod
async def set_position(
self, position: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs
) -> None:
"""
Sets the current position of the switch.
::
my_switch = Switch.from_robot(robot=machine, name="my_switch")
# Update the switch from its current position to the desired position of 1.
await my_switch.set_position(1)
# Update the switch from its current position to the desired position of 0.
await my_switch.set_position(0)
Args:
position (int): The position of the switch within the range of available positions.
For more information, see `Switch component <https://docs.viam.com/dev/reference/apis/components/switch/#setposition>`_.
"""
...

@abc.abstractmethod
async def get_number_of_positions(self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
"""
Get the number of available positions on the switch.
::
my_switch = Switch.from_robot(robot=machine, name="my_switch")
print(await my_switch.get_number_of_positions())
Returns:
int: The number of available positions.
For more information, see `Switch component <https://docs.viam.com/dev/reference/apis/components/switch/#getnumberofpositions>`_.
"""
...
30 changes: 30 additions & 0 deletions tests/mocks/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from viam.components.power_sensor import PowerSensor
from viam.components.sensor import Sensor
from viam.components.servo import Servo
from viam.components.switch import Switch
from viam.errors import ResourceNotFoundError
from viam.media.audio import Audio, AudioStream
from viam.media.video import CameraMimeType, NamedImage, ViamImage
Expand Down Expand Up @@ -1013,3 +1014,32 @@ async def get_geometries(self, *, extra: Optional[Dict[str, Any]] = None, timeou

async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]:
return {"command": command}


class MockSwitch(Switch):
def __init__(self, name: str, number_of_positions: int = 3, position: int = 0):
self.number_of_positions = number_of_positions
self.position = position
self.timeout: Optional[float] = None
self.extra: Optional[Mapping[str, Any]] = None
super().__init__(name)

async def get_position(self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
self.extra = extra
self.timeout = timeout
return self.position

async def get_number_of_positions(self, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs) -> int:
self.extra = extra
self.timeout = timeout
return self.number_of_positions

async def set_position(
self, position: int, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None, **kwargs
) -> None:
self.extra = extra
self.timeout = timeout
self.position = position

async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None, **kwargs) -> Mapping[str, ValueTypes]:
return {"command": command}
Loading