Skip to content

Commit 505ecce

Browse files
authored
Merge pull request #139 from c-hydro/dev
Dev
2 parents 9032a77 + b65c844 commit 505ecce

File tree

7 files changed

+164
-133
lines changed

7 files changed

+164
-133
lines changed

pyproject.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ dependencies = [
3838
"pyhdf>=0.10.2", #only for modis_downloader
3939
"cdsapi>=0.7.4", #only for cds_downloader
4040
"ecmwf-opendata >= 0.2.0", #only for ecmwf_opendata_downloader
41-
"paramiko>=2.9.3",
42-
"ftpretty>=0.1.0", # I would like to avoid a repetition here, paramiko and ftpretty kind of do the same thing
41+
"paramiko>=2.9.3"
4342
]
4443

4544
[project.urls]

src/door/base_downloaders.py

Lines changed: 111 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
import xarray as xr
88
import os
99

10-
from .utils.io import download_http, check_download, handle_missing, download_ftp, download_sftp
10+
import paramiko
11+
import ftplib
12+
import requests
13+
14+
from .utils.io import check_download, handle_missing
1115

1216
from d3tools import spatial as sp
1317
from d3tools import timestepping as ts
@@ -164,13 +168,13 @@ def _loop_timesteps_and_save_data(self, timesteps: list[ts.TimeStep]) -> None:
164168
# the latter is more space efficient, but at times you have to download the data for several timesteps at once
165169
# so it is better to have the option to download all the data in a single folder
166170
if self.single_temp_folder:
167-
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True, prefix=os.getenv('TMP')) as tmp_path:
171+
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True, dir=os.getenv('TMP')) as tmp_path:
168172
for timestep in timesteps:
169173
self._get_and_save_data_ts(timestep, tmp_path)
170174
rm_at_exit(tmp_path)
171175
else:
172176
for timestep in timesteps:
173-
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True, prefix=os.getenv('TMP')) as tmp_path:
177+
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True, dir=os.getenv('TMP')) as tmp_path:
174178
self._get_and_save_data_ts(timestep, tmp_path)
175179
rm_at_exit(tmp_path)
176180

@@ -349,20 +353,14 @@ class URLDownloader(DOORDownloader):
349353

350354
name = "URL_Downloader"
351355

352-
def __init__(self, url_blank: str, protocol: str = 'http', host: str|None = None) -> None:
356+
def __init__(self, url_blank: str, protocol: str = 'http') -> None:
353357

354358
self.url_blank = url_blank
355-
if protocol.lower() not in ['http', 'ftp', 'sftp', 'https']:
359+
if protocol.lower() not in ['http', 'https']:
356360
raise ValueError(f'Protocol {protocol} not supported')
357361
else:
358362
self.protocol = protocol.lower()
359363

360-
if self.protocol == 'ftp' or self.protocol == 'sftp':
361-
if host is None:
362-
raise ValueError(f'FTP host must be specified')
363-
else:
364-
self.host = host
365-
366364
super().__init__()
367365

