Skip to content

Commit 9463abb

Browse files
committed
PNDA-4237: Reporting failed YARN application submission
1 parent c68b6f9 commit 9463abb

File tree

3 files changed

+114
-48
lines changed

3 files changed

+114
-48
lines changed

api/src/main/resources/application_summary.py

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import commands
23
import multiprocessing
34
import time
45
import logging
@@ -368,43 +369,85 @@ def check_in_yarn(job_name):
368369
logging.debug(str(error_message))
369370
return run_app_info
370371

371-
def spark_application(job_name):
372+
# pylint: disable=C0103
373+
374+
def check_in_service_log(namespace, application, component_name):
375+
'''
376+
Check in service log in case of application failed to submit to YARN
377+
'''
378+
service_name = '%s-%s-%s' % (namespace, application, component_name)
379+
(command, message, more_detail) = ('', '', '')
380+
command = 'sudo journalctl -u %s.service' % service_name
381+
out = commands.getoutput('%s -n 50' % command).split('\n')
382+
more_detail = 'More details: execute "journalctl -u %s"' % service_name
383+
for line in out:
384+
if 'Exception:' in line:
385+
message = '%s%s' % (line.split('Exception:')[0].split(' ')[-1], 'Exception')
386+
break
387+
if message == '':
388+
message = '%s' % (more_detail)
389+
else:
390+
message = '%s. %s' % (message, more_detail)
391+
return 'FAILED_TO_SUBMIT_TO_YARN', message
392+
393+
def spark_yarn_handler(yarn_data):
394+
'''
395+
Handling Spark YARN data
396+
'''
397+
information = ''
398+
aggregate_status = ''
399+
yarnid = yarn_data['id']
400+
if yarn_data['state'] == 'SUBMITTED' or yarn_data['state'] == 'ACCEPTED':
401+
aggregate_status = yarn_data['state']
402+
message = yarn_data['diagnostics'].split('Details :')[0].strip()
403+
information = message
404+
elif yarn_data['state'] == 'RUNNING':
405+
spark_data = spark_job_handler(yarn_data['id'])
406+
if spark_data['state'] == 'OK':
407+
aggregate_status = 'RUNNING'
408+
else:
409+
aggregate_status = 'RUNNING_WITH_ERRORS'
410+
information = spark_data['information']
411+
elif yarn_data['finalStatus'] == 'SUCCEEDED':
412+
aggregate_status = '%s_%s' % (yarn_data['state'], yarn_data['finalStatus'])
413+
elif yarn_data['state'] == 'FINISHED' and (yarn_data['finalStatus'] == 'FAILED' or yarn_data['finalStatus'] == 'KILLED'):
414+
aggregate_status = '%s_%s' % (yarn_data['state'], yarn_data['finalStatus'])
415+
information = yarn_data['diagnostics']
416+
elif yarn_data['finalStatus'] == 'FAILED' or yarn_data['finalStatus'] == 'KILLED':
417+
aggregate_status = yarn_data['finalStatus']
418+
information = yarn_data['diagnostics']
419+
else:
420+
aggregate_status = 'NOT_FOUND'
421+
message = yarn_data.get('RemoteException', {'message': ['']}).\
422+
get('message').split(':')
423+
message[0] = ''
424+
information = ''.join(message).strip()
425+
return aggregate_status, yarnid, information
426+
427+
def spark_application(job_name, application, component_name):
372428
"""
373429
Handling SPARK Application
374430
"""
375431
ret = {}
376-
yarnid = ''
377-
information = ''
432+
check_in_service = False
433+
(aggregate_status, yarnid, information) = ('', '', '')
434+
status, timestamp = _HBASE.get_status_with_timestamp(application)
378435
yarn_data = check_in_yarn(job_name)
379-
if yarn_data != None:
380-
yarnid = yarn_data['id']
381-
if yarn_data['state'] == 'SUBMITTED' or yarn_data['state'] == 'ACCEPTED':
382-
aggregate_status = yarn_data['state']
383-
message = yarn_data['diagnostics'].split('Details :')[0].strip()
384-
information = message
385-
elif yarn_data['state'] == 'RUNNING':
386-
spark_data = spark_job_handler(yarn_data['id'])
387-
if spark_data['state'] == 'OK':
388-
aggregate_status = 'RUNNING'
389-
else:
390-
aggregate_status = 'RUNNING_WITH_ERRORS'
391-
information = spark_data['information']
392-
elif yarn_data['finalStatus'] == 'SUCCEEDED':
393-
aggregate_status = '%s_%s' % (yarn_data['state'], yarn_data['finalStatus'])
394-
elif yarn_data['state'] == 'FINISHED' and (yarn_data['finalStatus'] == 'FAILED' or yarn_data['finalStatus'] == 'KILLED'):
395-
aggregate_status = '%s_%s' % (yarn_data['state'], yarn_data['finalStatus'])
396-
information = yarn_data['diagnostics']
397-
elif yarn_data['finalStatus'] == 'FAILED' or yarn_data['finalStatus'] == 'KILLED':
398-
aggregate_status = yarn_data['finalStatus']
399-
information = yarn_data['diagnostics']
436+
if status == 'CREATED':
437+
if yarn_data != None:
438+
aggregate_status, yarnid, information = spark_yarn_handler(yarn_data)
400439
else:
401-
aggregate_status = 'NOT_FOUND'
402-
message = yarn_data.get('RemoteException', {'message': ['']}).\
403-
get('message').split(':')
404-
message[0] = ''
405-
information = ''.join(message).strip()
440+
aggregate_status = ApplicationState.CREATED
406441
else:
407-
aggregate_status = ApplicationState.CREATED
442+
if yarn_data != None:
443+
if timestamp < yarn_data['startedTime']:
444+
aggregate_status, yarnid, information = spark_yarn_handler(yarn_data)
445+
else:
446+
check_in_service = True
447+
else:
448+
check_in_service = True
449+
if check_in_service:
450+
aggregate_status, information = check_in_service_log(CONFIG['environment']['namespace'], application, component_name)
408451
ret = {
409452
'aggregate_status': aggregate_status,
410453
'yarnId': yarnid,
@@ -431,7 +474,8 @@ def get_json(component_list, queue_obj):
431474
(component[application][component_name]['job_handle'])})
432475
if 'sparkStreaming' in component_name:
433476
ret[application].update({component_name: spark_application\
434-
(component[application][component_name]['component_job_name'])})
477+
(component[application][component_name]['component_job_name'], application,\
478+
component[application][component_name]['component_name'])})
435479
queue_obj.put([{process_name: ret}])
436480
logging.info('%s %s', 'Finished', process_name)
437481

