Skip to content

Can't run multiple tests with same instance of ReusableClient #63

@slavict

Description

@slavict

How to reproduce

import pytest
from sanic_testing.reusable import ReusableClient
from sanic import Sanic
from sanic import response


@pytest.fixture(scope="session")
def app():
    sanic_app = Sanic("TestSanic")

    @sanic_app.get("/")
    def basic(request):
        return response.text("foo")

    @sanic_app.post("/api/login")
    def basic(request):
        return response.text("foo")

    @sanic_app.get("/api/resources")
    def basic(request):
        return response.text("foo")

    return sanic_app


@pytest.fixture(scope="session")
def cli(app):
    cli = ReusableClient(app)
    return cli


def test_root(cli):
    with cli:
        _, response = cli.get("/")
        assert response.status == 200


def test_login(cli):
    with cli:
        _, response = cli.post("/api/login", )
        assert response.status == 200

        _, response = cli.get("/api/resources")
        assert response.status == 200

Error that I got

self = <_UnixSelectorEventLoop running=False closed=False debug=False>
future = <Task finished name='Task-18' coro=<StartupMixin.create_server() done, defined at /home/ghost/wuw2/lib/python3.10/site-packages/sanic/mixins/startup.py:347> exception=RuntimeError('cannot reuse already awaited coroutine')>

    def run_until_complete(self, future):
        """Run until the Future is done.
    
        If the argument is a coroutine, it is wrapped in a Task.
    
        WARNING: It would be disastrous to call run_until_complete()
        with the same coroutine twice -- it would wrap it in two
        different Tasks and that can't be good.
    
        Return the Future's result, or raise its exception.
        """
        self._check_closed()
        self._check_running()
    
        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False
    
        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
    
>       return future.result()
E       RuntimeError: cannot reuse already awaited coroutine

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions