Skip to content

Commit be720b0

Browse files
committed
works up until upload metadata
problem - the method in the extractor is not getting the secret key possible fix - use token for secret key
1 parent 8838bb1 commit be720b0

File tree

5 files changed

+324
-127
lines changed

5 files changed

+324
-127
lines changed

pyclowder/connectors.py

Lines changed: 55 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
from dotenv import load_dotenv
5959
load_dotenv()
6060

61+
clowder_version = float(os.getenv('clowder_version'))
62+
6163

6264
class Connector(object):
6365
""" Class that will listen for messages.
@@ -415,17 +417,15 @@ def _process_message(self, body):
415417
return
416418

417419
# register extractor
418-
# TODO make work for clowder2.0
419-
if float(os.getenv('clowder_version')) == 2.0:
420-
print('do differently')
421-
registration_url = "%sapi/v2/extractors" % source_host
422-
if registration_url not in Connector.registered_clowder:
423-
Connector.registered_clowder.append(registration_url)
424-
self.register_extractor_v2(registration_url, token)
420+
if clowder_version >= 2.0:
421+
url = "%sapi/v2/extractors" % source_host
425422
else:
426423
url = "%sapi/extractors" % source_host
427-
if url not in Connector.registered_clowder:
428-
Connector.registered_clowder.append(url)
424+
if url not in Connector.registered_clowder:
425+
Connector.registered_clowder.append(url)
426+
if clowder_version >= 2.0:
427+
self.register_extractor("%s" % (url), token=token)
428+
else:
429429
self.register_extractor("%s?key=%s" % (url, secret_key))
430430

431431
# tell everybody we are starting to process the file
@@ -446,16 +446,16 @@ def _process_message(self, body):
446446
found_local = False
447447
try:
448448
if check_result != pyclowder.utils.CheckMessage.bypass:
449-
if float(os.getenv('clowder_version')) == 2.0:
450-
file_metadata = pyclowder.files.download_info_v2(self, host, token, resource["id"])
449+
if clowder_version >= 2.0:
450+
file_metadata = pyclowder.files.download_info(self, host, secret_key, resource["id"], token=token)
451451
else:
452452
file_metadata = pyclowder.files.download_info(self, host, secret_key, resource["id"])
453453
file_path = self._check_for_local_file(file_metadata)
454454
if not file_path:
455-
if float(os.getenv('clowder_version')) == 2.0:
456-
file_path = pyclowder.files.download_v2(self, host, token, resource["id"],
457-
resource["intermediate_id"],
458-
resource["file_ext"])
455+
if clowder_version >= 2.0:
456+
file_path = pyclowder.files.download(self, host, secret_key, resource["id"],
457+
resource["intermediate_id"],
458+
resource["file_ext"], token=token)
459459
else:
460460
file_path = pyclowder.files.download(self, host, secret_key, resource["id"],
461461
resource["intermediate_id"],
@@ -537,61 +537,56 @@ def _process_message(self, body):
537537
else:
538538
self.message_error(resource, message)
539539

540-
def register_extractor(self, endpoints):
540+
def register_extractor(self, endpoints, token=None):
541541
"""Register extractor info with Clowder.
542542
543543
This assumes a file called extractor_info.json to be located in either the
544544
current working directory, or the folder where the main program is started.
545545
"""
546+
if clowder_version >= 2.0:
547+
if not endpoints or endpoints == "":
548+
return
549+
550+
logger = logging.getLogger(__name__)
551+
552+
headers = {'Content-Type': 'application/json',
553+
'Authorization': 'Bearer ' + token}
554+
data = self.extractor_info
555+
556+
for url in endpoints.split(','):
557+
if url not in Connector.registered_clowder:
558+
Connector.registered_clowder.append(url)
559+
try:
560+
result = requests.post(url.strip(), headers=headers,
561+
data=json.dumps(data),
562+
verify=self.ssl_verify)
563+
result.raise_for_status()
564+
logger.debug("Registering extractor with %s : %s", url, result.text)
565+
except Exception as exc: # pylint: disable=broad-except
566+
logger.exception('Error in registering extractor: ' + str(exc))
567+
else:
568+
# don't do any work if we wont register the endpoint
569+
if not endpoints or endpoints == "":
570+
return
546571

547-
# don't do any work if we wont register the endpoint
548-
if not endpoints or endpoints == "":
549-
return
550-
551-
logger = logging.getLogger(__name__)
552-
553-
headers = {'Content-Type': 'application/json'}
554-
data = self.extractor_info
555-
556-
for url in endpoints.split(','):
557-
if url not in Connector.registered_clowder:
558-
Connector.registered_clowder.append(url)
559-
try:
560-
result = requests.post(url.strip(), headers=headers,
561-
data=json.dumps(data),
562-
verify=self.ssl_verify)
563-
result.raise_for_status()
564-
logger.debug("Registering extractor with %s : %s", url, result.text)
565-
except Exception as exc: # pylint: disable=broad-except
566-
logger.exception('Error in registering extractor: ' + str(exc))
567-
568-
def register_extractor_v2(self, endpoint, token):
569-
"""Register extractor info with Clowder.
570-
571-
This assumes a file called extractor_info.json to be located in either the
572-
current working directory, or the folder where the main program is started.
573-
"""
572+
logger = logging.getLogger(__name__)
574573

575-
# don't do any work if we wont register the endpoint
576-
if not endpoint or endpoint == "":
577-
return
574+
headers = {'Content-Type': 'application/json'}
575+
data = self.extractor_info
578576

579-
logger = logging.getLogger(__name__)
580577

581-
headers = {'Content-Type': 'application/json',
582-
'Authorization': 'Bearer ' + token}
583-
data = self.extractor_info
584578

585-
if endpoint not in Connector.registered_clowder:
586-
Connector.registered_clowder.append(endpoint)
587-
try:
588-
result = requests.post(endpoint.strip(), headers=headers,
589-
data=json.dumps(data),
590-
verify=self.ssl_verify)
591-
result.raise_for_status()
592-
logger.debug("Registering extractor with %s : %s", url, result.text)
593-
except Exception as exc: # pylint: disable=broad-except
594-
logger.exception('Error in registering extractor: ' + str(exc))
579+
for url in endpoints.split(','):
580+
if url not in Connector.registered_clowder:
581+
Connector.registered_clowder.append(url)
582+
try:
583+
result = requests.post(url.strip(), headers=headers,
584+
data=json.dumps(data),
585+
verify=self.ssl_verify)
586+
result.raise_for_status()
587+
logger.debug("Registering extractor with %s : %s", url, result.text)
588+
except Exception as exc: # pylint: disable=broad-except
589+
logger.exception('Error in registering extractor: ' + str(exc))
595590

596591
# pylint: disable=no-self-use
597592
def status_update(self, status, resource, message):

pyclowder/files.py

Lines changed: 48 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from pyclowder.datasets import get_file_list
1616
from pyclowder.collections import get_datasets, get_child_collections
1717

18+
from dotenv import load_dotenv
19+
load_dotenv()
20+
clowder_version = float(os.getenv('clowder_version'))
21+
1822
# Some sources of urllib3 support warning suppression, but not all
1923
try:
2024
from urllib3 import disable_warnings
@@ -25,7 +29,7 @@
2529

2630

2731
# pylint: disable=too-many-arguments
28-
def download(connector, host, key, fileid, intermediatefileid=None, ext=""):
32+
def download(connector, host, key, fileid, intermediatefileid=None, ext="", token=None):
2933
"""Download file to be processed from Clowder.
3034
3135
Keyword arguments:
@@ -39,60 +43,44 @@ def download(connector, host, key, fileid, intermediatefileid=None, ext=""):
3943

