|
| 1 | +import time |
| 2 | +import json |
| 3 | +import logging |
| 4 | +import sys |
| 5 | +from importlib import import_module |
| 6 | +from multiprocessing import TimeoutError as ThreadTimeoutError |
| 7 | + |
| 8 | +from summary_aggregator import ComponentSummaryAggregator |
| 9 | +from plugins_summary.yarn_connection import YarnConnection |
| 10 | +from async_dispatcher import AsyncDispatcher |
| 11 | +import application_registrar |
| 12 | +import application_summary_registrar |
| 13 | +import deployer_utils |
| 14 | + |
| 15 | + |
| 16 | +# constants |
| 17 | +SUMMARY_INTERVAL = 30 |
| 18 | +STATUS_INTERVAL = 0.1 |
| 19 | +REST_API_REQ_TIMEOUT = 5 |
| 20 | +MAX_APP_SUMMARY_TIMEOUT = 60 |
| 21 | + |
| 22 | +def milli_time(): |
| 23 | + return int(round(time.time() * 1000)) |
| 24 | + |
| 25 | +class ApplicationDetailedSummary(object): |
| 26 | + |
| 27 | + def __init__(self, environment, config): |
| 28 | + self._environment = environment |
| 29 | + self._environment.update({'rest_api_req_timeout': REST_API_REQ_TIMEOUT}) |
| 30 | + self._config = config |
| 31 | + self._application_registrar = application_registrar.HbaseApplicationRegistrar(environment['hbase_thrift_server']) |
| 32 | + self._application_summary_registrar = application_summary_registrar.HBaseAppplicationSummary(environment['hbase_thrift_server']) |
| 33 | + self._yarn_connection = YarnConnection(self._environment) |
| 34 | + self._summary_aggregator = ComponentSummaryAggregator() |
| 35 | + self._component_creators = {} |
| 36 | + self.dispatcher = AsyncDispatcher(num_threads=4) |
| 37 | + |
| 38 | + def generate(self): |
| 39 | + """ |
| 40 | + Update applications detailed summary |
| 41 | + """ |
| 42 | + applist = self._application_registrar.list_applications() |
| 43 | + logging.info("List of applications: %s", ', '.join(applist)) |
| 44 | + self._application_summary_registrar.sync_with_dm(applist) |
| 45 | + apps_to_be_processed = {} |
| 46 | + |
| 47 | + for app in applist: |
| 48 | + apps_to_be_processed.update({app: self.generate_summary(app)}) |
| 49 | + |
| 50 | + wait_time = 0 |
| 51 | + |
| 52 | + # waiting block for all the application to get completed |
| 53 | + while len(apps_to_be_processed) != 0: |
| 54 | + for app_name in apps_to_be_processed.keys(): |
| 55 | + try: |
| 56 | + apps_to_be_processed[app_name].task.get(STATUS_INTERVAL) # |
| 57 | + del apps_to_be_processed[app_name] |
| 58 | + except ThreadTimeoutError: |
| 59 | + wait_time += STATUS_INTERVAL # increasing the wait time by status interval |
| 60 | + if round(wait_time, 1) % MAX_APP_SUMMARY_TIMEOUT == 0: |
| 61 | + # logging out list of applications whose wait time exceeds the max app summary timeout, on the interval of same max app summary timeout |
| 62 | + # i.e. every 60 seconds as per current max app summary timeout |
| 63 | + logging.error("Timeout exceeded, %s applications waiting for %d seconds", (',').join(apps_to_be_processed.keys()), int(wait_time)) |
| 64 | + |
| 65 | + def generate_summary(self, application): |
| 66 | + """ |
| 67 | + Update HBase wih recent application summary |
| 68 | + """ |
| 69 | + def _do_generate(): |
| 70 | + |
| 71 | + try: |
| 72 | + create_data = self._application_registrar.get_create_data(application) |
| 73 | + input_data = {} |
| 74 | + for component_name, component_data in create_data.iteritems(): |
| 75 | + input_data[component_name] = {} |
| 76 | + input_data[component_name]["component_ref"] = self._load_creator(component_name) |
| 77 | + input_data[component_name]["component_data"] = component_data |
| 78 | + app_data = self._summary_aggregator.get_application_summary(application, input_data) |
| 79 | + self._application_summary_registrar.post_to_hbase(app_data, application) |
| 80 | + logging.debug("Application: %s, Status: %s", application, app_data[application]['aggregate_status']) |
| 81 | + except Exception as ex: |
| 82 | + logging.error('%s while trying to get status of application "%s"', str(ex), application) |
| 83 | + |
| 84 | + return self.dispatcher.run_as_asynch(task=_do_generate) |
| 85 | + |
| 86 | + def _load_creator(self, component_type): |
| 87 | + |
| 88 | + creator = self._component_creators.get(component_type) |
| 89 | + |
| 90 | + if creator is None: |
| 91 | + |
| 92 | + cls = '%s%sComponentSummary' % (component_type[0].upper(), component_type[1:]) |
| 93 | + try: |
| 94 | + module = import_module("plugins_summary.%s" % component_type) |
| 95 | + self._component_creators[component_type] = getattr(module, cls)\ |
| 96 | + (self._environment, self._yarn_connection, self._application_summary_registrar) |
| 97 | + creator = self._component_creators[component_type] |
| 98 | + except ImportError as exception: |
| 99 | + logging.error( |
| 100 | + 'Unable to load Creator for component type "%s" [%s]', |
| 101 | + component_type, |
| 102 | + exception) |
| 103 | + |
| 104 | + return creator |
| 105 | + |
| 106 | +def main(): |
| 107 | + """ |
| 108 | + main |
| 109 | + """ |
| 110 | + config = None |
| 111 | + with open('dm-config.json', 'r') as con: |
| 112 | + config = json.load(con) |
| 113 | + |
| 114 | + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', |
| 115 | + level=logging.getLevelName(config['config']['log_level']), |
| 116 | + stream=sys.stderr) |
| 117 | + |
| 118 | + deployer_utils.fill_hadoop_env(config['environment'], config['config']) |
| 119 | + |
| 120 | + summary = ApplicationDetailedSummary(config['environment'], config['config']) |
| 121 | + |
| 122 | + logging.info('Starting... Building actual status for applications') |
| 123 | + |
| 124 | + while True: |
| 125 | + # making sure every 30 seconds generate summary initiated |
| 126 | + start_time_on_cur_round = milli_time() |
| 127 | + |
| 128 | + summary.generate() |
| 129 | + |
| 130 | + finish_time_on_cur_round = (milli_time() - start_time_on_cur_round)/1000.0 |
| 131 | + logging.info("Finished generating summary, time taken %s seconds", str(finish_time_on_cur_round)) |
| 132 | + |
| 133 | + if finish_time_on_cur_round >= SUMMARY_INTERVAL: |
| 134 | + continue |
| 135 | + else: |
| 136 | + # putting sleep only for the remainig time from the current round's time |
| 137 | + time.sleep(SUMMARY_INTERVAL - finish_time_on_cur_round) |
| 138 | + |
| 139 | +if __name__ == "__main__": |
| 140 | + main() |
0 commit comments