Skip to content

Commit 05be2b1

Browse files
committed
add clowder email param
1 parent 7db6359 commit 05be2b1

File tree

2 files changed

+45
-28
lines changed

2 files changed

+45
-28
lines changed

pyclowder/connectors.py

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class Connector(object):
6666
registered_clowder = list()
6767

6868
def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
69-
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None):
69+
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
7070
self.extractor_name = extractor_name
7171
self.extractor_info = extractor_info
7272
self.check_message = check_message
@@ -77,6 +77,7 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
7777
else:
7878
self.mounted_paths = mounted_paths
7979
self.clowder_url = clowder_url
80+
self.clowder_email = clowder_email
8081
self.extractor_key = extractor_key
8182
self.max_retry = max_retry
8283

@@ -392,16 +393,10 @@ def _process_message(self, body):
392393
return
393394

394395
# register extractor
395-
if self.extractor_key is None:
396-
url = "%sapi/extractors" % source_host
397-
if url not in Connector.registered_clowder:
398-
Connector.registered_clowder.append(url)
399-
self.register_extractor("%s?key=%s" % (url, secret_key))
400-
else:
401-
url = "%sapi/extractors/private/%s" % (source_host, self.extractor_key)
402-
if url not in Connector.registered_clowder:
403-
Connector.registered_clowder.append(url)
404-
self.register_extractor("%s?key=%s" % (url, secret_key))
396+
url = "%sapi/extractors" % source_host
397+
if url not in Connector.registered_clowder:
398+
self.register_extractor("%s?key=%s" % (url, secret_key))
399+
Connector.registered_clowder.append(url)
405400

406401
# tell everybody we are starting to process the file
407402
self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing.")
@@ -519,18 +514,25 @@ def register_extractor(self, endpoints):
519514

520515
headers = {'Content-Type': 'application/json'}
521516
data = self.extractor_info
517+
if self.extractor_key is not None and len(self.extractor_key) > 0:
518+
data["unique_key"] = self.extractor_key
519+
logger.info("Registering extractor with key "+self.extractor_key)
520+
logger.info(endpoints)
522521

523522
for url in endpoints.split(','):
524-
if url not in Connector.registered_clowder:
525-
Connector.registered_clowder.append(url)
526-
try:
527-
result = requests.post(url.strip(), headers=headers,
528-
data=json.dumps(data),
529-
verify=self.ssl_verify)
530-
result.raise_for_status()
531-
logger.debug("Registering extractor with %s : %s", url, result.text)
532-
except Exception as exc: # pylint: disable=broad-except
533-
logger.exception('Error in registering extractor: ' + str(exc))
523+
logger.info(url)
524+
logger.info("submitting...")
525+
if "unique_key" in data:
526+
if url.find("?") > -1: url += "&user=%s" % self.clowder_email
527+
else: url += "?user=%s" % self.clowder_email # TODO: This will not work, need an auth key matching email
528+
try:
529+
result = requests.post(url.strip(), headers=headers,
530+
data=json.dumps(data),
531+
verify=self.ssl_verify)
532+
result.raise_for_status()
533+
logger.info("Registering extractor as %s : %s", url, result.text)
534+
except Exception as exc: # pylint: disable=broad-except
535+
logger.exception('Error in registering extractor: ' + str(exc))
534536

535537
# pylint: disable=no-self-use
536538
def status_update(self, status, resource, message):
@@ -637,9 +639,9 @@ class RabbitMQConnector(Connector):
637639
def __init__(self, extractor_name, extractor_info,
638640
rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None,
639641
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
640-
heartbeat=5*60, clowder_url=None, max_retry=10, extractor_key=None):
642+
heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
641643
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
642-
ssl_verify, mounted_paths, clowder_url, max_retry)
644+
ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email)
643645
self.rabbitmq_uri = rabbitmq_uri
644646
self.rabbitmq_exchange = rabbitmq_exchange
645647
self.rabbitmq_key = rabbitmq_key
@@ -649,7 +651,7 @@ def __init__(self, extractor_name, extractor_info,
649651
self.rabbitmq_queue = rabbitmq_queue
650652
self.extractor_key = extractor_key
651653
if extractor_key is not None:
652-
self.rabbitmq_queue += extractor_key
654+
self.rabbitmq_queue += ".UK__" + extractor_key
653655
self.channel = None
654656
self.connection = None
655657
self.consumer_tag = None
@@ -697,7 +699,7 @@ def connect(self):
697699
routing_key="extractors." + self.extractor_name)
698700

