1010import logging
1111import json
1212import threading
13+ import requests
1314import pika
1415
1516
@@ -116,7 +117,7 @@ def process_messages(self, channel):
116117
117118 if msg ["type" ] == 'missing' :
118119 jbody = json .loads (self .body )
119- logging .error ("%s %s no longer exists ." % (jbody ["resource_type" ], jbody ["resource_id" ]))
120+ logging .error ("%s [%s] message removed ." % (jbody ["resource_type" ], jbody ["resource_id" ]))
120121 channel .basic_ack (self .method .delivery_tag )
121122 with self .lock :
122123 self .finished = True
@@ -127,7 +128,7 @@ def process_messages(self, channel):
127128 # TODO: If resource exists but retry count > some N, should we stop bouncing it back to main queue?
128129 jbody = json .loads (self .body )
129130 jbody ["retry_count" ] = 0
130- logging .error ("%s %s goes back to the main queue, precious! " % (jbody ["resource_type" ], jbody ["resource_id" ]))
131+ logging .error ("%s [%s] message resubmitted. " % (jbody ["resource_type" ], jbody ["resource_id" ]))
131132 channel .basic_publish (exchange = '' ,
132133 routing_key = self .queue_basic ,
133134 properties = properties ,
@@ -141,21 +142,56 @@ def process_messages(self, channel):
141142
142143 def evaluate_message (self , jbody ):
143144 # TODO: If dataset or file, check existence as necessary.
144- self .messages .append ({
145- "type" : "resubmit" ,
146- "resource_type" : jbody ["type" ],
147- "resource_id" : jbody ["id" ]
148- })
145+ logging .getLogger (__name__ ).info (jbody )
146+ host = jbody .get ('host' , '' )
147+ if host == '' :
148+ return
149+ elif not host .endswith ('/' ):
150+ host += '/'
151+ key = jbody .get ("secretKey" , '' )
152+ res_type = jbody ["type" ]
153+ res_id = jbody ["id" ]
154+
155+ logging .getLogger (__name__ ).info ("Checking %s?key=%s" % (host , key ))
156+ r = self .check_existence (host , key , res_type , res_id )
157+ resubmit = False
158+
159+ if r .status_code == 200 :
160+ # The erroneous resource exists, so resubmit to main queue
161+ resubmit = True
162+ elif r .status_code == 401 :
163+ # Unauthorized to view resource, but it exists so resubmit (extractor might use other creds)
164+ logging .getLogger (__name__ ).error ("Not authorized: %s [%s]" % (res_type , res_id ))
165+ resubmit = True
166+ else :
167+ logging .getLogger (__name__ ).error ("%s: %s [%s]" % (r , res_type , res_id ))
168+ self .messages .append ({
169+ "type" : "missing" ,
170+ "resource_type" : res_type ,
171+ "resource_id" : res_id
172+ })
173+
174+ if resubmit :
175+ self .messages .append ({
176+ "type" : "resubmit" ,
177+ "resource_type" : res_type ,
178+ "resource_id" : res_id
179+ })
180+ # Return status code of request for the resource from Clowder
181+ def check_existence (self , host , key , resource_type , resource_id ):
182+ # TODO: Is there a better exists URL to use?
183+ clowder_url = "%sapi/%s/%s/metadata?key=%s" % (host , resource_type , resource_id , key )
184+ r = requests .get (clowder_url )
185+ return r .status_code
149186
150187 def is_finished (self ):
151188 with self .lock :
152189 return self .worker and not self .worker .isAlive () and self .finished and len (self .messages ) == 0
153190
154191
155- logging .getLogger (__name__ ).info ("Starting to listen" )
156- print ("Starting to listen..." )
157- rabbitmq_uri = os .getenv ('RABBITMQ_URI' , 'amqp://guest:guest@localhost:5672/%2f' )
158- extractor_name = os .getenv ('EXTRACTOR_QUEUE' , 'ncsa.image.preview' )
159- monitor = RabbitMQMonitor (rabbitmq_uri , extractor_name )
160- logging .getLogger (__name__ ).info ("Starting to listen to " + extractor_name )
161- monitor .listen ()
192+ if __name__ == '__main__' :
193+ rabbitmq_uri = os .getenv ('RABBITMQ_URI' , 'amqp://guest:guest@localhost:5672/%2f' )
194+ extractor_name = os .getenv ('EXTRACTOR_QUEUE' , 'ncsa.image.preview' )
195+ monitor = RabbitMQMonitor (rabbitmq_uri , extractor_name )
196+ logging .getLogger (__name__ ).info ("Starting to listen to " + extractor_name )
197+ monitor .listen ()
0 commit comments