20
20
_MAX_TIME_BOUND = 60
21
21
_MAX_PROCESS_TIME = 6
22
22
COMPONENT_STATUS = dict ([("green" , "OK" ), ("amber" , "WARN" ), ("red" , "ERROR" )])
23
- ERROR_STATUS = dict ([("OK" , "WITH_NO_ERRORS" ), ("ERROR" , "WITH_ERRORS" ), ("WARN" , "WITH_ERRORS" )])
24
- FAILURE_STATUS = dict ([("OK" , "WITH_NO_FAILURES" ), ("ERROR" , "WITH_FAILURES" ), \
25
- ("WARN" , "WITH_FAILURES" )])
26
23
27
24
# pylint: disable=R0204
28
25
def spark_job_handler (app_id ):
@@ -40,7 +37,7 @@ def spark_job_handler(app_id):
40
37
spark_jobs = json .loads (spark_jobs .text )
41
38
if spark_jobs :
42
39
information = {}
43
- job_count = len ( spark_jobs )
40
+ job_count = spark_jobs [ 0 ][ 'jobId' ] + 1
44
41
job_suc_c , job_unknown_c , job_fail_c , job_run_c = (0 , 0 , 0 , 0 )
45
42
for ele in spark_jobs :
46
43
if ele ['status' ] == 'RUNNING' :
@@ -55,7 +52,7 @@ def spark_job_handler(app_id):
55
52
app_id , '/api/v1/applications/' , app_id , '/stages' )
56
53
spark_stages = requests .get (url )
57
54
spark_stages = json .loads (spark_stages .text )
58
- stage_count = len ( spark_stages )
55
+ stage_count = spark_stages [ 0 ][ 'stageId' ] + 1
59
56
stage_complete_c , stage_active_c , stage_pending_c , stage_failed_c = (0 , 0 , 0 , 0 )
60
57
for ele in spark_stages :
61
58
if ele ['status' ] == 'COMPLETE' :
@@ -143,7 +140,7 @@ def find_workflow_type(actions):
143
140
144
141
# pylint: disable=R0101
145
142
146
- def get_oozie_workflow_actions (actions ):
143
+ def oozie_action_handler (actions ):
147
144
"""
148
145
Handling OOZIE actions both Workflow and Coordinator
149
146
"""
@@ -197,7 +194,7 @@ def get_oozie_workflow_actions(actions):
197
194
key = '%s-%d' % (type_name , count )
198
195
count += 1
199
196
oozie_info = oozie_api_request (action ['externalId' ])
200
- oozie_data = get_oozie_workflow_actions (oozie_info ['actions' ])
197
+ oozie_data = oozie_action_handler (oozie_info ['actions' ])
201
198
workflowstatus = process_component_data (oozie_data )
202
199
if action ['status' ] == 'ERROR' or action ['status' ] == 'FAILED' \
203
200
or action ['status' ] == 'KILLED' :
@@ -259,27 +256,37 @@ def oozie_coordinator_handler(data):
259
256
coord_status = {}
260
257
if data ['status' ] == 'PREPSUSPENDED' :
261
258
coord_status .update ({'aggregate_status' : ApplicationState .CREATED , \
262
- 'name' : data ['coordJobName' ]})
259
+ 'oozieId' : data [ 'coordJobId' ], ' name' : data ['coordJobName' ]})
263
260
elif data ['status' ] == 'PREP' :
264
261
coord_status .update ({'aggregate_status' : ApplicationState .STARTING , \
265
- 'name' : data ['coordJobName' ]})
262
+ 'oozieId' : data [ 'coordJobId' ], ' name' : data ['coordJobName' ]})
266
263
elif data ['status' ] == 'RUNNING' :
267
- oozie_data = get_oozie_workflow_actions (data ['actions' ])
264
+ oozie_data = oozie_action_handler (data ['actions' ])
265
+ aggregate_status = process_component_data (oozie_data )
266
+ if aggregate_status == 'OK' :
267
+ status = 'RUNNING'
268
+ else :
269
+ status = 'RUNNING_WITH_ERRORS'
270
+ coord_status .update ({'actions' : oozie_data , 'status' : aggregate_status , 'aggregate_status' :status , \
271
+ 'oozieId' : data ['coordJobId' ], 'name' : data ['coordJobName' ]})
272
+ elif data ['status' ] == 'SUSPENDED' or data ['status' ] == 'KILLED' :
273
+ oozie_data = oozie_action_handler (data ['actions' ])
268
274
aggregate_status = process_component_data (oozie_data )
269
- coord_status .update ({'actions' : oozie_data , 'status' : aggregate_status , 'aggregate_status' :\
270
- '%s_%s_%s' % (ApplicationState .STARTED , 'RUNNING' , ERROR_STATUS [aggregate_status ]), \
275
+ if aggregate_status == 'OK' :
276
+ status = '%s' % (data ['status' ])
277
+ else :
278
+ status = '%s_%s' % (data ['status' ], 'WITH_FAILURES' )
279
+ coord_status .update ({'actions' : oozie_data , 'status' : aggregate_status , 'aggregate_status' : status , \
271
280
'oozieId' : data ['coordJobId' ], 'name' : data ['coordJobName' ]})
272
- elif data ['status' ] == 'SUSPENDED ' :
273
- oozie_data = get_oozie_workflow_actions (data ['actions' ])
281
+ elif data ['status' ] == 'SUCCEEDED ' :
282
+ oozie_data = oozie_action_handler (data ['actions' ])
274
283
aggregate_status = process_component_data (oozie_data )
275
- coord_status .update ({'actions' : oozie_data , 'status' : aggregate_status , 'aggregate_status' :\
276
- '%s_%s_%s' % (ApplicationState .COMPLETED , 'SUSPENDED' , FAILURE_STATUS [aggregate_status ]), \
284
+ coord_status .update ({'actions' : oozie_data , 'status' : aggregate_status , 'aggregate_status' : 'COMPLETED' , \
277
285
'oozieId' : data ['coordJobId' ], 'name' : data ['coordJobName' ]})
278
- elif data ['status' ] == 'KILLED ' :
279
- oozie_data = get_oozie_workflow_actions (data ['actions' ])
286
+ elif data ['status' ] == 'DONEWITHERROR ' :
287
+ oozie_data = oozie_action_handler (data ['actions' ])
280
288
aggregate_status = process_component_data (oozie_data )
281
- coord_status .update ({'actions' : oozie_data , 'status' : aggregate_status , 'aggregate_status' :\
282
- '%s_%s_%s' % (ApplicationState .COMPLETED , 'KILLED' , FAILURE_STATUS [aggregate_status ]), \
289
+ coord_status .update ({'actions' : oozie_data , 'status' : aggregate_status , 'aggregate_status' : 'COMPLETED_WITH_FAILURES' , \
283
290
'oozieId' : data ['coordJobId' ], 'name' : data ['coordJobName' ]})
284
291
return coord_status
285
292
@@ -290,24 +297,34 @@ def oozie_workflow_handler(data):
290
297
workflow_status = {}
291
298
if data ['status' ] == 'PREP' :
292
299
workflow_status .update ({'aggregate_status' : ApplicationState .CREATED , \
293
- 'name' : data ['appName' ]})
300
+ 'oozieId' : data [ 'id' ], ' name' : data ['appName' ]})
294
301
elif data ['status' ] == 'RUNNING' :
295
- oozie_data = get_oozie_workflow_actions (data ['actions' ])
302
+ oozie_data = oozie_action_handler (data ['actions' ])
303
+ aggregate_status = process_component_data (oozie_data )
304
+ if aggregate_status == 'OK' :
305
+ status = 'RUNNING'
306
+ else :
307
+ status = 'RUNNING_WITH_ERRORS'
308
+ workflow_status .update ({'actions' : oozie_data , 'aggregate_status' : status , \
309
+ 'oozieId' : data ['id' ], 'name' : data ['appName' ], 'status' : aggregate_status })
310
+ elif data ['status' ] == 'SUSPENDED' or data ['status' ] == 'KILLED' :
311
+ oozie_data = oozie_action_handler (data ['actions' ])
296
312
aggregate_status = process_component_data (oozie_data )
297
- workflow_status .update ({'actions' : oozie_data , 'aggregate_status' : '%s_%s_%s' % \
298
- (ApplicationState .STARTED , 'RUNNING' , ERROR_STATUS [aggregate_status ]), \
313
+ if aggregate_status == 'OK' :
314
+ status = '%s' % (data ['status' ])
315
+ else :
316
+ status = '%s_%s' % (data ['status' ], 'WITH_FAILURES' )
317
+ workflow_status .update ({'actions' : oozie_data , 'aggregate_status' : status , \
299
318
'oozieId' : data ['id' ], 'name' : data ['appName' ], 'status' : aggregate_status })
300
- elif data ['status' ] == 'SUSPENDED ' :
301
- oozie_data = get_oozie_workflow_actions (data ['actions' ])
319
+ elif data ['status' ] == 'SUCCEEDED ' :
320
+ oozie_data = oozie_action_handler (data ['actions' ])
302
321
aggregate_status = process_component_data (oozie_data )
303
- workflow_status .update ({'actions' : oozie_data , 'aggregate_status' : '%s_%s_%s' % \
304
- (ApplicationState .COMPLETED , 'SUSPENDED' , FAILURE_STATUS [aggregate_status ]), \
322
+ workflow_status .update ({'actions' : oozie_data , 'aggregate_status' : 'COMPLETED' , \
305
323
'oozieId' : data ['id' ], 'name' : data ['appName' ], 'status' : aggregate_status })
306
- elif data ['status' ] == 'KILLED ' :
307
- oozie_data = get_oozie_workflow_actions (data ['actions' ])
324
+ elif data ['status' ] == 'DONEWITHERROR ' :
325
+ oozie_data = oozie_action_handler (data ['actions' ])
308
326
aggregate_status = process_component_data (oozie_data )
309
- workflow_status .update ({'actions' : oozie_data , 'aggregate_status' : '%s_%s_%s' % \
310
- (ApplicationState .COMPLETED , 'KILLED' , FAILURE_STATUS [aggregate_status ]), \
327
+ workflow_status .update ({'actions' : oozie_data , 'aggregate_status' : 'COMPLETED_WITH_FAILURES' , \
311
328
'oozieId' : data ['id' ], 'name' : data ['appName' ], 'status' : aggregate_status })
312
329
return workflow_status
313
330
@@ -353,47 +370,45 @@ def spark_application(job_name):
353
370
"""
354
371
Handling SPARK Application
355
372
"""
356
- new_app_flag = False
357
373
ret = {}
358
- information = None
359
- app_id = check_in_yarn (job_name )
360
- if app_id != None :
361
- yarn_data = yarn_info (app_id ['id' ])
362
- if yarn_data ['yarnFinalStatus' ] == 'FAILED' :
363
- aggregate_status = '%s_%s_%s' % (ApplicationState .COMPLETED , \
364
- 'FAILED' , FAILURE_STATUS ['ERROR' ])
365
- elif yarn_data ['yarnFinalStatus' ] == 'KILLED' :
366
- aggregate_status = '%s_%s_%s' % (ApplicationState .COMPLETED , \
367
- 'KILLED' , FAILURE_STATUS ['OK' ])
368
- elif yarn_data ['yarnStatus' ] == 'RUNNING' :
369
- spark_data = spark_job_handler (app_id ['id' ])
370
- aggregate_status = '%s_%s_%s' % (ApplicationState .STARTED , \
371
- yarn_data ['yarnStatus' ], ERROR_STATUS [spark_data ['state' ]])
374
+ yarnid = ''
375
+ information = ''
376
+ yarn_data = check_in_yarn (job_name )
377
+ if yarn_data != None :
378
+ yarnid = yarn_data ['id' ]
379
+ if yarn_data ['state' ] == 'SUBMITTED' or yarn_data ['state' ] == 'ACCEPTED' :
380
+ aggregate_status = yarn_data ['state' ]
381
+ message = yarn_data ['diagnostics' ].split ('Details :' )[0 ].strip ()
382
+ information = message
383
+ elif yarn_data ['state' ] == 'RUNNING' :
384
+ spark_data = spark_job_handler (yarn_data ['id' ])
385
+ if spark_data ['state' ] == 'OK' :
386
+ aggregate_status = 'RUNNING'
387
+ else :
388
+ aggregate_status = 'RUNNING_WITH_ERRORS'
372
389
information = spark_data ['information' ]
373
- elif yarn_data ['yarnStatus' ] == 'FINISHED' and yarn_data ['yarnFinalStatus' ] == 'SUCCEEDED' :
374
- aggregate_status = '%s_%s_%s' % (ApplicationState .COMPLETED , \
375
- yarn_data ['yarnStatus' ], ERROR_STATUS ['OK' ])
376
- elif yarn_data ['yarnStatus' ] == 'NOT FOUND' :
377
- aggregate_status = '%s_%s_%s' % (ApplicationState .COMPLETED , \
378
- yarn_data ['yarnStatus' ], ERROR_STATUS ['ERROR' ])
379
- information = yarn_data ['information' ]
390
+ elif yarn_data ['finalStatus' ] == 'SUCCEEDED' :
391
+ aggregate_status = '%s_%s' % (yarn_data ['state' ], yarn_data ['finalStatus' ])
392
+ elif yarn_data ['state' ] == 'FINISHED' and (yarn_data ['finalStatus' ] == 'FAILED' or yarn_data ['finalStatus' ] == 'KILLED' ):
393
+ aggregate_status = '%s_%s' % (yarn_data ['state' ], yarn_data ['finalStatus' ])
394
+ information = yarn_data ['diagnostics' ]
395
+ elif yarn_data ['finalStatus' ] == 'FAILED' or yarn_data ['finalStatus' ] == 'KILLED' :
396
+ aggregate_status = yarn_data ['finalStatus' ]
397
+ information = yarn_data ['diagnostics' ]
380
398
else :
381
- aggregate_status = '%s_%s_%s' % (ApplicationState .STARTED , \
382
- yarn_data ['yarnStatus' ], ERROR_STATUS ['OK' ])
383
- ret = {
384
- 'aggregate_status' : aggregate_status ,
385
- 'yarnId' : app_id ['id' ],
386
- 'information' : information ,
387
- 'name' : job_name
388
- }
399
+ aggregate_status = 'NOT_FOUND'
400
+ message = yarn_data .get ('RemoteException' , {'message' : ['' ]}).\
401
+ get ('message' ).split (':' )
402
+ message [0 ] = ''
403
+ information = '' .join (message ).strip ()
389
404
else :
390
- new_app_flag = True
391
- if new_app_flag :
392
- ret = {
393
- 'aggregate_status ' : ApplicationState . CREATED ,
394
- 'information' : information ,
395
- 'name' : job_name
396
- }
405
+ aggregate_status = ApplicationState . CREATED
406
+ ret = {
407
+ 'aggregate_status' : aggregate_status ,
408
+ 'yarnId ' : yarnid ,
409
+ 'information' : information ,
410
+ 'name' : job_name
411
+ }
397
412
return ret
398
413
399
414
def get_json (component_list , queue_obj ):
@@ -477,45 +492,41 @@ def process_application_data(application):
477
492
"""
478
493
Find Application level aggregated status based on it's component's status
479
494
"""
480
- (current_agg_status_priority , current_error_status_priority , \
481
- current_failure_status_priority ) = (99 , 99 , 99 )
482
- aggregated_status_priority = dict ([(ApplicationState .CREATED , 1 ), \
483
- (ApplicationState .STARTING , 2 ), (ApplicationState .STARTED , 3 ), (ApplicationState .COMPLETED , 4 )])
484
- error_status_priority = dict ([('ERRORS' , 1 ), ('NO_ERRORS' , 2 )])
485
- failure_status_priority = dict ([('FAILURES' , 1 ), ('NO_FAILURES' , 2 )])
495
+ aggregated_status_priority = dict ([(1 , ApplicationState .CREATED ), \
496
+ (2 , ApplicationState .STARTING ), (3 , "RUNNING" ), (4 , 'RUNNING_WITH_ERRORS' ), \
497
+ (5 , 'STOPPED' ), (6 , 'STOPPED_WITH_FAILURES' ), (7 , 'KILLED' ), (8 , 'KILLED_WITH_FAILURES' ), \
498
+ (9 , 'COMPLETED' ), (10 , 'COMPLETED_WITH_FAILURES' ), (11 , 'NOT_FOUND' )])
499
+ current_agg_status_priority = 99
486
500
for component in application :
487
- if current_agg_status_priority >= aggregated_status_priority [application [component ]\
488
- ['aggregate_status' ].split ('_' )[0 ]]:
489
- current_agg_status_priority = aggregated_status_priority [application [component ]\
490
- ['aggregate_status' ].split ('_' )[0 ]]
491
- temp = application [component ]['aggregate_status' ].split ('WITH_' )[- 1 ]
492
- if 'ERRORS' in temp :
493
- if current_error_status_priority >= error_status_priority [temp ]:
494
- current_error_status_priority = error_status_priority [temp ]
495
- if 'FAILURES' in temp :
496
- if current_failure_status_priority >= failure_status_priority [temp ]:
497
- current_failure_status_priority = failure_status_priority [temp ]
498
-
499
- if current_agg_status_priority == 3 :
500
- if current_error_status_priority == 2 :
501
- if current_failure_status_priority == 1 :
502
- error_status = 'ERRORS'
503
- else :
504
- error_status = 'NO_ERRORS'
501
+ temp_comp_status = application [component ].get ('aggregate_status' , '' )
502
+ temp_status_priority = 0
503
+ if temp_comp_status == 'CREATED' :
504
+ temp_status_priority = 1
505
+ elif temp_comp_status == 'STARTING' or temp_comp_status == 'SUBMITTED' \
506
+ or temp_comp_status == 'ACCEPTED' :
507
+ temp_status_priority = 2
508
+ elif temp_comp_status == 'RUNNING' :
509
+ temp_status_priority = 3
510
+ elif temp_comp_status == 'RUNNING_WITH_ERRORS' :
511
+ temp_status_priority = 4
512
+ elif temp_comp_status == 'SUSPENDED' :
513
+ temp_status_priority = 5
514
+ elif temp_comp_status == 'SUSPENDED_WITH_FAILURES' :
515
+ temp_status_priority = 6
516
+ elif temp_comp_status == 'KILLED' or temp_comp_status == 'FINISHED_KILLED' :
517
+ temp_status_priority = 7
518
+ elif temp_comp_status == 'KILLED_WITH_FAILURES' :
519
+ temp_status_priority = 8
520
+ elif temp_comp_status == 'COMPLETED' or temp_comp_status == 'FINISHED_SUCCEEDED' :
521
+ temp_status_priority = 9
522
+ elif temp_comp_status == 'COMPLETED_WITH_FAILURES' or temp_comp_status == 'FAILED' \
523
+ or temp_comp_status == 'FINISHED_FAILED' :
524
+ temp_status_priority = 10
505
525
else :
506
- error_status = 'ERRORS'
507
- aggregated_status = '%s_%s_WITH_%s' % (ApplicationState .STARTED , 'RUNNING' , error_status )
508
- elif current_agg_status_priority == 4 :
509
- if current_failure_status_priority == 1 :
510
- failure_status = 'FAILURES'
511
- else :
512
- failure_status = 'NO_FAILURES'
513
- aggregated_status = '%s_WITH_%s' % (ApplicationState .COMPLETED , failure_status )
514
- elif current_agg_status_priority == 1 :
515
- aggregated_status = ApplicationState .CREATED
516
- else :
517
- aggregated_status = ApplicationState .STARTING
518
- return aggregated_status
526
+ temp_status_priority = 11
527
+ if temp_status_priority < current_agg_status_priority :
528
+ current_agg_status_priority = temp_status_priority
529
+ return aggregated_status_priority [current_agg_status_priority ]
519
530
520
531
def application_summary (app_list ):
521
532
"""
0 commit comments