699701
# start the extractor announcer
700-
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.rabbitmq_queue, self.heartbeat)
702+
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.clowder_email, self.rabbitmq_queue, self.heartbeat)
701703
self.announcer.start_thread()
702704

703705
def listen(self):
@@ -801,10 +803,11 @@ def on_message(self, channel, method, header, body):
801803

802804

803805
class RabbitMQBroadcast:
804-
def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat):
806+
def __init__(self, rabbitmq_uri, extractor_info, clowder_email, rabbitmq_queue, heartbeat):
805807
self.active = True
806808
self.rabbitmq_uri = rabbitmq_uri
807809
self.extractor_info = extractor_info
810+
self.clowder_email = clowder_email
808811
self.rabbitmq_queue = rabbitmq_queue
809812
self.heartbeat = heartbeat
810813
self.id = str(uuid.uuid4())
@@ -834,13 +837,16 @@ def send_heartbeat(self):
834837
message = {
835838
'id': self.id,
836839
'queue': self.rabbitmq_queue,
840+
'owner': self.clowder_email,
837841
'extractor_info': self.extractor_info
838842
}
839843
next_heartbeat = time.time()
840844
while self.thread:
841845
try:
842846
self.channel.connection.process_data_events()
843847
if time.time() >= next_heartbeat:
848+
logging.getLogger(__name__).info("Sending heartbeat...")
849+
logging.getLogger(__name__).info(message)
844850
self.channel.basic_publish(exchange='extractors', routing_key='', body=json.dumps(message))
845851
next_heartbeat = time.time() + self.heartbeat
846852
except SystemExit:

pyclowder/extractors.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def __init__(self):
6767
clowder_url = os.getenv("CLOWDER_URL", "")
6868
registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "")
6969
extractor_key = os.getenv("EXTRACTOR_KEY", "")
70+
clowder_email = os.getenv("CLOWDER_EMAIL", "")
7071
logging_config = os.getenv("LOGGING")
7172
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
7273
input_file_path = os.getenv("INPUT_FILE_PATH")
@@ -94,6 +95,9 @@ def __init__(self):
9495
self.parser.add_argument('--key', '-k', dest="extractor_key",
9596
default=extractor_key,
9697
help='Unique key to use for extractor queue ID (sets extractor to private)')
98+
self.parser.add_argument('--user', '-u', dest="clowder_email",
99+
default=clowder_email,
100+
help='Email address of Clowder user who will initially be assigned ownership (ignored if no --key provided)')
97101
self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri,
98102
help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%"))
99103
self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename',
@@ -159,7 +163,6 @@ def start(self):
159163
else:
160164
rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", ".")))
161165

162-
logger.info('Creating connector with key '+self.args.extractor_key)
163166
connector = RabbitMQConnector(self.args.rabbitmq_queuename,
164167
self.extractor_info,
165168
check_message=self.check_message,
@@ -171,9 +174,17 @@ def start(self):
171174
mounted_paths=json.loads(self.args.mounted_paths),
172175
clowder_url=self.args.clowder_url,
173176
max_retry=self.args.max_retry,
174-
extractor_key=self.args.extractor_key)
177+
extractor_key=self.args.extractor_key,
178+
clowder_email=self.args.clowder_email)
175179
connector.connect()
176180
connector.register_extractor(self.args.registration_endpoints)
181+
182+
# TODO: register extractor initially without _process_message?
183+
url = "%sapi/extractors" % self.args.clowder_url
184+
if url not in connector.registered_clowder:
185+
connector.register_extractor("%s" % (url))
186+
connector.registered_clowder.append(url)
187+
177188
threading.Thread(target=connector.listen, name="RabbitMQConnector").start()
178189

179190
elif self.args.connector == "HPC":

0 commit comments

Comments
 (0)