Skip to content
2 changes: 1 addition & 1 deletion Lib/test/test_warnings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ def test_late_resource_warning(self):
self.assertTrue(err.startswith(expected), ascii(err))


class DeprecatedTests(unittest.TestCase):
class DeprecatedTests(PyPublicAPITests):
def test_dunder_deprecated(self):
@deprecated("A will go away soon")
class A:
Expand Down
175 changes: 106 additions & 69 deletions Lib/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,25 @@ def simplefilter(action, category=Warning, lineno=0, append=False):
_add_filter(action, None, category, None, lineno, append=append)

def _add_filter(*item, append):
# Remove possible duplicate filters, so new one will be placed
# in correct place. If append=True and duplicate exists, do nothing.
if not append:
try:
filters.remove(item)
except ValueError:
pass
filters.insert(0, item)
else:
if item not in filters:
filters.append(item)
_filters_mutated()
with _lock:
if not append:
# Remove possible duplicate filters, so new one will be placed
# in correct place. If append=True and duplicate exists, do nothing.
try:
filters.remove(item)
except ValueError:
pass
filters.insert(0, item)
else:
if item not in filters:
filters.append(item)
_filters_mutated_unlocked()

def resetwarnings():
"""Clear the list of warning filters, so that no filters are active."""
filters[:] = []
_filters_mutated()
with _lock:
filters[:] = []
_filters_mutated_unlocked()

class _OptionError(Exception):
"""Exception used by option processing helpers."""
Expand Down Expand Up @@ -353,64 +355,66 @@ def warn_explicit(message, category, filename, lineno,
module = filename or "<unknown>"
if module[-3:].lower() == ".py":
module = module[:-3] # XXX What about leading pathname?
if registry is None:
registry = {}
if registry.get('version', 0) != _filters_version:
registry.clear()
registry['version'] = _filters_version
if isinstance(message, Warning):
text = str(message)
category = message.__class__
else:
text = message
message = category(message)
key = (text, category, lineno)
# Quick test for common case
if registry.get(key):
return
# Search the filters
for item in filters:
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
(mod is None or mod.match(module)) and
(ln == 0 or lineno == ln)):
break
else:
action = defaultaction
# Early exit actions
if action == "ignore":
return
with _lock:
if registry is None:
registry = {}
if registry.get('version', 0) != _filters_version:
registry.clear()
registry['version'] = _filters_version
# Quick test for common case
if registry.get(key):
return
# Search the filters
for item in filters:
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
(mod is None or mod.match(module)) and
(ln == 0 or lineno == ln)):
break
else:
action = defaultaction
# Early exit actions
if action == "ignore":
return

if action == "error":
raise message
# Other actions
if action == "once":
registry[key] = 1
oncekey = (text, category)
if onceregistry.get(oncekey):
return
onceregistry[oncekey] = 1
elif action in {"always", "all"}:
pass
elif action == "module":
registry[key] = 1
altkey = (text, category, 0)
if registry.get(altkey):
return
registry[altkey] = 1
elif action == "default":
registry[key] = 1
else:
# Unrecognized actions are errors
raise RuntimeError(
"Unrecognized action (%r) in warnings.filters:\n %s" %
(action, item))

# Prime the linecache for formatting, in case the
# "file" is actually in a zipfile or something.
import linecache
linecache.getlines(filename, module_globals)

if action == "error":
raise message
# Other actions
if action == "once":
registry[key] = 1
oncekey = (text, category)
if onceregistry.get(oncekey):
return
onceregistry[oncekey] = 1
elif action in {"always", "all"}:
pass
elif action == "module":
registry[key] = 1
altkey = (text, category, 0)
if registry.get(altkey):
return
registry[altkey] = 1
elif action == "default":
registry[key] = 1
else:
# Unrecognized actions are errors
raise RuntimeError(
"Unrecognized action (%r) in warnings.filters:\n %s" %
(action, item))
# Print message and context
msg = WarningMessage(message, category, filename, lineno, source)
_showwarnmsg(msg)
Expand Down Expand Up @@ -488,11 +492,12 @@ def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._module._filters_mutated()
self._showwarning = self._module.showwarning
self._showwarnmsg_impl = self._module._showwarnmsg_impl
with _lock:
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._module._filters_mutated_unlocked()
self._showwarning = self._module.showwarning
self._showwarnmsg_impl = self._module._showwarnmsg_impl
if self._filter is not None:
simplefilter(*self._filter)
if self._record:
Expand All @@ -508,10 +513,11 @@ def __enter__(self):
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
self._module.filters = self._filters
self._module._filters_mutated()
self._module.showwarning = self._showwarning
self._module._showwarnmsg_impl = self._showwarnmsg_impl
with _lock:
self._module.filters = self._filters
self._module._filters_mutated_unlocked()
self._module.showwarning = self._showwarning
self._module._showwarnmsg_impl = self._showwarnmsg_impl


