From 7d8b9f9e498b578ed11c6c8b129d833488e4fb9c Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 18 Oct 2024 16:37:05 -0600 Subject: [PATCH 1/9] Raise an exception if _globals_init() fails. --- Modules/_interpchannelsmodule.c | 3 ++- Modules/_interpqueuesmodule.c | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index a8b4a8d76b0eaa..c52cde6da500f7 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -3482,7 +3482,8 @@ The 'interpreters' module provides a more convenient interface."); static int module_exec(PyObject *mod) { - if (_globals_init() != 0) { + int err = _globals_init(); + if (handle_channel_error(err, mod, -1)) { return -1; } diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 55c43199ee4d79..7fd18f65f5ec3d 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -1894,7 +1894,8 @@ The 'interpreters' module provides a more convenient interface."); static int module_exec(PyObject *mod) { - if (_globals_init() != 0) { + int err = _globals_init(); + if (handle_queue_error(err, mod, -1)) { return -1; } From 2579a3a77d78f28b5c8f6707135a028c98714925 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 11:06:11 -0600 Subject: [PATCH 2/9] Free the mutex in _globals_fini(). --- Modules/_interpchannelsmodule.c | 15 +++++++++------ Modules/_interpqueuesmodule.c | 16 ++++++++++------ 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index c52cde6da500f7..3abb3385c14f55 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -1349,6 +1349,7 @@ typedef struct _channels { static void _channels_init(_channels *channels, PyThread_type_lock mutex) { + assert(mutex != NULL); channels->mutex = mutex; channels->head = NULL; channels->numopen = 0; @@ -1356,14 +1357,12 @@ _channels_init(_channels *channels, PyThread_type_lock mutex) } static void -_channels_fini(_channels *channels) +_channels_fini(_channels *channels, PyThread_type_lock *p_mutex) { assert(channels->numopen == 0); assert(channels->head == NULL); - if (channels->mutex != NULL) { - PyThread_free_lock(channels->mutex); - channels->mutex = NULL; - } + *p_mutex = channels->mutex; + channels->mutex = NULL; } static int64_t @@ -2844,7 +2843,11 @@ _globals_fini(void) return; } - _channels_fini(&_globals.channels); + PyThread_type_lock mutex; + _channels_fini(&_globals.channels, &mutex); + if (mutex != NULL) { + PyThread_free_lock(mutex); + } } static _channels * diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 7fd18f65f5ec3d..9ae92060b6b8bc 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -845,6 +845,7 @@ typedef struct _queues { static void _queues_init(_queues *queues, PyThread_type_lock mutex) { + assert(mutex != NULL); queues->mutex = mutex; queues->head = NULL; queues->count = 0; @@ -852,8 +853,9 @@ _queues_init(_queues *queues, PyThread_type_lock mutex) } static void -_queues_fini(_queues *queues) +_queues_fini(_queues *queues, PyThread_type_lock *p_mutex) { + assert(queues->mutex != NULL); if (queues->count > 0) { PyThread_acquire_lock(queues->mutex, WAIT_LOCK); assert((queues->count == 0) != (queues->head != NULL)); @@ -863,10 +865,8 @@ _queues_fini(_queues *queues) PyThread_release_lock(queues->mutex); _queuerefs_clear(head); } - if (queues->mutex != NULL) { - PyThread_free_lock(queues->mutex); - queues->mutex = NULL; - } + *p_mutex = queues->mutex; + queues->mutex = NULL; } static int64_t @@ -1430,7 +1430,11 @@ _globals_fini(void) return; } - _queues_fini(&_globals.queues); + PyThread_type_lock mutex; + _queues_fini(&_globals.queues, &mutex); + if (mutex != NULL) { + PyThread_free_lock(mutex); + } } static _queues * From 98d2a7d63c69a5cbec4ce44114abafd5af33e6ef Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 11:12:11 -0600 Subject: [PATCH 3/9] Use atomic operations for the module count. --- Modules/_interpchannelsmodule.c | 10 +++------- Modules/_interpqueuesmodule.c | 10 +++------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index 3abb3385c14f55..2e8a5d946f279c 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -2811,16 +2811,14 @@ set_channelend_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv) the data that we need to share between interpreters, so it cannot hold PyObject values. */ static struct globals { - int module_count; + uint8_t module_count; _channels channels; } _globals = {0}; static int _globals_init(void) { - // XXX This isn't thread-safe. - _globals.module_count++; - if (_globals.module_count > 1) { + if (_Py_atomic_add_uint8(&_globals.module_count, 1) > 0) { // Already initialized. return 0; } @@ -2837,9 +2835,7 @@ _globals_init(void) static void _globals_fini(void) { - // XXX This isn't thread-safe. - _globals.module_count--; - if (_globals.module_count > 0) { + if (_Py_atomic_add_uint8(&_globals.module_count, -1) > 1) { return; } diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 9ae92060b6b8bc..b4dd41324b5a06 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -1398,16 +1398,14 @@ _queueobj_shared(PyThreadState *tstate, PyObject *queueobj, the data that we need to share between interpreters, so it cannot hold PyObject values. */ static struct globals { - int module_count; + uint8_t module_count; _queues queues; } _globals = {0}; static int _globals_init(void) { - // XXX This isn't thread-safe. - _globals.module_count++; - if (_globals.module_count > 1) { + if (_Py_atomic_add_uint8(&_globals.module_count, 1) > 0) { // Already initialized. return 0; } @@ -1424,9 +1422,7 @@ _globals_init(void) static void _globals_fini(void) { - // XXX This isn't thread-safe. - _globals.module_count--; - if (_globals.module_count > 0) { + if (_Py_atomic_add_uint8(&_globals.module_count, -1) > 1) { return; } From 89212915d7f0226b5eab310db1d489e75c2325f3 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 11:42:16 -0600 Subject: [PATCH 4/9] Check the mutex in queues/channels_init(). --- Modules/_interpchannelsmodule.c | 2 +- Modules/_interpqueuesmodule.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index 2e8a5d946f279c..658e954ac1598a 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -1350,6 +1350,7 @@ static void _channels_init(_channels *channels, PyThread_type_lock mutex) { assert(mutex != NULL); + assert(channels->mutex == NULL); channels->mutex = mutex; channels->head = NULL; channels->numopen = 0; @@ -2823,7 +2824,6 @@ _globals_init(void) return 0; } - assert(_globals.channels.mutex == NULL); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_CHANNELS_MUTEX_INIT; diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index b4dd41324b5a06..7ca7fc96ecf8af 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -846,6 +846,7 @@ static void _queues_init(_queues *queues, PyThread_type_lock mutex) { assert(mutex != NULL); + assert(queues->mutex == NULL); queues->mutex = mutex; queues->head = NULL; queues->count = 0; @@ -1410,7 +1411,6 @@ _globals_init(void) return 0; } - assert(_globals.queues.mutex == NULL); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_QUEUES_ALLOC; From 202b75d8d2d120c18c6ff3f388bd12e92b9849b7 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 12:02:37 -0600 Subject: [PATCH 5/9] Use a static initializer init queues/channels_init()/_fini(). --- Modules/_interpchannelsmodule.c | 20 ++++++++++++++------ Modules/_interpqueuesmodule.c | 30 ++++++++++++++++-------------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index 658e954ac1598a..c242d0ca9ad58e 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -1351,19 +1351,27 @@ _channels_init(_channels *channels, PyThread_type_lock mutex) { assert(mutex != NULL); assert(channels->mutex == NULL); - channels->mutex = mutex; - channels->head = NULL; - channels->numopen = 0; - channels->next_id = 0; + *channels = (_channels){ + .mutex = mutex, + .head = NULL, + .numopen = 0, + .next_id = 0, + }; } static void _channels_fini(_channels *channels, PyThread_type_lock *p_mutex) { + PyThread_type_lock mutex = channels->mutex; + assert(mutex != NULL); + + PyThread_acquire_lock(mutex, WAIT_LOCK); assert(channels->numopen == 0); assert(channels->head == NULL); - *p_mutex = channels->mutex; - channels->mutex = NULL; + *channels = (_channels){0}; + PyThread_release_lock(mutex); + + *p_mutex = mutex; } static int64_t diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 7ca7fc96ecf8af..3b1d622470d8c3 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -847,27 +847,29 @@ _queues_init(_queues *queues, PyThread_type_lock mutex) { assert(mutex != NULL); assert(queues->mutex == NULL); - queues->mutex = mutex; - queues->head = NULL; - queues->count = 0; - queues->next_id = 1; + *queues = (_queues){ + .mutex = mutex, + .head = NULL, + .count = 0, + .next_id = 1, + }; } static void _queues_fini(_queues *queues, PyThread_type_lock *p_mutex) { - assert(queues->mutex != NULL); + PyThread_type_lock mutex = queues->mutex; + assert(mutex != NULL); + + PyThread_acquire_lock(mutex, WAIT_LOCK); if (queues->count > 0) { - PyThread_acquire_lock(queues->mutex, WAIT_LOCK); - assert((queues->count == 0) != (queues->head != NULL)); - _queueref *head = queues->head; - queues->head = NULL; - queues->count = 0; - PyThread_release_lock(queues->mutex); - _queuerefs_clear(head); + assert(queues->head != NULL); + _queuerefs_clear(queues->head); } - *p_mutex = queues->mutex; - queues->mutex = NULL; + *queues = (_queues){0}; + PyThread_release_lock(mutex); + + *p_mutex = mutex; } static int64_t From 2822adace0d7ab65e17a95f4e09be7980c2d51fe Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 12:25:32 -0600 Subject: [PATCH 6/9] Use a global lock around _globals_init() and _globals_fini(). --- Modules/_interpchannelsmodule.c | 39 +++++++++++++++++++-------------- Modules/_interpqueuesmodule.c | 39 +++++++++++++++++++-------------- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index c242d0ca9ad58e..131f6af705d068 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -2820,38 +2820,43 @@ set_channelend_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv) the data that we need to share between interpreters, so it cannot hold PyObject values. */ static struct globals { - uint8_t module_count; + PyMutex mutex; + int module_count; _channels channels; } _globals = {0}; static int _globals_init(void) { - if (_Py_atomic_add_uint8(&_globals.module_count, 1) > 0) { - // Already initialized. - return 0; - } - - PyThread_type_lock mutex = PyThread_allocate_lock(); - if (mutex == NULL) { - return ERR_CHANNELS_MUTEX_INIT; + PyMutex_Lock(&_globals.mutex); + assert(_globals.module_count >= 0); + _globals.module_count++; + if (_globals.module_count == 1) { + // Called for the first time. + PyThread_type_lock mutex = PyThread_allocate_lock(); + if (mutex == NULL) { + PyMutex_Unlock(&_globals.mutex); + return ERR_CHANNELS_MUTEX_INIT; + } + _channels_init(&_globals.channels, mutex); } - _channels_init(&_globals.channels, mutex); + PyMutex_Unlock(&_globals.mutex); return 0; } static void _globals_fini(void) { - if (_Py_atomic_add_uint8(&_globals.module_count, -1) > 1) { - return; - } - - PyThread_type_lock mutex; - _channels_fini(&_globals.channels, &mutex); - if (mutex != NULL) { + PyMutex_Lock(&_globals.mutex); + assert(_globals.module_count > 0); + _globals.module_count--; + if (_globals.module_count == 0) { + PyThread_type_lock mutex; + _channels_fini(&_globals.channels, &mutex); + assert(mutex != NULL); PyThread_free_lock(mutex); } + PyMutex_Unlock(&_globals.mutex); } static _channels * diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 3b1d622470d8c3..4d739333a75695 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -1401,38 +1401,43 @@ _queueobj_shared(PyThreadState *tstate, PyObject *queueobj, the data that we need to share between interpreters, so it cannot hold PyObject values. */ static struct globals { - uint8_t module_count; + PyMutex mutex; + int module_count; _queues queues; } _globals = {0}; static int _globals_init(void) { - if (_Py_atomic_add_uint8(&_globals.module_count, 1) > 0) { - // Already initialized. - return 0; - } - - PyThread_type_lock mutex = PyThread_allocate_lock(); - if (mutex == NULL) { - return ERR_QUEUES_ALLOC; + PyMutex_Lock(&_globals.mutex); + assert(_globals.module_count >= 0); + _globals.module_count++; + if (_globals.module_count == 1) { + // Called for the first time. + PyThread_type_lock mutex = PyThread_allocate_lock(); + if (mutex == NULL) { + PyMutex_Unlock(&_globals.mutex); + return ERR_QUEUES_ALLOC; + } + _queues_init(&_globals.queues, mutex); } - _queues_init(&_globals.queues, mutex); + PyMutex_Unlock(&_globals.mutex); return 0; } static void _globals_fini(void) { - if (_Py_atomic_add_uint8(&_globals.module_count, -1) > 1) { - return; - } - - PyThread_type_lock mutex; - _queues_fini(&_globals.queues, &mutex); - if (mutex != NULL) { + PyMutex_Lock(&_globals.mutex); + assert(_globals.module_count > 0); + _globals.module_count--; + if (_globals.module_count == 0) { + PyThread_type_lock mutex; + _queues_fini(&_globals.queues, &mutex); + assert(mutex != NULL); PyThread_free_lock(mutex); } + PyMutex_Unlock(&_globals.mutex); } static _queues * From 27fcb3c14f094cb42920b93ca4f087a3575ceefc Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 12:28:43 -0600 Subject: [PATCH 7/9] Update the summary of global state. --- Modules/_interpchannelsmodule.c | 1 + 1 file changed, 1 insertion(+) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index 131f6af705d068..ccfb70ac2f53eb 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -28,6 +28,7 @@ This module has the following process-global state: _globals (static struct globals): + mutex (PyMutex) module_count (int) channels (struct _channels): numopen (int64_t) From cf4e106db08f424758320efc6da1724bbdda7f29 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 13:11:26 -0600 Subject: [PATCH 8/9] Change an incref to a decref. --- Modules/_interpqueuesmodule.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 4d739333a75695..120ce9662ae8a2 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -1315,7 +1315,7 @@ _queueid_xid_new(int64_t qid) struct _queueid_xid *data = PyMem_RawMalloc(sizeof(struct _queueid_xid)); if (data == NULL) { - _queues_incref(queues, qid); + _queues_decref(queues, qid); return NULL; } data->qid = qid; From 9c4c1f2adefebd48afc9243f3b2e15a4cacda48f Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 21 Oct 2024 13:19:10 -0600 Subject: [PATCH 9/9] Decrement the module count if _globals_init() fails. --- Modules/_interpchannelsmodule.c | 1 + Modules/_interpqueuesmodule.c | 1 + 2 files changed, 2 insertions(+) diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c index ccfb70ac2f53eb..8e6b21db76e01c 100644 --- a/Modules/_interpchannelsmodule.c +++ b/Modules/_interpchannelsmodule.c @@ -2836,6 +2836,7 @@ _globals_init(void) // Called for the first time. PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { + _globals.module_count--; PyMutex_Unlock(&_globals.mutex); return ERR_CHANNELS_MUTEX_INIT; } diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index 120ce9662ae8a2..297a1763a98ce6 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -1416,6 +1416,7 @@ _globals_init(void) // Called for the first time. PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { + _globals.module_count--; PyMutex_Unlock(&_globals.mutex); return ERR_QUEUES_ALLOC; }