Skip to content

Commit 50e7d98

Browse files
committed
Refactor application detail API
Add yarn status & return the status for most recent app instance, not just a running one. Use the RM REST API instead of the CLI to get this information. PNDA-2464
1 parent 9909952 commit 50e7d98

File tree

6 files changed

+233
-39
lines changed

6 files changed

+233
-39
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ All notable changes to this project will be documented in this file.
44
## [Unreleased]
55
### Changed
66
- Externalized build logic from Jenkins to shell script so it can be reused
7+
- Refactored the information returned by the Application Detail API to include the YARN application state and also to return information for jobs that have ended. Made the implementation more performant by using the YARN Resource Manager REST API instead of the CLI.
78

89
## [0.2.0] 2016-10-21
910
### Added

README.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,19 @@ Response Codes:
255255
500 - Server Error
256256
257257
{
258-
"status": "STARTED",
259-
"name": "spark-batch-example-app-instance",
260-
"yarn-ids": [
261-
{"component":"example", "type":"oozie", "yarn-id":"application_1455877292606_0404"}
262-
]
258+
"yarn_applications": {
259+
"oozie-example": {
260+
"type": "oozie",
261+
"yarn-id": "application_1479988623709_0015",
262+
"component": "example",
263+
"yarn-start-time": 1479992520527,
264+
"yarn-state": "FINISHED"
265+
}
266+
},
267+
"status": "STARTED",
268+
"name": "spark-batch-example-app-instance"
263269
}
270+
264271
````
265272

266273
### Start _application_

