@@ -100,8 +100,6 @@ def streaming_task(product_url: str, trusted_domains: list[str], auth: str, buck
100100 Raises:
101101 ValueError: If the streaming process fails, raises a ValueError with details of the failure.
102102 """
103- logger = Logging .default (__name__ )
104- logger .critical (f"start streaming_task" )
105103
106104 try :
107105 s3_handler = S3StorageHandler (
@@ -117,7 +115,6 @@ def streaming_task(product_url: str, trusted_domains: list[str], auth: str, buck
117115 ) from e
118116 except KeyError as exc :
119117 raise ValueError (f"Cannot create s3 connector object. Reason: { exc } " ) from exc
120- logger .critical (f"end streaming_task" )
121118 return s3_file
122119
123120
@@ -214,7 +211,7 @@ def __init__(
214211 self .catalog_bucket = os .environ .get ("RSPY_CATALOG_BUCKET" , "rs-cluster-catalog" )
215212
216213 # Override from BaseProcessor, execute is async in RSPYProcessor
217- def execute (
214+ async def execute (
218215 self ,
219216 data : dict ,
220217 outputs : dict | None = None , # pylint: disable=unused-argument
@@ -273,7 +270,7 @@ def execute(
273270 )
274271
275272 # Execution section
276- if not self .check_catalog (catalog_collection , item_collection .features ):
273+ if not await self .check_catalog (catalog_collection , item_collection .features ):
277274 return self .log_job_execution (
278275 JobStatus .failed ,
279276 0 ,
@@ -360,7 +357,7 @@ def log_job_execution(
360357 self .db_process_manager .update_job (self .job_id , update_data )
361358 return self ._get_execute_result ()
362359
363- def check_catalog (self , catalog_collection : str , features : list [Feature ]) -> bool :
360+ async def check_catalog (self , catalog_collection : str , features : list [Feature ]) -> bool :
364361 """
365362 Method used to check RSPY catalog if a feature from input_collection is already published.
366363
@@ -572,10 +569,8 @@ def manage_dask_tasks_results(self, client: Client, catalog_collection: str):
572569 catalog_collection (str): Name of the catalog collection.
573570 """
574571 self .logger .info ("Tasks monitoring started" )
575- self .logger .critical ("start manage_dask_tasks_results" )
576572 if not client :
577573 self .logger .error ("The dask cluster client object is not created. Exiting" )
578- self .logger .critical ("end manage_dask_tasks_results 1" )
579574 return
580575 for task in as_completed (self .tasks ):
581576 try :
@@ -603,7 +598,6 @@ def manage_dask_tasks_results(self, client: Client, catalog_collection: str):
603598 self .log_job_execution (JobStatus .failed , None , f"At least one of the tasks failed: { task_e } " )
604599 self .delete_files_from_bucket ()
605600 self .logger .error (f"Tasks monitoring finished with error. At least one of the tasks failed: { task_e } " )
606- self .logger .critical ("end manage_dask_tasks_results 2" )
607601 return
608602 # Publish all the features once processed
609603 published_featurs_ids : list [str ] = []
@@ -619,13 +613,11 @@ def manage_dask_tasks_results(self, client: Client, catalog_collection: str):
619613 self .delete_files_from_bucket ()
620614 # delete the published items
621615 self .unpublish_rspy_features (catalog_collection , published_featurs_ids )
622- self .logger .critical ("end manage_dask_tasks_results 3" )
623616 return
624617 published_featurs_ids .append (feature .id )
625618 # Update status once all features are processed
626619 self .log_job_execution (JobStatus .successful , 100 , "Finished" )
627620 self .logger .info ("Tasks monitoring finished" )
628- self .logger .critical ("end manage_dask_tasks_results 5" )
629621
630622 def dask_cluster_connect (self ) -> Client :
631623 """Connects a dask cluster scheduler
@@ -714,8 +706,6 @@ def dask_cluster_connect(self) -> Client:
714706 clusters = gateway .list_clusters ()
715707 self .logger .debug (f"The list of clusters: { clusters } " )
716708
717- self .logger .critical (f"Cluster names: { [cluster .options .get ('cluster_name' ) for cluster in clusters ]} " )
718-
719709 # In local mode, get the first cluster from the gateway.
720710 cluster_id = None
721711 if local_mode :
@@ -737,11 +727,7 @@ def dask_cluster_connect(self) -> Client:
737727 if not cluster_id :
738728 raise IndexError (f"No dask cluster named '{ cluster_name } ' was found." )
739729
740- self .logger .critical (
741- f"Connect to gateway { os .environ ['DASK_GATEWAY__ADDRESS' ]} cluster_id: { cluster_id } " ,
742- )
743730 self .cluster = gateway .connect (cluster_id )
744- self .logger .critical (f"Connected: { self .cluster } " )
745731
746732 self .logger .info (f"Successfully connected to the { cluster_name } dask cluster" )
747733 except KeyError as e :
@@ -817,9 +803,6 @@ def submit_tasks_to_dask_cluster(self, token: str, trusted_domains: list[str], c
817803 """
818804 # empty the list
819805 self .tasks = []
820-
821- self .logger .critical (f"start submit_tasks_to_dask_cluster" )
822-
823806 # Submit tasks
824807 try :
825808 for asset_info in self .assets_info :
@@ -837,8 +820,6 @@ def submit_tasks_to_dask_cluster(self, token: str, trusted_domains: list[str], c
837820 self .logger .exception (f"Submitting task to dask cluster failed. Reason: { e } " )
838821 raise RuntimeError (f"Submitting task to dask cluster failed. Reason: { e } " ) from e
839822
840- self .logger .critical (f"end submit_tasks_to_dask_cluster" )
841-
842823 async def process_rspy_features (self , catalog_collection : str ) -> tuple [str , dict ]:
843824 """
844825 Method used to trigger dask distributed streaming process.
@@ -855,23 +836,19 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
855836 Example: ("application/json", {"running": <job_id>})
856837 """
857838 self .logger .debug ("Starting main loop" )
858- self .logger .critical (f"process_rspy_features" )
859839
860840 # Process each feature by initiating the streaming download of its assets to the final bucket.
861841 for feature in self .stream_list :
862842 if not self .prepare_streaming_tasks (catalog_collection , feature ):
863- self .logger .critical (f"process_rspy_features 1" )
864843 return self .log_job_execution (JobStatus .failed , 0 , "Unable to create tasks for the Dask cluster" )
865844 if not self .assets_info :
866845 self .logger .info ("There are no assets to stage. Exiting...." )
867- self .logger .critical (f"process_rspy_features 2" )
868846 return self .log_job_execution (JobStatus .successful , 100 , "Finished without processing any tasks" )
869847
870848 # Determine the domain(s)
871849 domains = list ({urlparse (asset [0 ]).hostname for asset in self .assets_info })
872850 self .logger .info ("Staging from domain(s) {domains}" )
873851 if len (domains ) > 1 :
874- self .logger .critical (f"process_rspy_features 3" )
875852 return self .log_job_execution (JobStatus .failed , 0 , "Staging from multiple domains is not supported yet" )
876853 domain = domains [0 ]
877854
@@ -883,7 +860,6 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
883860 self .logger .error (
884861 f"Failed to retrieve the token needed to connect to the external station: { http_exception } " ,
885862 )
886- self .logger .critical (f"process_rspy_features 4" )
887863 return self .log_job_execution (
888864 JobStatus .failed ,
889865 0 ,
@@ -896,7 +872,6 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
896872 self .submit_tasks_to_dask_cluster (token , external_auth_config .trusted_domains , dask_client )
897873 except RuntimeError as re :
898874 self .logger .error ("Failed to start the staging process" )
899- self .logger .critical (f"process_rspy_features 5" )
900875 return self .log_job_execution (JobStatus .failed , 0 , f"{ re } " )
901876
902877 # Set the status to running for the job
@@ -913,7 +888,6 @@ async def process_rspy_features(self, catalog_collection: str) -> tuple[str, dic
913888 self .assets_info = []
914889 dask_client .close ()
915890
916- self .logger .critical (f"process_rspy_features 6" )
917891 return self ._get_execute_result ()
918892
919893 def publish_rspy_feature (self , catalog_collection : str , feature : Feature ):
0 commit comments