4848
4949import pyclowder .datasets
5050import pyclowder .files
51+ import pyclowder .api .v1 .files as v1files
52+ import pyclowder .api .v2 .files as v2files
5153import pyclowder .utils
5254
5355import smtplib
5860from dotenv import load_dotenv
5961load_dotenv ()
6062
61- clowder_version = float (os .getenv ('clowder_version' , '1.0' ))
62-
6363
6464class Connector (object ):
6565 """ Class that will listen for messages.
@@ -139,7 +139,7 @@ def alive(self):
139139 """Return whether connection is still alive or not."""
140140 return True
141141
142- def _build_resource (self , body , host , secret_key ):
142+ def _build_resource (self , body , host , secret_key , clowder_version ):
143143 """Examine message body and create resource object based on message type.
144144
145145 Example FILE message -- *.file.#
@@ -242,7 +242,7 @@ def _build_resource(self, body, host, secret_key):
242242
243243 elif resource_type == "file" :
244244 ext = os .path .splitext (filename )[1 ]
245- if float ( os . getenv ( ' clowder_version' )) == 2.0 :
245+ if clowder_version == 2.0 :
246246 return {
247247 "type" : "file" ,
248248 "id" : fileid ,
@@ -407,22 +407,21 @@ def _process_message(self, body):
407407 if not host .endswith ('/' ): host += '/'
408408 secret_key = body .get ('secretKey' , '' )
409409 retry_count = 0 if 'retry_count' not in body else body ['retry_count' ]
410- resource = self ._build_resource (body , host , secret_key )
410+ clowder_version = float (body .get ('clowderVersion' , os .getenv ('clowder_version' , '1.0' )))
411+ resource = self ._build_resource (body , host , secret_key , clowder_version )
411412 if not resource :
412413 logging .error ("No resource found, this is bad." )
413414 return
414415
415416 # register extractor
416- if clowder_version >= 2.0 :
417+ if clowder_version == 2.0 :
418+ # TODO: Shouldn't heartbeat handle this?
417419 url = "%sapi/v2/extractors" % source_host
418420 else :
419421 url = "%sapi/extractors" % source_host
420422 if url not in Connector .registered_clowder :
421423 Connector .registered_clowder .append (url )
422- if clowder_version >= 2.0 :
423- self .register_extractor ("%s?key=%s" % (url ,secret_key ))
424- else :
425- self .register_extractor ("%s?key=%s" % (url , secret_key ))
424+ self .register_extractor ("%s?key=%s" % (url ,secret_key ))
426425
427426 # tell everybody we are starting to process the file
428427 self .status_update (pyclowder .utils .StatusMessage .start , resource , "Started processing." )
@@ -442,18 +441,18 @@ def _process_message(self, body):
442441 found_local = False
443442 try :
444443 if check_result != pyclowder .utils .CheckMessage .bypass :
445- if clowder_version > = 2.0 :
446- file_metadata = pyclowder . files .download_info (self , host , secret_key , resource ["id" ])
444+ if clowder_version = = 2.0 :
445+ file_metadata = v2files .download_info (self , host , secret_key , resource ["id" ])
447446 else :
448- file_metadata = pyclowder . files .download_info (self , host , secret_key , resource ["id" ])
447+ file_metadata = v1files .download_info (self , host , secret_key , resource ["id" ])
449448 file_path = self ._check_for_local_file (file_metadata )
450449 if not file_path :
451- if clowder_version > = 2.0 :
452- file_path = pyclowder . files .download (self , host , secret_key , resource ["id" ],
450+ if clowder_version = = 2.0 :
451+ file_path = v2files .download (self , host , secret_key , resource ["id" ],
453452 resource ["intermediate_id" ],
454453 resource ["file_ext" ])
455454 else :
456- file_path = pyclowder . files .download (self , host , secret_key , resource ["id" ],
455+ file_path = v1files .download (self , host , secret_key , resource ["id" ],
457456 resource ["intermediate_id" ],
458457 resource ["file_ext" ])
459458 else :
@@ -539,48 +538,24 @@ def register_extractor(self, endpoints):
539538 This assumes a file called extractor_info.json to be located in either the
540539 current working directory, or the folder where the main program is started.
541540 """
542- if clowder_version >= 2.0 :
543- if not endpoints or endpoints == "" :
544- return
545-
546- logger = logging .getLogger (__name__ )
547-
548- headers = {'Content-Type' : 'application/json' }
549- data = self .extractor_info
550-
551- for url in endpoints .split (',' ):
552- if url not in Connector .registered_clowder :
553- Connector .registered_clowder .append (url )
554- try :
555- result = requests .post (url .strip (), headers = headers ,
556- data = json .dumps (data ),
557- verify = self .ssl_verify )
558- result .raise_for_status ()
559- logger .debug ("Registering extractor with %s : %s" , url , result .text )
560- except Exception as exc : # pylint: disable=broad-except
561- logger .exception ('Error in registering extractor: ' + str (exc ))
562- else :
563- # don't do any work if we wont register the endpoint
564- if not endpoints or endpoints == "" :
565- return
566-
567- logger = logging .getLogger (__name__ )
568-
569- headers = {'Content-Type' : 'application/json' }
570- data = self .extractor_info
541+ if not endpoints or endpoints == "" :
542+ return
571543
544+ logger = logging .getLogger (__name__ )
572545
546+ headers = {'Content-Type' : 'application/json' }
547+ data = self .extractor_info
573548
574- for url in endpoints .split (',' ):
575- if url not in Connector .registered_clowder :
576- Connector .registered_clowder .append (url )
577- try :
578- result = requests .post (url .strip (), headers = headers ,
579- data = json .dumps (data ),
580- verify = self .ssl_verify )
581- result .raise_for_status ()
582- logger .debug ("Registering extractor with %s : %s" , url , result .text )
583- except Exception as exc : # pylint: disable=broad-except
549+ for url in endpoints .split (',' ):
550+ if url not in Connector .registered_clowder :
551+ Connector .registered_clowder .append (url )
552+ try :
553+ result = requests .post (url .strip (), headers = headers ,
554+ data = json .dumps (data ),
555+ verify = self .ssl_verify )
556+ result .raise_for_status ()
557+ logger .debug ("Registering extractor with %s : %s" , url , result .text )
558+ except Exception as exc : # pylint: disable=broad-except
584559 logger .exception ('Error in registering extractor: ' + str (exc ))
585560
586561 # pylint: disable=no-self-use
0 commit comments