diff --git a/src/frequenz/core/sentinels.py b/src/frequenz/core/sentinels.py new file mode 100644 index 0000000..2236c74 --- /dev/null +++ b/src/frequenz/core/sentinels.py @@ -0,0 +1,185 @@ +# License: MIT +# Copyright © 2021-2022 Tal Einat +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH +# Based on: +# https://github.com/taleinat/python-stdlib-sentinels/blob/9fdf9628d7bf010f0a66c72b717802c715c7d564/sentinels/sentinels.py + +"""Create unique sentinel objects. + +This module provides a class, [`Sentinel`][frequenz.core.sentinels], which can be used +to create unique sentinel objects as specified by [`PEP +661`](https://peps.python.org/pep-0661/). +""" + + +import sys as _sys +from threading import Lock as _Lock +from typing import Self, cast + +__all__ = ["Sentinel"] + + +# Design and implementation decisions: +# +# The first implementations created a dedicated class for each instance. +# However, once it was decided to use Sentinel for type signatures, there +# was no longer a need for a dedicated class for each sentinel value on order +# to enable strict type signatures. Since class objects consume a relatively +# large amount of memory, the implementation was changed to avoid this. +# +# With this change, the mechanism used for unpickling/copying objects needed +# to be changed too, since we could no longer count on each dedicated class +# simply returning its singleton instance as before. __reduce__ can return +# a string, upon which an attribute with that name is looked up in the module +# and returned. However, that would have meant that pickling/copying support +# would depend on the "name" argument being exactly the name of the variable +# used in the module, and simply wouldn't work for sentinels created in +# functions/methods. Instead, a registry for sentinels was added, where all +# sentinel objects are stored keyed by their name + module name. This is used +# to look up existing sentinels both during normal object creation and during +# copying/unpickling. + + +class Sentinel: + """Create a unique sentinel object. + + Sentinel objects are used to represent special values, such as "no value" or "not + computed yet". They are used in place of [`None`][] to avoid ambiguity, since `None` + can be a valid value in some cases. + + For more details, please check [`PEP 661`](https://peps.python.org/pep-0661/). + + Example: + ```python + from frequenz.core.sentinels import Sentinel + from typing import assert_type + + MISSING = Sentinel('MISSING') + + def func(value: int | MISSING) -> None: + if value is MISSING: + assert_type(value, MISSING) + else: + assert_type(value, int) + ``` + """ + + _name: str + _repr: str + _bool_value: bool + _module_name: str + + def __new__( + cls, + name: str, + repr: str | None = None, # pylint: disable=redefined-builtin + bool_value: object = True, + module_name: str | None = None, + ) -> Self: + """Create a new sentinel object. + + Args: + name: The fully-qualified name of the variable to which the return value + shall be assigned. + repr: The `repr` of the sentinel object. If not provided, "" will be + used (with any leading class names removed). + bool_value: The boolean value of the sentinel object. + module_name: The fully-qualified name of the module in which the sentinel is + created. If not provided, the module name will be inferred from the call + stack. + + Returns: + A unique sentinel object. + """ + name = str(name) + repr = str(repr) if repr else f'<{name.split(".")[-1]}>' + bool_value = bool(bool_value) + if not module_name: + parent_frame = _get_parent_frame() + module_name = ( + parent_frame.f_globals.get("__name__", "__main__") + if parent_frame is not None + else __name__ + ) + + # Include the class's module and fully qualified name in the + # registry key to support sub-classing. + registry_key = _sys.intern( + f"{cls.__module__}-{cls.__qualname__}-{module_name}-{name}" + ) + sentinel = _registry.get(registry_key, None) + if sentinel is not None: + return cast(Self, sentinel) + sentinel = super().__new__(cls) + sentinel._name = name + sentinel._repr = repr + sentinel._bool_value = bool_value + sentinel._module_name = module_name + with _lock: + return cast(Self, _registry.setdefault(registry_key, sentinel)) + + def __repr__(self): + """Return a string representation of the sentinel object.""" + return self._repr + + def __bool__(self): + """Return the boolean value of the sentinel object.""" + return self._bool_value + + def __reduce__(self): + """Return the sentinel object's representation for pickling and copying.""" + return ( + self.__class__, + ( + self._name, + self._repr, + self._bool_value, + self._module_name, + ), + ) + + +# We ignore checks for the rest of the file, as this is an external implementation and +# we hope this module gets added to the Python standard library eventually. +# pylint: disable-all +# mypy: ignore-errors +# type: ignore + +_lock = _Lock() +_registry: dict[str, Sentinel] = {} + + +# The following implementation attempts to support Python +# implementations which don't support sys._getframe(2), such as +# Jython and IronPython. +# +# The version added to the stdlib may simply return sys._getframe(2), +# without the fallbacks. +# +# For reference, see the implementation of namedtuple: +# https://github.com/python/cpython/blob/67444902a0f10419a557d0a2d3b8675c31b075a9/Lib/collections/__init__.py#L503 +def _get_parent_frame(): + """Return the frame object for the caller's parent stack frame.""" + try: + # Two frames up = the parent of the function which called this. + return _sys._getframe(2) + except (AttributeError, ValueError): + global _get_parent_frame + + def _get_parent_frame(): + """Return the frame object for the caller's parent stack frame.""" + try: + raise Exception + except Exception: + try: + return _sys.exc_info()[2].tb_frame.f_back.f_back + except Exception: + global _get_parent_frame + + def _get_parent_frame(): + """Return the frame object for the caller's parent stack frame.""" + return None + + return _get_parent_frame() + + return _get_parent_frame() diff --git a/tests/test_sentinels.py b/tests/test_sentinels.py new file mode 100644 index 0000000..f39fd0e --- /dev/null +++ b/tests/test_sentinels.py @@ -0,0 +1,120 @@ +# License: MIT +# Copyright © 2021-2022 Tal Einat +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH +# Based on: +# https://github.com/taleinat/python-stdlib-sentinels/blob/9fdf9628d7bf010f0a66c72b717802c715c7d564/test/test_sentinels.py + +"""Test the `sentinels` module.""" + +import copy +import pickle +import unittest + +from frequenz.core.sentinels import Sentinel + +sent1 = Sentinel("sent1") +sent2 = Sentinel("sent2", repr="test_sentinels.sent2") + + +class TestSentinel(unittest.TestCase): + """Test the `Sentinel` class.""" + + def setUp(self) -> None: + """Set up the test case.""" + self.sent_defined_in_function = Sentinel("defined_in_function") + + def test_identity(self) -> None: + """Test that the same sentinel object is returned for the same name.""" + for sent in sent1, sent2, self.sent_defined_in_function: + with self.subTest(sent=sent): + self.assertIs(sent, sent) + self.assertEqual(sent, sent) + + def test_uniqueness(self) -> None: + """Test that different names result in different sentinel objects.""" + self.assertIsNot(sent1, sent2) + self.assertNotEqual(sent1, sent2) + self.assertIsNot(sent1, None) + self.assertNotEqual(sent1, None) + self.assertIsNot(sent1, Ellipsis) + self.assertNotEqual(sent1, Ellipsis) + self.assertIsNot(sent1, "sent1") + self.assertNotEqual(sent1, "sent1") + self.assertIsNot(sent1, "") + self.assertNotEqual(sent1, "") + + def test_same_object_in_same_module(self) -> None: + """Test that the same sentinel object is returned for the same name in the same module.""" + copy1 = Sentinel("sent1") + self.assertIs(copy1, sent1) + copy2 = Sentinel("sent1") + self.assertIs(copy2, copy1) + + def test_same_object_fake_module(self) -> None: + """Test that the same sentinel object is returned for the same name in a fake module.""" + copy1 = Sentinel("FOO", module_name="i.dont.exist") + copy2 = Sentinel("FOO", module_name="i.dont.exist") + self.assertIs(copy1, copy2) + + def test_unique_in_different_modules(self) -> None: + """Test that different modules result in different sentinel objects.""" + other_module_sent1 = Sentinel("sent1", module_name="i.dont.exist") + self.assertIsNot(other_module_sent1, sent1) + + def test_repr(self) -> None: + """Test the `repr` of sentinel objects.""" + self.assertEqual(repr(sent1), "") + self.assertEqual(repr(sent2), "test_sentinels.sent2") + + def test_type(self) -> None: + """Test the type of sentinel objects.""" + self.assertIsInstance(sent1, Sentinel) + self.assertIsInstance(sent2, Sentinel) + + def test_copy(self) -> None: + """Test that `copy` and `deepcopy` return the same object.""" + self.assertIs(sent1, copy.copy(sent1)) + self.assertIs(sent1, copy.deepcopy(sent1)) + + def test_pickle_roundtrip(self) -> None: + """Test that pickling and unpickling returns the same object.""" + self.assertIs(sent1, pickle.loads(pickle.dumps(sent1))) + + def test_bool_value(self) -> None: + """Test that the sentinel object is falsy.""" + self.assertTrue(sent1) + self.assertTrue(Sentinel("I_AM_TRUTHY")) + self.assertFalse(Sentinel("I_AM_FALSY", bool_value=False)) + + def test_automatic_module_name(self) -> None: + """Test that the module name is inferred from the call stack.""" + self.assertIs( + Sentinel("sent1", module_name=__name__), + sent1, + ) + self.assertIs( + Sentinel("defined_in_function", module_name=__name__), + self.sent_defined_in_function, + ) + + def test_subclass(self) -> None: + """Test that subclassing `Sentinel` works as expected.""" + + class FalseySentinel(Sentinel): + """A sentinel object which is falsy.""" + + def __bool__(self) -> bool: + """Return `False`.""" + return False + + subclass_sent = FalseySentinel("FOO") + self.assertIs(subclass_sent, subclass_sent) + self.assertEqual(subclass_sent, subclass_sent) + self.assertFalse(subclass_sent) + non_subclass_sent = Sentinel("FOO") + self.assertIsNot(subclass_sent, non_subclass_sent) + self.assertNotEqual(subclass_sent, non_subclass_sent) + + +if __name__ == "__main__": + unittest.main()