@@ -19,6 +19,7 @@ class RabbitMQMonitor():
1919 def __init__ (self , rabbitmq_uri , extractor_name ):
2020 self .connection = None
2121 self .channel = None
22+
2223 self .rabbitmq_uri = rabbitmq_uri
2324 self .queue_basic = extractor_name
2425 self .queue_error = "error." + extractor_name
@@ -95,6 +96,8 @@ def on_message(self, channel, method, header, body):
9596 json_body = json .loads (self ._decode_body (body ))
9697 if 'routing_key' not in json_body and method .routing_key :
9798 json_body ['routing_key' ] = method .routing_key
99+ json_body ["header" ] = header
100+ json_body ["method" ] = method
98101
99102 self .worker = threading .Thread (target = self .evaluate_message , args = (json_body ,))
100103 self .worker .start ()
@@ -114,82 +117,115 @@ def process_messages(self, channel):
114117 while self .messages :
115118 with self .lock :
116119 msg = self .messages .pop (0 )
120+ method = msg ["body" ]["method" ]
121+ header = msg ["body" ]["header" ]
122+ del msg ["body" ]["method" ]
123+ del msg ["body" ]["header" ]
117124
118125 if msg ["type" ] == 'missing' :
119- jbody = json . loads ( self . body )
120- logging . error ( "%s [%s] message removed." % ( jbody [ "resource_type" ], jbody [ "resource_id" ]) )
121- channel . basic_ack ( self . method . delivery_tag )
126+ logging . getLogger ( __name__ ). error ( "%s [%s] message removed." % ( msg [ "resource_type" ], msg [ "resource_id" ]) )
127+ channel . basic_ack ( method . delivery_tag )
128+
122129 with self .lock :
123130 self .finished = True
124131
125132 elif msg ["type" ] == 'resubmit' :
126- properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self . header .reply_to )
133+ properties = pika .BasicProperties (delivery_mode = 2 , reply_to = header .reply_to )
127134 # Reset retry count to 0
128135 # TODO: If resource exists but retry count > some N, should we stop bouncing it back to main queue?
129- jbody = json .loads (self .body )
130- jbody ["retry_count" ] = 0
131- logging .error ("%s [%s] message resubmitted." % (jbody ["resource_type" ], jbody ["resource_id" ]))
136+ msg ["body" ]["retry_count" ] = 0
137+ logging .getLogger (__name__ ).error ("%s [%s] message resubmitted." % (msg ["resource_type" ], msg ["resource_id" ]))
132138 channel .basic_publish (exchange = '' ,
133139 routing_key = self .queue_basic ,
134140 properties = properties ,
135- body = json .dumps (jbody ))
136- channel .basic_ack (self . method .delivery_tag )
141+ body = json .dumps (msg [ "body" ] ))
142+ channel .basic_ack (method .delivery_tag )
137143 with self .lock :
138144 self .finished = True
139145
140146 else :
141147 logging .getLogger (__name__ ).error ("Received unknown message type [%s]." % msg ["type" ])
142148
143- def evaluate_message (self , jbody ):
149+ def evaluate_message (self , body ):
144150 # TODO: If dataset or file, check existence as necessary.
145- logging .getLogger (__name__ ).info (jbody )
146- host = jbody .get ('host' , '' )
151+ host = body .get ('host' , '' )
147152 if host == '' :
148153 return
149154 elif not host .endswith ('/' ):
150155 host += '/'
151- key = jbody .get ("secretKey" , '' )
152- res_type = jbody ["type" ]
153- res_id = jbody ["id" ]
156+ host = host .replace ("localhost" , "host.docker.internal" )
157+ key = body .get ("secretKey" , '' )
158+
159+ fileid = body .get ('id' , '' )
160+ datasetid = body .get ('datasetId' , '' )
161+
162+ # determine resource type; defaults to file
163+ resource_type = "file"
164+ message_type = body ['routing_key' ]
165+ if message_type .find (".dataset." ) > - 1 :
166+ resource_type = "dataset"
167+ elif message_type .find (".file." ) > - 1 :
168+ resource_type = "file"
169+ elif message_type .find ("metadata.added" ) > - 1 :
170+ resource_type = "metadata"
171+ elif message_type == "extractors." + self .queue_basic :
172+ # This was a manually submitted extraction
173+ if datasetid == fileid :
174+ resource_type = "dataset"
175+ else :
176+ resource_type = "file"
154177
155- logging .getLogger (__name__ ).info ("Checking %s?key=%s" % (host , key ))
156- r = self .check_existence (host , key , res_type , res_id )
178+ if resource_type == "dataset" :
179+ resource_id = datasetid
180+ else :
181+ resource_id = fileid
182+
183+ r = self .check_existence (host , key , resource_type , resource_id )
157184 resubmit = False
158185
159186 if r .status_code == 200 :
160187 # The erroneous resource exists, so resubmit to main queue
161188 resubmit = True
162189 elif r .status_code == 401 :
163190 # Unauthorized to view resource, but it exists so resubmit (extractor might use other creds)
164- logging .getLogger (__name__ ).error ("Not authorized: %s [%s]" % (res_type , res_id ))
191+ logging .getLogger (__name__ ).error ("Not authorized: %s [%s]" % (resource_type , resource_id ))
165192 resubmit = True
166193 else :
167- logging .getLogger (__name__ ).error ("%s: %s [%s]" % (r , res_type , res_id ))
194+ logging .getLogger (__name__ ).error ("%s: %s [%s]" % (r , resource_type , resource_id ))
168195 self .messages .append ({
169196 "type" : "missing" ,
170- "resource_type" : res_type ,
171- "resource_id" : res_id
197+ "resource_type" : resource_type ,
198+ "resource_id" : resource_id ,
199+ "body" : body
172200 })
173201
174202 if resubmit :
175203 self .messages .append ({
176204 "type" : "resubmit" ,
177- "resource_type" : res_type ,
178- "resource_id" : res_id
205+ "resource_type" : resource_type ,
206+ "resource_id" : resource_id ,
207+ "body" : body
179208 })
180- # Return status code of request for the resource from Clowder
209+
210+ # Return response of request for the resource from Clowder
181211 def check_existence (self , host , key , resource_type , resource_id ):
182212 # TODO: Is there a better exists URL to use?
183- clowder_url = "%sapi/%s/%s/metadata?key=%s" % (host , resource_type , resource_id , key )
213+ clowder_url = "%sapi/%ss/%s/metadata?key=%s" % (host , resource_type , resource_id , key )
214+ logging .getLogger (__name__ ).info (clowder_url )
184215 r = requests .get (clowder_url )
185- return r . status_code
216+ return r
186217
187218 def is_finished (self ):
188219 with self .lock :
189220 return self .worker and not self .worker .isAlive () and self .finished and len (self .messages ) == 0
190221
191222
192- if __name__ == '__main__' :
223+ if __name__ == "__main__" :
224+ logging .basicConfig (format = '%(asctime)-15s [%(threadName)-15s] %(levelname)-7s :'
225+ ' %(name)s - %(message)s' ,
226+ level = logging .INFO )
227+ logging .getLogger ('requests.packages.urllib3.connectionpool' ).setLevel (logging .WARN )
228+
193229 rabbitmq_uri = os .getenv ('RABBITMQ_URI' , 'amqp://guest:guest@localhost:5672/%2f' )
194230 extractor_name = os .getenv ('EXTRACTOR_QUEUE' , 'ncsa.image.preview' )
195231 monitor = RabbitMQMonitor (rabbitmq_uri , extractor_name )
0 commit comments