55Given a particular RabbitMQ instance and extractor name, this will check the error queue of that extractor for messages.
66- If the dataset or file in the message exists, send the message back to the main queue.
77- If it does not exist, generate a log entry and delete the message from the error queue permanently.
8+
9+ If the CLOWDER_HOST and CLOWDER_KEY environment variables are set, they will be used to check the existence of
10+ files and datasets instead of the host and key found in the RabbitMQ message.
811"""
912import os
1013import logging
1619
1720# Simplified version of pyClowder's RabbitMQConnector class.
1821class RabbitMQMonitor ():
19- def __init__ (self , rabbitmq_uri , extractor_name ):
22+ def __init__ (self , rabbitmq_uri , extractor_name , clowder_host , clowder_key ):
2023 self .connection = None
2124 self .channel = None
2225
2326 self .rabbitmq_uri = rabbitmq_uri
2427 self .queue_basic = extractor_name
2528 self .queue_error = "error." + extractor_name
29+ self .clowder_host = clowder_host
30+ self .clowder_key = clowder_key
2631
2732 self .worker = None
2833 self .finished = False
@@ -123,7 +128,7 @@ def process_messages(self, channel):
123128 del msg ["body" ]["header" ]
124129
125130 if msg ["type" ] == 'missing' :
126- logging .getLogger (__name__ ).error ("%s [%s] message removed." % (msg ["resource_type" ], msg ["resource_id" ]))
131+ logging .getLogger (__name__ ).info ("%s [%s] removed." % (msg ["resource_type" ], msg ["resource_id" ]))
127132 channel .basic_ack (method .delivery_tag )
128133
129134 with self .lock :
@@ -134,7 +139,7 @@ def process_messages(self, channel):
134139 # Reset retry count to 0
135140 # TODO: If resource exists but retry count > some N, should we stop bouncing it back to main queue?
136141 msg ["body" ]["retry_count" ] = 0
137- logging .getLogger (__name__ ).error ("%s [%s] message resubmitted." % (msg ["resource_type" ], msg ["resource_id" ]))
142+ logging .getLogger (__name__ ).info ("%s [%s] resubmitted." % (msg ["resource_type" ], msg ["resource_id" ]))
138143 channel .basic_publish (exchange = '' ,
139144 routing_key = self .queue_basic ,
140145 properties = properties ,
@@ -147,13 +152,11 @@ def process_messages(self, channel):
147152 logging .getLogger (__name__ ).error ("Received unknown message type [%s]." % msg ["type" ])
148153
149154 def evaluate_message (self , body ):
150- # TODO: If dataset or file, check existence as necessary.
151155 host = body .get ('host' , '' )
152156 if host == '' :
153157 return
154158 elif not host .endswith ('/' ):
155159 host += '/'
156- host = host .replace ("localhost" , "host.docker.internal" )
157160 key = body .get ("secretKey" , '' )
158161
159162 fileid = body .get ('id' , '' )
@@ -185,13 +188,14 @@ def evaluate_message(self, body):
185188
186189 if r .status_code == 200 :
187190 # The erroneous resource exists, so resubmit to main queue
191+ logging .getLogger (__name__ ).error ("%s [%s]: Resubmitting." % (resource_type , resource_id ))
188192 resubmit = True
189193 elif r .status_code == 401 :
190194 # Unauthorized to view resource, but it exists so resubmit (extractor might use other creds)
191- logging .getLogger (__name__ ).error ("Not authorized: %s [%s]" % (resource_type , resource_id ))
195+ logging .getLogger (__name__ ).error ("%s [%s]: Credentials not authorized. Resubmitting. " % (resource_type , resource_id ))
192196 resubmit = True
193197 else :
194- logging .getLogger (__name__ ).error ("%s: %s [%s] " % (r , resource_type , resource_id ))
198+ logging .getLogger (__name__ ).error ("%s [%s]: %s. Removing. " % (resource_type , resource_id , r ))
195199 self .messages .append ({
196200 "type" : "missing" ,
197201 "resource_type" : resource_type ,
@@ -209,8 +213,12 @@ def evaluate_message(self, body):
209213
210214 # Return response of request for the resource from Clowder
211215 def check_existence (self , host , key , resource_type , resource_id ):
216+ # Perform replacements from environment variables if needed
217+ host_url = self .clowder_host if self .clowder_host != '' else host
218+ if not host_url .endswith ('/' ): host_url += '/'
219+ secret_key = self .clowder_key if self .clowder_key != '' else key
212220 # TODO: Is there a better exists URL to use?
213- clowder_url = "%sapi/%ss/%s/metadata?key=%s" % (host , resource_type , resource_id , key )
221+ clowder_url = "%sapi/%ss/%s/metadata?key=%s" % (host_url , resource_type , resource_id , secret_key )
214222 r = requests .get (clowder_url )
215223 return r
216224
@@ -227,5 +235,7 @@ def is_finished(self):
227235
228236 rabbitmq_uri = os .getenv ('RABBITMQ_URI' , 'amqp://guest:guest@localhost:5672/%2f' )
229237 extractor_name = os .getenv ('EXTRACTOR_QUEUE' , 'ncsa.image.preview' )
230- monitor = RabbitMQMonitor (rabbitmq_uri , extractor_name )
238+ clowder_host = os .getenv ('CLOWDER_HOST' , '' )
239+ clowder_key = os .getenv ('CLOWDER_KEY' , '' )
240+ monitor = RabbitMQMonitor (rabbitmq_uri , extractor_name , clowder_host , clowder_key )
231241 monitor .listen ()
0 commit comments