66import os
77import re
88import subprocess
9- import sys
9+ import sys
1010import time
1111import watchtower
1212import string
1616#################################
1717
1818DATA_ROOT = '/home/ubuntu/bucket'
19- LOCAL_OUTPUT = '/home/ubuntu/local_output'
2019QUEUE_URL = os .environ ['SQS_QUEUE_URL' ]
2120AWS_BUCKET = os .environ ['AWS_BUCKET' ]
2221LOG_GROUP_NAME = os .environ ['LOG_GROUP_NAME' ]
@@ -33,7 +32,7 @@ class JobQueue():
3332 def __init__ (self , queueURL ):
3433 self .client = boto3 .client ('sqs' )
3534 self .queueURL = queueURL
36-
35+
3736 def readMessage (self ):
3837 response = self .client .receive_message (QueueUrl = self .queueURL , WaitTimeSeconds = 20 )
3938 if 'Messages' in response .keys ():
@@ -63,7 +62,7 @@ def monitorAndLog(process,logger):
6362 break
6463 if output :
6564 print (output .strip ())
66- logger .info (output )
65+ logger .info (output )
6766
6867def printandlog (text ,logger ):
6968 print (text )
@@ -82,72 +81,69 @@ def stringify_metadata_dict(mdict):
8281def runFIJI (message ):
8382 #List the directories in the bucket- this prevents a strange s3fs error
8483 rootlist = os .listdir (DATA_ROOT )
85- for eachSubDir in rootlist :
86- subDirName = os .path .join (DATA_ROOT ,eachSubDir )
87- if os .path .isdir (subDirName ):
88- trashvar = os .system ('ls ' + subDirName )
8984
9085 # Configure the logs
9186 logger = logging .getLogger (__name__ )
9287
93-
88+
9489 # Read the metadata string
9590
96-
91+
9792 # Prepare paths and parameters
98- localOut = LOCAL_OUTPUT
93+ localOut = 'output'
9994 remoteOut = message ['output_file_location' ]
10095
10196 # Start loggging now that we have a job we care about
10297 metadataID = stringify_metadata_dict (message ['Metadata' ])
103- watchtowerlogger = watchtower .CloudWatchLogHandler (log_group = LOG_GROUP_NAME , stream_name = metadataID ,create_log_group = False )
104- logger .addHandler (watchtowerlogger )
105-
98+ metadata_for_log_name = metadataID .replace ('*' ,'.' )
99+ watchtowerlogger = watchtower .CloudWatchLogHandler (log_group = LOG_GROUP_NAME , stream_name = metadata_for_log_name ,create_log_group = False )
100+ logger .addHandler (watchtowerlogger )
101+
106102 # Build and run FIJI command
107- cmd = '../../opt/fiji/ Fiji.app/ImageJ-linux64 --ij2 --headless --console --run "../../opt/fiji/ Fiji.app/plugins/' + SCRIPT_NAME + '" "'
108- cmd += stringify_metadata_dict (message ['shared_metadata' ]) + ', ' + metadataID + '"'
103+ cmd = [ " Fiji.app/ImageJ-linux64" , " --ij2" , " --headless" , " --console" , " --run" , os . path . join ( " Fiji.app/plugins" , SCRIPT_NAME )]
104+ cmd . append ( stringify_metadata_dict (message ['shared_metadata' ]) + ', ' + metadataID )
109105 print ('Running' , cmd )
110106 logger .info (cmd )
111-
112- subp = subprocess .Popen (cmd . split () , stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
107+
108+ subp = subprocess .Popen (cmd , stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
113109 monitorAndLog (subp ,logger )
114-
110+
115111 # Get the outputs and move them to S3
116-
117- # Figure out how many output files there were
112+
113+ # Figure out how many output files there were - thanks https://stackoverflow.com/a/29769297
118114 print ('Checking output folder size' )
119- cmd = "find " + localOut + " -type f | wc -l"
120- logger . info
121- subp = subprocess . Popen ( cmd . split (), stdout = subprocess . PIPE , stderr = subprocess . PIPE )
122- out , err = subp . communicate ( )
123- if int ( out ) >= int (EXPECTED_NUMBER_FILES ):
115+ filenum = 0
116+ for _ , _ , filenames in os . walk ( localOut ):
117+ filenum += len ( filenames )
118+ print ( localOut , filenum )
119+ if filenum >= int (EXPECTED_NUMBER_FILES ):
124120 mvtries = 0
125121 while mvtries < 3 :
126- try :
127- printandlog ('Move attempt #' + str (mvtries + 1 ),logger )
128- cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive'
129- subp = subprocess .Popen (cmd .split (), stdout = subprocess .PIPE , stderr = subprocess .PIPE )
130- out ,err = subp .communicate ()
131- printandlog ('== OUT \n ' + out , logger )
132- if err == '' :
133- break
134- else :
135- printandlog ('== ERR \n ' + err ,logger )
136- mvtries += 1
137- except :
138- printandlog ('Move failed' ,logger )
139- printandlog ('== ERR \n ' + err ,logger )
140- time .sleep (30 )
141- mvtries += 1
122+ try :
123+ printandlog ('Move attempt #' + str (mvtries + 1 ),logger )
124+ cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive'
125+ subp = subprocess .Popen (cmd .split (), stdout = subprocess .PIPE , stderr = subprocess .PIPE )
126+ out ,err = subp .communicate ()
127+ printandlog ('== OUT \n ' + out , logger )
128+ if err == '' :
129+ break
130+ else :
131+ printandlog ('== ERR \n ' + err ,logger )
132+ mvtries += 1
133+ except :
134+ printandlog ('Move failed' ,logger )
135+ printandlog ('== ERR \n ' + err ,logger )
136+ time .sleep (30 )
137+ mvtries += 1
142138 if mvtries < 3 :
143- printandlog ('SUCCESS' ,logger )
144- logger .removeHandler (watchtowerlogger )
145- return 'SUCCESS'
139+ printandlog ('SUCCESS' ,logger )
140+ logger .removeHandler (watchtowerlogger )
141+ return 'SUCCESS'
146142 else :
147- printandlog ('OUTPUT PROBLEM. Giving up on ' + metadataID ,logger )
148- logger .removeHandler (watchtowerlogger )
149- return 'OUTPUT_PROBLEM'
150-
143+ printandlog ('OUTPUT PROBLEM. Only ' + str ( filenum ) + ' files detected . Giving up on '+ metadataID ,logger )
144+ logger .removeHandler (watchtowerlogger )
145+ return 'OUTPUT_PROBLEM'
146+
151147
152148#################################
153149# MAIN WORKER LOOP
@@ -179,4 +175,3 @@ def main():
179175 print ('Worker started' )
180176 main ()
181177 print ('Worker finished' )
182-
0 commit comments