Skip to content

Commit 822d923

Browse files
committed
add extractor_key param
1 parent 98c1fea commit 822d923

File tree

2 files changed

+24
-7
lines changed

2 files changed

+24
-7
lines changed

pyclowder/connectors.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class Connector(object):
6666
registered_clowder = list()
6767

6868
def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
69-
mounted_paths=None, clowder_url=None, max_retry=10):
69+
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None):
7070
self.extractor_name = extractor_name
7171
self.extractor_info = extractor_info
7272
self.check_message = check_message
@@ -77,6 +77,7 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
7777
else:
7878
self.mounted_paths = mounted_paths
7979
self.clowder_url = clowder_url
80+
self.extractor_key = extractor_key
8081
self.max_retry = max_retry
8182

8283
filename = 'notifications.json'
@@ -391,10 +392,16 @@ def _process_message(self, body):
391392
return
392393

393394
# register extractor
394-
url = "%sapi/extractors" % source_host
395-
if url not in Connector.registered_clowder:
396-
Connector.registered_clowder.append(url)
397-
self.register_extractor("%s?key=%s" % (url, secret_key))
395+
if self.extractor_key is None:
396+
url = "%sapi/extractors" % source_host
397+
if url not in Connector.registered_clowder:
398+
Connector.registered_clowder.append(url)
399+
self.register_extractor("%s?key=%s" % (url, secret_key))
400+
else:
401+
url = "%sapi/extractors/private/%s" % (source_host, self.extractor_key)
402+
if url not in Connector.registered_clowder:
403+
Connector.registered_clowder.append(url)
404+
self.register_extractor("%s?key=%s" % (url, secret_key))
398405

399406
# tell everybody we are starting to process the file
400407
self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing.")
@@ -630,7 +637,7 @@ class RabbitMQConnector(Connector):
630637
def __init__(self, extractor_name, extractor_info,
631638
rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None,
632639
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
633-
heartbeat=5*60, clowder_url=None, max_retry=10):
640+
heartbeat=5*60, clowder_url=None, max_retry=10, extractor_key=None):
634641
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
635642
ssl_verify, mounted_paths, clowder_url, max_retry)
636643
self.rabbitmq_uri = rabbitmq_uri
@@ -640,6 +647,9 @@ def __init__(self, extractor_name, extractor_info,
640647
self.rabbitmq_queue = extractor_info['name']
641648
else:
642649
self.rabbitmq_queue = rabbitmq_queue
650+
self.extractor_key = extractor_key
651+
if extractor_key is not None:
652+
self.rabbitmq_queue += extractor_key
643653
self.channel = None
644654
self.connection = None
645655
self.consumer_tag = None

pyclowder/extractors.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def __init__(self):
6666
rabbitmq_exchange = os.getenv('RABBITMQ_EXCHANGE', "")
6767
clowder_url = os.getenv("CLOWDER_URL", "")
6868
registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "")
69+
extractor_key = os.getenv("EXTRACTOR_KEY", "")
6970
logging_config = os.getenv("LOGGING")
7071
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
7172
input_file_path = os.getenv("INPUT_FILE_PATH")
@@ -90,6 +91,9 @@ def __init__(self):
9091
self.parser.add_argument('--register', '-r', nargs='?', dest="registration_endpoints",
9192
default=registration_endpoints,
9293
help='Clowder registration URL (default=%s)' % registration_endpoints)
94+
self.parser.add_argument('--key', '-k', dest="extractor_key",
95+
default=extractor_key,
96+
help='Unique key to use for extractor queue ID (sets extractor to private)')
9397
self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri,
9498
help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%"))
9599
self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename',
@@ -155,6 +159,7 @@ def start(self):
155159
else:
156160
rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", ".")))
157161

162+
logger.info('Creating connector with key '+self.args.extractor_key)
158163
connector = RabbitMQConnector(self.args.rabbitmq_queuename,
159164
self.extractor_info,
160165
check_message=self.check_message,
@@ -165,8 +170,10 @@ def start(self):
165170
rabbitmq_queue=self.args.rabbitmq_queuename,
166171
mounted_paths=json.loads(self.args.mounted_paths),
167172
clowder_url=self.args.clowder_url,
168-
max_retry=self.args.max_retry)
173+
max_retry=self.args.max_retry,
174+
extractor_key=self.args.extractor_key)
169175
connector.connect()
176+
logger.info("new version check OK")
170177
connector.register_extractor(self.args.registration_endpoints)
171178
threading.Thread(target=connector.listen, name="RabbitMQConnector").start()
172179

0 commit comments

Comments
 (0)