5555from email .mime .multipart import MIMEMultipart
5656from string import Template
5757
58- from dotenv import load_dotenv
59- load_dotenv ()
60-
61- clowder_version = float (os .getenv ('clowder_version' , '1.0' ))
62-
6358
6459class Connector (object ):
6560 """ Class that will listen for messages.
@@ -139,7 +134,7 @@ def alive(self):
139134 """Return whether connection is still alive or not."""
140135 return True
141136
142- def _build_resource (self , body , host , secret_key ):
137+ def _build_resource (self , body , host , secret_key , clowder_version ):
143138 """Examine message body and create resource object based on message type.
144139
145140 Example FILE message -- *.file.#
@@ -242,7 +237,7 @@ def _build_resource(self, body, host, secret_key):
242237
243238 elif resource_type == "file" :
244239 ext = os .path .splitext (filename )[1 ]
245- if float ( os . getenv ( ' clowder_version' )) == 2.0 :
240+ if clowder_version == 2 :
246241 return {
247242 "type" : "file" ,
248243 "id" : fileid ,
@@ -407,22 +402,18 @@ def _process_message(self, body):
407402 if not host .endswith ('/' ): host += '/'
408403 secret_key = body .get ('secretKey' , '' )
409404 retry_count = 0 if 'retry_count' not in body else body ['retry_count' ]
410- resource = self ._build_resource (body , host , secret_key )
405+ clowder_version = int (body .get ('clowderVersion' , os .getenv ('CLOWDER_VERSION' , '1' )))
406+ resource = self ._build_resource (body , host , secret_key , clowder_version )
411407 if not resource :
412408 logging .error ("No resource found, this is bad." )
413409 return
414410
415411 # register extractor
416- if clowder_version >= 2.0 :
417- url = "%sapi/v2/extractors" % source_host
418- else :
412+ if clowder_version != 2 :
419413 url = "%sapi/extractors" % source_host
420- if url not in Connector .registered_clowder :
421- Connector .registered_clowder .append (url )
422- if clowder_version >= 2.0 :
414+ if url not in Connector .registered_clowder :
415+ Connector .registered_clowder .append (url )
423416 self .register_extractor ("%s?key=%s" % (url ,secret_key ))
424- else :
425- self .register_extractor ("%s?key=%s" % (url , secret_key ))
426417
427418 # tell everybody we are starting to process the file
428419 self .status_update (pyclowder .utils .StatusMessage .start , resource , "Started processing." )
@@ -442,18 +433,10 @@ def _process_message(self, body):
442433 found_local = False
443434 try :
444435 if check_result != pyclowder .utils .CheckMessage .bypass :
445- if clowder_version >= 2.0 :
446- file_metadata = pyclowder .files .download_info (self , host , secret_key , resource ["id" ])
447- else :
448- file_metadata = pyclowder .files .download_info (self , host , secret_key , resource ["id" ])
436+ file_metadata = pyclowder .files .download_info (self , host , secret_key , resource ["id" ])
449437 file_path = self ._check_for_local_file (file_metadata )
450438 if not file_path :
451- if clowder_version >= 2.0 :
452- file_path = pyclowder .files .download (self , host , secret_key , resource ["id" ],
453- resource ["intermediate_id" ],
454- resource ["file_ext" ])
455- else :
456- file_path = pyclowder .files .download (self , host , secret_key , resource ["id" ],
439+ file_path = pyclowder .files .download (self , host , secret_key , resource ["id" ],
457440 resource ["intermediate_id" ],
458441 resource ["file_ext" ])
459442 else :
@@ -539,48 +522,24 @@ def register_extractor(self, endpoints):
539522 This assumes a file called extractor_info.json to be located in either the
540523 current working directory, or the folder where the main program is started.
541524 """
542- if clowder_version >= 2.0 :
543- if not endpoints or endpoints == "" :
544- return
545-
546- logger = logging .getLogger (__name__ )
547-
548- headers = {'Content-Type' : 'application/json' }
549- data = self .extractor_info
550-
551- for url in endpoints .split (',' ):
552- if url not in Connector .registered_clowder :
553- Connector .registered_clowder .append (url )
554- try :
555- result = requests .post (url .strip (), headers = headers ,
556- data = json .dumps (data ),
557- verify = self .ssl_verify )
558- result .raise_for_status ()
559- logger .debug ("Registering extractor with %s : %s" , url , result .text )
560- except Exception as exc : # pylint: disable=broad-except
561- logger .exception ('Error in registering extractor: ' + str (exc ))
562- else :
563- # don't do any work if we wont register the endpoint
564- if not endpoints or endpoints == "" :
565- return
566-
567- logger = logging .getLogger (__name__ )
568-
569- headers = {'Content-Type' : 'application/json' }
570- data = self .extractor_info
525+ if not endpoints or endpoints == "" :
526+ return
571527
528+ logger = logging .getLogger (__name__ )
572529
530+ headers = {'Content-Type' : 'application/json' }
531+ data = self .extractor_info
573532
574- for url in endpoints .split (',' ):
575- if url not in Connector .registered_clowder :
576- Connector .registered_clowder .append (url )
577- try :
578- result = requests .post (url .strip (), headers = headers ,
579- data = json .dumps (data ),
580- verify = self .ssl_verify )
581- result .raise_for_status ()
582- logger .debug ("Registering extractor with %s : %s" , url , result .text )
583- except Exception as exc : # pylint: disable=broad-except
533+ for url in endpoints .split (',' ):
534+ if url not in Connector .registered_clowder :
535+ Connector .registered_clowder .append (url )
536+ try :
537+ result = requests .post (url .strip (), headers = headers ,
538+ data = json .dumps (data ),
539+ verify = self .ssl_verify )
540+ result .raise_for_status ()
541+ logger .debug ("Registering extractor with %s : %s" , url , result .text )
542+ except Exception as exc : # pylint: disable=broad-except
584543 logger .exception ('Error in registering extractor: ' + str (exc ))
585544
586545 # pylint: disable=no-self-use
@@ -742,7 +701,7 @@ def connect(self):
742701
743702 self .channel .queue_bind (queue = self .rabbitmq_queue ,
744703 exchange = self .rabbitmq_exchange ,
745- routing_key = "extractors." + self .extractor_name )
704+ routing_key = self .extractor_name )
746705
747706 # start the extractor announcer
748707 self .announcer = RabbitMQBroadcast (self .rabbitmq_uri , self .extractor_info , self .rabbitmq_queue , self .heartbeat )
0 commit comments