Skip to content

Commit 289dade

Browse files
Merge pull request #1 from EarthScope/improve_nb_output
logging updates
2 parents e1eae21 + bdc191e commit 289dade

File tree

4 files changed

+179
-83
lines changed

4 files changed

+179
-83
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ boto3
22
botocore
33
numpy
44
pandas
5+
scipy
56
julian
67
pyarrow
78
pandera
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from .temp import DataHandler,DATA_TYPE,FILE_TYPE,DataCatalog
1+
from .temp import DataHandler,DATA_TYPE,FILE_TYPE,DataCatalog, SOURCE_MAP, TARGET_MAP

src/es_sfgtools/pipeline/temp.py

Lines changed: 147 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
import matplotlib.pyplot as plt
1212
import matplotlib.dates as mdates
1313
import seaborn
14-
import tqdm
14+
from tqdm.autonotebook import tqdm
1515
from multiprocessing import Pool, cpu_count
1616
from functools import partial
1717
import numpy as np
1818
import warnings
19-
import folium
19+
import folium
2020
import json
2121
warnings.filterwarnings("ignore")
2222
seaborn.set_theme(style="whitegrid")
@@ -36,11 +36,13 @@ class FILE_TYPE(Enum):
3636
SEABIRD = "svpavg"
3737
NOVATEL770 = "novatel770"
3838
DFPO00 = "dfpo00"
39+
OFFLOAD = "offload"
3940

4041
@classmethod
4142
def to_schema(cls):
4243
return [x.name for x in cls]
4344

45+
4446
FILE_TYPES = [x.value for x in FILE_TYPE]
4547

4648
class DATA_TYPE(Enum):
@@ -170,6 +172,7 @@ def __init__(self,working_dir:Path) -> None:
170172
self.catalog_data = DataCatalog.validate(pd.read_csv(self.catalog))
171173
else:
172174
self.catalog_data = pd.DataFrame()
175+
logger.info(f"Data Handler initialized, data will be stored in {self.working_dir}")
173176

