@@ -230,7 +230,6 @@ def _build_resource(self, body, host, secret_key):
230230 "type" : "dataset" ,
231231 "id" : datasetid
232232 }
233- self .status_update (pyclowder .utils .StatusMessage .error , resource , msg )
234233 self .message_error (resource )
235234 return None
236235
@@ -392,7 +391,7 @@ def _process_message(self, body):
392391 self .register_extractor ("%s?key=%s" % (url , secret_key ))
393392
394393 # tell everybody we are starting to process the file
395- self .status_update (pyclowder .utils .StatusMessage .start , resource , "Started processing" )
394+ self .status_update (pyclowder .utils .StatusMessage .start . value , resource , "Started processing. " )
396395
397396 # checks whether to process the file in this message or not
398397 # pylint: disable=too-many-nested-blocks
@@ -456,41 +455,41 @@ def _process_message(self, body):
456455 logger .exception ("Error removing temporary dataset directory" )
457456
458457 else :
459- self .status_update (pyclowder .utils .StatusMessage .processing , resource , "Skipped in check_message" )
458+ self .status_update (pyclowder .utils .StatusMessage .skip . value , resource , "Skipped in check_message" )
460459
461460 self .message_ok (resource )
462461
463462 except SystemExit as exc :
464- status = "sys.exit : " + str (exc )
465- logger .exception ("[%s] %s" , resource ['id' ], status )
466- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
467- self .message_resubmit (resource , retry_count )
463+ message = str .format ("sys.exit: {}" , str (exc ))
464+ logger .exception ("[%s] %s" , resource ['id' ], message )
465+ self .message_resubmit (resource , retry_count , message )
468466 raise
469467 except KeyboardInterrupt :
470- status = "keyboard interrupt"
471- logger .exception ("[%s] %s" , resource ['id' ], status )
472- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
473- self .message_resubmit (resource , retry_count )
468+ message = "keyboard interrupt"
469+ logger .exception ("[%s] %s" , resource ['id' ], message )
470+ self .message_resubmit (resource , retry_count , message )
474471 raise
475472 except GeneratorExit :
476- status = "generator exit"
477- logger .exception ("[%s] %s" , resource ['id' ], status )
478- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
479- self .message_resubmit (resource , retry_count )
473+ message = "generator exit"
474+ logger .exception ("[%s] %s" , resource ['id' ], message )
475+ self .message_resubmit (resource , retry_count , message )
480476 raise
481477 except subprocess .CalledProcessError as exc :
482- status = str .format ("Error processing [exit code={}]\n {}" , exc .returncode , exc .output )
483- logger .exception ("[%s] %s" , resource ['id' ], status )
484- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
485- self .message_error (resource )
478+ message = str .format ("Error in subprocess [exit code={}]:\n {}" , exc .returncode , exc .output )
479+ logger .exception ("[%s] %s" , resource ['id' ], message )
480+ self .message_error (resource , message )
481+ except PyClowderExtractionAbort as exc :
482+ message = str .format ("Aborting message: {}" , exc .message )
483+ logger .exception ("[%s] %s" , resource ['id' ], message )
484+ self .message_error (resource , message )
486485 except Exception as exc : # pylint: disable=broad-except
487- status = "Error processing : " + str (exc )
488- logger .exception ("[%s] %s" , resource ['id' ], status )
489- self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
486+ message = str (exc )
487+ logger .exception ("[%s] %s" , resource ['id' ], message )
490488 if retry_count < 10 :
491- self .message_resubmit (resource , retry_count + 1 )
489+ message = "(#%s) %s" % (retry_count + 1 , message )
490+ self .message_resubmit (resource , retry_count + 1 , message )
492491 else :
493- self .message_error (resource )
492+ self .message_error (resource , message )
494493
495494 def register_extractor (self , endpoints ):
496495 """Register extractor info with Clowder.
@@ -528,21 +527,23 @@ def status_update(self, status, resource, message):
528527 the instance know the progress of the extractor.
529528
530529 Keyword arguments:
531- status - START | PROCESSING | DONE | ERROR
530+ status - pyclowder.utils.StatusMessage value
532531 resource - descriptor object with {"type", "id"} fields
533532 message - contents of the status update
534533 """
535534 logging .getLogger (__name__ ).info ("[%s] : %s: %s" , resource ["id" ], status , message )
536535
537- def message_ok (self , resource ):
538- self .status_update (pyclowder .utils .StatusMessage .done , resource , "Done processing" )
536+ def message_ok (self , resource , message = "Done processing." ):
537+ self .status_update (pyclowder .utils .StatusMessage .done .value , resource , message )
538+
539+ def message_error (self , resource , message = "Error processing message." ):
540+ self .status_update (pyclowder .utils .StatusMessage .error .value , resource , message )
539541
540- def message_error (self , resource ):
541- self .status_update (pyclowder .utils .StatusMessage .error , resource , "Error processing message" )
542+ def message_resubmit (self , resource , retry_count , message = "Resubmitting message." ):
543+ self .status_update (pyclowder .utils .StatusMessage .retry . value , resource , message )
542544
543- def message_resubmit (self , resource , retry_count ):
544- self .status_update (pyclowder .utils .StatusMessage .processing , resource , "Resubmitting message (attempt #%s)"
545- % retry_count )
545+ def message_process (self , resource , message ):
546+ self .status_update (pyclowder .utils .StatusMessage .processing .value , resource , message )
546547
547548 def get (self , url , params = None , raise_status = True , ** kwargs ):
548549 """
@@ -877,19 +878,22 @@ def process_messages(self, channel, rabbitmq_queue):
877878 with self .lock :
878879 msg = self .messages .pop (0 )
879880
881+ # PROCESSING - Standard update message during extractor processing
880882 if msg ["type" ] == 'status' :
881883 if self .header .reply_to :
882884 properties = pika .BasicProperties (delivery_mode = 2 , correlation_id = self .header .correlation_id )
883885 channel .basic_publish (exchange = '' ,
884886 routing_key = self .header .reply_to ,
885887 properties = properties ,
886- body = json .dumps (msg ['status ' ]))
888+ body = json .dumps (msg ['payload ' ]))
887889
890+ # DONE - Extractor finished without error
888891 elif msg ["type" ] == 'ok' :
889892 channel .basic_ack (self .method .delivery_tag )
890893 with self .lock :
891894 self .finished = True
892895
896+ # ERROR - Extractor encountered error and message goes to error queue
893897 elif msg ["type" ] == 'error' :
894898 properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
895899 channel .basic_publish (exchange = '' ,
@@ -900,18 +904,18 @@ def process_messages(self, channel, rabbitmq_queue):
900904 with self .lock :
901905 self .finished = True
902906
907+ # RESUBMITTING - Extractor encountered error and message is resubmitted to same queue
903908 elif msg ["type" ] == 'resubmit' :
904- retry_count = msg ['retry_count' ]
905- queue = rabbitmq_queue
906- properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
907909 jbody = json .loads (self .body )
908- jbody ['retry_count' ] = retry_count
910+ jbody ['retry_count' ] = msg [ ' retry_count' ]
909911 if 'exchange' not in jbody and self .method .exchange :
910912 jbody ['exchange' ] = self .method .exchange
911- if 'routing_key' not in jbody and self .method .routing_key and self .method .routing_key != queue :
913+ if 'routing_key' not in jbody and self .method .routing_key and self .method .routing_key != rabbitmq_queue :
912914 jbody ['routing_key' ] = self .method .routing_key
915+
916+ properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
913917 channel .basic_publish (exchange = '' ,
914- routing_key = queue ,
918+ routing_key = rabbitmq_queue ,
915919 properties = properties ,
916920 body = json .dumps (jbody ))
917921 channel .basic_ack (self .method .delivery_tag )
@@ -923,31 +927,35 @@ def process_messages(self, channel, rabbitmq_queue):
923927
924928 def status_update (self , status , resource , message ):
925929 super (RabbitMQHandler , self ).status_update (status , resource , message )
926- status_report = dict ()
927- # TODO: Update this to check resource["type"] once Clowder better supports dataset events
928- status_report ['file_id' ] = resource ["id" ]
929- status_report ['job_id' ] = self .job_id
930- status_report ['extractor_id' ] = self .extractor_info ['name' ]
931- status_report ['status' ] = "%s: %s" % (status , message )
932- status_report ['start' ] = pyclowder .utils .iso8601time ()
930+
933931 with self .lock :
932+ # TODO: Remove 'status' from payload later and read from message_type and message in Clowder 2.0
934933 self .messages .append ({"type" : "status" ,
935- "status" : status_report ,
936934 "resource" : resource ,
937- "message" : message })
938-
939- def message_ok (self , resource ):
940- super (RabbitMQHandler , self ).message_ok (resource )
935+ "payload" : {
936+ "file_id" : resource ["id" ],
937+ "extractor_id" : self .extractor_info ['name' ],
938+ "job_id" : self .job_id ,
939+ "status" : "%s: %s" % (status , message ),
940+ "start" : pyclowder .utils .iso8601time (),
941+ "message_type" : status ,
942+ "message" : message
943+ }})
944+
945+ def message_ok (self , resource , message = "Done processing." ):
946+ super (RabbitMQHandler , self ).message_ok (resource , message )
941947 with self .lock :
942948 self .messages .append ({"type" : "ok" })
943949
944- def message_error (self , resource ):
945- super (RabbitMQHandler , self ).message_error (resource )
950+ def message_error (self , resource , message = "Error processing message." ):
951+ super (RabbitMQHandler , self ).message_error (resource , message )
946952 with self .lock :
947953 self .messages .append ({"type" : "error" })
948954
949- def message_resubmit (self , resource , retry_count ):
950- super (RabbitMQHandler , self ).message_resubmit (resource , retry_count )
955+ def message_resubmit (self , resource , retry_count , message = None ):
956+ if message is None :
957+ message = "(#%s)" % retry_count
958+ super (RabbitMQHandler , self ).message_resubmit (resource , retry_count , message )
951959 with self .lock :
952960 self .messages .append ({"type" : "resubmit" , "retry_count" : retry_count })
953961
@@ -1105,3 +1113,14 @@ def put(self, url, data=None, raise_status=True, **kwargs):
11051113 def delete (self , url , raise_status = True , ** kwargs ):
11061114 logging .getLogger (__name__ ).debug ("DELETE: " + url )
11071115 return None
1116+
1117+
1118+ class PyClowderExtractionAbort (Exception ):
1119+ """Raise exception that will not be subject to retry attempts (i.e. errors that are expected to fail again).
1120+
1121+ Attributes:
1122+ message -- explanation of the error
1123+ """
1124+
1125+ def __init__ (self , message ):
1126+ self .message = message
0 commit comments