class deprecated:
Expand Down Expand Up @@ -701,17 +707,48 @@ def extract():
# If either if the compiled regexs are None, match anything.
try:
from _warnings import (filters, _defaultaction, _onceregistry,
warn, warn_explicit, _filters_mutated)
warn, warn_explicit,
_filters_mutated_unlocked,
_acquire_lock, _release_lock,
)
defaultaction = _defaultaction
onceregistry = _onceregistry
_warnings_defaults = True

class _Lock:
def __enter__(self):
_acquire_lock()
return self

def __exit__(self, *args):
_release_lock()

_lock = _Lock()

def _filters_mutated():
# Even though this function is part of the public API, it's used
# by a fair amount of user code.
with _lock:
_filters_mutated_unlocked()

except ImportError:
filters = []
defaultaction = "default"
onceregistry = {}

import _thread

# Note that this is a non-reentrant lock, matching what's used by
# _acquire_lock() and _release_lock(). Care must be taken to
# not deadlock.
_lock = _thread.LockType()

_filters_version = 1

def _filters_mutated_unlocked():
global _filters_version
_filters_version += 1

def _filters_mutated():
global _filters_version
_filters_version += 1
Expand Down
60 changes: 53 additions & 7 deletions Python/_warnings.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,47 @@ get_warnings_attr(PyInterpreterState *interp, PyObject *attr, int try_import)
return obj;
}

/*[clinic input]
_acquire_lock as warnings_acquire_lock

[clinic start generated code]*/

static PyObject *
warnings_acquire_lock_impl(PyObject *module)
/*[clinic end generated code: output=594313457d1bf8e1 input=46ec20e55acca52f]*/
{
PyInterpreterState *interp = get_current_interp();
if (interp == NULL) {
return NULL;
}

WarningsState *st = warnings_get_state(interp);
assert(st != NULL);

_PyMutex_Lock(&st->mutex);
Py_RETURN_NONE;
}

/*[clinic input]
_release_lock as warnings_release_lock

[clinic start generated code]*/

static PyObject *
warnings_release_lock_impl(PyObject *module)
/*[clinic end generated code: output=d73d5a8789396750 input=ea01bb77870c5693]*/
{
PyInterpreterState *interp = get_current_interp();
if (interp == NULL) {
return NULL;
}

WarningsState *st = warnings_get_state(interp);
assert(st != NULL);

_PyMutex_Unlock(&st->mutex);
Py_RETURN_NONE;
}

static PyObject *
get_once_registry(PyInterpreterState *interp)
Expand Down Expand Up @@ -1165,13 +1206,13 @@ warnings_warn_explicit_impl(PyObject *module, PyObject *message,
}

/*[clinic input]
_filters_mutated as warnings_filters_mutated
_filters_mutated_unlocked as warnings_filters_mutated_unlocked

[clinic start generated code]*/

static PyObject *
warnings_filters_mutated_impl(PyObject *module)
/*[clinic end generated code: output=8ce517abd12b88f4 input=35ecbf08ee2491b2]*/
warnings_filters_mutated_unlocked_impl(PyObject *module)
/*[clinic end generated code: output=05ce1f43c187b8cc input=c0488aa2a6f0f661]*/
{
PyInterpreterState *interp = get_current_interp();
if (interp == NULL) {
Expand All @@ -1181,14 +1222,17 @@ warnings_filters_mutated_impl(PyObject *module)
WarningsState *st = warnings_get_state(interp);
assert(st != NULL);

Py_BEGIN_CRITICAL_SECTION_MUT(&st->mutex);
// Note that the lock must be held by the caller.
if (!PyMutex_IsLocked(&st->mutex)) {
PyErr_SetString(PyExc_RuntimeError, "warnings mutex is not held");
return NULL;
}

st->filters_version++;
Py_END_CRITICAL_SECTION();

Py_RETURN_NONE;
}


/* Function to issue a warning message; may raise an exception. */

static int
Expand Down Expand Up @@ -1464,7 +1508,9 @@ _PyErr_WarnUnawaitedCoroutine(PyObject *coro)
static PyMethodDef warnings_functions[] = {
WARNINGS_WARN_METHODDEF
WARNINGS_WARN_EXPLICIT_METHODDEF
WARNINGS_FILTERS_MUTATED_METHODDEF
WARNINGS_FILTERS_MUTATED_UNLOCKED_METHODDEF
WARNINGS_ACQUIRE_LOCK_METHODDEF
WARNINGS_RELEASE_LOCK_METHODDEF
/* XXX(brett.cannon): add showwarning? */
/* XXX(brett.cannon): Reasonable to add formatwarning? */
{NULL, NULL} /* sentinel */
Expand Down
Loading
Loading