@@ -254,7 +254,7 @@ def _build_resource(self, body, host, secret_key):
254254 "metadata" : body ['metadata' ]
255255 }
256256
257- def _check_for_local_file (self , host , secret_key , file_metadata ):
257+ def _check_for_local_file (self , file_metadata ):
258258 """ Try to get pointer to locally accessible copy of file for extractor."""
259259
260260 # first check if file is accessible locally
@@ -285,7 +285,7 @@ def _download_file_metadata(self, host, secret_key, fileid, filepath):
285285 md_dir = tempfile .mkdtemp (suffix = fileid )
286286 (fd , md_file ) = tempfile .mkstemp (suffix = md_name , dir = md_dir )
287287
288- with os .fdopen (fd , "w " ) as tmp_file :
288+ with os .fdopen (fd , "wb " ) as tmp_file :
289289 tmp_file .write (json .dumps (file_md ))
290290
291291 return (md_dir , md_file )
@@ -303,13 +303,13 @@ def _prepare_dataset(self, host, secret_key, resource):
303303 # first check if any files in dataset accessible locally
304304 ds_file_list = pyclowder .datasets .get_file_list (self , host , secret_key , resource ["id" ])
305305 for ds_file in ds_file_list :
306- file_path = self ._check_for_local_file (host , secret_key , ds_file )
306+ file_path = self ._check_for_local_file (ds_file )
307307 if not file_path :
308308 missing_files .append (ds_file )
309309 else :
310310 # Create a link to the original file if the "true" name of the file doesn't match what's on disk
311311 if not file_path .lower ().endswith (ds_file ['filename' ].lower ()):
312- ln_name = io .path .join (temp_link_dir , ds_file ['filename' ])
312+ ln_name = os .path .join (temp_link_dir , ds_file ['filename' ])
313313 os .symlink (file_path , ln_name )
314314 tmp_files_created .append (ln_name )
315315 file_path = ln_name
@@ -342,7 +342,7 @@ def _prepare_dataset(self, host, secret_key, resource):
342342 md_name = "%s_dataset_metadata.json" % resource ["id" ]
343343 md_dir = tempfile .mkdtemp (suffix = resource ["id" ])
344344 (fd , md_file ) = tempfile .mkstemp (suffix = md_name , dir = md_dir )
345- with os .fdopen (fd , "w " ) as tmp_file :
345+ with os .fdopen (fd , "wb " ) as tmp_file :
346346 tmp_file .write (json .dumps (ds_md ))
347347 located_files .append (md_file )
348348 tmp_files_created .append (md_file )
@@ -409,7 +409,7 @@ def _process_message(self, body):
409409 try :
410410 if check_result != pyclowder .utils .CheckMessage .bypass :
411411 file_metadata = pyclowder .files .download_info (self , host , secret_key , resource ["id" ])
412- file_path = self ._check_for_local_file (host , secret_key , file_metadata )
412+ file_path = self ._check_for_local_file (file_metadata )
413413 if not file_path :
414414 file_path = pyclowder .files .download (self , host , secret_key , resource ["id" ],
415415 resource ["intermediate_id" ],
@@ -460,7 +460,7 @@ def _process_message(self, body):
460460 self .message_ok (resource )
461461
462462 except SystemExit as exc :
463- status = "sys.exit : " + exc . message
463+ status = "sys.exit : " + str ( exc )
464464 logger .exception ("[%s] %s" , resource ['id' ], status )
465465 self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
466466 self .message_resubmit (resource , retry_count )
@@ -483,7 +483,7 @@ def _process_message(self, body):
483483 self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
484484 self .message_error (resource )
485485 except Exception as exc : # pylint: disable=broad-except
486- status = "Error processing : " + exc . message
486+ status = "Error processing : " + str ( exc )
487487 logger .exception ("[%s] %s" , resource ['id' ], status )
488488 self .status_update (pyclowder .utils .StatusMessage .error , resource , status )
489489 if retry_count < 10 :
@@ -635,6 +635,7 @@ def __init__(self, extractor_name, extractor_info,
635635 self .connection = None
636636 self .consumer_tag = None
637637 self .worker = None
638+ self .announcer = None
638639
639640 def connect (self ):
640641 """connect to rabbitmq using URL parameters"""
@@ -688,13 +689,15 @@ def listen(self):
688689 self .connect ()
689690
690691 # create listener
691- self .consumer_tag = self .channel .basic_consume (self .on_message , queue = self .rabbitmq_queue , no_ack = False )
692+ self .consumer_tag = self .channel .basic_consume (queue = self .rabbitmq_queue ,
693+ on_message_callback = self .on_message ,
694+ auto_ack = False )
692695
693696 # start listening
694697 logging .getLogger (__name__ ).info ("Starting to listen for messages." )
695698 try :
696699 # pylint: disable=protected-access
697- while self .channel and self .channel ._consumer_infos :
700+ while self .channel and self .channel .is_open and self . channel . _consumer_infos :
698701 self .channel .connection .process_data_events (time_limit = 1 ) # 1 second
699702 if self .worker :
700703 self .worker .process_messages (self .channel , self .rabbitmq_queue )
@@ -710,20 +713,18 @@ def listen(self):
710713 logging .getLogger (__name__ ).exception ("Error while consuming messages." )
711714 finally :
712715 logging .getLogger (__name__ ).info ("Stopped listening for messages." )
713- if self .channel :
716+ if self .channel and self . channel . is_open :
714717 try :
715718 self .channel .close ()
716719 except Exception :
717720 logging .getLogger (__name__ ).exception ("Error while closing channel." )
718- finally :
719- self .channel = None
720- if self .connection :
721+ self .channel = None
722+ if self .connection and self .connection .is_open :
721723 try :
722724 self .connection .close ()
723725 except Exception :
724726 logging .getLogger (__name__ ).exception ("Error while closing connection." )
725- finally :
726- self .connection = None
727+ self .connection = None
727728
728729 def stop (self ):
729730 """Tell the connector to stop listening for messages."""
@@ -734,7 +735,9 @@ def alive(self):
734735 return self .connection is not None
735736
736737 @staticmethod
737- def _decode_body (body , codecs = ['utf8' , 'iso-8859-1' ]):
738+ def _decode_body (body , codecs = None ):
739+ if not codecs :
740+ codecs = ['utf8' , 'iso-8859-1' ]
738741 # see https://stackoverflow.com/a/15918519
739742 for i in codecs :
740743 try :
@@ -778,6 +781,9 @@ def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat):
778781 self .rabbitmq_queue = rabbitmq_queue
779782 self .heartbeat = heartbeat
780783 self .id = str (uuid .uuid4 ())
784+ self .connection = None
785+ self .channel = None
786+ self .thread = None
781787
782788 def start_thread (self ):
783789 parameters = pika .URLParameters (self .rabbitmq_uri )
0 commit comments