4044
connector.message_process({"type": "file", "id": fileid}, "Downloading file.")
4145

42-
# TODO: intermediateid doesn't really seem to be used here, can we remove entirely?
43-
if not intermediatefileid:
44-
intermediatefileid = fileid
45-
46-
url = '%sapi/files/%s?key=%s' % (host, intermediatefileid, key)
47-
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True)
4846

49-
(inputfile, inputfilename) = tempfile.mkstemp(suffix=ext)
50-
51-
try:
52-
with os.fdopen(inputfile, "wb") as outputfile:
53-
for chunk in result.iter_content(chunk_size=10*1024):
54-
outputfile.write(chunk)
55-
return inputfilename
56-
except Exception:
57-
os.remove(inputfilename)
58-
raise
59-
60-
61-
# pylint: disable=too-many-arguments
62-
def download_v2(connector, host, token, fileid, intermediatefileid=None, ext=""):
63-
"""Download file to be processed from Clowder.
64-
65-
Keyword arguments:
66-
connector -- connector information, used to get missing parameters and send status updates
67-
host -- the clowder host, including http and port, should end with a /
68-
key -- the secret key to login to clowder
69-
fileid -- the file that is currently being processed
70-
intermediatefileid -- either same as fileid, or the intermediate file to be used
71-
ext -- the file extension, the downloaded file will end with this extension
72-
"""
73-
74-
connector.message_process({"type": "file", "id": fileid}, "Downloading file.")
7547

7648
# TODO: intermediateid doesn't really seem to be used here, can we remove entirely?
7749
if not intermediatefileid:
7850
intermediatefileid = fileid
7951

