Skip to content

Would anybody be interested if I added a graceful timer implementation? #459

@Vizonex

Description

@Vizonex

I had an idea a couple of days ago on one of my evening walks an idea for a possible New timer type and I decided not to go ahead and write a new python library just yet for it since many features would be taken straight from the Timer class object provided here. This way safer cleanup by the user could be done without task-starvation.

@types.coroutine
def _s_heartbeat():
    """
    Allows a single cycle from the event loop
    to pass by, it is equivilent to setting
    asyncio.sleep(0) but simpler and faster
    """
    yield

# Ignore the fact that I subclassed it. I'm trying to make the code smaller
# so repeated code in async_timeout doesn't have to be re-read.
class GTimer(Timeout):
    """
    Graceful Timer allows things to be ran while inside of a while-loop
    and is allowed to be exited gracefully
    example::

        async with GTimer(5) as gtimer:
            while await gtimer.is_running():
                ...
            # Somewhere after this while loop you 
            # could perform sensetive object cleanup.

    """
    
    __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler", "_task")

    def __init__(
        self, deadline: Optional[float], *, loop: asyncio.AbstractEventLoop
    ) -> None:
        super().__init__(deadline, loop)
    
    
    def _do_exit(self, exc_type: Optional[BaseException]):
        # Error Silenced because GTimer is graceful
        pass


    def _on_timeout(self) -> None:
        # Instead of killing the running task allow 
        # anything inside of a while loop to complete 
        # gracefully hence the name...
        self._state = _State.TIMEOUT
        # drop the reference early
        self._timeout_handler = None

    async def is_running(self) -> bool:
        # Allow a single cycle to pass before getting the 
        # boolean incase user has a poorly configured while loop...
        # this prevents task-starving.
        await _s_heartbeat()
        return self._state == _State.TIMEOUT

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions