@@ -63,7 +63,7 @@ class Connector(object):
6363 """
6464
6565 def __init__ (self , extractor_name , extractor_info , check_message = None , process_message = None , ssl_verify = True ,
66- mounted_paths = None , clowder_url = None , max_retry = 10 ):
66+ mounted_paths = None , clowder_url = None , max_retry = 10 , extractor_key = None , clowder_email = None ):
6767 self .extractor_name = extractor_name
6868 self .extractor_info = extractor_info
6969 self .check_message = check_message
@@ -74,6 +74,8 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
7474 else :
7575 self .mounted_paths = mounted_paths
7676 self .clowder_url = clowder_url
77+ self .clowder_email = clowder_email
78+ self .extractor_key = extractor_key
7779 self .max_retry = max_retry
7880
7981 filename = 'notifications.json'
@@ -625,15 +627,18 @@ class RabbitMQConnector(Connector):
625627 def __init__ (self , extractor_name , extractor_info ,
626628 rabbitmq_uri , rabbitmq_key = None , rabbitmq_queue = None ,
627629 check_message = None , process_message = None , ssl_verify = True , mounted_paths = None ,
628- heartbeat = 5 * 60 , clowder_url = None , max_retry = 10 ):
630+ heartbeat = 10 , clowder_url = None , max_retry = 10 , extractor_key = None , clowder_email = None ):
629631 super (RabbitMQConnector , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
630- ssl_verify , mounted_paths , clowder_url , max_retry )
632+ ssl_verify , mounted_paths , clowder_url , max_retry , extractor_key , clowder_email )
631633 self .rabbitmq_uri = rabbitmq_uri
632634 self .rabbitmq_key = rabbitmq_key
633635 if rabbitmq_queue is None :
634636 self .rabbitmq_queue = extractor_info ['name' ]
635637 else :
636638 self .rabbitmq_queue = rabbitmq_queue
639+ self .extractor_key = extractor_key
640+ if extractor_key :
641+ self .rabbitmq_queue = "private.%s.%s" % (extractor_key , self .rabbitmq_queue )
637642 self .channel = None
638643 self .connection = None
639644 self .consumer_tag = None
@@ -659,7 +664,7 @@ def connect(self):
659664 self .channel .queue_declare (queue = 'error.' + self .rabbitmq_queue , durable = True )
660665
661666 # start the extractor announcer
662- self .announcer = RabbitMQBroadcast (self .rabbitmq_uri , self .extractor_info , self .rabbitmq_queue , self .heartbeat )
667+ self .announcer = RabbitMQBroadcast (self .rabbitmq_uri , self .extractor_info , self .clowder_email , self . rabbitmq_queue , self .heartbeat )
663668 self .announcer .start_thread ()
664669
665670 def listen (self ):
@@ -765,10 +770,11 @@ def on_message(self, channel, method, header, body):
765770
766771
767772class RabbitMQBroadcast :
768- def __init__ (self , rabbitmq_uri , extractor_info , rabbitmq_queue , heartbeat ):
773+ def __init__ (self , rabbitmq_uri , extractor_info , clowder_email , rabbitmq_queue , heartbeat ):
769774 self .active = True
770775 self .rabbitmq_uri = rabbitmq_uri
771776 self .extractor_info = extractor_info
777+ self .clowder_email = clowder_email
772778 self .rabbitmq_queue = rabbitmq_queue
773779 self .heartbeat = heartbeat
774780 self .id = str (uuid .uuid4 ())
@@ -798,6 +804,7 @@ def send_heartbeat(self):
798804 message = {
799805 'id' : self .id ,
800806 'queue' : self .rabbitmq_queue ,
807+ 'owner' : self .clowder_email ,
801808 'extractor_info' : self .extractor_info
802809 }
803810 next_heartbeat = time .time ()
0 commit comments