api/src/main/resources/application_summary_registrar.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ def get_dm_data(self, key):
8282
connection.close()
8383
return data
8484

85+
def get_status_with_timestamp(self, key):
86+
try:
87+
connection = happybase.Connection(self._hbase_host)
88+
table = connection.table("platform_applications")
89+
row = table.row(key, columns=['cf:status'], include_timestamp=True)
90+
status, timestamp = row['cf:status']
91+
except TTransportException as error_message:
92+
logging.error(str(error_message))
93+
finally:
94+
connection.close()
95+
return status, timestamp
96+
8597
def get_summary_data(self, application):
8698
record = {application: {}}
8799
dm_data = self.get_dm_data(application)

api/src/main/resources/test_application_summary.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,43 +17,42 @@ def test_process_application_data(self):
1717
{'aggregate_status': 'RUNNING_WITH_ERRORS', 'yarnId': '1234'}})
1818
self.assertEqual(result, 'RUNNING_WITH_ERRORS')
1919

20-
@patch('application_summary.check_in_yarn')
2120
@patch('application_summary.spark_job_handler')
22-
def test_spark_application(self, spark_job_patch, yarn_check_patch):
21+
def test_spark_yarn_handler(self, spark_job_patch):
2322
"""
2423
Tetsing Spark application
2524
"""
2625
#Spark application in case of Killed
27-
yarn_check_patch.return_value = {'id': 'application_1234', 'state': 'KILLED', 'finalStatus': 'KILLED', \
26+
input_data = {'id': 'application_1234', 'state': 'KILLED', 'finalStatus': 'KILLED', \
2827
'diagnostics': 'Killed'}
29-
result = application_summary.spark_application('app1-example-job')
30-
self.assertEqual(result['aggregate_status'], '%s' % ('KILLED'))
28+
result = application_summary.spark_yarn_handler(input_data)
29+
self.assertEqual(result[0], '%s' % ('KILLED'))
3130

3231
#Spark application in case of Failed
33-
yarn_check_patch.return_value = {'id': 'application_1234', 'state': 'FINISHED', 'finalStatus': 'FAILED', \
32+
input_data = {'id': 'application_1234', 'state': 'FINISHED', 'finalStatus': 'FAILED', \
3433
'diagnostics': 'Failed'}
35-
result = application_summary.spark_application('app1-example-job')
36-
self.assertEqual(result['aggregate_status'], '%s' % ('FINISHED_FAILED'))
34+
result = application_summary.spark_yarn_handler(input_data)
35+
self.assertEqual(result[0], '%s' % ('FINISHED_FAILED'))
3736

3837
#Spark application in case of Running with No errors
39-
yarn_check_patch.return_value = {'id': 'application_1234', 'state': 'RUNNING', \
38+
input_data = {'id': 'application_1234', 'state': 'RUNNING', \
4039
'finalStatus': 'UNDEFINED'}
4140
spark_job_patch.return_value = {'state': 'OK', 'information': 'job_stage data'}
42-
result = application_summary.spark_application('app1-example-job')
43-
self.assertEqual(result['aggregate_status'], '%s' % ('RUNNING'))
41+
result = application_summary.spark_yarn_handler(input_data)
42+
self.assertEqual(result[0], '%s' % ('RUNNING'))
4443

4544
#Spark application in case of Running with errors
46-
yarn_check_patch.return_value = {'id': 'application_1234', 'state': 'RUNNING', \
45+
input_data = {'id': 'application_1234', 'state': 'RUNNING', \
4746
'finalStatus': 'UNDEFINED'}
4847
spark_job_patch.return_value = {'state': 'ERROR', 'information': 'job_stage data'}
49-
result = application_summary.spark_application('app1-example-job')
50-
self.assertEqual(result['aggregate_status'], '%s' % ('RUNNING_WITH_ERRORS'))
48+
result = application_summary.spark_yarn_handler(input_data)
49+
self.assertEqual(result[0], '%s' % ('RUNNING_WITH_ERRORS'))
5150

5251
#Spark application in other states than above states
53-
yarn_check_patch.return_value = {'id': 'application_1234', 'state': 'ACCEPTED', \
52+
input_data = {'id': 'application_1234', 'state': 'ACCEPTED', \
5453
'finalStatus': 'UNDEFINED', 'diagnostics': 'Accepted'}
55-
result = application_summary.spark_application('app1-example-job')
56-
self.assertEqual(result['aggregate_status'], '%s' % ('ACCEPTED'))
54+
result = application_summary.spark_yarn_handler(input_data)
55+
self.assertEqual(result[0], '%s' % ('ACCEPTED'))
5756

5857
@patch('requests.get')
5958
def test_check_in_yarn(self, mock_req):
@@ -473,3 +472,14 @@ def test_yarn_info(self, yarn_mock_req):
473472
'type': 'UNKNOWN',
474473
'information': 'app with id application_123 not found'
475474
})
475+
476+
# pylint: disable=C0301
477+
478+
@patch('platform.dist')
479+
@patch('commands.getoutput')
480+
def test_check_in_service_log(self, cmd_patch, distro_patch):
481+
distro_patch.return_value = ['redhat']
482+
cmd_patch.return_value = 'Mar 22 03:39:32 rhel-cdh-hadoop-edge spark-submit[2475]: 18/03/22 03:39:32 INFO yarn.Client: Uploading resource file:/opt/platform_app/s1/example/dataplatform-raw.avsc -> hdfs://rhel-cdh-hadoop-mgr-1:8020/user/pnda/.sparkStaging/application_1521689436801_0013/dataplatform-raw.avsc\nMar 22 03:39:32 rhel-cdh-hadoop-edge spark-submit[2475]: 18/03/22 03:39:32 INFO yarn.Client: Uploading resource file:/opt/platform_app/s1/example/avro-1.8.1-py2.7.egg -> hdfs://rhel-cdh-hadoop-mgr-1:8020/user/pnda/.sparkStaging/application_1521689436801_0013/avro-1.8.1-py2.7.egg\nMar 22 03:39:32 rhel-cdh-hadoop-edge spark-submit[2475]: 18/03/22 03:39:32 INFO yarn.Client: Deleting staging directory .sparkStaging/application_1521689436801_0013\nMar 22 03:39:32 rhel-cdh-hadoop-edge spark-submit[2475]: Exception in thread "main" java.io.FileNotFoundException: File file:/opt/platform_app/s1/example/avro-1.8.1-py2.7.egg does not exist\nMar 22 03:38:01 rhel-cdh-hadoop-edge spark-submit[31045]: at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:598)\nMar 22 03:38:01 rhel-cdh-hadoop-edge spark-submit[31045]: at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:811)\nMar 22 03:38:01 rhel-cdh-hadoop-edge spark-submit[31045]: at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:588)\nMar 22 03:38:01 rhel-cdh-hadoop-edge spark-submit[31045]: at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:425)\nMar 22 03:38:01 rhel-cdh-hadoop-edge spark-submit[31045]: at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:340)Mar 22 03:38:01 rhel-cdh-hadoop-edge spark-submit[31045]: at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:292)'
483+
result = application_summary.check_in_service_log('platform_app', 'application_123', 'example')
484+
self.assertEqual(result[0], 'FAILED_TO_SUBMIT_TO_YARN')
485+
self.assertEqual(result[1], 'java.io.FileNotFoundException. More details: execute "journalctl -u platform_app-application_123-example"')

0 commit comments

Comments
 (0)