Skip to content

Commit f955121

Browse files
authored
Merge pull request #1040 from cam72cam/master
STScIDSI-31: Enable downloading MAST HST data from S3
2 parents 702cdaf + 375103f commit f955121

File tree

1 file changed

+146
-3
lines changed

1 file changed

+146
-3
lines changed

astroquery/mast/core.py

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import warnings
1313
import json
1414
import time
15+
import string
1516
import os
1617
import re
1718
import keyring
19+
import threading
1820

1921
import numpy as np
2022

@@ -28,6 +30,7 @@
2830
from astropy.table import Table, Row, vstack, MaskedColumn
2931
from astropy.extern.six.moves.urllib.parse import quote as urlencode
3032
from astropy.extern.six.moves.http_cookiejar import Cookie
33+
from astropy.utils.console import ProgressBarOrSpinner
3134
from astropy.utils.exceptions import AstropyWarning
3235
from astropy.logger import log
3336

@@ -39,7 +42,6 @@
3942
NoResultsWarning, InputWarning, AuthenticationWarning)
4043
from . import conf
4144

42-
4345
__all__ = ['Observations', 'ObservationsClass',
4446
'Mast', 'MastClass']
4547

@@ -816,6 +818,13 @@ class ObservationsClass(MastClass):
816818
Class for querying MAST observational data.
817819
"""
818820

821+
def __init__(self, *args, **kwargs):
822+
823+
super(ObservationsClass, self).__init__(*args, **kwargs)
824+
825+
self._boto3 = None
826+
self._botocore = None
827+
819828
def list_missions(self):
820829
"""
821830
Lists data missions archived by MAST and avaiable through `astroquery.mast`.
@@ -1290,6 +1299,132 @@ def _download_curl_script(self, products, out_dir):
12901299
"URL": [url]})
12911300
return manifest
12921301

