@@ -63,7 +63,7 @@ class Connector(object):
6363 """
6464
6565 def __init__ (self , extractor_name , extractor_info , check_message = None , process_message = None , ssl_verify = True ,
66- mounted_paths = None , clowder_url = None , max_retry = 10 ):
66+ mounted_paths = None , clowder_url = None , max_retry = 10 , extractor_key = None , clowder_email = None ):
6767 self .extractor_name = extractor_name
6868 self .extractor_info = extractor_info
6969 self .check_message = check_message
@@ -74,6 +74,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
7474 else :
7575 self .mounted_paths = mounted_paths
7676 self .clowder_url = clowder_url
77+ self .clowder_email = clowder_email
78+ self .extractor_key = extractor_key
79+ if extractor_key :
80+ self .extractor_info ["unique_key" ] = extractor_key
7781 self .max_retry = max_retry
7882
7983 filename = 'notifications.json'
@@ -625,15 +629,18 @@ class RabbitMQConnector(Connector):
625629 def __init__ (self , extractor_name , extractor_info ,
626630 rabbitmq_uri , rabbitmq_key = None , rabbitmq_queue = None ,
627631 check_message = None , process_message = None , ssl_verify = True , mounted_paths = None ,
628- heartbeat = 5 * 60 , clowder_url = None , max_retry = 10 ):
632+ heartbeat = 10 , clowder_url = None , max_retry = 10 , extractor_key = None , clowder_email = None ):
629633 super (RabbitMQConnector , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
630- ssl_verify , mounted_paths , clowder_url , max_retry )
634+ ssl_verify , mounted_paths , clowder_url , max_retry , extractor_key , clowder_email )
631635 self .rabbitmq_uri = rabbitmq_uri
632636 self .rabbitmq_key = rabbitmq_key
633637 if rabbitmq_queue is None :
634638 self .rabbitmq_queue = extractor_info ['name' ]
635639 else :
636640 self .rabbitmq_queue = rabbitmq_queue
641+ self .extractor_key = extractor_key
642+ if extractor_key :
643+ self .rabbitmq_queue = "private.%s.%s" % (extractor_key , self .rabbitmq_queue )
637644 self .channel = None
638645 self .connection = None
639646 self .consumer_tag = None
@@ -659,7 +666,7 @@ def connect(self):
659666 self .channel .queue_declare (queue = 'error.' + self .rabbitmq_queue , durable = True )
660667
661668 # start the extractor announcer
662- self .announcer = RabbitMQBroadcast (self .rabbitmq_uri , self .extractor_info , self .rabbitmq_queue , self .heartbeat )
669+ self .announcer = RabbitMQBroadcast (self .rabbitmq_uri , self .extractor_info , self .clowder_email , self . rabbitmq_queue , self .heartbeat )
663670 self .announcer .start_thread ()
664671
665672 def listen (self ):
@@ -765,10 +772,11 @@ def on_message(self, channel, method, header, body):
765772
766773
767774class RabbitMQBroadcast :
768- def __init__ (self , rabbitmq_uri , extractor_info , rabbitmq_queue , heartbeat ):
775+ def __init__ (self , rabbitmq_uri , extractor_info , clowder_email , rabbitmq_queue , heartbeat ):
769776 self .active = True
770777 self .rabbitmq_uri = rabbitmq_uri
771778 self .extractor_info = extractor_info
779+ self .clowder_email = clowder_email
772780 self .rabbitmq_queue = rabbitmq_queue
773781 self .heartbeat = heartbeat
774782 self .id = str (uuid .uuid4 ())
@@ -798,6 +806,7 @@ def send_heartbeat(self):
798806 message = {
799807 'id' : self .id ,
800808 'queue' : self .rabbitmq_queue ,
809+ 'owner' : self .clowder_email ,
801810 'extractor_info' : self .extractor_info
802811 }
803812 next_heartbeat = time .time ()
0 commit comments