@@ -100,6 +100,8 @@ def streaming_task(product_url: str, trusted_domains: list[str], auth: str, buck
100100 Raises:
101101 ValueError: If the streaming process fails, raises a ValueError with details of the failure.
102102 """
103+ logger = Logging .default (__name__ )
104+ logger .critical (f"start streaming_task" )
103105
104106 try :
105107 s3_handler = S3StorageHandler (
@@ -115,6 +117,7 @@ def streaming_task(product_url: str, trusted_domains: list[str], auth: str, buck
115117 ) from e
116118 except KeyError as exc :
117119 raise ValueError (f"Cannot create s3 connector object. Reason: { exc } " ) from exc
120+ logger .critical (f"end streaming_task" )
118121 return s3_file
119122
120123
@@ -211,7 +214,7 @@ def __init__(
211214 self .catalog_bucket = os .environ .get ("RSPY_CATALOG_BUCKET" , "rs-cluster-catalog" )
212215
213216 # Override from BaseProcessor, execute is async in RSPYProcessor
214- async def execute (
217+ def execute (
215218 self ,
216219 data : dict ,
217220 outputs : dict | None = None , # pylint: disable=unused-argument
@@ -270,7 +273,7 @@ async def execute(
270273 )
271274
272275 # Execution section
273- if not await self .check_catalog (catalog_collection , item_collection .features ):
276+ if not self .check_catalog (catalog_collection , item_collection .features ):
274277 return self .log_job_execution (
275278 JobStatus .failed ,
276279 0 ,
@@ -357,7 +360,7 @@ def log_job_execution(
357360 self .db_process_manager .update_job (self .job_id , update_data )
358361 return self ._get_execute_result ()
359362
360- async def check_catalog (self , catalog_collection : str , features : list [Feature ]) -> bool :
363+ def check_catalog (self , catalog_collection : str , features : list [Feature ]) -> bool :
361364 """
362365 Method used to check RSPY catalog if a feature from input_collection is already published.
363366
@@ -569,8 +572,10 @@ def manage_dask_tasks_results(self, client: Client, catalog_collection: str):
569572 catalog_collection (str): Name of the catalog collection.
570573 """
571574 self .logger .info ("Tasks monitoring started" )
575+ self .logger .critical ("start manage_dask_tasks_results" )
572576 if not client :
573577 self .logger .error ("The dask cluster client object is not created. Exiting" )
578+ self .logger .critical ("end manage_dask_tasks_results 1" )
574579 return
575580 for task in as_completed (self .tasks ):
576581 try :
@@ -598,6 +603,7 @@ def manage_dask_tasks_results(self, client: Client, catalog_collection: str):
598603 self .log_job_execution (JobStatus .failed , None , f"At least one of the tasks failed: { task_e } " )
599604 self .delete_files_from_bucket ()
600605 self .logger .error (f"Tasks monitoring finished with error. At least one of the tasks failed: { task_e } " )
606+ self .logger .critical ("end manage_dask_tasks_results 2" )
601607 return
602608 # Publish all the features once processed
603609 published_featurs_ids : list [str ] = []
@@ -613,11 +619,13 @@ def manage_dask_tasks_results(self, client: Client, catalog_collection: str):
613619 self .delete_files_from_bucket ()
614620 # delete the published items
615621 self .unpublish_rspy_features (catalog_collection , published_featurs_ids )
622+ self .logger .critical ("end manage_dask_tasks_results 3" )
616623 return
617624 published_featurs_ids .append (feature .id )
618625 # Update status once all features are processed
619626 self .log_job_execution (JobStatus .successful , 100 , "Finished" )
620627 self .logger .info ("Tasks monitoring finished" )
628+ self .logger .critical ("end manage_dask_tasks_results 5" )
621629
622630 def dask_cluster_connect (self ) -> Client :
623631 """Connects a dask cluster scheduler
@@ -706,6 +714,8 @@ def dask_cluster_connect(self) -> Client:
706714 clusters = gateway .list_clusters ()
707715 self .logger .debug (f"The list of clusters: { clusters } " )
708716
717+ self .logger .critical (f"Cluster names: { [cluster .options .get ('cluster_name' ) for cluster in clusters ]} " )
718+
709719 # In local mode, get the first cluster from the gateway.
710720 cluster_id = None
711721 if local_mode :
@@ -727,7 +737,11 @@ def dask_cluster_connect(self) -> Client:
727737 if not cluster_id :
728738 raise IndexError (f"No dask cluster named '{ cluster_name } ' was found." )
729739
740+ self .logger .critical (
741+ f"Connect to gateway { os .environ ['DASK_GATEWAY__ADDRESS' ]} cluster_id: { cluster_id } " ,
742+ )
730743 self .cluster = gateway .connect (cluster_id )
744+ self .logger .critical (f"Connected: { self .cluster } " )
731745
732746 self .logger .info (f"Successfully connected to the { cluster_name } dask cluster" )
733747 except KeyError as e :
@@ -803,6 +817,9 @@ def submit_tasks_to_dask_cluster(self, token: str, trusted_domains: list[str], c
803817 """
804818 # empty the list
805819 self .tasks = []
820+
821+ self .logger .critical (f"start submit_tasks_to_dask_cluster" )
822+
806823 # Submit tasks
807824 try :
808825 for asset_info in self .assets_info :
@@ -820,6 +837,8 @@ def submit_tasks_to_dask_cluster(self, token: str, trusted_domains: list[str], c
820837 self .logger .exception (f"Submitting task to dask cluster failed. Reason: { e } " )
821838 raise RuntimeError (f"Submitting task to dask cluster failed. Reason: { e } " ) from e
822839
840+ self .logger .critical (f"end submit_tasks_to_dask_cluster" )
841+
823842 async def process_rspy_features (self , catalog_collection : str ) -> tuple [str , dict ]:
824843 """
825844 Method used to trigger dask distributed streaming process.
@@ -836,19 +855,23 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
836855 Example: ("application/json", {"running": <job_id>})
837856 """
838857 self .logger .debug ("Starting main loop" )
858+ self .logger .critical (f"process_rspy_features" )
839859
840860 # Process each feature by initiating the streaming download of its assets to the final bucket.
841861 for feature in self .stream_list :
842862 if not self .prepare_streaming_tasks (catalog_collection , feature ):
863+ self .logger .critical (f"process_rspy_features 1" )
843864 return self .log_job_execution (JobStatus .failed , 0 , "Unable to create tasks for the Dask cluster" )
844865 if not self .assets_info :
845866 self .logger .info ("There are no assets to stage. Exiting...." )
867+ self .logger .critical (f"process_rspy_features 2" )
846868 return self .log_job_execution (JobStatus .successful , 100 , "Finished without processing any tasks" )
847869
848870 # Determine the domain(s)
849871 domains = list ({urlparse (asset [0 ]).hostname for asset in self .assets_info })
850872 self .logger .info ("Staging from domain(s) {domains}" )
851873 if len (domains ) > 1 :
874+ self .logger .critical (f"process_rspy_features 3" )
852875 return self .log_job_execution (JobStatus .failed , 0 , "Staging from multiple domains is not supported yet" )
853876 domain = domains [0 ]
854877
@@ -860,6 +883,7 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
860883 self .logger .error (
861884 f"Failed to retrieve the token needed to connect to the external station: { http_exception } " ,
862885 )
886+ self .logger .critical (f"process_rspy_features 4" )
863887 return self .log_job_execution (
864888 JobStatus .failed ,
865889 0 ,
@@ -872,6 +896,7 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
872896 self .submit_tasks_to_dask_cluster (token , external_auth_config .trusted_domains , dask_client )
873897 except RuntimeError as re :
874898 self .logger .error ("Failed to start the staging process" )
899+ self .logger .critical (f"process_rspy_features 5" )
875900 return self .log_job_execution (JobStatus .failed , 0 , f"{ re } " )
876901
877902 # Set the status to running for the job
@@ -888,6 +913,7 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
888913 self .assets_info = []
889914 dask_client .close ()
890915
916+ self .logger .critical (f"process_rspy_features 6" )
891917 return self ._get_execute_result ()
892918
893919 def publish_rspy_feature (self , catalog_collection : str , feature : Feature ):
0 commit comments