Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/48.breaking.0.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:meth:`RichComponent.get_manager` and :meth:`RichComponent.set_manager` are now class methods.
2 changes: 2 additions & 0 deletions changelog/48.breaking.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
:meth:`ComponentManager.register_component` and :meth:`ComponentManager.deregister_component` have been modified, and certain internal behaviour has changed.
More noticeably, :meth:`ComponentManager.deregister_component` now takes a component identifier rather than a full class.
2 changes: 2 additions & 0 deletions changelog/48.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The :class:`ComponentManager` instance is now immediately stored on a :class:`RichComponent` when it is registered.
This should reduce the number of times you need to manually provide a manager to :meth:`RichComponent.as_ui_component` etc.
18 changes: 12 additions & 6 deletions src/disnake_compass/api/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class RichComponent(typing.Protocol):

__slots__: typing.Sequence[str] = ()

def get_manager(self) -> ComponentManager:
@classmethod
def get_manager(cls) -> ComponentManager:
"""Get the manager that was responsible for parsing this component instance."""
...

def set_manager(self, manager: ComponentManager, /) -> None:
@classmethod
def set_manager(cls, manager: ComponentManager | None, /) -> None:
"""Set the manager that was responsible for parsing this component instance."""
...

Expand Down Expand Up @@ -262,6 +264,8 @@ def register_component(
self,
component_type: type[ComponentT],
/,
*,
identifier: str,
) -> type[ComponentT]:
r"""Register a component to this component manager.

Expand All @@ -272,7 +276,9 @@ def register_component(
----------
component_type
The component class to register.

identifier
The identifier with which to register this component class.

Returns
-------
:class:`type`\[:data:`.ComponentT`]
Expand All @@ -281,16 +287,16 @@ def register_component(
"""
...

def deregister_component(self, component_type: type[RichComponent], /) -> None:
def deregister_component(self, identifier: str, /) -> None:
"""Deregister a component from this component manager.

After deregistration, the component will no be tracked, and its
callbacks can no longer fire until it is re-registered.

Parameters
----------
component_type
The component class to deregister.
identifier
The identifier of the component class to deregister.

Returns
-------
Expand Down
20 changes: 13 additions & 7 deletions src/disnake_compass/impl/component/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ def __new__( # noqa: PYI034
# here, but this is a false-positive, as the only non-method
# member is __slots__.
assert issubclass(cls, component_api.RichComponent) # pyright: ignore[reportGeneralTypeIssues]
factory_cls: type[component_api.ComponentFactory[typing.Any]] = (
factory_cls = (
factory_impl.NoopFactory
if typing_extensions.is_protocol(cls)
else factory_impl.ComponentFactory
else factory_impl.ComponentFactory[typing.Any]
)

cls.set_factory(factory_cls.from_component(cls))
Expand All @@ -208,17 +208,23 @@ class ComponentBase(component_api.RichComponent, typing.Protocol, metaclass=Comp
"""Overarching base class for any kind of component."""

_factory: typing.ClassVar[component_api.ComponentFactory[typing_extensions.Self]]
_manager: component_api.ComponentManager = fields.meta()
_manager: typing.ClassVar[component_api.ComponentManager | None] = None

def get_manager(self) -> component_api.ComponentManager: # noqa: D102
@classmethod
def get_manager(cls) -> component_api.ComponentManager: # noqa: D102
# <<Docstring inherited from component_api.RichComponent>>

return self._manager
if cls._manager is None:
msg = f"Component {cls.__qualname__} is not yet registered to a manager."
raise RuntimeError(msg)

return cls._manager

def set_manager(self, manager: component_api.ComponentManager) -> None: # noqa: D102
@classmethod
def set_manager(cls, manager: component_api.ComponentManager | None) -> None: # noqa: D102
# <<Docstring inherited from component_api.RichComponent>>

self._manager = manager
cls._manager = manager

@classmethod
def get_factory(cls) -> component_api.ComponentFactory[typing_extensions.Self]:
Expand Down
151 changes: 87 additions & 64 deletions src/disnake_compass/impl/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_ROOT = sys.intern("root")
_COMPONENT_EVENT = sys.intern("on_message_interaction")
_MODAL_EVENT = sys.intern("on_modal_submit")
_IS_COMPONENTS_V2 = 1 << 15

_COMPONENT_CTX: contextvars.ContextVar[tuple[component_api.RichComponent, str]] = (
contextvars.ContextVar("_COMPONENT_CTX")
Expand Down Expand Up @@ -291,6 +292,7 @@ class ComponentManager(component_api.ComponentManager):
_count: bool | None
_counter: int
_identifiers: dict[str, str]
# TODO: Refactor module data to go somewhere else now that only the root manager is aware of it.
_module_data: dict[str, _ModuleData]
_name: str
_registrars: weakref.WeakValueDictionary[str, ComponentManager]
Expand Down Expand Up @@ -413,16 +415,21 @@ def sep(self) -> str:
return _recurse_parents_getattr(self, "_sep", _DEFAULT_SEP)

@property
def parent(self) -> ComponentManager | None: # noqa: D102
def parent(self) -> component_api.ComponentManager | None: # noqa: D102
# <<docstring inherited from api.components.ComponentManager>>

if "." not in self.name:
# Return the root manager if this is not the root manager already.
return None if self.name is _ROOT else get_manager(_ROOT)
return None if self.is_root else get_manager(_ROOT)

root, _ = self.name.rsplit(".", 1)
return get_manager(root)

@property
def is_root(self) -> bool:
"""Whether this manager is the root manager."""
return self.name is _ROOT

def config(
self,
*,
Expand Down Expand Up @@ -511,7 +518,7 @@ async def _parse_raw_component(
# Since we do not want to fire components that (to the user)
# do not exist anymore, we should remove them from the
# manager and return None.
self.deregister_component(component_type)
self.deregister_component(identifier)
return None, None

component_params = {
Expand Down Expand Up @@ -592,13 +599,18 @@ async def parse_message_components(
on the nested structure.

""" # noqa: E501
if message.flags.value & _IS_COMPONENTS_V2:
msg = "parse_message_components does not yet work with components v2."
raise RuntimeError(msg)

