@@ -230,7 +230,6 @@ def _build_resource(self, body, host, secret_key):
230230 "type" : "dataset" ,
231231 "id" : datasetid
232232 }
233- self .status_update (pyclowder .utils .StatusMessage .error , resource , msg )
234233 self .message_error (resource )
235234 return None
236235
@@ -392,7 +391,7 @@ def _process_message(self, body):
392391 self .register_extractor ("%s?key=%s" % (url , secret_key ))
393392
394393 # tell everybody we are starting to process the file
395- self .status_update (pyclowder .utils .StatusMessage .start , resource , "Started processing" )
394+ self .status_update (pyclowder .utils .StatusMessage .start . value , resource , "Started processing. " )
396395
397396 # checks whether to process the file in this message or not
398397 # pylint: disable=too-many-nested-blocks
@@ -456,41 +455,37 @@ def _process_message(self, body):
456455 logger .exception ("Error removing temporary dataset directory" )
457456
458457 else :
459- self .status_update (pyclowder .utils .StatusMessage .processing , resource , "Skipped in check_message" )
458+ self .status_update (pyclowder .utils .StatusMessage .skip . value , resource , "Skipped in check_message" )
460459
461460 self .message_ok (resource )
462461
463462 except SystemExit as exc :
464- status = "sys.exit : " + str (exc )
465- logger .exception ("[%s] %s" , resource ['id' ], status )
466- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
467- self .message_resubmit (resource , retry_count )
463+ message = str .format ("sys.exit: {}" , str (exc ))
464+ logger .exception ("[%s] %s" , resource ['id' ], message )
465+ self .message_resubmit (resource , retry_count , message )
468466 raise
469467 except KeyboardInterrupt :
470- status = "keyboard interrupt"
471- logger .exception ("[%s] %s" , resource ['id' ], status )
472- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
473- self .message_resubmit (resource , retry_count )
468+ message = "keyboard interrupt"
469+ logger .exception ("[%s] %s" , resource ['id' ], message )
470+ self .message_resubmit (resource , retry_count , message )
474471 raise
475472 except GeneratorExit :
476- status = "generator exit"
477- logger .exception ("[%s] %s" , resource ['id' ], status )
478- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
479- self .message_resubmit (resource , retry_count )
473+ message = "generator exit"
474+ logger .exception ("[%s] %s" , resource ['id' ], message )
475+ self .message_resubmit (resource , retry_count , message )
480476 raise
481477 except subprocess .CalledProcessError as exc :
482- status = str .format ("Error processing [exit code={}]\n {}" , exc .returncode , exc .output )
483- logger .exception ("[%s] %s" , resource ['id' ], status )
484- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
485- self .message_error (resource )
478+ message = str .format ("Error in subprocess [exit code={}]:\n {}" , exc .returncode , exc .output )
479+ logger .exception ("[%s] %s" , resource ['id' ], message )
480+ self .message_error (resource , message )
486481 except Exception as exc : # pylint: disable=broad-except
487- status = "Error processing : " + str (exc )
488- logger .exception ("[%s] %s" , resource ['id' ], status )
489- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
482+ message = str (exc )
483+ logger .exception ("[%s] %s" , resource ['id' ], message )
490484 if retry_count < 10 :
491- self .message_resubmit (resource , retry_count + 1 )
485+ message = "(#%s) %s" % (retry_count + 1 , message )
486+ self .message_resubmit (resource , retry_count + 1 , message )
492487 else :
493- self .message_error (resource )
488+ self .message_error (resource , message )
494489
495490 def register_extractor (self , endpoints ):
496491 """Register extractor info with Clowder.
@@ -528,21 +523,23 @@ def status_update(self, status, resource, message):
528523 the instance know the progress of the extractor.
529524
530525 Keyword arguments:
531- status - START | PROCESSING | DONE | ERROR
526+ status - pyclowder.utils.StatusMessage value
532527 resource - descriptor object with {"type", "id"} fields
533528 message - contents of the status update
534529 """
535530 logging .getLogger (__name__ ).info ("[%s] : %s: %s" , resource ["id" ], status , message )
536531
537- def message_ok (self , resource ):
538- self .status_update (pyclowder .utils .StatusMessage .done , resource , "Done processing" )
532+ def message_ok (self , resource , message = "Done processing." ):
533+ self .status_update (pyclowder .utils .StatusMessage .done .value , resource , message )
534+
535+ def message_error (self , resource , message = "Error processing message." ):
536+ self .status_update (pyclowder .utils .StatusMessage .error .value , resource , message )
539537
540- def message_error (self , resource ):
541- self .status_update (pyclowder .utils .StatusMessage .error , resource , "Error processing message" )
538+ def message_resubmit (self , resource , retry_count , message = "Resubmitting message." ):
539+ self .status_update (pyclowder .utils .StatusMessage .retry . value , resource , message )
542540
543- def message_resubmit (self , resource , retry_count ):
544- self .status_update (pyclowder .utils .StatusMessage .processing , resource , "Resubmitting message (attempt #%s)"
545- % retry_count )
541+ def message_process (self , resource , message ):
542+ self .status_update (pyclowder .utils .StatusMessage .processing .value , resource , message )
546543
547544 def get (self , url , params = None , raise_status = True , ** kwargs ):
548545 """
@@ -871,19 +868,22 @@ def process_messages(self, channel, rabbitmq_queue):
871868 with self .lock :
872869 msg = self .messages .pop (0 )
873870
871+ # PROCESSING - Standard update message during extractor processing
874872 if msg ["type" ] == 'status' :
875873 if self .header .reply_to :
876874 properties = pika .BasicProperties (delivery_mode = 2 , correlation_id = self .header .correlation_id )
877875 channel .basic_publish (exchange = '' ,
878876 routing_key = self .header .reply_to ,
879877 properties = properties ,
880- body = json .dumps (msg ['status ' ]))
878+ body = json .dumps (msg ['payload ' ]))
881879
880+ # DONE - Extractor finished without error
882881 elif msg ["type" ] == 'ok' :
883882 channel .basic_ack (self .method .delivery_tag )
884883 with self .lock :
885884 self .finished = True
886885
886+ # ERROR - Extractor encountered error and message goes to error queue
887887 elif msg ["type" ] == 'error' :
888888 properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
889889 channel .basic_publish (exchange = '' ,
@@ -894,18 +894,18 @@ def process_messages(self, channel, rabbitmq_queue):
894894 with self .lock :
895895 self .finished = True
896896
897+ # RESUBMITTING - Extractor encountered error and message is resubmitted to same queue
897898 elif msg ["type" ] == 'resubmit' :
898- retry_count = msg ['retry_count' ]
899- queue = rabbitmq_queue
900- properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
901899 jbody = json .loads (self .body )
902- jbody ['retry_count' ] = retry_count
900+ jbody ['retry_count' ] = msg [ ' retry_count' ]
903901 if 'exchange' not in jbody and self .method .exchange :
904902 jbody ['exchange' ] = self .method .exchange
905- if 'routing_key' not in jbody and self .method .routing_key and self .method .routing_key != queue :
903+ if 'routing_key' not in jbody and self .method .routing_key and self .method .routing_key != rabbitmq_queue :
906904 jbody ['routing_key' ] = self .method .routing_key
905+
906+ properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
907907 channel .basic_publish (exchange = '' ,
908- routing_key = queue ,
908+ routing_key = rabbitmq_queue ,
909909 properties = properties ,
910910 body = json .dumps (jbody ))
911911 channel .basic_ack (self .method .delivery_tag )
@@ -917,30 +917,33 @@ def process_messages(self, channel, rabbitmq_queue):
917917
918918 def status_update (self , status , resource , message ):
919919 super (RabbitMQHandler , self ).status_update (status , resource , message )
920- status_report = dict ()
921- # TODO: Update this to check resource["type"] once Clowder better supports dataset events
922- status_report ['file_id' ] = resource ["id" ]
923- status_report ['extractor_id' ] = self .extractor_info ['name' ]
924- status_report ['status' ] = "%s: %s" % (status , message )
925- status_report ['start' ] = pyclowder .utils .iso8601time ()
926920 with self .lock :
921+ # TODO: Remove 'status' from payload later and read from message_type and message in Clowder 2.0
927922 self .messages .append ({"type" : "status" ,
928- "status" : status_report ,
929923 "resource" : resource ,
930- "message" : message })
931-
932- def message_ok (self , resource ):
933- super (RabbitMQHandler , self ).message_ok (resource )
924+ "payload" : {
925+ "file_id" : resource ["id" ],
926+ "extractor_id" : self .extractor_info ['name' ],
927+ "status" : "%s: %s" % (status , message ),
928+ "start" : pyclowder .utils .iso8601time (),
929+ "message_type" : status ,
930+ "message" : message
931+ }})
932+
933+ def message_ok (self , resource , message = "Done processing." ):
934+ super (RabbitMQHandler , self ).message_ok (resource , message )
934935 with self .lock :
935936 self .messages .append ({"type" : "ok" })
936937
937- def message_error (self , resource ):
938- super (RabbitMQHandler , self ).message_error (resource )
938+ def message_error (self , resource , message = "Error processing message." ):
939+ super (RabbitMQHandler , self ).message_error (resource , message )
939940 with self .lock :
940941 self .messages .append ({"type" : "error" })
941942
942- def message_resubmit (self , resource , retry_count ):
943- super (RabbitMQHandler , self ).message_resubmit (resource , retry_count )
943+ def message_resubmit (self , resource , retry_count , message = None ):
944+ if message is None :
945+ message = "(#%s)" % retry_count
946+ super (RabbitMQHandler , self ).message_resubmit (resource , retry_count , message )
944947 with self .lock :
945948 self .messages .append ({"type" : "resubmit" , "retry_count" : retry_count })
946949
0 commit comments