1- # Alex Hancock, UCSC CGL
2- #
3- # Luigi Monitor
4-
51import boto
62import json
73import os
4+ import re
85import subprocess
96import sys
107import urllib2
118
12- from sqlalchemy import create_engine , MetaData , Table , Column , String , select , and_
9+ from sqlalchemy import select , and_
1310from datetime import datetime
1411
12+ # Add parent directory to get luigidb init
13+ sys .path .append ( os .path .dirname ( os .path .dirname ( os .path .abspath (__file__ ) ) ) )
14+ from monitordb_lib import luigiDBInit
15+
1516
1617def get_touchfile (bucket_name , touchfile_name ):
1718 s3 = boto .connect_s3 ()
1819 bucket = s3 .get_bucket (bucket_name , validate = False )
19- print "GOT S3 BUCKET"
20+ print "GOT S3 BUCKET"
2021
2122 key = bucket .new_key (touchfile_name )
2223 print "CREATED NEW S3 KEY"
@@ -87,46 +88,20 @@ def get_consonance_status(consonance_uuid):
8788 return json .loads (status_text )
8889
8990
90- # This was exported to a method to avoid duplication
91- # for both the creation and update timestamps
9291def format_consonance_timestamp (consonance_timestamp ):
92+ # This was exported to a method to avoid duplication
93+ # for both the creation and update timestamps
9394 datetime_obj = datetime .strptime (consonance_timestamp , '%Y-%m-%dT%H:%M:%S.%f+0000' )
9495 return datetime .strftime (datetime_obj , '%Y-%m-%d %H:%M' )
9596
9697
97- #
98- # Database initialization, creation if table doesn't exist
99- #
100- # Change echo to True to show SQL code... unnecessary for production
101- #
102- db = create_engine ('postgresql://{}:{}@db/{}' .format (os .getenv ("POSTGRES_USER" ), os .getenv ("POSTGRES_PASSWORD" ), os .getenv ("POSTGRES_DB" )), echo = False )
103- conn = db .connect ()
104- metadata = MetaData (db )
105- luigi = Table ('luigi' , metadata ,
106- Column ("luigi_job" , String (100 ), primary_key = True ),
107- Column ("status" , String (20 )),
108- Column ("submitter_specimen_id" , String (100 )),
109- Column ("specimen_uuid" , String (100 )),
110- Column ("workflow_name" , String (100 )),
111- Column ("center_name" , String (100 )),
112- Column ("submitter_donor_id" , String (100 )),
113- Column ("consonance_job_uuid" , String (100 )),
114- Column ("submitter_donor_primary_site" , String (100 )),
115- Column ("project" , String (100 )),
116- Column ("analysis_type" , String (100 )),
117- Column ("program" , String (100 )),
118- Column ("donor_uuid" , String (100 )),
119- Column ("submitter_sample_id" , String (100 )),
120- Column ("submitter_experimental_design" , String (100 )),
121- Column ("submitter_specimen_type" , String (100 )),
122- Column ("workflow_version" , String (100 )),
123- Column ("sample_uuid" , String (100 )),
124- Column ("start_time" , String (100 )),
125- Column ("last_updated" , String (100 )))
126-
127- if not db .dialect .has_table (db , luigi ):
128- luigi .create ()
98+ def validateConsonanceUUID (consonance_uuid ):
99+ # Return if consonance uuid only contains hex characters and "-"
100+ uuid_pattern = re .compile ("[a-f,A-F,0-9,-]+" )
101+ return bool (uuid_pattern .match (consonance_uuid ))
102+
129103
104+ monitordb_connection , monitordb_table = luigiDBInit ()
130105jobList = get_job_list ()
131106
132107for job in jobList :
@@ -177,16 +152,16 @@ def format_consonance_timestamp(consonance_timestamp):
177152 # use the Consonance job uuid instead of the Luigi job id because
178153 # Luigi sometimes reuses job ids for different runs
179154 # The Consonance job uuid is always unique
180- # select_query = select([luigi ]).where(luigi .c.luigi_job == job)
155+ # select_query = select([monitordb_table ]).where(monitordb_table .c.luigi_job == job)
181156 job_uuid = jsonMetadata ['consonance_job_uuid' ]
182157 print "QUERYING DB FOR JOB UUID:" , job_uuid
183- select_query = select ([luigi ]).where (luigi .c .consonance_job_uuid == job_uuid )
184- select_result = query_to_list (conn .execute (select_query ))
158+ select_query = select ([monitordb_table ]).where (monitordb_table .c .consonance_job_uuid == job_uuid )
159+ select_result = query_to_list (monitordb_connection .execute (select_query ))
185160 print "JOB RESULT:" , select_result
186161 if len (select_result ) == 0 :
187162 try :
188- #ins_query = luigi .insert().values(luigi_job=job,
189- ins_query = luigi .insert ().values (luigi_job = job_uuid ,
163+ #ins_query = monitordb_table .insert().values(luigi_job=job,
164+ ins_query = monitordb_table .insert ().values (luigi_job = job ,
190165 submitter_specimen_id = jsonMetadata ['submitter_specimen_id' ],
191166 specimen_uuid = jsonMetadata ['specimen_uuid' ],
192167 workflow_name = jsonMetadata ['workflow_name' ],
@@ -203,7 +178,7 @@ def format_consonance_timestamp(consonance_timestamp):
203178 submitter_specimen_type = jsonMetadata ['submitter_specimen_type' ],
204179 workflow_version = jsonMetadata ['workflow_version' ],
205180 sample_uuid = jsonMetadata ['sample_uuid' ])
206- exec_result = conn .execute (ins_query )
181+ exec_result = monitordb_connection .execute (ins_query )
207182 except Exception as e :
208183 print >> sys .stderr , e .message , e .args
209184 print "Dumping jsonMetadata to aid debug:\n " , jsonMetadata
@@ -218,21 +193,21 @@ def format_consonance_timestamp(consonance_timestamp):
218193# consonance status using job.consonance_uuid
219194# update that job using the information from status return
220195#
221- select_query = select ([luigi ])
222- select_result = conn .execute (select_query )
196+ select_query = select ([monitordb_table ])
197+ select_result = monitordb_connection .execute (select_query )
223198result_list = [dict (row ) for row in select_result ]
224199for job in result_list :
225200 try :
226201 job_name = job ['luigi_job' ]
227202 job_uuid = job ['consonance_job_uuid' ]
228203
229- if job_uuid == "no consonance id in test mode" :
204+ if not validateConsonanceUUID ( job_uuid ) :
230205 # Skip test mode Consonance ID's
231206 # and force next job
232207 print "\n Test ID, skipping"
233208
234- stmt = luigi .delete ().where (luigi .c .luigi_job == job_name )
235- exec_result = conn .execute (stmt )
209+ stmt = monitordb_table .delete ().where (monitordb_table .c .luigi_job == job_name )
210+ exec_result = monitordb_connection .execute (stmt )
236211 else :
237212 # Consonace job id is real
238213 print "\n JOB NAME:" , job_uuid
@@ -247,20 +222,20 @@ def format_consonance_timestamp(consonance_timestamp):
247222 print "CREATED:" , created
248223 print "UPDATED:" , updated
249224
250- stmt = luigi .update ().\
251- where (luigi .c .luigi_job == job_name ).\
225+ stmt = monitordb_table .update ().\
226+ where (monitordb_table .c .consonance_job_uuid == job_uuid ).\
252227 values (status = status_json ['state' ],
253228 start_time = created ,
254229 last_updated = updated )
255- exec_result = conn .execute (stmt )
230+ exec_result = monitordb_connection .execute (stmt )
256231
257232 except Exception as e :
258233 print >> sys .stderr , e .message , e .args
259234 print >> sys .stderr , "Dumping job entry:" , job
260235
261- stmt = luigi .update ().\
262- where ((and_ (luigi .c .luigi_job == job_name ,
263- luigi .c .status != 'SUCCESS' ,
264- luigi .c .status != 'FAILED' ))).\
236+ stmt = monitordb_table .update ().\
237+ where ((and_ (monitordb_table .c .luigi_job == job_name ,
238+ monitordb_table .c .status != 'SUCCESS' ,
239+ monitordb_table .c .status != 'FAILED' ))).\
265240 values (status = 'JOB NOT FOUND' )
266- exec_result = conn .execute (stmt )
241+ exec_result = monitordb_connection .execute (stmt )
0 commit comments