174177
def _get_timestamp(self,remote_prefix:str) -> pd.Timestamp:
175178
"""
@@ -189,6 +192,21 @@ def _get_timestamp(self,remote_prefix:str) -> pd.Timestamp:
189192
except:
190193
return None
191194

195+
def get_local_counts(self):
196+
try:
197+
local_files = self.catalog_data[self.catalog_data['local_location'].notnull()]
198+
data_type_counts = local_files.type.value_counts()
199+
except (AttributeError, KeyError):
200+
data_type_counts = pd.Series()
201+
return data_type_counts
202+
203+
def get_dtype_counts(self):
204+
try:
205+
data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts()
206+
except AttributeError:
207+
data_type_counts = "No data types found"
208+
return data_type_counts
209+
192210
def add_campaign_data(self,
193211
network: str,
194212
station: str,
@@ -215,7 +233,7 @@ def add_campaign_data(self,
215233
if file_type in file.replace("_", ""):
216234
discovered_file_type = file_type
217235
break
218-
236+
219237
if discovered_file_type is None:
220238
logger.error(f"File type not recognized for {file}")
221239
continue
@@ -254,43 +272,22 @@ def add_campaign_data(self,
254272
self.catalog_data = pd.concat([self.catalog_data, incoming_df])
255273

256274
self.catalog_data.to_csv(self.catalog,index=False)
257-
# Get count of each data type in the catalog
258-
data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts()
259-
return data_type_counts
260-
261-
def _download_https(self, remote_url: Path, destination_dir: Path, token_path='.'):
262-
"""
263-
Downloads a file from the specified https url on gage-data
264-
265-
Args:
266-
remote_url (Path): The path of the file in the gage-data storage.
267-
destination (Path): The local path where the file will be downloaded.
268-
269-
Returns:
270-
bool: True if the file was downloaded successfully, False otherwise.
271-
"""
272-
try:
273-
local_location = destination_dir / Path(remote_url).name
274-
get_file_from_gage_data(url=remote_url,
275-
dest_dir=destination_dir,
276-
token_path=token_path)
277-
logger.info(f"Downloaded {str(remote_url)} to {str(local_location)}")
278-
return True
279-
except Exception as e:
280-
response = f"Error downloading {str(remote_url)} \n {e}"
281-
response += "\n HINT: Check authentication credentials"
282-
logger.error(response)
283-
print(response)
284-
return False
285275

286-
def download_campaign_data(self,network:str,station:str,survey:str,override:bool=False,from_s3:bool=False):
276+
def download_campaign_data(self,
277+
network:str,
278+
station:str,
279+
survey:str,
280+
file_type: str,
281+
override:bool=False,
282+
from_s3:bool=False):
287283
"""
288284
Retrieves and catalogs data from the remote locations stored in the catalog.
289285
290286
Args:
291287
network (str): The network name.
292288
station (str): The station name.
293289
survey (str): The survey name.
290+
file_type (str): The type of file to download
294291
override (bool): Whether to download the data even if it already exists
295292
from_s3 (bool): Use S3 download functionality if remote resourses are in an s3 bucket
296293
@@ -299,12 +296,19 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool
299296
"""
300297
# TODO make multithreaded
301298
# Find all entries in the catalog that match the params
299+
local_counts = self.get_local_counts()
300+
try:
301+
local_files_of_type = local_counts[file_type]
302+
except KeyError:
303+
local_files_of_type = 0
304+
logger.info(f"Data directory currently contains {local_files_of_type} files of type {file_type}")
302305
entries = self.catalog_data[
303306
(self.catalog_data.network == network)
304307
& (self.catalog_data.station == station)
305308
& (self.catalog_data.survey == survey)
309+
& (self.catalog_data.type == file_type)
306310
]
307-
311+
logger.info(f"Downloading {entries.shape[0]-local_files_of_type} missing files of type {file_type}")
308312
if entries.shape[0] < 1:
309313
raise Exception('No matching data found in catalog')
310314
if from_s3:
@@ -324,7 +328,7 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool
324328
remote_url=entry.remote_prefix,
325329
destination=local_location,
326330
)
327-
# Check if the entry is from an S3 location or gage/sage
331+
# Check if the entry is from an S3 location or gage-data
328332
else:
329333
is_download = self._download_https(
330334
remote_url=entry.remote_filepath, destination_dir=self.raw_dir
@@ -340,9 +344,9 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool
340344
if count == 0:
341345
response = f"No files downloaded"
342346
logger.error(response)
343-
print(response)
344-
345-
logger.info(f"Downloaded {count} files to {str(self.raw_dir)}")
347+
#print(response)
348+
else:
349+
logger.info(f"Downloaded {count} files")
346350

347351
self.catalog_data.to_csv(self.catalog,index=False)
348352

@@ -412,30 +416,6 @@ def add_campaign_data_s3(self, network: str, station: str, survey: str, bucket:
412416
data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts()
413417
return data_type_counts
414418

415-
def _download_boto(self, client: boto3.client, bucket: str, remote_url: Path, destination: Path):
416-
"""
417-
Downloads a file from the specified S3 bucket.
418-
419-
Args:
420-
client (boto3.client): The Boto3 client object for S3.
421-
bucket (str): The name of the S3 bucket.
422-
remote_url (Path): The path of the file in the S3 bucket.
423-
destination (Path): The local path where the file will be downloaded.
424-
425-
Returns:
426-
bool: True if the file was downloaded successfully, False otherwise.
427-
"""
428-
try:
429-
client.download_file(Bucket=bucket, Key=str(remote_url), Filename=str(destination))
430-
logger.info(f"Downloaded {str(remote_url)} to {str(destination)}")
431-
return True
432-
except Exception as e:
433-
response = f"Error downloading {str(remote_url)} \n {e}"
434-
response += "\n HINT: $ aws sso login"
435-
logger.error(response)
436-
print(response)
437-
return False
438-
439419
def download_campaign_data_s3(self,network:str,station:str,survey:str,override:bool=False):
440420
"""
441421
Retrieves and catalogs data from the s3 locations stored in the catalog.
@@ -480,6 +460,55 @@ def download_campaign_data_s3(self,network:str,station:str,survey:str,override:b
480460
logger.info(f"Downloaded {count} files to {str(self.raw_dir)}")
481461

482462
self.catalog_data.to_csv(self.catalog,index=False)
463+
464+
def _download_https(self, remote_url: Path, destination_dir: Path, token_path='.'):
465+
"""
466+
Downloads a file from the specified https url on gage-data
467+
468+
Args:
469+
remote_url (Path): The path of the file in the gage-data storage.
470+
destination (Path): The local path where the file will be downloaded.
471+
472+
Returns:
473+
bool: True if the file was downloaded successfully, False otherwise.
474+
"""
475+
try:
476+
#local_location = destination_dir / Path(remote_url).name
477+
get_file_from_gage_data(url=remote_url,
478+
dest_dir=destination_dir,
479+
token_path=token_path)
480+
#logger.info(f"Downloaded {str(remote_url)} to {str(local_location)}")
481+
return True
482+
except Exception as e:
483+
response = f"Error downloading {str(remote_url)} \n {e}"
484+
response += "\n HINT: Check authentication credentials"
485+
logger.error(response)
486+
print(response)
487+
return False
488+
489+
def _download_boto(self, client: boto3.client, bucket: str, remote_url: Path, destination: Path):
490+
"""
491+
Downloads a file from the specified S3 bucket.
492+
493+
Args:
494+
client (boto3.client): The Boto3 client object for S3.
495+
bucket (str): The name of the S3 bucket.
496+
remote_url (Path): The path of the file in the S3 bucket.
497+
destination (Path): The local path where the file will be downloaded.
498+
499+
Returns:
500+
bool: True if the file was downloaded successfully, False otherwise.
501+
"""
502+
try:
503+
client.download_file(Bucket=bucket, Key=str(remote_url), Filename=str(destination))
504+
logger.info(f"Downloaded {str(remote_url)} to {str(destination)}")
505+
return True
506+
except Exception as e:
507+
response = f"Error downloading {str(remote_url)} \n {e}"
508+
response += "\n HINT: $ aws sso login"
509+
logger.error(response)
510+
print(response)
511+
return False
483512

484513
def clear_raw_processed_data(self, network: str, station: str, survey: str):
485514
"""
@@ -551,6 +580,45 @@ def clear_raw_processed_data(self, network: str, station: str, survey: str):
551580

552581
pbar.close()
553582

583+
def add_entry(self, entry: dict):
584+
"""
585+
Add an entry in the catalog. This may result in duplicates, which need to be cleaned up via
586+
consolidate_entries()
587+
588+
Args:
589+
entry (dict): The new entry.
590+
591+
Returns:
592+
None
593+
"""
594+
with self.catalog.open("r") as f:
595+
keys=list(f.readline().rstrip().split(','))
596+
entry_str = "\n"
597+
for key in keys:
598+
if key in entry:
599+
entry_str += f"{str(entry[key])}"
600+
if key != keys[-1]:
601+
entry_str += ","
602+
603+
with self.catalog.open("a") as f:
604+
print(entry_str)
605+
f.write(entry_str)
606+
607+
def consolidate_entries(self):
608+
"""
609+
Remove any duplicate entries, keeping the most complete.
610+
611+
Args:
612+
None
613+
Returns:
614+
None
615+
"""
616+
df = pd.read_csv(str(self.catalog))
617+
df['count'] = pd.isnull(df).sum(1)
618+
df=df.sort_values(['count']).drop_duplicates(subset=['uuid'],keep='first').drop(labels='count',axis=1)
619+
df=df.sort_index()
620+
df.to_csv(self.catalog,index=False)
621+
554622
def update_entry(self,entry:dict):
555623
"""
556624
Replace an entry in the catalog with a new entry.
@@ -599,23 +667,24 @@ def get_parent_stack(
599667
parents: List[Union[FILE_TYPE, DATA_TYPE]] = SOURCE_MAP.get(
600668
stack[pointer], []
601669
)
602-
while parents:
603-
parent = parents.pop()
670+
#while parents:
671+
# parent = parents.pop()
672+
for parent in parents:
604673
stack.append(parent)
605674
pointer += 1
606675
return stack[::-1]
607676

608677
def _process_targeted(
609678
self, parent: dict, child_type: Union[FILE_TYPE, DATA_TYPE]
610679
) -> dict:
611-
680+
#TODO: implement multithreaded logging, had to switch to print statement below
612681
if isinstance(parent, dict):
613682
parent = pd.Series(parent)
614683

615684
# if parent.processed:
616685
# return None
617-
logger.info(
618-
f"Attemping to process {parent.uuid} of Type {parent.type} to {child_type.value}"
686+
print(
687+
f"Attemping to process {os.path.basename(parent.local_location)} ({parent.uuid}) of Type {parent.type} to {child_type.value}"
619688
)
620689
child_map = TARGET_MAP.get(FILE_TYPE(parent.type))
621690
if child_map is None:
@@ -688,7 +757,10 @@ def _process_targeted(
688757
"source_uuid": parent.uuid,
689758
"processed": is_processed,
690759
}
691-
logger.info(f"\n Successful Processing: \n{str(processed_meta)}")
760+
print(f"Successful Processing: {str(processed_meta)}")
761+
if is_processed == True:
762+
self.update_entry(processed_meta)
763+
#self.add_entry(processed_meta)
692764
return processed_meta
693765

694766
def _process_data_link(self,
@@ -743,7 +815,7 @@ def _process_data_link(self,
743815

744816
meta_data_list = []
745817
with Pool(processes=cpu_count()) as pool:
746-
for meta_data in tqdm.tqdm(
818+
for meta_data in tqdm(
747819
pool.map(
748820
process_func_partial,
749821
parent_entries_to_process.to_dict(orient="records"),
@@ -753,6 +825,7 @@ def _process_data_link(self,
753825
):
754826
if meta_data is not None:
755827
self.update_entry(meta_data)
828+
#self.add_entry(meta_data)
756829
meta_data_list.append(meta_data)
757830

758831
parent_entries_processed = parent_entries_to_process[
@@ -761,18 +834,21 @@ def _process_data_link(self,
761834
)
762835
]
763836
logger.info(f"Processed {len(meta_data_list)} Out of {parent_entries_processed.shape[0]} For {target.value} Files from {parent_entries_processed.shape[0]} Parent Files")
837+
self.consolidate_entries()
764838
return parent_entries_processed
765839

766840
def _process_data_graph(self, network: str, station: str, survey: str,child_type:Union[FILE_TYPE,DATA_TYPE],override:bool=False):
767841
processing_queue = self.get_parent_stack(child_type=child_type)
768-
842+
#logger.info(f"processing queue: {processing_queue}")
769843
while processing_queue:
770844
parent = processing_queue.pop(0)
845+
#logger.info(f"parent: {parent}")
771846
children:dict = TARGET_MAP.get(parent,{})
847+
#logger.info(f"children: {children}")
772848
children_to_process = [k for k in children.keys() if k in processing_queue]
773-
849+
#logger.info(f"children to process: {children_to_process}")
774850
for child in children_to_process:
775-
851+
#logger.info(f"child:{child}")
776852
processed_parents:pd.DataFrame = self._process_data_link(network,station,survey,target=child,source=[parent],override=override)
777853
# Check if all children of this parent have been processed
778854
if processed_parents is not None:

0 commit comments

Comments
 (0)