diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c91ce30..90eaa38 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,6 +16,7 @@ concurrency: env: CIBW_SKIP: > pp* + PYAWAITABLE_OPTIMIZED: 1 jobs: binary-wheels-standard: diff --git a/.github/workflows/memory_leak.yml b/.github/workflows/memory_leak.yml index 343692d..f0d31f8 100644 --- a/.github/workflows/memory_leak.yml +++ b/.github/workflows/memory_leak.yml @@ -18,6 +18,7 @@ env: PYTHONUNBUFFERED: "1" FORCE_COLOR: "1" PYTHONIOENCODING: "utf8" + PYTHONMALLOC: malloc jobs: memory-leaks: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 30d79b1..c99d8f9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,6 +24,7 @@ env: PYTHONUNBUFFERED: "1" FORCE_COLOR: "1" PYTHONIOENCODING: "utf8" + PYAWAITABLE_OPTIMIZED: 1 jobs: run-tests: diff --git a/CHANGELOG.md b/CHANGELOG.md index 91a12f2..419671f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +- Significantly reduced awaitable object size by dynamically allocating it. +- Reduced memory footprint by removing preallocated awaitable objects. +- Objects returned by a PyAwaitable object's `__await__` are now garbage collected (*i.e.*, they don't leak with rare circular references). +- Removed limit on number of stored callbacks or values. +- Switched some user-error messages to `RuntimeError` instead of `SystemError`. + ## [1.3.0] - 2024-10-26 - Added support for `async with` via `pyawaitable_async_with`. diff --git a/include/pyawaitable/array.h b/include/pyawaitable/array.h new file mode 100644 index 0000000..795fcda --- /dev/null +++ b/include/pyawaitable/array.h @@ -0,0 +1,258 @@ +#ifndef PYAWAITABLE_ARRAY_H +#define PYAWAITABLE_ARRAY_H + +#include +#include + +#define pyawaitable_array_DEFAULT_SIZE 16 + +/* + * Deallocator for items on a pyawaitable_array structure. A NULL pointer + * will never be given to the deallocator. + */ +typedef void (*pyawaitable_array_deallocator)(void *); + +/* + * Internal only dynamic array for CPython. + */ +typedef struct +{ + /* + * The actual items in the dynamic array. + * Don't access this field publicly to get + * items--use pyawaitable_array_GET_ITEM() instead. + */ + void **items; + /* + * The length of the actual items array allocation. + */ + Py_ssize_t capacity; + /* + * The number of items in the array. + * Don't use this field publicly--use pyawaitable_array_LENGTH() + */ + Py_ssize_t length; + /* + * The deallocator, set by one of the initializer functions. + * This may be NULL. + */ + pyawaitable_array_deallocator deallocator; +} pyawaitable_array; + + +/* Zero out the array */ +static inline void +pyawaitable_array_ZERO(pyawaitable_array *array) +{ + assert(array != NULL); + array->deallocator = NULL; + array->items = NULL; + array->length = 0; + array->capacity = 0; +} + +static inline void +pyawaitable_array_ASSERT_VALID(pyawaitable_array *array) +{ + assert(array != NULL); + assert(array->items != NULL); +} + +static inline void +pyawaitable_array_ASSERT_INDEX(pyawaitable_array *array, Py_ssize_t index) +{ + // Ensure the index is valid + assert(index < array->length); + assert(index >= 0); +} + +/* + * Initialize a dynamic array with an initial size and deallocator. + * + * If the deallocator is NULL, then nothing happens to items upon + * removal and upon array clearing. + * + * Returns -1 upon failure, 0 otherwise. + */ +int +pyawaitable_array_init_with_size( + pyawaitable_array *array, + pyawaitable_array_deallocator deallocator, + Py_ssize_t initial +); + +/* + * Append to the array. + * + * Returns -1 upon failure, 0 otherwise. + * If this fails, the deallocator is not ran on the item. + */ +int pyawaitable_array_append(pyawaitable_array *array, void *item); + +/* + * Insert an item at the target index. The index + * must currently be a valid index in the array. + * + * Returns -1 upon failure, 0 otherwise. + * If this fails, the deallocator is not ran on the item. + */ +int +pyawaitable_array_insert( + pyawaitable_array *array, + Py_ssize_t index, + void *item +); + +/* Remove all items from the array. */ +void +pyawaitable_array_clear_items(pyawaitable_array *array); + +/* + * Clear all the fields on the array. + * + * Note that this does *not* free the actual dynamic array + * structure--use pyawaitable_array_Free() for that. + * + * It's safe to call pyawaitable_array_init() or init_with_size() again + * on the array after calling this. + */ +void pyawaitable_array_clear(pyawaitable_array *array); + +/* + * Set a value at index in the array. + * + * If an item already exists at the target index, the deallocator + * is called on it, if the array has one set. + * + * This cannot fail. + */ +void +pyawaitable_array_set(pyawaitable_array *array, Py_ssize_t index, void *item); + +/* + * Remove the item at the index, and call the deallocator on it (if the array + * has one set). + * + * This cannot fail. + */ +void +pyawaitable_array_remove(pyawaitable_array *array, Py_ssize_t index); + +/* + * Remove the item at the index *without* deallocating it, and + * return the item. + * + * This cannot fail. + */ +void * +pyawaitable_array_pop(pyawaitable_array *array, Py_ssize_t index); + +/* + * Clear all the fields on a dynamic array, and then + * free the dynamic array structure itself. + * + * The array must have been created by pyawaitable_array_new() + */ +static inline void +pyawaitable_array_free(pyawaitable_array *array) +{ + pyawaitable_array_ASSERT_VALID(array); + pyawaitable_array_clear(array); + PyMem_RawFree(array); +} + +/* + * Equivalent to pyawaitable_array_init_with_size() with a default size of 16. + * + * Returns -1 upon failure, 0 otherwise. + */ +static inline int +pyawaitable_array_init( + pyawaitable_array *array, + pyawaitable_array_deallocator deallocator +) +{ + return pyawaitable_array_init_with_size( + array, + deallocator, + pyawaitable_array_DEFAULT_SIZE + ); +} + +/* + * Allocate and create a new dynamic array on the heap. + * + * The returned pointer should be freed with pyawaitable_array_free() + * If this function fails, it returns NULL. + */ +static inline pyawaitable_array * +pyawaitable_array_new_with_size( + pyawaitable_array_deallocator deallocator, + Py_ssize_t initial +) +{ + pyawaitable_array *array = PyMem_Malloc(sizeof(pyawaitable_array)); + if (array == NULL) + { + return NULL; + } + + if (pyawaitable_array_init_with_size(array, deallocator, initial) < 0) + { + PyMem_Free(array); + return NULL; + } + + pyawaitable_array_ASSERT_VALID(array); // Sanity check + return array; +} + +/* + * Equivalent to pyawaitable_array_new_with_size() with a size of 16. + * + * The returned array must be freed with pyawaitable_array_free(). + * Returns NULL on failure. + */ +static inline pyawaitable_array * +pyawaitable_array_new(pyawaitable_array_deallocator deallocator) +{ + return pyawaitable_array_new_with_size( + deallocator, + pyawaitable_array_DEFAULT_SIZE + ); +} + +/* + * Get an item from the array. This cannot fail. + * + * If the index is not valid, this is undefined behavior. + */ +static inline void * +pyawaitable_array_GET_ITEM(pyawaitable_array *array, Py_ssize_t index) +{ + pyawaitable_array_ASSERT_VALID(array); + pyawaitable_array_ASSERT_INDEX(array, index); + return array->items[index]; +} + +/* + * Get the length of the array. This cannot fail. + */ +static inline Py_ssize_t +pyawaitable_array_LENGTH(pyawaitable_array *array) +{ + pyawaitable_array_ASSERT_VALID(array); + return array->length; +} + +/* + * Pop the item at the end the array. + * This function cannot fail. + */ +static inline void * +pyawaitable_array_pop_top(pyawaitable_array *array) +{ + return pyawaitable_array_pop(array, pyawaitable_array_LENGTH(array) - 1); +} + +#endif diff --git a/include/pyawaitable/awaitableobject.h b/include/pyawaitable/awaitableobject.h index c35951c..9567d90 100644 --- a/include/pyawaitable/awaitableobject.h +++ b/include/pyawaitable/awaitableobject.h @@ -4,10 +4,10 @@ #include #include +#include + typedef int (*awaitcallback)(PyObject *, PyObject *); typedef int (*awaitcallback_err)(PyObject *, PyObject *); -#define CALLBACK_ARRAY_SIZE 128 -#define VALUE_ARRAY_SIZE 32 typedef struct _pyawaitable_callback { @@ -21,31 +21,23 @@ struct _PyAwaitableObject { PyObject_HEAD - // Callbacks - pyawaitable_callback aw_callbacks[CALLBACK_ARRAY_SIZE]; - Py_ssize_t aw_callback_index; - - // Stored Values - PyObject *aw_values[VALUE_ARRAY_SIZE]; - Py_ssize_t aw_values_index; - - // Arbitrary Values - void *aw_arb_values[VALUE_ARRAY_SIZE]; - Py_ssize_t aw_arb_values_index; - - // Integer Values - long aw_int_values[VALUE_ARRAY_SIZE]; - Py_ssize_t aw_int_values_index; + pyawaitable_array aw_callbacks; + pyawaitable_array aw_object_values; + pyawaitable_array aw_arbitrary_values; + pyawaitable_array aw_integer_values; - // Awaitable State + /* Index of current callback */ Py_ssize_t aw_state; + /* Is the awaitable done? */ bool aw_done; + /* Was the awaitable awaited? */ bool aw_awaited; - bool aw_used; - - // Misc + /* Strong reference to the result of the coroutine. */ PyObject *aw_result; + /* Strong reference to the genwrapper. */ PyObject *aw_gen; + /* Set to 1 if the object was cancelled, for introspection against callbacks */ + int aw_recently_cancelled; }; typedef struct _PyAwaitableObject PyAwaitableObject; @@ -78,9 +70,4 @@ pyawaitable_await_function_impl( ... ); -int -alloc_awaitable_pool(void); -void -dealloc_awaitable_pool(void); - #endif diff --git a/include/pyawaitable/genwrapper.h b/include/pyawaitable/genwrapper.h index 06beeb6..967eee7 100644 --- a/include/pyawaitable/genwrapper.h +++ b/include/pyawaitable/genwrapper.h @@ -18,8 +18,7 @@ genwrapper_next(PyObject *self); int genwrapper_fire_err_callback( PyObject *self, - PyObject *await, - pyawaitable_callback *cb + awaitcallback_err err_callback ); PyObject * diff --git a/setup.py b/setup.py index 7fb7ee0..ee2e1cb 100644 --- a/setup.py +++ b/setup.py @@ -1,18 +1,19 @@ from glob import glob from setuptools import Extension, setup +import os if __name__ == "__main__": setup( name="pyawaitable", license="MIT", - version="1.3.0", + version="1.4.0-dev", ext_modules=[ Extension( "_pyawaitable", glob("./src/_pyawaitable/*.c"), include_dirs=["./include/", "./src/pyawaitable/"], - extra_compile_args=["-g", "-O3"], + extra_compile_args=["-g", "-O3" if os.environ.get("PYAWAITABLE_OPTIMIZED") else "-O0"], ) ], package_dir={"": "src"}, diff --git a/src/_pyawaitable/array.c b/src/_pyawaitable/array.c new file mode 100644 index 0000000..14865c0 --- /dev/null +++ b/src/_pyawaitable/array.c @@ -0,0 +1,163 @@ +#include + +static inline void +call_deallocator_maybe(pyawaitable_array *array, Py_ssize_t index) +{ + if (array->deallocator != NULL && array->items[index] != NULL) + { + array->deallocator(array->items[index]); + array->items[index] = NULL; + } +} + +int +pyawaitable_array_init_with_size( + pyawaitable_array *array, + pyawaitable_array_deallocator deallocator, + Py_ssize_t initial +) +{ + assert(array != NULL); + assert(initial > 0); + void **items = PyMem_Calloc(sizeof(void *), initial); + if (items == NULL) + { + return -1; + } + + array->capacity = initial; + array->items = items; + array->length = 0; + array->deallocator = deallocator; + + return 0; +} + +static int +resize_if_needed(pyawaitable_array *array) +{ + if (array->length == array->capacity) + { + // Need to resize + array->capacity *= 2; + void **new_items = PyMem_Realloc( + array->items, + sizeof(void *) * array->capacity + ); + if (new_items == NULL) + { + return -1; + } + + array->items = new_items; + } + + return 0; +} + +int +pyawaitable_array_append(pyawaitable_array *array, void *item) +{ + pyawaitable_array_ASSERT_VALID(array); + array->items[array->length++] = item; + if (resize_if_needed(array) < 0) + { + array->items[--array->length] = NULL; + return -1; + } + return 0; +} + +int +pyawaitable_array_insert( + pyawaitable_array *array, + Py_ssize_t index, + void *item +) +{ + pyawaitable_array_ASSERT_VALID(array); + pyawaitable_array_ASSERT_INDEX(array, index); + ++array->length; + if (resize_if_needed(array) < 0) + { + // Grow the array beforehand, otherwise it's + // going to be a mess putting it back together if + // allocation fails. + --array->length; + return -1; + } + + for (Py_ssize_t i = array->length - 1; i > index; --i) + { + array->items[i] = array->items[i - 1]; + } + + array->items[index] = item; + return 0; +} + +void +pyawaitable_array_set(pyawaitable_array *array, Py_ssize_t index, void *item) +{ + pyawaitable_array_ASSERT_VALID(array); + pyawaitable_array_ASSERT_INDEX(array, index); + call_deallocator_maybe(array, index); + array->items[index] = item; +} + +static void +remove_no_dealloc(pyawaitable_array *array, Py_ssize_t index) +{ + for (Py_ssize_t i = index; i < array->length - 1; ++i) + { + array->items[i] = array->items[i + 1]; + } + --array->length; +} + +void +pyawaitable_array_remove(pyawaitable_array *array, Py_ssize_t index) +{ + pyawaitable_array_ASSERT_VALID(array); + pyawaitable_array_ASSERT_INDEX(array, index); + call_deallocator_maybe(array, index); + remove_no_dealloc(array, index); +} + +void * +pyawaitable_array_pop(pyawaitable_array *array, Py_ssize_t index) +{ + pyawaitable_array_ASSERT_VALID(array); + pyawaitable_array_ASSERT_INDEX(array, index); + void *item = array->items[index]; + remove_no_dealloc(array, index); + return item; +} + +void +pyawaitable_array_clear_items(pyawaitable_array *array) +{ + pyawaitable_array_ASSERT_VALID(array); + for (Py_ssize_t i = 0; i < array->length; ++i) + { + call_deallocator_maybe(array, i); + array->items[i] = NULL; + } + + array->length = 0; +} + +void +pyawaitable_array_clear(pyawaitable_array *array) +{ + pyawaitable_array_ASSERT_VALID(array); + pyawaitable_array_clear_items(array); + PyMem_Free(array->items); + + // It would be nice if others could reuse the allocation for another + // dynarray later, so clear all the fields. + array->items = NULL; + array->length = 0; + array->capacity = 0; + array->deallocator = NULL; +} diff --git a/src/_pyawaitable/awaitable.c b/src/_pyawaitable/awaitable.c index e16e19b..25bc741 100644 --- a/src/_pyawaitable/awaitable.c +++ b/src/_pyawaitable/awaitable.c @@ -1,18 +1,25 @@ #include -#include +#include + +#include #include -#include +#include #include -#include -#define AWAITABLE_POOL_SIZE 256 +#include PyDoc_STRVAR( awaitable_doc, "Awaitable transport utility for the C API." ); -static Py_ssize_t pool_index = 0; -static PyObject *pool[AWAITABLE_POOL_SIZE]; +static void +callback_dealloc(void *ptr) +{ + assert(ptr != NULL); + pyawaitable_callback *cb = (pyawaitable_callback *) ptr; + Py_CLEAR(cb->coro); + PyMem_Free(cb); +} static PyObject * awaitable_new_func(PyTypeObject *tp, PyObject *args, PyObject *kwds) @@ -27,11 +34,42 @@ awaitable_new_func(PyTypeObject *tp, PyObject *args, PyObject *kwds) } PyAwaitableObject *aw = (PyAwaitableObject *) self; - aw->aw_awaited = false; + aw->aw_gen = NULL; aw->aw_done = false; - aw->aw_used = false; + aw->aw_state = 0; + aw->aw_result = NULL; + aw->aw_recently_cancelled = 0; + + if (pyawaitable_array_init(&aw->aw_callbacks, callback_dealloc) < 0) + { + goto error; + } + + if ( + pyawaitable_array_init( + &aw->aw_object_values, + (pyawaitable_array_deallocator) Py_DecRef + ) < 0 + ) + { + goto error; + } - return (PyObject *) aw; + if (pyawaitable_array_init(&aw->aw_arbitrary_values, NULL) < 0) + { + goto error; + } + + if (pyawaitable_array_init(&aw->aw_integer_values, NULL) < 0) + { + goto error; + } + + return self; +error: + PyErr_NoMemory(); + Py_DECREF(self); + return NULL; } PyObject * @@ -56,37 +94,20 @@ static void awaitable_dealloc(PyObject *self) { PyAwaitableObject *aw = (PyAwaitableObject *)self; - for (Py_ssize_t i = 0; i < aw->aw_values_index; ++i) - { - if (!aw->aw_values[i]) - break; - Py_DECREF(aw->aw_values[i]); - } +#define CLEAR_IF_NON_NULL(array) \ + if (array.items != NULL) { \ + pyawaitable_array_clear(&array); \ + } + CLEAR_IF_NON_NULL(aw->aw_callbacks); + CLEAR_IF_NON_NULL(aw->aw_object_values); + CLEAR_IF_NON_NULL(aw->aw_arbitrary_values); + CLEAR_IF_NON_NULL(aw->aw_integer_values); +#undef CLEAR_IF_NON_NULL Py_XDECREF(aw->aw_gen); Py_XDECREF(aw->aw_result); - for (int i = 0; i < CALLBACK_ARRAY_SIZE; ++i) - { - pyawaitable_callback *cb = &aw->aw_callbacks[i]; - if (cb == NULL) - break; - - if (cb->done) - { - if (cb->coro != NULL) - { - PyErr_SetString( - PyExc_SystemError, - "sanity check: coro was not cleared" - ); - PyErr_WriteUnraisable(self); - } - } else - Py_XDECREF(cb->coro); - } - - if (!aw->aw_done && aw->aw_used) + if (!aw->aw_done) { if ( PyErr_WarnEx( @@ -104,49 +125,50 @@ awaitable_dealloc(PyObject *self) } void -pyawaitable_cancel_impl(PyObject *aw) +pyawaitable_cancel_impl(PyObject *self) { - assert(aw != NULL); - PyAwaitableObject *a = (PyAwaitableObject *) aw; - - for (int i = 0; i < CALLBACK_ARRAY_SIZE; ++i) + assert(self != NULL); + PyAwaitableObject *aw = (PyAwaitableObject *) self; + pyawaitable_array_clear_items(&aw->aw_callbacks); + aw->aw_state = 0; + if (aw->aw_gen != NULL) { - pyawaitable_callback *cb = &a->aw_callbacks[i]; - if (!cb) - break; - - // Reset the callback - Py_CLEAR(cb->coro); - cb->done = false; - cb->callback = NULL; - cb->err_callback = NULL; + GenWrapperObject *gw = (GenWrapperObject *)aw->aw_gen; + Py_CLEAR(gw->gw_current_await); } + + aw->aw_recently_cancelled = 1; } int pyawaitable_await_impl( - PyObject *aw, + PyObject *self, PyObject *coro, awaitcallback cb, awaitcallback_err err ) { - PyAwaitableObject *a = (PyAwaitableObject *) aw; - if (a->aw_callback_index == CALLBACK_ARRAY_SIZE) + PyAwaitableObject *aw = (PyAwaitableObject *) self; + + pyawaitable_callback *aw_c = PyMem_Malloc(sizeof(pyawaitable_callback)); + if (aw_c == NULL) { - PyErr_SetString( - PyExc_SystemError, - "pyawaitable: awaitable object cannot store more than 128 coroutines" - ); + PyErr_NoMemory(); return -1; } - pyawaitable_callback *aw_c = &a->aw_callbacks[a->aw_callback_index++]; aw_c->coro = Py_NewRef(coro); aw_c->callback = cb; aw_c->err_callback = err; aw_c->done = false; + if (pyawaitable_array_append(&aw->aw_callbacks, aw_c) < 0) + { + PyMem_Free(aw_c); + PyErr_NoMemory(); + return -1; + } + return 0; } @@ -161,51 +183,8 @@ pyawaitable_set_result_impl(PyObject *awaitable, PyObject *result) PyObject * pyawaitable_new_impl(void) { - if (pool_index == AWAITABLE_POOL_SIZE) - { - PyObject *aw = awaitable_new_func(&_PyAwaitableType, NULL, NULL); - ((PyAwaitableObject *) aw)->aw_used = true; - return aw; - } - - PyObject *pool_obj = pool[pool_index++]; - ((PyAwaitableObject *) pool_obj)->aw_used = true; - return pool_obj; -} - -void -dealloc_awaitable_pool(void) -{ - for (Py_ssize_t i = pool_index; i < AWAITABLE_POOL_SIZE; ++i) - { - if (Py_REFCNT(pool[i]) != 1) - { - PyErr_Format( - PyExc_SystemError, - "expected %R to have a reference count of 1", - pool[i] - ); - PyErr_WriteUnraisable(NULL); - } - Py_DECREF(pool[i]); - } -} - -int -alloc_awaitable_pool(void) -{ - for (Py_ssize_t i = 0; i < AWAITABLE_POOL_SIZE; ++i) - { - pool[i] = awaitable_new_func(&_PyAwaitableType, NULL, NULL); - if (!pool[i]) - { - for (Py_ssize_t x = 0; x < i; ++x) - Py_DECREF(pool[x]); - return -1; - } - } - - return 0; + // XXX Use a freelist? + return awaitable_new_func(&_PyAwaitableType, NULL, NULL); } int diff --git a/src/_pyawaitable/coro.c b/src/_pyawaitable/coro.c index 9d767d0..d597797 100644 --- a/src/_pyawaitable/coro.c +++ b/src/_pyawaitable/coro.c @@ -12,7 +12,9 @@ awaitable_send_with_arg(PyObject *self, PyObject *value) { PyObject *gen = awaitable_next(self); if (gen == NULL) + { return NULL; + } Py_DECREF(gen); Py_RETURN_NONE; @@ -27,7 +29,9 @@ awaitable_send(PyObject *self, PyObject *args) PyObject *value; if (!PyArg_ParseTuple(args, "O", &value)) + { return NULL; + } return awaitable_send_with_arg(self, value); } @@ -65,34 +69,45 @@ awaitable_throw(PyObject *self, PyObject *args) } if (traceback) + { if (PyException_SetTraceback(err, traceback) < 0) { Py_DECREF(err); return NULL; } + } PyErr_Restore(err, NULL, NULL); } else + { PyErr_Restore( Py_NewRef(type), Py_XNewRef(value), Py_XNewRef(traceback) ); + } PyAwaitableObject *aw = (PyAwaitableObject *)self; if ((aw->aw_gen != NULL) && (aw->aw_state != 0)) { GenWrapperObject *gw = (GenWrapperObject *)aw->aw_gen; - pyawaitable_callback *cb = &aw->aw_callbacks[aw->aw_state - 1]; + pyawaitable_callback *cb = + pyawaitable_array_GET_ITEM(&aw->aw_callbacks, aw->aw_state - 1); if (cb == NULL) + { return NULL; + } - if (genwrapper_fire_err_callback(self, gw->gw_current_await, cb) < 0) + if (genwrapper_fire_err_callback(self, cb->err_callback) < 0) + { return NULL; + } } else + { return NULL; + } - assert(NULL); + Py_UNREACHABLE(); } #if PY_MINOR_VERSION > 9 @@ -119,7 +134,6 @@ awaitable_am_send(PyObject *self, PyObject *arg, PyObject **presult) *presult = NULL; return PYGEN_ERROR; } - PyAwaitableObject *aw = (PyAwaitableObject *)self; *presult = send_res; return PYGEN_NEXT; diff --git a/src/_pyawaitable/genwrapper.c b/src/_pyawaitable/genwrapper.c index f6eed35..36fdeb5 100644 --- a/src/_pyawaitable/genwrapper.c +++ b/src/_pyawaitable/genwrapper.c @@ -12,6 +12,10 @@ aw->aw_done = true; \ Py_CLEAR(g->gw_aw); \ } while (0) +#define DONE_IF_OK(cb) \ + if (cb != NULL) { \ + DONE(cb); \ + } static PyObject * gen_new(PyTypeObject *tp, PyObject *args, PyObject *kwds) @@ -32,26 +36,29 @@ gen_new(PyTypeObject *tp, PyObject *args, PyObject *kwds) return (PyObject *) g; } +static int +genwrapper_traverse(PyObject *self, visitproc visit, void *arg) +{ + GenWrapperObject *gw = (GenWrapperObject *) self; + Py_VISIT(gw->gw_current_await); + Py_VISIT(gw->gw_aw); + return 0; +} + +static int +genwrapper_clear(PyObject *self) +{ + GenWrapperObject *gw = (GenWrapperObject *) self; + Py_CLEAR(gw->gw_current_await); + Py_CLEAR(gw->gw_aw); + return 0; +} + static void gen_dealloc(PyObject *self) { - GenWrapperObject *g = (GenWrapperObject *) self; - if (g->gw_current_await != NULL) - { - PyErr_SetString( - PyExc_SystemError, - "sanity check: gw_current_await was not cleared!" - ); - PyErr_WriteUnraisable(self); - } - if (g->gw_aw != NULL) - { - PyErr_SetString( - PyExc_SystemError, - "sanity check: gw_aw was not cleared!" - ); - PyErr_WriteUnraisable(self); - } + PyObject_GC_UnTrack(self); + (void)genwrapper_clear(self); Py_TYPE(self)->tp_free(self); } @@ -75,23 +82,20 @@ genwrapper_new(PyAwaitableObject *aw) int genwrapper_fire_err_callback( PyObject *self, - PyObject *await, - pyawaitable_callback *cb + awaitcallback_err err_callback ) { assert(PyErr_Occurred() != NULL); - if (!cb->err_callback) + if (err_callback == NULL) { - cb->done = true; return -1; } PyObject *err = PyErr_GetRaisedException(); Py_INCREF(self); - int e_res = cb->err_callback(self, err); + int e_res = err_callback(self, err); Py_DECREF(self); - cb->done = true; if (e_res < 0) { @@ -101,8 +105,9 @@ genwrapper_fire_err_callback( { PyErr_SetRaisedException(err); } else + { Py_DECREF(err); - + } return -1; } @@ -110,6 +115,15 @@ genwrapper_fire_err_callback( return 0; } +static inline pyawaitable_callback * +genwrapper_advance(GenWrapperObject *gw) +{ + return pyawaitable_array_GET_ITEM( + &gw->gw_aw->aw_callbacks, + gw->gw_aw->aw_state++ + ); +} + PyObject * genwrapper_next(PyObject *self) { @@ -119,26 +133,17 @@ genwrapper_next(PyObject *self) if (!aw) { PyErr_SetString( - PyExc_SystemError, + PyExc_RuntimeError, "pyawaitable: genwrapper used after return" ); return NULL; } pyawaitable_callback *cb; - if (aw->aw_state == CALLBACK_ARRAY_SIZE) - { - PyErr_SetString( - PyExc_SystemError, - "pyawaitable: object cannot handle more than 255 coroutines" - ); - AW_DONE(); - return NULL; - } if (g->gw_current_await == NULL) { - if (aw->aw_callbacks[aw->aw_state].coro == NULL) + if (pyawaitable_array_LENGTH(&aw->aw_callbacks) == aw->aw_state) { PyErr_SetObject( PyExc_StopIteration, @@ -148,7 +153,19 @@ genwrapper_next(PyObject *self) return NULL; } - cb = &aw->aw_callbacks[aw->aw_state++]; + cb = genwrapper_advance(g); + assert(cb != NULL); + assert(cb->done == false); + assert(cb->coro != NULL); + + if (cb->coro == NULL) + { + printf( + "len: %ld, state: %ld\n", + pyawaitable_array_LENGTH(&aw->aw_callbacks), + aw->aw_state + ); + } if ( Py_TYPE(cb->coro)->tp_as_async == NULL || @@ -173,22 +190,21 @@ genwrapper_next(PyObject *self) if ( genwrapper_fire_err_callback( (PyObject *)aw, - g->gw_current_await, - cb + cb->err_callback ) < 0 ) { - DONE(cb); + DONE_IF_OK(cb); AW_DONE(); return NULL; } - DONE(cb); + DONE_IF_OK(cb); return genwrapper_next(self); } } else { - cb = &aw->aw_callbacks[aw->aw_state - 1]; + cb = pyawaitable_array_GET_ITEM(&aw->aw_callbacks, aw->aw_state - 1); } PyObject *result = Py_TYPE( @@ -197,6 +213,7 @@ genwrapper_next(PyObject *self) if (result != NULL) { + // Yield! return result; } @@ -220,8 +237,7 @@ genwrapper_next(PyObject *self) if ( genwrapper_fire_err_callback( (PyObject *) aw, - g->gw_current_await, - cb + cb->err_callback ) < 0 ) { @@ -243,6 +259,7 @@ genwrapper_next(PyObject *self) return genwrapper_next(self); } + // Deduce the return value of the coroutine PyObject *value; if (occurred) { @@ -264,16 +281,24 @@ genwrapper_next(PyObject *self) value = Py_NewRef(Py_None); } + // Preserve the error callback in case we get cancelled + awaitcallback_err err_callback = cb->err_callback; Py_INCREF(aw); int res = cb->callback((PyObject *) aw, value); Py_DECREF(aw); Py_DECREF(value); + // If we recently cancelled, then cb is no longer valid + if (aw->aw_recently_cancelled) + { + cb = NULL; + } + if (res < -1) { // -2 or lower denotes that the error should be deferred, // regardless of whether a handler is present. - DONE(cb); + DONE_IF_OK(cb); AW_DONE(); return NULL; } @@ -283,8 +308,8 @@ genwrapper_next(PyObject *self) if (!PyErr_Occurred()) { PyErr_SetString( - PyExc_SystemError, - "pyawaitable: callback returned -1 without exception set" + PyExc_RuntimeError, + "pyawaitable: user callback returned -1 without exception set" ); DONE(cb); AW_DONE(); @@ -293,18 +318,17 @@ genwrapper_next(PyObject *self) if ( genwrapper_fire_err_callback( (PyObject *) aw, - g->gw_current_await, - cb + err_callback ) < 0 ) { - DONE(cb); + DONE_IF_OK(cb); AW_DONE(); return NULL; } } - DONE(cb); + DONE_IF_OK(cb); return genwrapper_next(self); } @@ -314,8 +338,10 @@ PyTypeObject _PyAwaitableGenWrapperType = .tp_name = "_genwrapper", .tp_basicsize = sizeof(GenWrapperObject), .tp_dealloc = gen_dealloc, - .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, .tp_iter = PyObject_SelfIter, .tp_iternext = genwrapper_next, + .tp_clear = genwrapper_clear, + .tp_traverse = genwrapper_traverse, .tp_new = gen_new, }; diff --git a/src/_pyawaitable/mod.c b/src/_pyawaitable/mod.c index c09fdc8..10e9907 100644 --- a/src/_pyawaitable/mod.c +++ b/src/_pyawaitable/mod.c @@ -58,12 +58,6 @@ static PyAwaitableABI _abi_interface = pyawaitable_async_with_impl }; -static void -close_pool(PyObject *Py_UNUSED(capsule)) -{ - dealloc_awaitable_pool(); -} - PyMODINIT_FUNC PyInit__pyawaitable(void) { @@ -73,7 +67,7 @@ PyInit__pyawaitable(void) PyObject *capsule = PyCapsule_New( &_abi_interface, "_pyawaitable.abi_v1", - close_pool + NULL ); if (!capsule) @@ -89,11 +83,5 @@ PyInit__pyawaitable(void) return NULL; } - if (alloc_awaitable_pool() < 0) - { - Py_DECREF(m); - return NULL; - } - return m; } diff --git a/src/_pyawaitable/values.c b/src/_pyawaitable/values.c index 8ba7043..df95373 100644 --- a/src/_pyawaitable/values.c +++ b/src/_pyawaitable/values.c @@ -1,90 +1,106 @@ #include +#include + +#include #include +#include #include -#include -#define UNPACK(arr, tp, err, index) \ - do { \ - assert(awaitable != NULL); \ - PyAwaitableObject *aw = (PyAwaitableObject *) awaitable; \ - Py_INCREF(awaitable); \ - if (index == 0) { \ - PyErr_SetString( \ - PyExc_ValueError, \ - "pyawaitable: awaitable object has no stored " err \ - ); \ - Py_DECREF(awaitable); \ - return -1; \ - } \ - va_list args; \ - va_start(args, awaitable); \ - for (Py_ssize_t i = 0; i < index; ++i) { \ - tp ptr = va_arg(args, tp); \ - if (ptr == NULL) \ - continue; \ - *ptr = arr[i]; \ - } \ - va_end(args); \ - Py_DECREF(awaitable); \ - return 0; \ - } while (0) - -#define SAVE_ERR(err) \ - "pyawaitable: " err " array has a capacity of 32" \ - ", so storing %ld more would overflow it" \ - -#define SAVE(arr, index, tp, err, wrap) \ - do { \ - assert(awaitable != NULL); \ - assert(nargs != 0); \ - Py_INCREF(awaitable); \ - PyAwaitableObject *aw = (PyAwaitableObject *) awaitable; \ - Py_ssize_t final_size = index + nargs; \ - if (final_size >= VALUE_ARRAY_SIZE) { \ - PyErr_Format( \ - PyExc_SystemError, \ - SAVE_ERR(err), \ - final_size \ - ); \ - return -1; \ - } \ - va_list vargs; \ - va_start(vargs, nargs); \ - for (Py_ssize_t i = index; i < final_size; ++i) { \ - arr[i] = wrap(va_arg(vargs, tp)); \ - } \ - index += nargs; \ - va_end(vargs); \ - Py_DECREF(awaitable); \ - return 0; \ - } while (0) - -#define INDEX_HEAD(arr, idx, ret) \ - PyAwaitableObject *aw = (PyAwaitableObject *) awaitable; \ - if ((index >= idx) || (index < 0)) { \ - PyErr_Format( \ - PyExc_IndexError, \ - "pyawaitable: index %ld out of range for %ld stored values", \ - index, \ - idx \ - ); \ - return ret; \ - } - #define NOTHING -/* Normal Values */ +#define SAVE(field, type, extra) \ + PyAwaitableObject *aw = (PyAwaitableObject *) awaitable; \ + pyawaitable_array *array = &aw->field; \ + va_list vargs; \ + va_start(vargs, nargs); \ + for (Py_ssize_t i = 0; i < nargs; ++i) { \ + type ptr = va_arg(vargs, type); \ + assert((void *)ptr != NULL); \ + if (pyawaitable_array_append(array, (void *)ptr) < 0) { \ + PyErr_NoMemory(); \ + return -1; \ + } \ + extra; \ + } \ + va_end(vargs); \ + return 0 + +#define UNPACK(field, type) \ + PyAwaitableObject *aw = (PyAwaitableObject *) awaitable; \ + pyawaitable_array *array = &aw->field; \ + if (pyawaitable_array_LENGTH(array) == 0) { \ + PyErr_SetString( \ + PyExc_RuntimeError, \ + "pyawaitable: object has no stored values" \ + ); \ + return -1; \ + } \ + va_list vargs; \ + va_start(vargs, awaitable); \ + for (Py_ssize_t i = 0; i < pyawaitable_array_LENGTH(array); ++i) { \ + type *ptr = va_arg(vargs, type *); \ + if (ptr == NULL) { \ + continue; \ + } \ + *ptr = (type)pyawaitable_array_GET_ITEM(array, i); \ + } \ + va_end(vargs); \ + return 0 + +#define SET(field, type) \ + assert(awaitable != NULL); \ + PyAwaitableObject *aw = (PyAwaitableObject *) awaitable; \ + pyawaitable_array *array = &aw->field; \ + if (check_index(index, array) < 0) { \ + return -1; \ + } \ + pyawaitable_array_set(array, index, (void *)(new_value)); \ + return 0 + +#define GET(field, type) \ + assert(awaitable != NULL); \ + PyAwaitableObject *aw = (PyAwaitableObject *) awaitable; \ + pyawaitable_array *array = &aw->field; \ + if (check_index(index, array) < 0) { \ + return (type)NULL; \ + } \ + return (type)pyawaitable_array_GET_ITEM(array, index) + +static int +check_index(Py_ssize_t index, pyawaitable_array *array) +{ + assert(array != NULL); + if (index < 0) + { + PyErr_SetString( + PyExc_IndexError, + "pyawaitable: cannot set negative index" + ); + return -1; + } + + if (index >= pyawaitable_array_LENGTH(array)) + { + PyErr_SetString( + PyExc_IndexError, + "pyawaitable: cannot set index that is out of bounds" + ); + return -1; + } + + return 0; +} int pyawaitable_unpack_impl(PyObject *awaitable, ...) { - UNPACK(aw->aw_values, PyObject * *, "values", aw->aw_values_index); + UNPACK(aw_object_values, PyObject *); } int pyawaitable_save_impl(PyObject *awaitable, Py_ssize_t nargs, ...) { - SAVE(aw->aw_values, aw->aw_values_index, PyObject *, "values", Py_NewRef); + SAVE(aw_object_values, PyObject *, Py_INCREF(ptr)); } int @@ -94,9 +110,7 @@ pyawaitable_set_impl( PyObject *new_value ) { - INDEX_HEAD(aw->aw_values, aw->aw_values_index, -1); - Py_SETREF(aw->aw_values[index], Py_NewRef(new_value)); - return 0; + SET(aw_object_values, Py_NewRef); } PyObject * @@ -105,8 +119,7 @@ pyawaitable_get_impl( Py_ssize_t index ) { - INDEX_HEAD(aw->aw_values, aw->aw_values_index, NULL); - return aw->aw_values[index]; + GET(aw_object_values, PyObject *); } /* Arbitrary Values */ @@ -114,24 +127,13 @@ pyawaitable_get_impl( int pyawaitable_unpack_arb_impl(PyObject *awaitable, ...) { - UNPACK( - aw->aw_arb_values, - void **, - "arbitrary values", - aw->aw_arb_values_index - ); + UNPACK(aw_arbitrary_values, void *); } int pyawaitable_save_arb_impl(PyObject *awaitable, Py_ssize_t nargs, ...) { - SAVE( - aw->aw_arb_values, - aw->aw_arb_values_index, - void *, - "arbitrary values", - NOTHING - ); + SAVE(aw_arbitrary_values, void *, NOTHING); } int @@ -141,9 +143,7 @@ pyawaitable_set_arb_impl( void *new_value ) { - INDEX_HEAD(aw->aw_arb_values, aw->aw_arb_values_index, -1); - aw->aw_arb_values[index] = new_value; - return 0; + SET(aw_arbitrary_values, void *); } void * @@ -152,8 +152,7 @@ pyawaitable_get_arb_impl( Py_ssize_t index ) { - INDEX_HEAD(aw->aw_arb_values, aw->aw_arb_values_index, NULL); - return aw->aw_arb_values[index]; + GET(aw_arbitrary_values, void *); } /* Integer Values */ @@ -161,24 +160,13 @@ pyawaitable_get_arb_impl( int pyawaitable_unpack_int_impl(PyObject *awaitable, ...) { - UNPACK( - aw->aw_int_values, - long *, - "integer values", - aw->aw_int_values_index - ); + UNPACK(aw_integer_values, long); } int pyawaitable_save_int_impl(PyObject *awaitable, Py_ssize_t nargs, ...) { - SAVE( - aw->aw_int_values, - aw->aw_int_values_index, - long, - "integer values", - NOTHING - ); + SAVE(aw_integer_values, long, NOTHING); } int @@ -188,9 +176,7 @@ pyawaitable_set_int_impl( long new_value ) { - INDEX_HEAD(aw->aw_int_values, aw->aw_int_values_index, -1); - aw->aw_int_values[index] = new_value; - return 0; + SET(aw_integer_values, long); } long @@ -199,6 +185,5 @@ pyawaitable_get_int_impl( Py_ssize_t index ) { - INDEX_HEAD(aw->aw_int_values, aw->aw_int_values_index, -1); - return aw->aw_int_values[index]; + GET(aw_integer_values, long); } diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index bea8a87..7229dd7 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -94,7 +94,7 @@ def cb(awaitable_inner: pyawaitable.PyAwaitable, result: int) -> int: add_await(awaitable, coro(), cb, awaitcallback_err(0)) - with pytest.raises(SystemError): + with pytest.raises(RuntimeError): await awaitable @@ -144,3 +144,31 @@ async def coro() -> int: await awaitable assert called is True + +@limit_leaks +@pytest.mark.asyncio +async def test_a_lot_of_coroutines(): + awaitable = abi.new() + amount = 500 + + awaited = 0 + called = 0 + + async def coro(): + await asyncio.sleep(0) + nonlocal awaited + awaited += 1 + + @awaitcallback + def callback(awaitable: pyawaitable.PyAwaitable, result: None) -> int: + assert result is None + nonlocal called + called += 1 + return 0 + + for _ in range(amount): + add_await(awaitable, coro(), callback, awaitcallback_err(0)) + + await awaitable + assert called == amount + assert awaited == amount