Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "ratio1"
version = "3.2.25"
version = "3.4.112"
authors = [
{ name="Andrei Ionut Damian", email="andrei.damian@ratio1.ai" },
{ name="Cristan Bleotiu", email="cristian.bleotiu@ratio1.ai" },
Expand Down
104 changes: 71 additions & 33 deletions ratio1/base/generic_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from threading import Lock, Thread
from time import sleep
from time import time as tm
from typing import Optional, Callable, Dict, List, Any, Union

from ..base_decentra_object import BaseDecentrAIObject
from ..bc import DefaultBlockEngine, _DotDict, EE_VPN_IMPL
Expand Down Expand Up @@ -88,38 +89,40 @@ class GenericSession(BaseDecentrAIObject):

def __init__(
self, *,
host=None,
port=None,
user=None,
pwd=None,
secured=None,
subtopic=None,
name=None,
encrypt_comms=True,
config={},
filter_workers=None,
log: Logger = None,
on_payload=None,
on_notification=None,
on_heartbeat=None,
debug_silent=True,
debug=1, # TODO: debug or verbosity - fix this
verbosity=1,
silent=False,
dotenv_path=None,
show_commands=False,
blockchain_config=BLOCKCHAIN_CONFIG,
bc_engine=None,
formatter_plugins_locations=['plugins.io_formatters'],
root_topic="naeural",
local_cache_base_folder=None,
local_cache_app_folder='_local_cache',
use_home_folder=True,
eth_enabled=True,
auto_configuration=True,
run_dauth=True,
debug_env=False,
evm_network=None,
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
pwd: Optional[str] = None,
secured: Optional[bool] = None,
subtopic: Optional[str] = None,
name: Optional[str] = None,
encrypt_comms: bool = True,
config: Optional[Dict[str, Any]] = None,
filter_workers: Optional[List[str]] = None,
log: Optional[Logger] = None,
on_payload: Optional[Callable[['GenericSession', str, str, str, str, dict], None]] = None,
on_notification: Optional[Callable[['GenericSession', str, dict], None]] = None,
on_heartbeat: Optional[Callable[['GenericSession', str, dict], None]] = None,
on_pipeline_create: Optional[Callable[['GenericSession', 'Pipeline'], None]] = None,
on_instance_create: Optional[Callable[['GenericSession', 'Pipeline', 'Instance'], None]] = None,
debug_silent: bool = True,
debug: Union[int, bool] = 1, # TODO: debug or verbosity - fix this
verbosity: int = 1,
silent: bool = False,
dotenv_path: Optional[str] = None,
show_commands: bool = False,
blockchain_config: Optional[Dict[str, Any]] = None,
bc_engine: Optional[DefaultBlockEngine] = None,
formatter_plugins_locations: Optional[List[str]] = None,
root_topic: str = "naeural",
local_cache_base_folder: Optional[str] = None,
local_cache_app_folder: str = '_local_cache',
use_home_folder: bool = True,
eth_enabled: bool = True,
auto_configuration: bool = True,
run_dauth: bool = True,
debug_env: bool = False,
evm_network: Optional[str] = None,
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -187,6 +190,18 @@ def __init__(
As arguments, it has a reference to this Session object, the node name and the heartbeat payload.
Defaults to None.

on_pipeline_create : Callable[[Session, Pipeline], None], optional
Callback that is called when a new pipeline is created or attached to.
As arguments, it has a reference to this Session object and the Pipeline object that was created.
This callback is useful for tracking pipeline lifecycle events.
Defaults to None.

on_instance_create : Callable[[Session, Pipeline, Instance], None], optional
Callback that is called when a new plugin instance is created or attached to.
As arguments, it has a reference to this Session object, the Pipeline object, and the Instance object that was created.
This callback is useful for tracking instance lifecycle events.
Defaults to None.

debug_silent : bool, optional
This flag will disable debug logs, set to 'False` for a more verbose log, by default True
Observation: Obsolete, will be removed
Expand Down Expand Up @@ -246,6 +261,12 @@ def __init__(
self.__at_least_a_netmon_received = False

# TODO: maybe read config from file?
if config is None:
config = {}
if blockchain_config is None:
blockchain_config = BLOCKCHAIN_CONFIG
if formatter_plugins_locations is None:
formatter_plugins_locations = ['plugins.io_formatters']
self._config = {**self.default_config, **config}


Expand Down Expand Up @@ -298,6 +319,8 @@ def __init__(
self.custom_on_payload = on_payload
self.custom_on_heartbeat = on_heartbeat
self.custom_on_notification = on_notification
self.custom_on_pipeline_create = on_pipeline_create
self.custom_on_instance_create = on_instance_create

self.own_pipelines = []

Expand Down Expand Up @@ -2214,6 +2237,11 @@ def create_pipeline(self, *,
**kwargs
)
self.own_pipelines.append(pipeline)

# NOTE: on_pipeline_create callback is now called when R1EN confirms the pipeline
# in Pipeline.__apply_staged_config(), not here when the local object is instantiated.
# This ensures the callback is triggered only after the pipeline is confirmed by the R1EN.

return pipeline

def get_addr_by_name(self, name):
Expand Down Expand Up @@ -2446,6 +2474,13 @@ def attach_to_pipeline(self, *,
pipeline._add_on_notification_callback(on_notification)

self.own_pipelines.append(pipeline)

# Call on_pipeline_create callback if provided (for attached pipelines)
if self.custom_on_pipeline_create is not None:
try:
self.custom_on_pipeline_create(self, pipeline)
except Exception as e:
self.log.P(f"Error in on_pipeline_create callback: {e}", color='r')

return pipeline

Expand Down Expand Up @@ -2541,7 +2576,10 @@ def create_or_attach_to_pipeline(self, *,
on_notification=on_notification,
**kwargs
)


# Note: on_pipeline_create callback is called:
# - For newly created pipelines: in Pipeline.__apply_staged_config() when R1EN confirms
# - For attached pipelines: immediately in attach_to_pipeline() since the pipeline already exists
return pipeline

def wait_for_transactions(self, transactions: list[Transaction]):
Expand Down
22 changes: 22 additions & 0 deletions ratio1/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ def __init_instance(
debug=debug,
)
self.lst_plugin_instances.append(instance)

# Call on_instance_create callback if provided (only for newly created instances, not attached ones)
if not is_attached and self.session is not None and hasattr(self.session, 'custom_on_instance_create'):
if self.session.custom_on_instance_create is not None:
try:
self.session.custom_on_instance_create(self.session, self, instance)
except Exception as e:
self.log.P(f"Error in on_instance_create callback: {e}", color='r')

return instance

def __init_plugins(self, plugins, is_attached):
Expand Down Expand Up @@ -648,9 +657,22 @@ def __apply_staged_config(self, verbose=False):
self.P("Deployed pipeline <{}> on <{}>".format(self.name, self.node_addr), color="g")
self.__was_last_operation_successful = True

# Check if this is the first confirmation (pipeline was just created)
# A newly created pipeline will have an empty config before applying staged config
is_newly_created = len(self.config) == 0

self.config = {**self.config, **self.__staged_config}
self.__staged_config = None

# Call on_pipeline_create callback if this is a newly created pipeline
# This ensures the callback is triggered when R1EN confirms the pipeline, not when the local object is instantiated
if is_newly_created and self.session is not None and hasattr(self.session, 'custom_on_pipeline_create'):
if self.session.custom_on_pipeline_create is not None:
try:
self.session.custom_on_pipeline_create(self.session, self)
except Exception as e:
self.log.P(f"Error in on_pipeline_create callback: {e}", color='r')

return

def __apply_staged_instances_config(self, verbose=False):
Expand Down