api/src/main/resources/application_creator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,11 @@ def get_application_runtime_details(self, application_name, application_create_d
134134
logging.debug("get_application_runtime_details: %s %s", application_name, application_create_data)
135135

136136
details = {}
137-
details['yarn_ids'] = []
137+
details['yarn_applications'] = {}
138138
for component_type, component_create_data in application_create_data.iteritems():
139139
creator = self._load_creator(component_type)
140140
type_details = creator.get_component_runtime_details(component_create_data)
141-
details['yarn_ids'].extend(type_details['yarn_ids'])
141+
details['yarn_applications'].update(type_details['yarn_applications'])
142142
return details
143143

144144
def _load_creator(self, component_type):

api/src/main/resources/deployer_utils.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,13 @@ def fill_hadoop_env(env):
8888
elif service.type == "YARN":
8989
for role in service.get_all_roles():
9090
if role.type == "RESOURCEMANAGER":
91-
env['yarn_resource_manager_host'] = '%s' % api.get_host(
92-
role.hostRef.hostId).hostname
93-
env['yarn_resource_manager_port'] = '8088'
94-
env['yarn_resource_manager_mr_port'] = '8032'
91+
if 'yarn_resource_manager_host' in env:
92+
rm_instance = '_backup'
93+
else:
94+
rm_instance = ''
95+
env['yarn_resource_manager_host%s' % rm_instance] = '%s' % api.get_host(role.hostRef.hostId).hostname
96+
env['yarn_resource_manager_port%s' % rm_instance] = '8088'
97+
env['yarn_resource_manager_mr_port%s' % rm_instance] = '8032'
9598
if role.type == "NODEMANAGER":
9699
if 'yarn_node_managers' in env:
97100
env['yarn_node_managers'] = '%s,%s' % (

api/src/main/resources/plugins/base_creator.py

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import json
3737
import string
3838
import collections
39-
import subprocess
39+
import requests
4040
import hbase_descriptor
4141
import opentsdb_descriptor
4242
from deployer_utils import HDFS
@@ -59,6 +59,18 @@ def __init__(self, config, environment, namespace):
5959
environment['webhdfs_port'],
6060
'hdfs')
6161

62+
if 'yarn_resource_manager_host' in environment:
63+
self._yarn_resource_manager = "%s:%s" % (environment['yarn_resource_manager_host'],
64+
environment['yarn_resource_manager_port'])
65+
else:
66+
self._yarn_resource_manager = None
67+
68+
if 'yarn_resource_manager_host_backup' in environment:
69+
self._yarn_resource_manager_backup = "%s:%s" % (environment['yarn_resource_manager_host_backup'],
70+
environment['yarn_resource_manager_port_backup'])
71+
else:
72+
self._yarn_resource_manager_backup = None
73+
6274
def validate_component(self, components):
6375
'''
6476
Validates components of the package of given component type
@@ -126,6 +138,12 @@ def stop_component(self, application_name, stop_data):
126138
'''
127139
pass
128140

141+
def get_component_type(self):
142+
'''
143+
returns a name for the component type
144+
'''
145+
pass
146+
129147
def _instantiate_properties(self, application_name, component, property_overrides):
130148
logging.debug(
131149
"_instantiate_properties %s %s",
@@ -257,24 +275,55 @@ def validate_components(self, components):
257275

258276
def get_component_runtime_details(self, create_data):
259277
logging.debug("get_component_runtime_details: %s", create_data)
278+
260279
details = {}
261-
yarn_ids = []
262-
details['yarn_ids'] = yarn_ids
263-
for single_component_data in create_data:
264-
yarn_id = self.get_yarn_id(single_component_data['component_job_name'])
265-
if yarn_id is not None:
266-
yarn_ids.append({"component": single_component_data['component_name'],
267-
"type": self.get_component_type(),
268-
"yarn-id": yarn_id})
280+
yarn_applications = {}
281+
details['yarn_applications'] = yarn_applications
282+
283+
all_yarn_applications = self._get_yarn_applications()
284+
if all_yarn_applications is not None:
285+
for single_component_data in create_data:
286+
app_info = self._find_yarn_app_info(all_yarn_applications, single_component_data['component_job_name'])
287+
if app_info is not None:
288+
job_key = '%s-%s' % (self.get_component_type(), single_component_data['component_name'])
289+
yarn_applications[job_key] = {"component": single_component_data['component_name'],
290+
"type": self.get_component_type(),
291+
"yarn-id": app_info['id'],
292+
"yarn-start-time": app_info['startedTime'],
293+
"yarn-state": app_info['state']}
294+
else:
295+
logging.error('Failed to query application list from any resource manager')
296+
269297
return details
270298

271-
def get_yarn_id(self, job_name):
272-
out = subprocess.check_output(['yarn', 'application', '-list'])
273-
for line in out.splitlines():
274-
fields = line.split('\t')
275-
if len(fields) >= 6:
276-
logging.debug(line)
277-
app = fields[1].strip()
278-
if app == job_name:
279-
return fields[0].strip()
280-
return None
299+
def _get_yarn_applications_from_rm(self, resource_manager):
300+
result = None
301+
logging.debug('Querying list of yarn applications from %s', resource_manager)
302+
try:
303+
url = 'http://%s/ws/v1/cluster/apps' % resource_manager
304+
result = requests.get(url, headers={'Accept': 'application/json'}).json()
305+
except:
306+
logging.info('Failed to query application list from %s', url)
307+
308+
return result
309+
310+
def _get_yarn_applications(self):
311+
result = self._get_yarn_applications_from_rm(self._yarn_resource_manager)
312+
if result is None and self._yarn_resource_manager_backup is not None:
313+
result = self._get_yarn_applications_from_rm(self._yarn_resource_manager_backup)
314+
return result
315+
316+
def _get_yarn_start_time(self, app_info):
317+
try:
318+
return int(app_info['startedTime'])
319+
except:
320+
return 0
321+
322+
def _find_yarn_app_info(self, all_yarn_applications, job_name):
323+
result = None
324+
if 'apps' in all_yarn_applications and 'app' in all_yarn_applications['apps']:
325+
for app in all_yarn_applications['apps']['app']:
326+
if app['name'] == job_name:
327+
if result is None or self._get_yarn_start_time(app) > self._get_yarn_start_time(result):
328+
result = app
329+
return result

api/src/main/resources/test_application_creator.py

Lines changed: 143 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import unittest
2525
from datetime import datetime
26-
from mock import patch, mock_open
26+
from mock import patch, mock_open, Mock
2727
from application_creator import ApplicationCreator
2828
from exceptiondef import FailedValidation, FailedCreation
2929

@@ -357,13 +357,147 @@ def test_destroy_application(self, rmdir_mock, isdir_mock, put_mock, exec_ssh_mo
357357
put_mock.assert_any_call('oozie/v1/job/someid2?action=kill')
358358

359359
# pylint: disable=line-too-long
360-
@patch('subprocess.check_output')
361-
def test_get_runtime_details(self, subprocess_mock):
362-
subprocess_mock.return_value = '''Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
363-
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
364-
application_1455877292606_13009 aname-componentA-job SPARK hdfs root.hdfs RUNNING UNDEFINED 10% http://psm-cdh-dn0:51739
365-
application_1455877292606_13010 aname-componentC-job SPARK hdfs root.hdfs RUNNING UNDEFINED 10% http://psm-cdh-dn0:51739'''
360+
@patch('requests.get')
361+
def test_get_runtime_details(self, get_mock):
362+
rm_call = Mock()
363+
rm_call.json.return_value = {
364+
"apps": {
365+
"app": [{
366+
"id": "application_1455877292606_13009",
367+
"user": "hdfs",
368+
"name": "aname-componentA-job",
369+
"queue": "root.users.hdfs",
370+
"state": "FINISHED",
371+
"finalStatus": "SUCCEEDED",
372+
"progress": 100.0,
373+
"trackingUI": "History",
374+
"trackingUrl": "",
375+
"diagnostics": "",
376+
"clusterId": 1479988623709,
377+
"applicationType": "MAPREDUCE",
378+
"applicationTags": "",
379+
"startedTime": 1479996060665,
380+
"finishedTime": 1479996103786,
381+
"elapsedTime": 43121,
382+
"amContainerLogs": "",
383+
"amHostHttpAddress": "",
384+
"allocatedMB": -1,
385+
"allocatedVCores": -1,
386+
"runningContainers": -1,
387+
"memorySeconds": 30600,
388+
"vcoreSeconds": 29,
389+
"preemptedResourceMB": 0,
390+
"preemptedResourceVCores": 0,
391+
"numNonAMContainerPreempted": 0,
392+
"numAMContainerPreempted": 0,
393+
"logAggregationStatus": "DISABLED"
394+
}, {
395+
"id": "application_1455877292606_13010",
396+
"user": "hdfs",
397+
"name": "aname-componentC-job",
398+
"queue": "root.users.hdfs",
399+
"state": "FINISHED",
400+
"finalStatus": "SUCCEEDED",
401+
"progress": 100.0,
402+
"trackingUI": "History",
403+
"trackingUrl": "",
404+
"diagnostics": "",
405+
"clusterId": 1479988623709,
406+
"applicationType": "MAPREDUCE",
407+
"applicationTags": "",
408+
"startedTime": 1479996060665,
409+
"finishedTime": 1479996103786,
410+
"elapsedTime": 43121,
411+
"amContainerLogs": "",
412+
"amHostHttpAddress": "",
413+
"allocatedMB": -1,
414+
"allocatedVCores": -1,
415+
"runningContainers": -1,
416+
"memorySeconds": 30600,
417+
"vcoreSeconds": 29,
418+
"preemptedResourceMB": 0,
419+
"preemptedResourceVCores": 0,
420+
"numNonAMContainerPreempted": 0,
421+
"numAMContainerPreempted": 0,
422+
"logAggregationStatus": "DISABLED"
423+
}, {
424+
"id": "application_1455877292606_13011",
425+
"user": "hdfs",
426+
"name": "aname-componentC-job",
427+
"queue": "root.users.hdfs",
428+
"state": "RUNNING",
429+
"finalStatus": "SUCCEEDED",
430+
"progress": 100.0,
431+
"trackingUI": "History",
432+
"trackingUrl": "",
433+
"diagnostics": "",
434+
"clusterId": 1479988623709,
435+
"applicationType": "MAPREDUCE",
436+
"applicationTags": "",
437+
"startedTime": 1479996060667,
438+
"finishedTime": 1479996103786,
439+
"elapsedTime": 43121,
440+
"amContainerLogs": "",
441+
"amHostHttpAddress": "",
442+
"allocatedMB": -1,
443+
"allocatedVCores": -1,
444+
"runningContainers": -1,
445+
"memorySeconds": 30600,
446+
"vcoreSeconds": 29,
447+
"preemptedResourceMB": 0,
448+
"preemptedResourceVCores": 0,
449+
"numNonAMContainerPreempted": 0,
450+
"numAMContainerPreempted": 0,
451+
"logAggregationStatus": "DISABLED"
452+
}, {
453+
"id": "application_1455877292606_13012",
454+
"user": "hdfs",
455+
"name": "aname-componentC-job",
456+
"queue": "root.users.hdfs",
457+
"state": "NOT STARTED",
458+
"finalStatus": "SUCCEEDED",
459+
"progress": 100.0,
460+
"trackingUI": "History",
461+
"trackingUrl": "",
462+
"diagnostics": "",
463+
"clusterId": 1479988623709,
464+
"applicationType": "MAPREDUCE",
465+
"applicationTags": "",
466+
"startedTime": None,
467+
"finishedTime": 1479996103786,
468+
"elapsedTime": 43121,
469+
"amContainerLogs": "",
470+
"amHostHttpAddress": "",
471+
"allocatedMB": -1,
472+
"allocatedVCores": -1,
473+
"runningContainers": -1,
474+
"memorySeconds": 30600,
475+
"vcoreSeconds": 29,
476+
"preemptedResourceMB": 0,
477+
"preemptedResourceVCores": 0,
478+
"numNonAMContainerPreempted": 0,
479+
"numAMContainerPreempted": 0,
480+
"logAggregationStatus": "DISABLED"
481+
}]
482+
}
483+
}
484+
get_mock.return_value = rm_call
485+
366486
creator = ApplicationCreator(self.config, self.environment, self.service)
367487
result = creator.get_application_runtime_details('name', self.create_data)
368-
self.assertEqual(result, {'yarn_ids': [{'component': 'componentA', 'type': 'oozie', 'yarn-id': 'application_1455877292606_13009'},
369-
{'component': 'componentC', 'type': 'sparkStreaming', 'yarn-id': 'application_1455877292606_13010'}]})
488+
self.assertEqual(result, {"yarn_applications": {
489+
"oozie-componentA": {
490+
"type": "oozie",
491+
"yarn-id": "application_1455877292606_13009",
492+
"component": "componentA",
493+
"yarn-start-time": 1479996060665,
494+
"yarn-state": "FINISHED"
495+
},
496+
"sparkStreaming-componentC": {
497+
"type": "sparkStreaming",
498+
"yarn-id": "application_1455877292606_13011",
499+
"component": "componentC",
500+
"yarn-start-time": 1479996060667,
501+
"yarn-state": "RUNNING"
502+
}
503+
}})

0 commit comments

Comments
 (0)