368366
def format_url(self, **kwargs) -> str:
@@ -382,14 +380,15 @@ def download(self, destination: str, min_size: float = None, missing_action: str
382380

383381
url = self.format_url(**kwargs)
384382
try:
385-
if self.protocol == 'http' or self.protocol == 'https':
386-
download_http(url, destination, kwargs["auth"])
387-
elif self.protocol == 'ftp':
388-
download_ftp(self.host, url, destination, kwargs["auth"])
389-
elif self.protocol == 'sftp':
390-
download_sftp(self.host, url, destination, kwargs["auth"])
391-
else:
392-
raise ValueError(f'Protocol {self.protocol} not supported')
383+
r = requests.get(url, kwargs["auth"])
384+
if r.status_code != 200:
385+
raise FileNotFoundError(r.text)
386+
387+
os.makedirs(os.path.dirname(destination), exist_ok=True)
388+
389+
with open(destination, 'wb') as f:
390+
f.write(r.content)
391+
393392
except Exception as e:
394393
handle_missing(missing_action, kwargs)
395394
self.log.debug(f'Error downloading {url}: {e}')
@@ -403,6 +402,98 @@ def download(self, destination: str, min_size: float = None, missing_action: str
403402

404403
return True
405404

405+
class FTPDownloader(DOORDownloader):
406+
"""
407+
Downloader for data from an FTP server via FTP or SFTP.
408+
This typer of downloader is useful for data that can be downloaded from an FTP server.
409+
It allows to specify a URL template with placeholders for various parameters (as keyword arguments).
410+
"""
411+
412+
name = "FTP_Downloader"
413+
414+
def __init__(self, host: str, port: int = 21, protocol: str = 'ftp', user: str = 'anonymous', password: str = 'anonymous') -> None:
415+
if protocol.lower() not in ['ftp', 'sftp']:
416+
raise ValueError(f'Protocol {protocol} not supported')
417+
self.protocol = protocol.lower()
418+
419+
self.host = host
420+
self.port = port
421+
self.user = user
422+
self.password = password
423+
super().__init__()
424+
425+
if self.protocol == 'sftp':
426+
self.transport = paramiko.Transport((host, port))
427+
self.transport.connect(username=user, password=password)
428+
self.client = paramiko.SFTPClient.from_transport(self.transport)
429+
elif self.protocol == 'ftp':
430+
self.client = ftplib.FTP()
431+
self.client.connect(host, port)
432+
self.client.login(user, password)
433+
434+
def __del__(self):
435+
"""
436+
Close the FTP or SFTP client connection when the downloader is deleted.
437+
"""
438+
if hasattr(self, 'client'):
439+
try:
440+
self.client.close()
441+
except Exception as e:
442+
self.log.debug(f'Error closing {self.protocol} client: {e}')
443+
444+
if hasattr(self, 'transport'):
445+
try:
446+
self.transport.close()
447+
except Exception as e:
448+
self.log.debug(f'Error closing {self.protocol} transport: {e}')
449+
450+
def download(self, blank_path, destination: str, min_size: float = None, missing_action: str = 'error', **kwargs) -> bool:
451+
"""
452+
Downloads data from FTP or SFTP server.
453+
Eventually check file size to avoid empty files.
454+
"""
455+
456+
url = blank_path.format(**kwargs)
457+
458+
try:
459+
if self.protocol == 'sftp':
460+
self.client.get(url, destination)
461+
elif self.protocol == 'ftp':
462+
with open(destination, 'wb') as f:
463+
self.client.retrbinary(f'RETR {url}', f.write)
464+
except Exception as e:
465+
handle_missing(missing_action, kwargs)
466+
self.log.debug(f'Error downloading {url} via {self.protocol}: {e}')
467+
return False
468+
469+
success_flag, success_msg = check_download(destination, min_size, missing_action)
470+
if success_flag > 0:
471+
handle_missing(missing_action, kwargs)
472+
self.log.debug(f'Error downloading file from {url}: {success_msg}')
473+
return False
474+
475+
return True
476+
477+
def check_data(self, blank_path, **kwargs) -> bool:
478+
"""
479+
Check if the data is available on the FTP or SFTP server.
480+
This method can be used to check if the data is available before downloading it.
481+
"""
482+
url = blank_path.format(**kwargs)
483+
if self.protocol == 'ftp':
484+
# For FTP, we can use the 'nlst' command to check if the file exists
485+
if len(self.client.nlst(url)) > 0:
486+
return True
487+
elif self.protocol == 'sftp':
488+
try:
489+
self.client.stat(url)
490+
return True
491+
except Exception as e:
492+
pass
493+
return False
494+
495+
496+
406497
class APIDownloader(DOORDownloader):
407498
"""
408499
Downloader for data from an API.
@@ -443,4 +534,4 @@ def download(self, destination: str, min_size: float = None, missing_action: str
443534
def retrieve(self, **kwargs):
444535
return self.client.retrieve(**kwargs)
445536

446-
Downloader = DOORDownloader
537+
Downloader = DOORDownloader

src/door/data_sources/chirps/chirps_downloader.py

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import datetime as dt
77
import requests
88

9-
from ...base_downloaders import URLDownloader
9+
from ...base_downloaders import FTPDownloader
1010

1111
from d3tools import timestepping as ts
1212
from d3tools.timestepping.timestep import TimeStep
@@ -15,15 +15,15 @@
1515

1616
from ...utils.io import decompress_gz
1717

18-
class CHIRPSDownloader(URLDownloader):
18+
class CHIRPSDownloader(FTPDownloader):
1919
source = "CHIRPS"
2020
name = "CHIRPS_downloader"
2121

2222
default_options = {
2323
'get_prelim' : True, # if True, will also download preliminary data if available
2424
}
2525

26-
host = 'ftp://ftp.chc.ucsb.edu'
26+
host = 'ftp.chc.ucsb.edu'
2727
auth = ('anonymous', '')
2828

2929
homev2 = "/pub/org/chc/products/CHIRPS-2.0/"
@@ -76,7 +76,7 @@ class CHIRPSDownloader(URLDownloader):
7676

7777
def __init__(self, product: str) -> None:
7878
self.set_product(product)
79-
super().__init__(self.url_blank, protocol = 'ftp', host = self.host)
79+
super().__init__(self.host, protocol = 'ftp')
8080

8181
def set_product(self, product: str) -> None:
8282
self.product = product
@@ -100,28 +100,27 @@ def get_last_published_ts(self, prelim = None, product = None, **kwargs) -> ts.T
100100
product = self.product
101101

102102
ts_per_year = self.available_products[product]["ts_per_year"]
103-
url = self.available_products[product]["url"] if not prelim else self.available_products[product]["prelim_url"]
103+
blank_path = self.available_products[product]["url"] if not prelim else self.available_products[product]["prelim_url"]
104104

105105
if ts_per_year == 365:
106106
TimeStep = ts.Day
107107
else:
108108
TimeStep = FixedNTimeStep.get_subclass(ts_per_year)
109109

110110
current_timestep = TimeStep.from_date(dt.datetime.now())
111-
while True:
112-
if "pentad_of_month" in url:
111+
if 'lim' in kwargs:
112+
start = TimeStep.from_date(kwargs['lim'])
113+
else:
114+
start = TimeStep.from_date(dt.datetime(2000, 1, 1))
115+
116+
while current_timestep >= start:
117+
if "pentad_of_month" in blank_path:
113118
pentad_of_month = 6 if ts_per_year == 12 else current_timestep.dekad_of_month*2
114-
current_url = url.format(timestep = current_timestep, pentad_of_month = pentad_of_month)
119+
current_url = blank_path.format(timestep = current_timestep, pentad_of_month = pentad_of_month)
115120
else:
116-
current_url = url.format(timestep = current_timestep)
121+
current_url = blank_path.format(timestep = current_timestep)
117122

118-
# send a request to the url
119-
from ftpretty import ftpretty as ftp
120-
client = ftp(self.host.replace('ftp://', ''), self.auth[0], self.auth[1])
121-
list = client.list(current_url)
122-
123-
# if the request is successful, the last published timestep is the current timestep
124-
if len(list) > 0:
123+
if self.check_data(current_url):
125124
return current_timestep
126125

127126
# if the request is not successful, move to the previous timestep
@@ -140,7 +139,7 @@ def _get_data_ts(self,
140139
tmp_filename = f'{tmp_filename_raw}.tif.gz' if self.url_blank.endswith('.gz') else f'{tmp_filename_raw}.tif'
141140
tmp_destination = os.path.join(tmp_path, tmp_filename)
142141
if "pentad_of_month" not in self.url_blank:
143-
success = self.download(tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, auth = self.auth)
142+
success = self.download(self.url_blank, tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, auth = self.auth)
144143
else:
145144
if isinstance(timestep, ts.Month):
146145
pentads = [1, 2, 3, 4, 5, 6]
@@ -151,7 +150,7 @@ def _get_data_ts(self,
151150
for pentad in pentads:
152151
tmp_filename = f'{tmp_filename_raw}_{pentad}.tif'
153152
tmp_destination = os.path.join(tmp_path, tmp_filename)
154-
success = self.download(tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, pentad_of_month = pentad, auth = self.auth)
153+
success = self.download(self.url_blank, tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, pentad_of_month = pentad, auth = self.auth)
155154
tmp_filenames.append(tmp_filename)
156155
if not success:
157156
break
@@ -171,7 +170,7 @@ def _get_data_ts(self,
171170
if "pentad_of_month" not in self.url_prelim_blank:
172171
tmp_filename = f'{tmp_filename_raw}.tif.gz' if self.url_prelim_blank.endswith('.gz') else f'{tmp_filename_raw}.tif'
173172
tmp_destination = os.path.join(tmp_path, tmp_filename)
174-
success = self.download(tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, prelim = True, auth = self.auth)
173+
success = self.download(self.url_prelim_blank, tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, prelim = True, auth = self.auth)
175174

176175
else:
177176
if isinstance(timestep, ts.Month):
@@ -183,7 +182,7 @@ def _get_data_ts(self,
183182
for pentad in pentads:
184183
tmp_filename = f'{tmp_filename_raw}_{pentad}.tif'
185184
tmp_destination = os.path.join(tmp_path, tmp_filename)
186-
success = self.download(tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, pentad_of_month = pentad, prelim = True, auth = self.auth)
185+
success = self.download(self.url_prelim_blank, tmp_destination, min_size = 200, missing_action = 'ignore', timestep = timestep, pentad_of_month = pentad, prelim = True, auth = self.auth)
187186
tmp_filenames.append(tmp_filename)
188187
if not success:
189188
break
@@ -215,14 +214,4 @@ def _get_data_ts(self,
215214
if isprelim:
216215
cropped.attrs['PRELIMINARY'] = 'True'
217216

218-
yield cropped, {}
219-
220-
def format_url(self, prelim = False, **kwargs):
221-
"""
222-
Format the url for the download
223-
"""
224-
if prelim:
225-
url = self.url_prelim_blank.format(**kwargs)
226-
else:
227-
url = self.url_blank.format(**kwargs)
228-
return url
217+
yield cropped, {}

0 commit comments

Comments
 (0)