@@ -58,7 +58,9 @@ class Connector(object):
5858
5959 registered_clowder = list ()
6060
61- def __init__ (self , extractor_info , check_message = None , process_message = None , ssl_verify = True , mounted_paths = None ):
61+ def __init__ (self , extractor_name , extractor_info , check_message = None , process_message = None , ssl_verify = True ,
62+ mounted_paths = None ):
63+ self .extractor_name = extractor_name
6264 self .extractor_info = extractor_info
6365 self .check_message = check_message
6466 self .process_message = process_message
@@ -135,13 +137,14 @@ def _build_resource(self, body, host, secret_key):
135137 resource_type = "file"
136138 elif message_type .find ("metadata.added" ) > - 1 :
137139 resource_type = "metadata"
138- elif message_type == "extractors." + self .extractor_info ['name' ]:
140+ elif message_type == "extractors." + self .extractor_name \
141+ or message_type == "extractors." + self .extractor_info ['name' ]:
139142 # This was a manually submitted extraction
140143 if datasetid == fileid :
141144 resource_type = "dataset"
142145 else :
143146 resource_type = "file"
144- elif message_type .endswith (self .extractor_info ['name' ]):
147+ elif message_type .endswith (self .extractor_info ['name' ]) or message_type . endswith ( self . extractor_name ) :
145148 # This was migrated from another queue (e.g. error queue) so use extractor default
146149 for key , value in self .extractor_info ['process' ].iteritems ():
147150 if key == "dataset" :
@@ -551,9 +554,10 @@ class RabbitMQConnector(Connector):
551554 """
552555
553556 # pylint: disable=too-many-arguments
554- def __init__ (self , extractor_info , rabbitmq_uri , rabbitmq_exchange = None , rabbitmq_key = None ,
557+ def __init__ (self , extractor_name , extractor_info , rabbitmq_uri , rabbitmq_exchange = None , rabbitmq_key = None ,
555558 check_message = None , process_message = None , ssl_verify = True , mounted_paths = None ):
556- Connector .__init__ (self , extractor_info , check_message , process_message , ssl_verify , mounted_paths )
559+ super (RabbitMQConnector , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
560+ ssl_verify , mounted_paths )
557561 self .rabbitmq_uri = rabbitmq_uri
558562 self .rabbitmq_exchange = rabbitmq_exchange
559563 self .rabbitmq_key = rabbitmq_key
@@ -576,8 +580,8 @@ def connect(self):
576580 self .channel .basic_qos (prefetch_count = 1 )
577581
578582 # declare the queue in case it does not exist
579- self .channel .queue_declare (queue = self .extractor_info [ 'name' ] , durable = True )
580- self .channel .queue_declare (queue = 'error.' + self .extractor_info [ 'name' ] , durable = True )
583+ self .channel .queue_declare (queue = self .extractor_name , durable = True )
584+ self .channel .queue_declare (queue = 'error.' + self .extractor_name , durable = True )
581585
582586 # register with an exchange
583587 if self .rabbitmq_exchange :
@@ -588,18 +592,18 @@ def connect(self):
588592 # connect queue and exchange
589593 if self .rabbitmq_key :
590594 if isinstance (self .rabbitmq_key , str ):
591- self .channel .queue_bind (queue = self .extractor_info [ 'name' ] ,
595+ self .channel .queue_bind (queue = self .extractor_name ,
592596 exchange = self .rabbitmq_exchange ,
593597 routing_key = self .rabbitmq_key )
594598 else :
595599 for key in self .rabbitmq_key :
596- self .channel .queue_bind (queue = self .extractor_info [ 'name' ] ,
600+ self .channel .queue_bind (queue = self .extractor_name ,
597601 exchange = self .rabbitmq_exchange ,
598602 routing_key = key )
599603
600- self .channel .queue_bind (queue = self .extractor_info [ 'name' ] ,
604+ self .channel .queue_bind (queue = self .extractor_name ,
601605 exchange = self .rabbitmq_exchange ,
602- routing_key = "extractors." + self .extractor_info [ 'name' ] )
606+ routing_key = "extractors." + self .extractor_name )
603607
604608 def listen (self ):
605609 """Listen for messages coming from RabbitMQ"""
@@ -609,8 +613,7 @@ def listen(self):
609613 self .connect ()
610614
611615 # create listener
612- self .consumer_tag = self .channel .basic_consume (self .on_message , queue = self .extractor_info ['name' ],
613- no_ack = False )
616+ self .consumer_tag = self .channel .basic_consume (self .on_message , queue = self .extractor_name , no_ack = False )
614617
615618 # start listening
616619 logging .getLogger (__name__ ).info ("Starting to listen for messages." )
@@ -665,8 +668,8 @@ def on_message(self, channel, method, header, body):
665668 if 'routing_key' not in json_body and method .routing_key :
666669 json_body ['routing_key' ] = method .routing_key
667670
668- self .worker = RabbitMQHandler (self .extractor_info , self .check_message , self .process_message ,
669- self .ssl_verify , self .mounted_paths , method , header , body )
671+ self .worker = RabbitMQHandler (self .extractor_name , self .extractor_info , self .check_message ,
672+ self .process_message , self . ssl_verify , self .mounted_paths , method , header , body )
670673 self .worker .start_thread (json_body )
671674
672675
@@ -677,9 +680,10 @@ class RabbitMQHandler(Connector):
677680 a queue of messages that the super- loop can access and send later.
678681 """
679682
680- def __init__ (self , extractor_info , check_message = None , process_message = None , ssl_verify = True ,
683+ def __init__ (self , extractor_name , extractor_info , check_message = None , process_message = None , ssl_verify = True ,
681684 mounted_paths = None , method = None , header = None , body = None ):
682- Connector .__init__ (self , extractor_info , check_message , process_message , ssl_verify , mounted_paths )
685+ super (RabbitMQHandler , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
686+ ssl_verify , mounted_paths )
683687 self .method = method
684688 self .header = header
685689 self .body = body
@@ -728,7 +732,7 @@ def process_messages(self, channel):
728732 elif msg ["type" ] == 'error' :
729733 properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
730734 channel .basic_publish (exchange = '' ,
731- routing_key = 'error.' + self .extractor_info [ 'name' ] ,
735+ routing_key = 'error.' + self .extractor_name ,
732736 properties = properties ,
733737 body = self .body )
734738 channel .basic_ack (self .method .delivery_tag )
@@ -737,7 +741,7 @@ def process_messages(self, channel):
737741
738742 elif msg ["type" ] == 'resubmit' :
739743 retry_count = msg ['retry_count' ]
740- queue = self .extractor_info [ 'name' ]
744+ queue = self .extractor_name
741745 properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
742746 jbody = json .loads (self .body )
743747 jbody ['retry_count' ] = retry_count
@@ -790,9 +794,10 @@ class HPCConnector(Connector):
790794 """Takes pickle files and processes them."""
791795
792796 # pylint: disable=too-many-arguments
793- def __init__ (self , extractor_info , picklefile ,
797+ def __init__ (self , extractor_name , extractor_info , picklefile ,
794798 check_message = None , process_message = None , ssl_verify = True , mounted_paths = None ):
795- Connector .__init__ (self , extractor_info , check_message , process_message , ssl_verify , mounted_paths )
799+ super (HPCConnector , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
800+ ssl_verify , mounted_paths )
796801 self .picklefile = picklefile
797802 self .logfile = None
798803
@@ -847,8 +852,8 @@ class LocalConnector(Connector):
847852
848853 """
849854
850- def __init__ (self , extractor_info , input_file_path , process_message = None , output_file_path = None ):
851- super (LocalConnector , self ).__init__ (extractor_info , process_message = process_message )
855+ def __init__ (self , extractor_name , extractor_info , input_file_path , process_message = None , output_file_path = None ):
856+ super (LocalConnector , self ).__init__ (extractor_name , extractor_info , process_message = process_message )
852857 self .input_file_path = input_file_path
853858 self .output_file_path = output_file_path
854859 self .completed_processing = False
0 commit comments