-
Notifications
You must be signed in to change notification settings - Fork 24
Add coroutine command support #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
e4d360c
8488fbc
5de62e0
3ee269a
d285fb0
70f5715
9122d07
7174dfb
decd650
1c7e4d9
ed5b106
aafde42
0bb3251
8f91b2b
71322a3
bac4080
7815111
021ce43
401aa5f
04d90d7
ffbeeca
f2cae93
6068723
d8078e7
c977711
4028320
243072c
748c84c
a2da722
cb5af47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,6 @@ | ||
| # autogenerated by 'robotpy-build create-imports commands2.button commands2._impl.button' | ||
| from .._impl.button import Button, JoystickButton, NetworkButton, POVButton | ||
| from .button import Button | ||
| from .joystickbutton import JoystickButton | ||
| from .networkbutton import NetworkButton | ||
| from .povbutton import POVButton | ||
|
|
||
| __all__ = ["Button", "JoystickButton", "NetworkButton", "POVButton"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| from ..trigger import Trigger | ||
|
|
||
|
|
||
| class Button(Trigger): | ||
| whenPressed = Trigger.whenActive | ||
| whenReleased = Trigger.whenInactive | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| from wpilib import Joystick | ||
|
|
||
| from .button import Button | ||
|
|
||
|
|
||
| class JoystickButton(Button): | ||
| def __init__(self, joystick: Joystick, button: int) -> None: | ||
| super().__init__(lambda: joystick.getRawButton(button)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious what error message you get when something that isn't an integer is passed in... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It constructs fine but fails the first time py", line 21, in <lambda>
super().__init__(lambda: joystick.getRawButton(button))
TypeError: getRawButton(): incompatible function arguments. The following argument types are supported:
1. (self: wpilib.interfaces._interfaces.GenericHID, button: int) -> bool
Invoked with: <XboxController 0>, 'qwe'which isn't an ideal error message. I'll check the type in the constructor. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| from networktables import NetworkTable, NetworkTables, NetworkTableEntry | ||
|
|
||
| from typing import Union, overload | ||
|
|
||
| from .button import Button | ||
|
|
||
|
|
||
| class NetworkButton(Button): | ||
| @overload | ||
| def __init__(self, entry: NetworkTableEntry) -> None: | ||
| ... | ||
|
|
||
| @overload | ||
| def __init__(self, table: Union[NetworkTable, str], field: str) -> None: | ||
| ... | ||
|
|
||
| def __init__(self, *args, **kwargs) -> None: | ||
| num_args = len(args) + len(kwargs) | ||
| if num_args == 1: | ||
| entry: NetworkTableEntry = kwargs.get("entry", args[0]) | ||
TheTripleV marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| super().__init__( | ||
| lambda: NetworkTables.isConnected and entry.getBoolean(False) | ||
TheTripleV marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
| else: | ||
| table = kwargs.get("table", args[0]) | ||
| field = kwargs.get("field", args[-1]) | ||
TheTripleV marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
TheTripleV marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if isinstance(table, str): | ||
| table = NetworkTables.getTable(table) | ||
|
|
||
| entry = table.getEntry(field) | ||
| self.__init__(entry) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| from wpilib import Joystick | ||
|
|
||
| from .button import Button | ||
|
|
||
|
|
||
| class POVButton(Button): | ||
| def __init__(self, joystick: Joystick, angle: int, povNumber: int = 0) -> None: | ||
| super().__init__(lambda: joystick.getPOV(povNumber) == angle) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| from functools import wraps | ||
| from typing import Any, Callable, Generator, List, Union, Optional | ||
| from ._impl import CommandBase, Subsystem | ||
| import inspect | ||
| from typing_extensions import TypeGuard | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: Figure out the minimum version of |
||
|
|
||
| Coroutine = Generator[None, None, None] | ||
| CoroutineFunction = Callable[[], Generator[None, None, None]] | ||
| Coroutineable = Union[Callable[[], None], CoroutineFunction] | ||
|
|
||
|
|
||
| def is_coroutine(func: Any) -> TypeGuard[Coroutine]: | ||
| return inspect.isgenerator(func) | ||
|
|
||
|
|
||
| def is_coroutine_function(func: Any) -> TypeGuard[CoroutineFunction]: | ||
| return inspect.isgeneratorfunction(func) | ||
|
|
||
|
|
||
| def is_coroutineable(func: Any) -> TypeGuard[Coroutineable]: | ||
| return is_coroutine_function(func) or callable(func) | ||
|
|
||
|
|
||
| def ensure_generator_function(func: Coroutineable) -> Callable[..., Coroutine]: | ||
| if is_coroutine_function(func): | ||
| return func | ||
|
|
||
| @wraps(func) | ||
| def wrapper(*args, **kwargs): | ||
| func(*args, **kwargs) | ||
| yield | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
| class CoroutineCommand(CommandBase): | ||
| coroutine: Optional[Coroutine] | ||
| coroutine_function: Optional[Coroutineable] | ||
| is_finished: bool | ||
|
|
||
| def __init__( | ||
| self, | ||
| coroutine: Union[Coroutine, Coroutineable], | ||
| requirements: Optional[List[Subsystem]] = None, | ||
| runs_when_disabled: bool = False, | ||
| ) -> None: | ||
| self.coroutine = None | ||
| self.coroutine_function = None | ||
| self.runsWhenDisabled = lambda: runs_when_disabled | ||
|
|
||
| if is_coroutine(coroutine): | ||
| self.coroutine = coroutine | ||
| elif is_coroutine_function(coroutine): | ||
| self.coroutine_function = coroutine | ||
| else: | ||
| raise TypeError("The coroutine must be a coroutine or a coroutine function") | ||
|
|
||
| if requirements is not None: | ||
| self.addRequirements(requirements) | ||
|
|
||
| self.is_finished = False | ||
|
|
||
| def initialize(self) -> None: | ||
| if self.coroutine_function: | ||
| self.coroutine = ensure_generator_function(self.coroutine_function)() | ||
| elif self.coroutine and self.is_finished: | ||
| RuntimeError("Generator objects cannot be reused.") | ||
TheTripleV marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| self.is_finished = False | ||
|
|
||
| def execute(self): | ||
| try: | ||
| if not self.is_finished: | ||
| if not self.coroutine: | ||
| raise TypeError("This command was not properly initialized") | ||
| next(self.coroutine) | ||
| except StopIteration: | ||
| self.is_finished = True | ||
|
|
||
| def isFinished(self): | ||
| return self.is_finished | ||
|
|
||
|
|
||
| class commandify: | ||
| def __init__(self, requirements: Optional[List[Subsystem]] = None) -> None: | ||
| self.requirements = requirements | ||
|
|
||
| def __call__(self, func: Coroutineable): | ||
| @wraps(func) | ||
| def arg_accepter(*args, **kwargs) -> CoroutineCommand: | ||
| return CoroutineCommand( | ||
| lambda: ensure_generator_function(func)(*args, **kwargs), | ||
| self.requirements, | ||
| ) | ||
|
|
||
| return arg_accepter | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| from typing import Callable, Optional, overload, List, Union | ||
|
|
||
| from ._impl import Command, Subsystem | ||
| from ._impl import Trigger as _Trigger | ||
TheTripleV marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| from .coroutinecommand import CoroutineCommand, Coroutineable, Coroutine | ||
|
|
||
|
|
||
| class Trigger: | ||
TheTripleV marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """ | ||
| A button that can be pressed or released. | ||
| """ | ||
|
|
||
| def __init__(self, is_active: Callable[[], bool] = lambda: False) -> None: | ||
| self._trigger = _Trigger(is_active) | ||
|
|
||
| def __bool__(self) -> bool: | ||
| return bool(self._trigger) | ||
|
|
||
| def get(self) -> bool: | ||
| return bool(self) | ||
|
|
||
| def __call__(self) -> bool: | ||
| return bool(self) | ||
|
|
||
| def __and__(self, other: "Trigger") -> "Trigger": | ||
| return Trigger(lambda: self() and other()) | ||
|
|
||
| def __or__(self, other: "Trigger") -> "Trigger": | ||
| return Trigger(lambda: self() or other()) | ||
|
|
||
| def __not__(self) -> "Trigger": | ||
| return Trigger(lambda: not self()) | ||
|
|
||
| @overload | ||
| def whenActive(self, command: Command, /, interruptible: bool = True) -> None: | ||
| ... | ||
|
|
||
| @overload | ||
| def whenActive( | ||
| self, | ||
| coroutine: Union[Coroutine, Coroutineable], | ||
| /, | ||
| *, | ||
| interruptible: bool = True, | ||
| requirements: Optional[List[Subsystem]] = None, | ||
| runs_when_disabled: bool = False, | ||
| ) -> None: | ||
| ... | ||
|
|
||
| @overload | ||
| def whenActive( | ||
| self, | ||
| coroutine: None, | ||
| /, | ||
| *, | ||
| interruptible: bool = True, | ||
| requirements: Optional[List[Subsystem]] = None, | ||
| runs_when_disabled: bool = False, | ||
| ) -> Callable[[Coroutineable], None]: | ||
| ... | ||
|
|
||
| def whenActive( | ||
| self, | ||
| command_or_coroutine: Optional[Union[Command, Coroutine, Coroutineable]], | ||
| /, | ||
| interruptible: bool = True, | ||
| requirements: Optional[List[Subsystem]] = None, | ||
| runs_when_disabled: bool = False, | ||
| ) -> Union[None, Callable[[Coroutineable], None]]: | ||
TheTripleV marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if command_or_coroutine is None: | ||
|
|
||
| def wrapper(coroutine: Coroutineable) -> None: | ||
| self.whenActive( | ||
| coroutine, | ||
| interruptible=interruptible, | ||
| requirements=requirements, | ||
| runs_when_disabled=runs_when_disabled, | ||
| ) | ||
|
|
||
| return wrapper | ||
|
|
||
| if isinstance(command_or_coroutine, Command): | ||
| self._trigger.whenActive(command_or_coroutine, interruptible) | ||
| return | ||
|
|
||
| self._trigger.whenActive( | ||
| CoroutineCommand(command_or_coroutine, requirements, runs_when_disabled), | ||
| interruptible, | ||
| ) | ||
| return | ||
|
|
||
| @overload | ||
| def whenInactive(self, command: Command, /, interruptible: bool = True) -> None: | ||
| ... | ||
|
|
||
| @overload | ||
| def whenInactive( | ||
| self, | ||
| coroutine: Union[Coroutine, Coroutineable], | ||
| /, | ||
| *, | ||
| interruptible: bool = True, | ||
| requirements: Optional[List[Subsystem]] = None, | ||
| runs_when_disabled: bool = False, | ||
| ) -> None: | ||
| ... | ||
|
|
||
| @overload | ||
| def whenInactive( | ||
| self, | ||
| coroutine: None, | ||
| /, | ||
| *, | ||
| interruptible: bool = True, | ||
| requirements: Optional[List[Subsystem]] = None, | ||
| runs_when_disabled: bool = False, | ||
| ) -> Callable[[Coroutineable], None]: | ||
| ... | ||
|
|
||
| def whenInactive( | ||
| self, | ||
| command_or_coroutine: Optional[Union[Command, Coroutine, Coroutineable]], | ||
| /, | ||
| interruptible: bool = True, | ||
| requirements: Optional[List[Subsystem]] = None, | ||
| runs_when_disabled: bool = False, | ||
| ) -> Union[None, Callable[[Coroutineable], None]]: | ||
| if command_or_coroutine is None: | ||
|
|
||
| def wrapper(coroutine: Coroutineable) -> None: | ||
| self.whenInactive( | ||
| coroutine, | ||
| interruptible=interruptible, | ||
| requirements=requirements, | ||
| runs_when_disabled=runs_when_disabled, | ||
| ) | ||
|
|
||
| return wrapper | ||
|
|
||
| if isinstance(command_or_coroutine, Command): | ||
| self._trigger.whenInactive(command_or_coroutine, interruptible) | ||
| return | ||
|
|
||
| self._trigger.whenInactive( | ||
| CoroutineCommand(command_or_coroutine, requirements, runs_when_disabled), | ||
| interruptible, | ||
| ) | ||
| return | ||
Uh oh!
There was an error while loading. Please reload this page.