80-
url = '%sapi/v2/files/%s' % (host, intermediatefileid)
81-
headers = {"Authorization": "Bearer " + token}
82-
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True, headers=headers)
52+
if clowder_version >= 2.0:
53+
url = '%sapi/v2/files/%s' % (host, intermediatefileid)
54+
headers = {"Authorization": "Bearer " + token}
55+
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True, headers=headers)
56+
57+
(inputfile, inputfilename) = tempfile.mkstemp(suffix=ext)
58+
59+
try:
60+
with os.fdopen(inputfile, "wb") as outputfile:
61+
for chunk in result.iter_content(chunk_size=10 * 1024):
62+
outputfile.write(chunk)
63+
return inputfilename
64+
except Exception:
65+
os.remove(inputfilename)
66+
raise
67+
else:
68+
url = '%sapi/files/%s?key=%s' % (host, intermediatefileid, key)
69+
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True)
70+
71+
(inputfile, inputfilename) = tempfile.mkstemp(suffix=ext)
8372

84-
(inputfile, inputfilename) = tempfile.mkstemp(suffix=ext)
73+
try:
74+
with os.fdopen(inputfile, "wb") as outputfile:
75+
for chunk in result.iter_content(chunk_size=10*1024):
76+
outputfile.write(chunk)
77+
return inputfilename
78+
except Exception:
79+
os.remove(inputfilename)
80+
raise
8581

86-
try:
87-
with os.fdopen(inputfile, "wb") as outputfile:
88-
for chunk in result.iter_content(chunk_size=10*1024):
89-
outputfile.write(chunk)
90-
return inputfilename
91-
except Exception:
92-
os.remove(inputfilename)
93-
raise
9482

95-
def download_info(connector, host, key, fileid):
83+
def download_info(connector, host, key, fileid, token=None):
9684
"""Download file summary metadata from Clowder.
9785
9886
Keyword arguments:
@@ -102,30 +90,21 @@ def download_info(connector, host, key, fileid):
10290
fileid -- the file to fetch metadata of
10391
"""
10492

105-
url = '%sapi/files/%s/metadata?key=%s' % (host, fileid, key)
106-
headers = {"Authorization": "Bearer " + token}
107-
108-
# fetch data
109-
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True)
110-
111-
return result.json()
112-
113-
def download_info_v2(connector, host, token, fileid):
114-
"""Download file summary metadata from Clowder.
93+
if clowder_version >= 2.0:
94+
url = '%sapi/v2/files/%s/metadata' % (host, fileid)
95+
headers = {"Authorization": "Bearer " + token}
96+
# fetch data
97+
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True, headers=headers)
11598

116-
Keyword arguments:
117-
connector -- connector information, used to get missing parameters and send status updates
118-
host -- the clowder host, including http and port, should end with a /
119-
key -- the secret key to login to clowder
120-
fileid -- the file to fetch metadata of
121-
"""
99+
return result.json()
100+
else:
101+
url = '%sapi/files/%s/metadata?key=%s' % (host, fileid, key)
102+
headers = {"Authorization": "Bearer " + token}
122103

123-
url = '%sapi/v2/files/%s/metadata' % (host, fileid)
124-
headers = {"Authorization": "Bearer " + token}
125-
# fetch data
126-
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True, headers=headers)
104+
# fetch data
105+
result = connector.get(url, stream=True, verify=connector.ssl_verify if connector else True)
127106

128-
return result.json()
107+
return result.json()
129108

130109

131110
def download_metadata(connector, host, key, fileid, extractor=None):
@@ -235,6 +214,8 @@ def upload_metadata(connector, host, key, fileid, metadata):
235214
connector.message_process({"type": "file", "id": fileid}, "Uploading file metadata.")
236215

237216
headers = {'Content-Type': 'application/json'}
217+
# TODO if version 2.0
218+
238219
url = '%sapi/files/%s/metadata.jsonld?key=%s' % (host, fileid, key)
239220
result = connector.post(url, headers=headers, data=json.dumps(metadata),
240221
verify=connector.ssl_verify if connector else True)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
ARG PYCLOWDER_PYTHON=""
2+
FROM clowder/pyclowder${PYCLOWDER_PYTHON}:onbuild
3+
4+
ENV MAIN_SCRIPT="binary_extractor.py" \
5+
RABBITMQ_QUEUE="" \
6+
IMAGE_BINARY="" \
7+
IMAGE_TYPE="" \
8+
IMAGE_THUMBNAIL_COMMAND="" \
9+
IMAGE_PREVIEW_COMMAND="" \
10+
PREVIEW_BINARY="" \
11+
PREVIEW_TYPE="" \
12+
PREVIEW_COMMAND=""
13+
14+
ONBUILD COPY packages.* Dockerfile /home/clowder/
15+
ONBUILD RUN if [ -e packages.apt ]; then \
16+
apt-get -q -q update \
17+
&& xargs apt-get -y install --no-install-recommends < packages.apt \
18+
&& rm -rf /var/lib/apt/lists/*; \
19+
fi
20+
21+
ONBUILD COPY extractor_info.json /home/clowder/

0 commit comments

Comments
 (0)