Skip to content

Commit 65c9442

Browse files
authored
Merge pull request #78 from dharaneeshvrd/PNDA-4500
PNDA-4500: Flink application detailed summary
2 parents 30a48e5 + 8c04fa5 commit 65c9442

15 files changed

+1710
-1210
lines changed

README.md

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -344,77 +344,83 @@ user.name - User name to run this command as. Should have permissions to perform
344344

345345
````
346346
{
347-
"oozie-application": {
348-
"aggregate_status": "STARTED_RUNNING_WITH_ERRORS",
347+
"spark-batch-py": {
348+
"aggregate_status": "COMPLETED",
349349
"oozie-1": {
350-
"status": "WARN",
351-
"aggregate_status": "STARTED_RUNNING_WITH_ERRORS",
350+
"status": "OK",
351+
"name": "spark-batch-py-workflow",
352352
"actions": {
353-
"workflow-1": {
354-
"status": "WARN",
355-
"oozieId": "0000004-171229054340125-oozie-oozi-W",
356-
"actions": {
357-
"subworkflow-1": {
358-
"status": "WARN",
359-
"oozieId": "0000005-171229054340125-oozie-oozi-W",
360-
"actions": {
361-
"job-2": {
362-
"status": "ERROR",
363-
"information": "No JSON object could be decoded",
364-
"applicationType": "SPARK",
365-
"name": "process",
366-
"yarnId": "application_1514526198433_0022"
367-
},
368-
"job-1": {
369-
"status": "OK",
370-
"information": null,
371-
"applicationType": "MAPREDUCE",
372-
"name": "download",
373-
"yarnId": "application_1514526198433_0019"
374-
}
375-
},
376-
"name": "oozie-application-subworkflow"
377-
}
378-
},
379-
"name": "oozie-application-workflow"
353+
"job-1": {
354+
"status": "OK",
355+
"information": "",
356+
"yarnId": "application_1531380960927_0152",
357+
"applicationType": "spark",
358+
"name": "process"
380359
}
381360
},
382-
"oozieId": "0000003-171229054340125-oozie-oozi-C",
383-
"name": "oozie-application-coordinator"
361+
"componentType": "Oozie",
362+
"aggregate_status": "COMPLETED",
363+
"oozieId": "0000013-180712073712712-oozie-oozi-W"
384364
}
385365
}
386366
}
387367
````
388368
### Summary status in case of spark-streaming component
389369
````
390370
{
391-
"spark-streaming-application": {
392-
"aggregate_status": "STARTED_RUNNING_WITH_NO_ERRORS",
371+
"spark-stream": {
372+
"aggregate_status": "RUNNING",
393373
"sparkStreaming-1": {
394374
"information": {
395375
"stageSummary": {
396376
"active": 0,
397-
"number_of_stages": 128,
398-
"complete": 128,
377+
"number_of_stages": 1404,
378+
"complete": 1000,
399379
"pending": 0,
400380
"failed": 0
401381
},
402382
"jobSummary": {
403383
"unknown": 0,
404-
"number_of_jobs": 32,
384+
"number_of_jobs": 351,
405385
"running": 0,
406-
"succeeded": 32,
386+
"succeeded": 351,
407387
"failed": 0
408388
}
409389
},
410-
"aggregate_status": "STARTED_RUNNING_WITH_NO_ERRORS",
411-
"name": "spark-streaming-application-example-job",
412-
"yarnId": "application_1514526198433_0069"
390+
"name": "spark-stream-example-job",
391+
"yarnId": "application_1531380960927_0153",
392+
"componentType": "SparkStreaming",
393+
"aggregate_status": "RUNNING",
394+
"tracking_url": "http://st-2-std-hadoop-mgr-2.node.dc1.pnda.local:8088/proxy/application_1531380960927_0153/"
395+
}
396+
}
397+
}
398+
````
399+
### Summary status in case of flink component
400+
````
401+
{
402+
"test1": {
403+
"aggregate_status": "RUNNING",
404+
"flink-1": {
405+
"information": {
406+
"state": "OK",
407+
"vertices": [
408+
{
409+
"status": "RUNNING",
410+
"name": "Source"
411+
}
412+
],
413+
"flinkJid": "e7a7163fef86ad81017a0239839207cb"
414+
},
415+
"name": "test1-example-job",
416+
"yarnId": "application_1524556418619_0205",
417+
"trackingUrl": "http://rhel-hadoop-mgr-1.node.dc1.pnda.local:8088/proxy/application_1524556418619_0205/#/jobs/e7a7163fef86ad81017a0239839207cb",
418+
"componentType": "Flink",
419+
"aggregate_status": "RUNNING"
413420
}
414421
}
415422
}
416423
````
417-
418424
### Start _application_
419425
````
420426
POST /applications/<application>/start?user.name=<username>
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import time
2+
import json
3+
import logging
4+
import sys
5+
from importlib import import_module
6+
from multiprocessing import TimeoutError as ThreadTimeoutError
7+
8+
from summary_aggregator import ComponentSummaryAggregator
9+
from plugins_summary.yarn_connection import YarnConnection
10+
from async_dispatcher import AsyncDispatcher
11+
import application_registrar
12+
import application_summary_registrar
13+
import deployer_utils
14+
15+
16+
# constants
17+
SUMMARY_INTERVAL = 30
18+
STATUS_INTERVAL = 0.1
19+
REST_API_REQ_TIMEOUT = 5
20+
MAX_APP_SUMMARY_TIMEOUT = 60
21+
22+
def milli_time():
23+
return int(round(time.time() * 1000))
24+
25+
class ApplicationDetailedSummary(object):
26+
27+
def __init__(self, environment, config):
28+
self._environment = environment
29+
self._environment.update({'rest_api_req_timeout': REST_API_REQ_TIMEOUT})
30+
self._config = config
31+
self._application_registrar = application_registrar.HbaseApplicationRegistrar(environment['hbase_thrift_server'])
32+
self._application_summary_registrar = application_summary_registrar.HBaseAppplicationSummary(environment['hbase_thrift_server'])
33+
self._yarn_connection = YarnConnection(self._environment)
34+
self._summary_aggregator = ComponentSummaryAggregator()
35+
self._component_creators = {}
36+
self.dispatcher = AsyncDispatcher(num_threads=4)
37+
38+
def generate(self):
39+
"""
40+
Update applications detailed summary
41+
"""
42+
applist = self._application_registrar.list_applications()
43+
logging.info("List of applications: %s", ', '.join(applist))
44+
self._application_summary_registrar.sync_with_dm(applist)
45+
apps_to_be_processed = {}
46+
47+
for app in applist:
48+
apps_to_be_processed.update({app: self.generate_summary(app)})
49+
50+
wait_time = 0
51+
52+
# waiting block for all the application to get completed
53+
while len(apps_to_be_processed) != 0:
54+
for app_name in apps_to_be_processed.keys():
55+
try:
56+
apps_to_be_processed[app_name].task.get(STATUS_INTERVAL) #
57+
del apps_to_be_processed[app_name]
58+
except ThreadTimeoutError:
59+
wait_time += STATUS_INTERVAL # increasing the wait time by status interval
60+
if round(wait_time, 1) % MAX_APP_SUMMARY_TIMEOUT == 0:
61+
# logging out list of applications whose wait time exceeds the max app summary timeout, on the interval of same max app summary timeout
62+
# i.e. every 60 seconds as per current max app summary timeout
63+
logging.error("Timeout exceeded, %s applications waiting for %d seconds", (',').join(apps_to_be_processed.keys()), int(wait_time))
64+
65+
def generate_summary(self, application):
66+
"""
67+
Update HBase wih recent application summary
68+
"""
69+
def _do_generate():
70+
71+
try:
72+
create_data = self._application_registrar.get_create_data(application)
73+
input_data = {}
74+
for component_name, component_data in create_data.iteritems():
75+
input_data[component_name] = {}
76+
input_data[component_name]["component_ref"] = self._load_creator(component_name)
77+
input_data[component_name]["component_data"] = component_data
78+
app_data = self._summary_aggregator.get_application_summary(application, input_data)
79+
self._application_summary_registrar.post_to_hbase(app_data, application)
80+
logging.debug("Application: %s, Status: %s", application, app_data[application]['aggregate_status'])
81+
except Exception as ex:
82+
logging.error('%s while trying to get status of application "%s"', str(ex), application)
83+
84+
return self.dispatcher.run_as_asynch(task=_do_generate)
85+
86+
def _load_creator(self, component_type):
87+
88+
creator = self._component_creators.get(component_type)
89+
90+
if creator is None:
91+
92+
cls = '%s%sComponentSummary' % (component_type[0].upper(), component_type[1:])
93+
try:
94+
module = import_module("plugins_summary.%s" % component_type)
95+
self._component_creators[component_type] = getattr(module, cls)\
96+
(self._environment, self._yarn_connection, self._application_summary_registrar)
97+
creator = self._component_creators[component_type]
98+
except ImportError as exception:
99+
logging.error(
100+
'Unable to load Creator for component type "%s" [%s]',
101+
component_type,
102+
exception)
103+
104+
return creator
105+
106+
def main():
107+
"""
108+
main
109+
"""
110+
config = None
111+
with open('dm-config.json', 'r') as con:
112+
config = json.load(con)
113+
114+
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
115+
level=logging.getLevelName(config['config']['log_level']),
116+
stream=sys.stderr)
117+
118+
deployer_utils.fill_hadoop_env(config['environment'], config['config'])
119+
120+
summary = ApplicationDetailedSummary(config['environment'], config['config'])
121+
122+
logging.info('Starting... Building actual status for applications')
123+
124+
while True:
125+
# making sure every 30 seconds generate summary initiated
126+
start_time_on_cur_round = milli_time()
127+
128+
summary.generate()
129+
130+
finish_time_on_cur_round = (milli_time() - start_time_on_cur_round)/1000.0
131+
logging.info("Finished generating summary, time taken %s seconds", str(finish_time_on_cur_round))
132+
133+
if finish_time_on_cur_round >= SUMMARY_INTERVAL:
134+
continue
135+
else:
136+
# putting sleep only for the remainig time from the current round's time
137+
time.sleep(SUMMARY_INTERVAL - finish_time_on_cur_round)
138+
139+
if __name__ == "__main__":
140+
main()

0 commit comments

Comments
 (0)