Skip to content

RuntimeError: Event loop is closed in multiprocessing enviroments (celery).Β #46

@artur-augustyniak

Description

@artur-augustyniak

Hi,

First of all sorry for not providing PR, lack of spare time.
The problem I faced is not strictly an vt-py issue but i beleave that protection against such a problem can also be added here.
In short words, i ve got software where celery tasks using vt-py are processesd among other tasks, other tasks are using packages containg async code too. While trying to debug error mentioned in title I came up with PoC of problem/possible solution. I believe that code will be best description.

Right now the only more or less clear solution is wrapping async code using threads.

#!/usr/bin/env python3


import os
from multiprocessing import Pool
import asyncio


def other_celery_task(job_id):
    '''
        Here we've got common pattern in other libs, notice 'idiomatic' (or pretend to be) view on event loop as your own resource:
        obtain new event_loop
        try:
            do your job
        finally:
            event_loop.close()
    '''
    cpid = os.getpid()
    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    async def some_aync_op_possibly_throwing_exception():
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() start id", job_id, cpid)
        # await asyncio.sleep()
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() done id", job_id, cpid)
        return 1
    try:
        return event_loop.run_until_complete(some_aync_op_possibly_throwing_exception())
    except Exception:
        print("Oh no, fail, let's ignore it, forget about finnaly below")
    finally:
        event_loop.close()


def vt_client_related_celery_task(job_id):
    '''
        Here we've got reconstructed flow from vt.Client
    '''
    cpid = os.getpid()

    async def vt_client_aync_op():
        print("[*] async def vt_cllient_aync_op() start id", job_id, cpid)
        # await asyncio.sleep()
        print("[*] async def vt_cllient_aync_op() done id", job_id, cpid)
        return 1

    try:
        event_loop = asyncio.get_event_loop()
        print("event loop was in place id",
              job_id, cpid, event_loop.is_closed())

        '''
            try to uncommnet 2 lines below. I assume that closed loop is not NX loop, so Runtime exceptiion will be never thrown.
            When next celery task with vt.Client arrives, we've got RuntimeError: Event loop is closed
        '''
        # if event_loop.is_closed():
        #     raise RuntimeError("other task closed our loop?")
    except RuntimeError:
        # Generate an event loop if there isn't any.
        event_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(event_loop)
        print("event loop regenerated id", job_id, cpid)

    return event_loop.run_until_complete(vt_client_aync_op())


if __name__ == '__main__':
    '''
        Here we've got ovesimplication of default celery worker setup, dealing also with other tasks.
        Pool size 2 is intentionally small, this problem can be non deterministic

    '''

    with Pool(2) as pool:

        vt_round_one = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(0, 9)])
        other_round = pool.map_async(other_celery_task, [9])
        vt_round_two = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(10, 20)])
        if 20 == sum(vt_round_one.get() + other_round.get() + vt_round_two.get()):
            print("all tasks executed")
        else:
            print("yay, fail")

BR
Artur Augustyniak

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions