Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions appdaemon/adapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,25 +321,25 @@ async def listen_log(
pin_thread: int | None = None,
**kwargs
) -> str:
"""Registers the App to receive a callback every time an App logs a message.
"""Register a callback for whenever an app logs a message.

Args:
callback (function): Function to be called when a message is logged.
callback: Function that will be called when a message is logged. It must conform to the standard event
callback format documented `here <APPGUIDE.html#event-callbacks>`__
level (str, optional): Minimum level for logs to trigger the callback. Lower levels will be ignored. Default
is ``INFO``.
namespace (str, optional): Namespace to use for the call. Defaults to ``admin`` for log callbacks. See the
`namespace documentation <APPGUIDE.html#namespaces>`__ for more information.
log (str, optional): Name of the log to listen to, default is all logs. The name should be one of the 4
built in types ``main_log``, ``error_log``, ``diag_log`` or ``access_log`` or a user defined log entry.
pin (bool, optional): Optional setting to override the default thread pinning behavior. By default, this is
effectively ``True``, and ``pin_thread`` gets set when the app starts.
pin_thread (int, optional): Specify which thread from the worker pool will run the callback. The threads
each have an ID number. The ID numbers start at 0 and go through (number of threads - 1).
**kwargs: Arbitrary keyword parameters to be provided to the callback function when it is triggered.
**kwargs (optional): One or more keyword arguments to supply to the callback.

Returns:
A unique identifier that can be used to cancel the callback if required. Since variables created within
object methods are local to the function they are created in, and in all likelihood, the cancellation will
be invoked later in a different function, it is recommended that handles are stored in the object
namespace, e.g., self.handle.
A handle that can be used to cancel the callback.

Examples:
Listen to all ``WARNING`` log messages of the system.
Expand All @@ -359,10 +359,14 @@ async def listen_log(
>>> self.handle = self.listen_log(self.cb, "WARNING", log="my_custom_log")

"""

return await self.AD.logging.add_log_callback(
namespace, self.name, callback, level,
log=log, pin=pin, pin_thread=pin_thread,
namespace=namespace,
name=self.name,
callback=callback,
level=level,
log=log,
pin=pin,
pin_thread=pin_thread,
**kwargs
)

Expand Down
2 changes: 2 additions & 0 deletions appdaemon/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ class PinOutofRange(AppDaemonException):
pin_thread: int
total_threads: int

def __str__(self):
return f"Pin thread {self.pin_thread} out of range. Must be between 0 and {self.total_threads - 1}"

@dataclass
class BadClassSignature(AppDaemonException):
Expand Down
143 changes: 68 additions & 75 deletions appdaemon/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import OrderedDict
from logging import Logger, StreamHandler
from logging.handlers import RotatingFileHandler
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union, overload
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import pytz

Expand Down Expand Up @@ -178,6 +178,7 @@ class Logging(metaclass=utils.Singleton):
AD: "AppDaemon"
"""Reference to the top-level AppDaemon container object
"""
name: str = "_logging"

config: Dict[str, Dict[str, Any]]

Expand Down Expand Up @@ -493,10 +494,16 @@ def is_alias(self, log):
return True
return False

@overload
async def add_log_callback(self, namespace: str, name: str, callback: Callable, level: str | int, pin: bool | None = None, pin_thread: int | None = None, **kwargs) -> list[str] | None: ...

async def add_log_callback(self, namespace: str, name: str, callback: Callable, level: str | int, **kwargs) -> list[str] | None:
async def add_log_callback(
self,
namespace: str,
name: str,
callback: Callable,
level: str | int,
pin: bool | None = None,
pin_thread: int | None = None,
**kwargs
) -> list[str] | None:
"""Adds a callback for log which is called internally by apps.

Args:
Expand All @@ -510,80 +517,66 @@ async def add_log_callback(self, namespace: str, name: str, callback: Callable,
``None`` or a list of the callback handles, 1 for each logging level above the one given

"""
if self.AD.threading.validate_pin(name, kwargs) is True:
if "pin" in kwargs:
pin_app = kwargs["pin"]
else:
pin_app = self.AD.app_management.objects[name].pin_app
pin_app, pin_thread =self.AD.threading.determine_thread(self.name, pin, pin_thread)

if "pin_thread" in kwargs:
pin_thread = kwargs["pin_thread"]
pin_app = True
else:
pin_thread = self.AD.app_management.objects[name].pin_thread

#
# Add the callback
#

async with self.AD.callbacks.callbacks_lock:
if name not in self.AD.callbacks.callbacks:
self.AD.callbacks.callbacks[name] = {}

# Add a separate callback for each log level
handles = []
for thislevel in self.log_levels:
if self.log_levels[thislevel] >= self.log_levels[level]:
handle = uuid.uuid4().hex
cb_kwargs = copy.deepcopy(kwargs)
cb_kwargs["level"] = thislevel
self.AD.callbacks.callbacks[name][handle] = {
"name": name,
"id": self.AD.app_management.objects[name].id,
"type": "log",
"function": callback,
"namespace": namespace,
"pin_app": pin_app,
"pin_thread": pin_thread,
"kwargs": cb_kwargs,
}

handles.append(handle)

#
# If we have a timeout parameter, add a scheduler entry to delete the callback later
#
if "timeout" in cb_kwargs:
exec_time = await self.AD.sched.get_now() + datetime.timedelta(seconds=int(kwargs["timeout"]))

cb_kwargs["__timeout"] = await self.AD.sched.insert_schedule(
name=name,
aware_dt=exec_time,
callback=None,
repeat=False,
type_=None,
__log_handle=handle,
)

await self.AD.state.add_entity(
"admin",
"log_callback.{}".format(handle),
"active",
{
"app": name,
"function": callback.__name__,
"pinned": pin_app,
"pinned_thread": pin_thread,
"fired": 0,
"executed": 0,
"kwargs": cb_kwargs,
},
#
# Add the callback
#
async with self.AD.callbacks.callbacks_lock:
if name not in self.AD.callbacks.callbacks:
self.AD.callbacks.callbacks[name] = {}

# Add a separate callback for each log level
handles = []
for thislevel in self.log_levels:
if self.log_levels[thislevel] >= self.log_levels[level]:
handle = uuid.uuid4().hex
cb_kwargs = copy.deepcopy(kwargs)
cb_kwargs["level"] = thislevel
self.AD.callbacks.callbacks[name][handle] = {
"name": name,
"id": self.AD.app_management.objects[name].id,
"type": "log",
"function": callback,
"namespace": namespace,
"pin_app": pin_app,
"pin_thread": pin_thread,
"kwargs": cb_kwargs,
}

handles.append(handle)

#
# If we have a timeout parameter, add a scheduler entry to delete the callback later
#
if "timeout" in cb_kwargs:
exec_time = await self.AD.sched.get_now() + datetime.timedelta(seconds=int(kwargs["timeout"]))

cb_kwargs["__timeout"] = await self.AD.sched.insert_schedule(
name=name,
aware_dt=exec_time,
callback=None,
repeat=False,
type_=None,
__log_handle=handle,
)

return handles
await self.AD.state.add_entity(
"admin",
f"log_callback.{handle}",
"active",
{
"app": name,
"function": callback.__name__,
"pinned": pin_app,
"pinned_thread": pin_thread,
"fired": 0,
"executed": 0,
"kwargs": cb_kwargs,
},
)

else:
return None
return handles

async def process_log_callbacks(self, namespace, log_data):
"""Process Log callbacks"""
Expand Down
Loading