new_rows: list[list[MessageComponents]] = []
rich_components: list[component_api.RichComponent] = []

current_component, current_component_id = _COMPONENT_CTX.get((None, None))
should_test = current_component is not None

for row in message.components:
assert isinstance(row, disnake.ActionRow)
new_row: list[MessageComponents] = []
new_rows.append(new_row)

Expand Down Expand Up @@ -735,57 +747,79 @@ def register_component(

"""
resolved_identifier = identifier or self.make_identifier(component_type)
module_data = _ModuleData.from_object(component_type)

root_manager = get_manager(_ROOT)
if self.is_root:
module_data = _ModuleData.from_object(component_type)

if resolved_identifier in self._components:
# NOTE:
# This occurs when a component is registered while another
# component with the same identifier already exists.
#
# We now have two options:
# - This is caused by a reload. In this case, we expect the
# module name to remain unchanged and the module id to have
# changed. We can safely overwrite the old component.
# - This is an actual user error. If we were to silently
# overwrite the old component, it would unexpectedly go
# unresponsive. Instead, we raise an exception to the user.
old_module_data = self._module_data[resolved_identifier]
if not module_data.is_reload_of(old_module_data):
message = (
"Cannot register component with duplicate identifier"
f" {identifier!r}. (Original defined in module"
f" {old_module_data.name!r}, duplicate defined in"
f" module {module_data.name!r})"
)
raise RuntimeError(message)

if resolved_identifier in root_manager._components: # noqa: SLF001
# NOTE: This occurs when a component is registered while another
# component with the same identifier already exists.
#
# We now have two options:
# - This is caused by a reload. In this case, we expect the
# module name to remain unchanged and the module id to have
# changed. We can safely overwrite the old component.
# - This is an actual user error. If we were to silently
# overwrite the old component, it would unexpectedly go
# unresponsive. Instead, we raise an exception to the user.
old_module_data = root_manager._module_data[resolved_identifier] # noqa: SLF001
if not module_data.is_reload_of(old_module_data):
message = (
"Cannot register component with duplicate identifier"
f" {identifier!r}. (Original defined in module"
f" {old_module_data.name!r}, duplicate defined in module"
f" {module_data.name!r})"
)
raise RuntimeError(message)
# TODO: Pre-emptively remove all components that were registered
# to the module that just went out-of-scope.

# Register to current manager and all parent managers.
for manager in _recurse_parents(self):
manager._components[resolved_identifier] = component_type # noqa: SLF001
manager._identifiers[component_type.__name__] = resolved_identifier # noqa: SLF001
manager._module_data[resolved_identifier] = module_data # noqa: SLF001
manager._registrars[resolved_identifier] = self # noqa: SLF001
self._module_data[resolved_identifier] = module_data

# Register to current manager and all parent managers.
# for manager in _recurse_parents(self):
self._components[resolved_identifier] = component_type
self._identifiers[component_type.__name__] = resolved_identifier
self._registrars[resolved_identifier] = self

if self.parent:
self.parent.register_component(component_type, identifier=resolved_identifier)

# This is somewhat wasteful as every parent class also calls set_manager,
# but I'm not sure if we can make this better.
# Bottom-line is, we don't spend much time registering components anyway.
component_type.set_manager(self)
return component_type

def deregister_component(self, component_type: RichComponentType, /) -> None: # noqa: D102
def deregister_component(self, identifier: str, /) -> None: # noqa: D102
# <<docstring inherited from api.components.ComponentManager>>

identifier = self.make_identifier(component_type)
registrar = self._registrars.get(identifier)
if identifier not in self.components:
msg = f"This manager is not aware of a component with identifier {identifier!r}."
raise LookupError(msg)

component_type = self.components[identifier]
registrar = component_type.get_manager()

if not registrar:
message = (
f"Component {component_type.__name__!r} is not registered to a component manager."
)
raise TypeError(message)
# Always start deregistering from the registrar down to root.
if registrar is not self:
registrar.deregister_component(identifier)
component_type.set_manager(None)
return

# Deregister from the current manager and all parent managers.
for manager in _recurse_parents(registrar):
del manager._components[identifier] # noqa: SLF001
del manager._module_data[identifier] # noqa: SLF001
del manager._registrars[identifier] # noqa: SLF001
del self._components[identifier]
del self._registrars[identifier]

if self.parent:
# Again, goofy but for the time being this works, i guess.
component_type.set_manager(self.parent)
self.parent.deregister_component(identifier)
else:
# Only root has module data now.
del self._module_data[identifier]

def add_to_client(self, client: disnake.Client, /) -> None: # noqa: D102
# <<docstring inherited from api.components.ComponentManager>>
Expand Down Expand Up @@ -1001,26 +1035,20 @@ async def _invoke_component(
# defined but we need the extra check for type-safety.
return

# Set the registrar for this component as the manager that invoked it.
registrar = self._registrars[identifier]
component.set_manager(registrar)

# We traverse the managers in reverse: root first, then child, etc.
# until we reach the component's actual manager. Therefore, we first
# store all managers in a list, so that we can call reversed() on it
# later.
# This applies only to the callback wrappers. Error handlers are called
# starting from the actual manager and propagated down to the root
# manager if the error was left unhandled.
managers = list(_recurse_parents(registrar))
# Store all managers that are aware of the invoked component in a list
# to be able to loop over them later.
manager = component.get_manager()
assert isinstance(manager, ComponentManager)
managers = list(_recurse_parents(manager))

assert interaction.component.custom_id
ctx_value = (component, interaction.component.custom_id)
component_ctx_token = _COMPONENT_CTX.set(ctx_value)

try:
async with contextlib.AsyncExitStack() as stack:
# Enter all the context managers...
# Before invocation, we wrap the callback in all parents'
# callback wrappers from root to the registrar.
for manager in reversed(managers):
await stack.enter_async_context(
manager.wrap_callback(manager, component, interaction),
Expand All @@ -1033,15 +1061,10 @@ async def _invoke_component(
# Blanket exception catching is desired here as it's meant to
# redirect all non-system errors to the error handler.

# Call all error handlers in order from registrar to root.
# Short-circuit if any handler returns True.
for manager in managers:
if await manager.handle_exception(
manager,
component,
interaction,
exception,
):
# If an error handler returns True, consider the error
# handled and skip the remaining handlers.
if await manager.handle_exception(manager, component, interaction, exception):
break

finally:
Expand Down
Loading
Loading