From 60eeb47a998de6f4218f2298eea0ffc6ed4e4bc5 Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Thu, 15 May 2025 14:55:07 +0530 Subject: [PATCH 1/3] Enhance Collaborator module with improved error handling and documentation. Refactor the `run` method to include detailed error logging and a new `_execute_collaborator_rounds` method for better task management. Update the `start_` function to handle exceptions during collaborator initialization and execution, ensuring critical errors are logged and communicated to the user. Additionally, improve type hints and docstrings for better code clarity and maintainability. Signed-off-by: Rahul Garg --- openfl/.DS_Store | Bin 0 -> 6148 bytes openfl/component/collaborator/collaborator.py | 226 ++++++++++-------- openfl/interface/collaborator.py | 36 +-- 3 files changed, 154 insertions(+), 108 deletions(-) create mode 100644 openfl/.DS_Store diff --git a/openfl/.DS_Store b/openfl/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..f9d3a062e959ef2cc4383acbd07e3d2ad1006f1b GIT binary patch literal 6148 zcmeHK%}T>S5T0$TO(;SS3OxqA7OZU%#Y?F51&ruHr6wk7FlMDq?V%KM))(?gd>&_Z zH)3f8Pa<{(X203_$+BOIgHjwXZp=8;S@KTe~eDu}}fQ!dZrG?bH`9H-$()p|N$*_J)1H>T5e zXTRxmyPa9nnI5!TXm|EzGuztQ-Z?xOJS2~)e9?3Y{PL7+7%bowjU_vK_J(O9(>sh7 zl|=<3Gr$Zm1FOP-IsdHMs%)6|$qX<9KW2c=2M3kVHJEEuM+Xk{`$+K$Aqm=ammqWu zx(0KN7(o%b6j7H7bHxz49Q}^Ta}DMibvX#NGCs$wEX)l>sMXQ$sB{pnMjn{~W?-Iy zvYytd{~v#U|DP}788g5P{3`}TrSJ87xFyqD7dA(|) None: + """ + Run the collaborator main loop. + + Handles experiment lifecycle, round execution, and error logging. + """ + try: + self.callbacks.on_experiment_begin() + self._execute_collaborator_rounds() + self.callbacks.on_experiment_end() + logger.info("Received shutdown signal. Exiting...") + except Exception as experiment_error: + logger.critical( + f"Critical error in collaborator execution. Error: {experiment_error}", + exc_info=True + ) + self.callbacks.on_experiment_end({"error": str(experiment_error)}) + logger.critical("Collaborator is shutting down due to critical error.") + raise RuntimeError("Collaborator execution failed") from experiment_error + + def _execute_collaborator_rounds(self) -> None: + """ + Execute rounds until a shutdown signal is received. + Each round consists of receiving tasks, executing them, and reporting results. + If any task fails, the round is aborted and the error is logged. + """ while True: tasks, round_num, sleep_time, time_to_quit = self.client.get_tasks() - if time_to_quit: break - if not tasks: sleep(sleep_time) continue + try: + logger.info("Round: %d Received Tasks: %s", round_num, tasks) + self.callbacks.on_round_begin(round_num) + logs = self._execute_round_tasks(tasks, round_num) + self.tensor_db.clean_up(self.db_store_rounds) + self.callbacks.on_round_end(round_num, logs) + except Exception as round_error: + logger.error( + f"Error during round {round_num} execution. Error: {round_error}", + exc_info=True + ) + sleep(sleep_time or 10) - # Round begin - logger.info("Round: %d Received Tasks: %s", round_num, tasks) - self.callbacks.on_round_begin(round_num) + def _execute_round_tasks(self, tasks: list, round_number: int) -> dict: + """ + Execute all tasks in a round. - # Run tasks - logs = {} - for task in tasks: - metrics = self.do_task(task, round_num) - logs.update(metrics) + Args: + tasks: List of tasks to execute. + round_number: Current round number. - # Round end - self.tensor_db.clean_up(self.db_store_rounds) - self.callbacks.on_round_end(round_num, logs) + Returns: + Dictionary of logs/metrics from task execution. - # Experiment end - self.callbacks.on_experiment_end() - logger.info("Received shutdown signal. Exiting...") + Raises: + Exception: If any task execution fails, aborts the round. + """ + logs = {} + for task in tasks: + metrics = self.do_task(task, round_number) + logs.update(metrics) + return logs def do_task(self, task, round_number) -> dict: """Perform the specified task. @@ -270,97 +304,101 @@ def do_task(self, task, round_number) -> dict: return metrics - def get_data_for_tensorkey(self, tensor_key): - """Resolve the tensor corresponding to the requested tensorkey. + def get_data_for_tensorkey(self, tensor_key) -> object: + """ + Resolve and return the tensor for the requested TensorKey. + + This function checks the local cache, previous rounds, and the aggregator as needed. Args: - tensor_key (namedtuple): Tensorkey that will be resolved locally or - remotely. May be the product of other tensors. + tensor_key: The TensorKey to resolve. Returns: - nparray: The decompressed tensor associated with the requested - tensor key. + The decompressed tensor associated with the requested tensor key. + + Raises: + Exception: If the tensor cannot be retrieved or reconstructed. """ - # try to get from the store tensor_name, origin, round_number, report, tags = tensor_key logger.debug("Attempting to retrieve tensor %s from local store", tensor_key) - nparray = self.tensor_db.get_tensor_from_cache(tensor_key) - - # if None and origin is our client, request it from the client - if nparray is None: - if origin == self.collaborator_name: - logger.info( - f"Attempting to find locally stored {tensor_name} tensor from prior round..." - ) - prior_round = round_number - 1 - while prior_round >= 0: - nparray = self.tensor_db.get_tensor_from_cache( - TensorKey(tensor_name, origin, prior_round, report, tags) + try: + nparray = self.tensor_db.get_tensor_from_cache(tensor_key) + if nparray is None: + if origin == self.collaborator_name: + logger.info( + f"Attempting to find locally stored {tensor_name} tensor from prior round..." ) - if nparray is not None: - logger.debug( - f"Found tensor {tensor_name} in local TensorDB for round {prior_round}" + prior_round = round_number - 1 + while prior_round >= 0: + nparray = self.tensor_db.get_tensor_from_cache( + TensorKey(tensor_name, origin, prior_round, report, tags) ) - return nparray - prior_round -= 1 - logger.info(f"Cannot find any prior version of tensor {tensor_name} locally...") - # Determine whether there are additional compression related - # dependencies. - # Typically, dependencies are only relevant to model layers - tensor_dependencies = self.tensor_codec.find_dependencies( - tensor_key, self.use_delta_updates - ) - logger.debug( - "Unable to get tensor from local store..." - "attempting to retrieve from client len tensor_dependencies" - f" tensor_key {tensor_key}" - ) - if len(tensor_dependencies) > 0: - # Resolve dependencies - # tensor_dependencies[0] corresponds to the prior version - # of the model. - # If it exists locally, should pull the remote delta because - # this is the least costly path - prior_model_layer = self.tensor_db.get_tensor_from_cache(tensor_dependencies[0]) - if prior_model_layer is not None: - uncompressed_delta = self.get_aggregated_tensor_from_aggregator( - tensor_dependencies[1] - ) - new_model_tk, nparray = self.tensor_codec.apply_delta( - tensor_dependencies[1], - uncompressed_delta, - prior_model_layer, - creates_model=True, + if nparray is not None: + logger.debug( + f"Found tensor {tensor_name} in local TensorDB for round {prior_round}" + ) + return nparray + prior_round -= 1 + logger.info(f"Cannot find any prior version of tensor {tensor_name} locally...") + # Determine whether there are additional compression related + # dependencies. + # Typically, dependencies are only relevant to model layers + tensor_dependencies = self.tensor_codec.find_dependencies( + tensor_key, self.use_delta_updates + ) + logger.debug( + "Unable to get tensor from local store..." + "attempting to retrieve from client len tensor_dependencies" + f" tensor_key {tensor_key}" + ) + if len(tensor_dependencies) > 0: + # Resolve dependencies + # tensor_dependencies[0] corresponds to the prior version + # of the model. + # If it exists locally, should pull the remote delta because + # this is the least costly path + prior_model_layer = self.tensor_db.get_tensor_from_cache(tensor_dependencies[0]) + if prior_model_layer is not None: + uncompressed_delta = self.get_aggregated_tensor_from_aggregator( + tensor_dependencies[1] + ) + new_model_tk, nparray = self.tensor_codec.apply_delta( + tensor_dependencies[1], + uncompressed_delta, + prior_model_layer, + creates_model=True, + ) + self.tensor_db.cache_tensor({new_model_tk: nparray}) + else: + logger.info( + "Could not find previous model layer. Fetching latest layer from aggregator" + ) + nparray = self.get_aggregated_tensor_from_aggregator( + tensor_key, require_lossless=True + ) + elif "model" in tags: + nparray = self.get_aggregated_tensor_from_aggregator( + tensor_key, require_lossless=True ) - self.tensor_db.cache_tensor({new_model_tk: nparray}) else: + tensor_name, origin, round_number, report, tags = tensor_key + tags = (self.collaborator_name,) + tags + tensor_key = (tensor_name, origin, round_number, report, tags) logger.info( - "Could not find previous model layer.Fetching latest layer from aggregator" + "Could not find previous model layer." + f"Fetching latest layer from aggregator {tensor_key}" ) - # The original model tensor should be fetched from aggregator nparray = self.get_aggregated_tensor_from_aggregator( tensor_key, require_lossless=True ) - elif "model" in tags: - # Pulling the model for the first time - nparray = self.get_aggregated_tensor_from_aggregator( - tensor_key, require_lossless=True - ) else: - # we should try fetching the tensor from aggregator - tensor_name, origin, round_number, report, tags = tensor_key - tags = (self.collaborator_name,) + tags - tensor_key = (tensor_name, origin, round_number, report, tags) - logger.info( - "Could not find previous model layer." - f"Fetching latest layer from aggregator {tensor_key}" - ) - nparray = self.get_aggregated_tensor_from_aggregator( - tensor_key, require_lossless=True - ) - else: - logger.debug("Found tensor %s in local TensorDB", tensor_key) - + logger.debug("Found tensor %s in local TensorDB", tensor_key) + except Exception as get_tensor_error: + logger.error( + f"Error retrieving tensor {tensor_key}. Error: {get_tensor_error}", + exc_info=True + ) + raise return nparray def get_aggregated_tensor_from_aggregator(self, tensor_key, require_lossless=False): diff --git a/openfl/interface/collaborator.py b/openfl/interface/collaborator.py index fe774a216b..7318152475 100644 --- a/openfl/interface/collaborator.py +++ b/openfl/interface/collaborator.py @@ -62,26 +62,34 @@ def collaborator(context): help="The certified common name of the collaborator.", ) def start_(plan, collaborator_name, data_config): - """Starts a collaborator service.""" + """ + Starts a collaborator service. + + Args: + plan: Path to the FL plan YAML file. + collaborator_name: The certified common name of the collaborator. + data_config: Path to the dataset shard configuration file. + """ if plan and is_directory_traversal(plan): echo("Federated learning plan path is out of the openfl workspace scope.") sys.exit(1) if data_config and is_directory_traversal(data_config): echo("The data set/shard configuration file path is out of the openfl workspace scope.") sys.exit(1) - - plan_obj = Plan.parse( - plan_config_path=Path(plan).absolute(), - data_config_path=Path(data_config).absolute(), - ) - - # TODO: Need to restructure data loader config file loader - logger.info(f"Data paths: {plan_obj.cols_data_paths}") - echo(f"Data = {plan_obj.cols_data_paths}") - logger.info("🧿 Starting a Collaborator Service.") - - collaborator = plan_obj.get_collaborator(collaborator_name) - collaborator.run() + try: + plan_obj = Plan.parse( + plan_config_path=Path(plan).absolute(), + data_config_path=Path(data_config).absolute(), + ) + logger.info(f"Data paths: {plan_obj.cols_data_paths}") + echo(f"Data = {plan_obj.cols_data_paths}") + logger.info("🧿 Starting a Collaborator Service.") + collaborator = plan_obj.get_collaborator(collaborator_name) + collaborator.run() + except Exception as e: + logger.critical(f"Critical error starting or running collaborator: {e}", exc_info=True) + echo(style(f"Collaborator failed with error: {e}", fg="red")) + sys.exit(1) @collaborator.command(name="ping") From dc6d3a86c4f463eb1629452fd78972f1a67af03b Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Thu, 15 May 2025 15:04:32 +0530 Subject: [PATCH 2/3] Improve error logging in Collaborator class by ensuring consistent formatting of log messages. This change enhances the clarity of critical error messages and maintains a uniform style across the logging functionality. Signed-off-by: Rahul Garg --- openfl/component/collaborator/collaborator.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 77da34afd9..008962bbe9 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -171,7 +171,7 @@ def run(self) -> None: except Exception as experiment_error: logger.critical( f"Critical error in collaborator execution. Error: {experiment_error}", - exc_info=True + exc_info=True, ) self.callbacks.on_experiment_end({"error": str(experiment_error)}) logger.critical("Collaborator is shutting down due to critical error.") @@ -199,8 +199,7 @@ def _execute_collaborator_rounds(self) -> None: self.callbacks.on_round_end(round_num, logs) except Exception as round_error: logger.error( - f"Error during round {round_num} execution. Error: {round_error}", - exc_info=True + f"Error during round {round_num} execution. Error: {round_error}", exc_info=True ) sleep(sleep_time or 10) @@ -326,7 +325,8 @@ def get_data_for_tensorkey(self, tensor_key) -> object: if nparray is None: if origin == self.collaborator_name: logger.info( - f"Attempting to find locally stored {tensor_name} tensor from prior round..." + f"Attempting to find locally stored {tensor_name} " + f"tensor from prior round..." ) prior_round = round_number - 1 while prior_round >= 0: @@ -335,7 +335,8 @@ def get_data_for_tensorkey(self, tensor_key) -> object: ) if nparray is not None: logger.debug( - f"Found tensor {tensor_name} in local TensorDB for round {prior_round}" + f"Found tensor {tensor_name} in local TensorDB " + f"for round {prior_round}" ) return nparray prior_round -= 1 @@ -371,7 +372,8 @@ def get_data_for_tensorkey(self, tensor_key) -> object: self.tensor_db.cache_tensor({new_model_tk: nparray}) else: logger.info( - "Could not find previous model layer. Fetching latest layer from aggregator" + "Could not find previous model layer. " + "Fetching latest layer from aggregator" ) nparray = self.get_aggregated_tensor_from_aggregator( tensor_key, require_lossless=True @@ -395,8 +397,7 @@ def get_data_for_tensorkey(self, tensor_key) -> object: logger.debug("Found tensor %s in local TensorDB", tensor_key) except Exception as get_tensor_error: logger.error( - f"Error retrieving tensor {tensor_key}. Error: {get_tensor_error}", - exc_info=True + f"Error retrieving tensor {tensor_key}. Error: {get_tensor_error}", exc_info=True ) raise return nparray From da6d1201733c413e961b31586c3a2797211ad9ee Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Thu, 15 May 2025 15:06:53 +0530 Subject: [PATCH 3/3] .DS_Store banished! --- .gitignore | 1 + openfl/.DS_Store | Bin 6148 -> 0 bytes 2 files changed, 1 insertion(+) delete mode 100644 openfl/.DS_Store diff --git a/.gitignore b/.gitignore index e13041af29..4398b46be1 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ results/* **MNIST/ **cert/ .history/ +.DS_Store diff --git a/openfl/.DS_Store b/openfl/.DS_Store deleted file mode 100644 index f9d3a062e959ef2cc4383acbd07e3d2ad1006f1b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHK%}T>S5T0$TO(;SS3OxqA7OZU%#Y?F51&ruHr6wk7FlMDq?V%KM))(?gd>&_Z zH)3f8Pa<{(X203_$+BOIgHjwXZp=8;S@KTe~eDu}}fQ!dZrG?bH`9H-$()p|N$*_J)1H>T5e zXTRxmyPa9nnI5!TXm|EzGuztQ-Z?xOJS2~)e9?3Y{PL7+7%bowjU_vK_J(O9(>sh7 zl|=<3Gr$Zm1FOP-IsdHMs%)6|$qX<9KW2c=2M3kVHJEEuM+Xk{`$+K$Aqm=ammqWu zx(0KN7(o%b6j7H7bHxz49Q}^Ta}DMibvX#NGCs$wEX)l>sMXQ$sB{pnMjn{~W?-Iy zvYytd{~v#U|DP}788g5P{3`}TrSJ87xFyqD7dA(|)