1302+
def enable_s3_hst_dataset(self):
1303+
"""
1304+
Attempts to enable downloading HST public files from S3 instead of MAST.
1305+
Requires the boto3 library to function.
1306+
"""
1307+
import boto3
1308+
import botocore
1309+
self._boto3 = boto3
1310+
self._botocore = botocore
1311+
1312+
log.info("Using the S3 HST public dataset")
1313+
log.warning("Your AWS account will be charged for access to the S3 bucket")
1314+
log.info("See Request Pricing in https://aws.amazon.com/s3/pricing/ for details")
1315+
log.info("If you have not configured boto3, follow the instructions here: " +
1316+
"https://boto3.readthedocs.io/en/latest/guide/configuration.html")
1317+
1318+
def disable_s3_hst_dataset(self):
1319+
"""
1320+
Disables downloading HST public files from S3 instead of MAST
1321+
"""
1322+
self._boto3 = None
1323+
self._botocore = None
1324+
1325+
def _download_from_s3(self, dataProduct, localPath, cache=True):
1326+
# The following is a mishmash of BaseQuery._download_file and s3 access through boto
1327+
1328+
bkt_name = 'stpubdata'
1329+
1330+
# This is a cheap operation and does not perform any actual work yet
1331+
s3 = self._boto3.resource('s3')
1332+
s3_client = self._boto3.client('s3')
1333+
bkt = s3.Bucket(bkt_name)
1334+
1335+
dataUri = dataProduct['dataURI']
1336+
filename = dataUri.split("/")[-1]
1337+
obs_id = dataProduct['obs_id']
1338+
1339+
obs_id = obs_id.lower()
1340+
1341+
# This next part is a bit funky. Let me explain why:
1342+
# We have 2 different possible URI schemes for HST:
1343+
# mast:HST/product/obs_id_filename.type (old style)
1344+
# mast:HST/product/obs_id/obs_id_filename.type (new style)
1345+
# The first scheme was developed thinking that the obs_id in the filename
1346+
# would *always* match the actual obs_id folder the file was placed in.
1347+
# Unfortunately this assumption was false.
1348+
# We have been trying to switch to the new uri scheme as it specifies the
1349+
# obs_id used in the folder path correctly.
1350+
# The cherry on top is that the obs_id in the new style URI is not always correct either!
1351+
# When we are looking up files we have some code which iterates through all of
1352+
# the possible permutations of the obs_id's last char which can be *ANYTHING*
1353+
#
1354+
# So in conclusion we can't trust the last char obs_id from the file or from the database
1355+
# So with that in mind, hold your nose when reading the following:
1356+
1357+
info_lookup = None
1358+
sane_path = os.path.join("hst", "public", obs_id[:4], obs_id, filename)
1359+
try:
1360+
info_lookup = s3_client.head_object(Bucket=bkt_name, Key=sane_path, RequestPayer='requester')
1361+
bucketPath = sane_path
1362+
except self._botocore.exceptions.ClientError as e:
1363+
if e.response['Error']['Code'] != "404":
1364+
raise
1365+
1366+
if info_lookup is None:
1367+
# Unfortunately our file placement logic is anything but sane
1368+
# We put files in folders that don't make sense
1369+
for ch in (string.digits + string.ascii_lowercase):
1370+
# The last char of the obs_folder (observation id) can be any lowercase or numeric char
1371+
insane_obs = obs_id[:-1] + ch
1372+
insane_path = os.path.join("hst", "public", insane_obs[:4], insane_obs, filename)
1373+
1374+
try:
1375+
info_lookup = s3_client.head_object(Bucket=bkt_name, Key=insane_path, RequestPayer='requester')
1376+
bucketPath = insane_path
1377+
break
1378+
except self._botocore.exceptions.ClientError as e:
1379+
if e.response['Error']['Code'] != "404":
1380+
raise
1381+
1382+
if info_lookup is None:
1383+
raise Exception("Unable to locate file!")
1384+
1385+
# Unfortunately, we can't use the reported file size in the reported product. STScI's backing
1386+
# archive database (CAOM) is frequently out of date and in many cases omits the required information.
1387+
# length = dataProduct["size"]
1388+
# Instead we ask the webserver (in this case S3) what the expected content length is and use that.
1389+
length = info_lookup["ContentLength"]
1390+
1391+
if cache and os.path.exists(localPath):
1392+
if length is not None:
1393+
statinfo = os.stat(localPath)
1394+
if statinfo.st_size != length:
1395+
log.warning("Found cached file {0} with size {1} that is "
1396+
"different from expected size {2}"
1397+
.format(localPath,
1398+
statinfo.st_size,
1399+
length))
1400+
else:
1401+
log.info("Found cached file {0} with expected size {1}."
1402+
.format(localPath, statinfo.st_size))
1403+
return
1404+
1405+
with ProgressBarOrSpinner(length, ('Downloading URL s3://{0}/{1} to {2} ...'.format(
1406+
bkt_name, bucketPath, localPath))) as pb:
1407+
1408+
# Bytes read tracks how much data has been received so far
1409+
# This variable will be updated in multiple threads below
1410+
global bytes_read
1411+
bytes_read = 0
1412+
1413+
progress_lock = threading.Lock()
1414+
1415+
def progress_callback(numbytes):
1416+
# Boto3 calls this from multiple threads pulling the data from S3
1417+
global bytes_read
1418+
1419+
# This callback can be called in multiple threads
1420+
# Access to updating the console needs to be locked
1421+
with progress_lock:
1422+
bytes_read += numbytes
1423+
pb.update(bytes_read)
1424+
1425+
bkt.download_file(bucketPath, localPath, ExtraArgs={"RequestPayer": "requester"},
1426+
Callback=progress_callback)
1427+
12931428
def _download_files(self, products, base_dir, cache=True):
12941429
"""
12951430
Takes an `astropy.table.Table` of data products and downloads them into the dirctor given by base_dir.
@@ -1312,7 +1447,7 @@ def _download_files(self, products, base_dir, cache=True):
13121447
for dataProduct in products:
13131448

13141449
localPath = base_dir + "/" + dataProduct['obs_collection'] + "/" + dataProduct['obs_id']
1315-
dataUrl = self._MAST_DOWNLOAD_URL + "?uri=" + dataProduct['dataURI']
1450+
dataUrl = self._MAST_DOWNLOAD_URL + "?uri=" + dataProduct["dataURI"]
13161451

13171452
if not os.path.exists(localPath):
13181453
os.makedirs(localPath)
@@ -1324,7 +1459,15 @@ def _download_files(self, products, base_dir, cache=True):
13241459
url = None
13251460

13261461
try:
1327-
self._download_file(dataUrl, localPath, cache=cache)
1462+
if self._boto3 is not None and dataProduct["dataURI"].startswith("mast:HST/product"):
1463+
try:
1464+
self._download_from_s3(dataProduct, localPath, cache)
1465+
except Exception as ex:
1466+
log.exception("Error pulling from S3 bucket: %s" % ex)
1467+
log.warn("Falling back to mast download...")
1468+
self._download_file(dataUrl, localPath, cache=cache)
1469+
else:
1470+
self._download_file(dataUrl, localPath, cache=cache)
13281471

13291472
# check if file exists also this is where would perform md5,
13301473
# and also check the filesize if the database reliably reported file sizes

0 commit comments

Comments
 (0)