1616parameters:
1717
1818* rabbitmq_uri [REQUIRED] : the uri of the RabbitMQ server
19- * rabbitmq_exchange [OPTIONAL] : the exchange to which to bind the queue
2019* rabbitmq_key [OPTIONAL] : the key that binds the queue to the exchange
2120
2221HPCConnector
@@ -63,8 +62,6 @@ class Connector(object):
6362 that there is only one Connector per thread.
6463 """
6564
66- registered_clowder = list ()
67-
6865 def __init__ (self , extractor_name , extractor_info , check_message = None , process_message = None , ssl_verify = True ,
6966 mounted_paths = None , clowder_url = None , max_retry = 10 ):
7067 self .extractor_name = extractor_name
@@ -382,8 +379,7 @@ def _prepare_dataset(self, host, secret_key, resource):
382379 def _process_message (self , body ):
383380 """The actual processing of the message.
384381
385- This will register the extractor with the clowder instance that the message came from.
386- Next it will call check_message to see if the message should be processed and if the
382+ This will call check_message to see if the message should be processed and if the
387383 file should be downloaded. Finally it will call the actual process_message function.
388384 """
389385
@@ -408,13 +404,6 @@ def _process_message(self, body):
408404 logging .error ("No resource found, this is bad." )
409405 return
410406
411- # register extractor
412- if clowder_version != 2 :
413- url = "%sapi/extractors" % source_host
414- if url not in Connector .registered_clowder :
415- Connector .registered_clowder .append (url )
416- self .register_extractor ("%s?key=%s" % (url ,secret_key ))
417-
418407 # tell everybody we are starting to process the file
419408 self .status_update (pyclowder .utils .StatusMessage .start , resource , "Started processing." )
420409
@@ -516,32 +505,6 @@ def _process_message(self, body):
516505 else :
517506 self .message_error (resource , message )
518507
519- def register_extractor (self , endpoints ):
520- """Register extractor info with Clowder.
521-
522- This assumes a file called extractor_info.json to be located in either the
523- current working directory, or the folder where the main program is started.
524- """
525- if not endpoints or endpoints == "" :
526- return
527-
528- logger = logging .getLogger (__name__ )
529-
530- headers = {'Content-Type' : 'application/json' }
531- data = self .extractor_info
532-
533- for url in endpoints .split (',' ):
534- if url not in Connector .registered_clowder :
535- Connector .registered_clowder .append (url )
536- try :
537- result = requests .post (url .strip (), headers = headers ,
538- data = json .dumps (data ),
539- verify = self .ssl_verify )
540- result .raise_for_status ()
541- logger .debug ("Registering extractor with %s : %s" , url , result .text )
542- except Exception as exc : # pylint: disable=broad-except
543- logger .exception ('Error in registering extractor: ' + str (exc ))
544-
545508 # pylint: disable=no-self-use
546509 def status_update (self , status , resource , message ):
547510 """Sends a status message.
@@ -637,21 +600,17 @@ def delete(self, url, raise_status=True, **kwargs):
637600class RabbitMQConnector (Connector ):
638601 """Listens for messages on RabbitMQ.
639602
640- This will connect to rabbitmq and register the extractor with a queue. If the exchange
641- and key are specified it will bind the exchange to the queue. If an exchange is
642- specified it will always try to bind the special key extractors.<extractor_info[name]> to the
643- exchange and queue.
603+ This will connect to rabbitmq and register the extractor with a queue.
644604 """
645605
646606 # pylint: disable=too-many-arguments
647607 def __init__ (self , extractor_name , extractor_info ,
648- rabbitmq_uri , rabbitmq_exchange = None , rabbitmq_key = None , rabbitmq_queue = None ,
608+ rabbitmq_uri , rabbitmq_key = None , rabbitmq_queue = None ,
649609 check_message = None , process_message = None , ssl_verify = True , mounted_paths = None ,
650610 heartbeat = 5 * 60 , clowder_url = None , max_retry = 10 ):
651611 super (RabbitMQConnector , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
652612 ssl_verify , mounted_paths , clowder_url , max_retry )
653613 self .rabbitmq_uri = rabbitmq_uri
654- self .rabbitmq_exchange = rabbitmq_exchange
655614 self .rabbitmq_key = rabbitmq_key
656615 if rabbitmq_queue is None :
657616 self .rabbitmq_queue = extractor_info ['name' ]
@@ -681,28 +640,6 @@ def connect(self):
681640 self .channel .queue_declare (queue = self .rabbitmq_queue , durable = True )
682641 self .channel .queue_declare (queue = 'error.' + self .rabbitmq_queue , durable = True )
683642
684- # register with an exchange
685- if self .rabbitmq_exchange :
686- # declare the exchange in case it does not exist
687- self .channel .exchange_declare (exchange = self .rabbitmq_exchange , exchange_type = 'topic' ,
688- durable = True )
689-
690- # connect queue and exchange
691- if self .rabbitmq_key :
692- if isinstance (self .rabbitmq_key , str ):
693- self .channel .queue_bind (queue = self .rabbitmq_queue ,
694- exchange = self .rabbitmq_exchange ,
695- routing_key = self .rabbitmq_key )
696- else :
697- for key in self .rabbitmq_key :
698- self .channel .queue_bind (queue = self .rabbitmq_queue ,
699- exchange = self .rabbitmq_exchange ,
700- routing_key = key )
701-
702- self .channel .queue_bind (queue = self .rabbitmq_queue ,
703- exchange = self .rabbitmq_exchange ,
704- routing_key = self .extractor_name )
705-
706643 # start the extractor announcer
707644 self .announcer = RabbitMQBroadcast (self .rabbitmq_uri , self .extractor_info , self .rabbitmq_queue , self .heartbeat )
708645 self .announcer .start_thread ()
@@ -938,8 +875,6 @@ def process_messages(self, channel, rabbitmq_queue):
938875 elif msg ["type" ] == 'resubmit' :
939876 jbody = json .loads (self .body )
940877 jbody ['retry_count' ] = msg ['retry_count' ]
941- if 'exchange' not in jbody and self .method .exchange :
942- jbody ['exchange' ] = self .method .exchange
943878 if 'routing_key' not in jbody and self .method .routing_key and self .method .routing_key != rabbitmq_queue :
944879 jbody ['routing_key' ] = self .method .routing_key
945880
0 commit comments