|
| 1 | +import concurrent |
| 2 | +import multiprocessing |
| 3 | +from concurrent.futures import ThreadPoolExecutor, Future |
| 4 | +from contextlib import suppress |
| 5 | +from logging import getLogger |
| 6 | +from queue import Queue |
| 7 | +from threading import Event |
| 8 | +from time import sleep |
| 9 | +from typing import Dict, Optional, List, Any |
| 10 | + |
| 11 | +import pkg_resources |
| 12 | +import yaml |
| 13 | +from resotoclient import Kind, Model |
| 14 | +from resotolib.args import Namespace |
| 15 | +from resotolib.baseplugin import BaseCollectorPlugin |
| 16 | +from resotolib.baseresources import BaseResource |
| 17 | +from resotolib.config import Config |
| 18 | +from resotolib.core.actions import CoreFeedback |
| 19 | +from resotolib.core.model_export import node_to_dict |
| 20 | +from resotolib.json import from_json |
| 21 | +from resotolib.proc import emergency_shutdown |
| 22 | +from resotolib.types import Json |
| 23 | +from rich import print as rich_print |
| 24 | +from rich.live import Live |
| 25 | +from sqlalchemy import Engine |
| 26 | + |
| 27 | +from cloud2sql.show_progress import CollectInfo |
| 28 | +from cloud2sql.sql import SqlModel, SqlUpdater |
| 29 | + |
| 30 | +log = getLogger("cloud2sql") |
| 31 | + |
| 32 | + |
| 33 | +def collectors(raw_config: Json, feedback: CoreFeedback) -> Dict[str, BaseCollectorPlugin]: |
| 34 | + result = {} |
| 35 | + config: Config = Config # type: ignore |
| 36 | + for entry_point in pkg_resources.iter_entry_points("resoto.plugins"): |
| 37 | + plugin_class = entry_point.load() |
| 38 | + if issubclass(plugin_class, BaseCollectorPlugin) and plugin_class.cloud in raw_config: |
| 39 | + log.info(f"Found collector {plugin_class.cloud} ({plugin_class.__name__})") |
| 40 | + plugin_class.add_config(config) |
| 41 | + plugin = plugin_class() |
| 42 | + if hasattr(plugin, "core_feedback"): |
| 43 | + setattr(plugin, "core_feedback", feedback.with_context(plugin.cloud)) |
| 44 | + result[plugin_class.cloud] = plugin |
| 45 | + |
| 46 | + Config.init_default_config() |
| 47 | + Config.running_config.data = {**Config.running_config.data, **Config.read_config(raw_config)} |
| 48 | + return result |
| 49 | + |
| 50 | + |
| 51 | +def configure(path_to_config: Optional[str]) -> Json: |
| 52 | + # Config.init_default_config() |
| 53 | + if path_to_config: |
| 54 | + with open(path_to_config) as f: |
| 55 | + return yaml.safe_load(f) # type: ignore |
| 56 | + return {} |
| 57 | + |
| 58 | + |
| 59 | +def collect(collector: BaseCollectorPlugin, engine: Engine, feedback: CoreFeedback, args: Namespace) -> None: |
| 60 | + # collect cloud data |
| 61 | + feedback.progress_done(collector.cloud, 0, 1) |
| 62 | + collector.collect() |
| 63 | + # read the kinds created from this collector |
| 64 | + kinds = [from_json(m, Kind) for m in collector.graph.export_model(walk_subclasses=False)] |
| 65 | + model = SqlModel(Model({k.fqn: k for k in kinds})) |
| 66 | + node_edge_count = len(collector.graph.nodes) + len(collector.graph.edges) |
| 67 | + ne_current = 0 |
| 68 | + progress_update = 5000 |
| 69 | + feedback.progress_done("sync_db", 0, node_edge_count, context=[collector.cloud]) |
| 70 | + with engine.connect() as conn: |
| 71 | + # create the ddl metadata from the kinds |
| 72 | + model.create_schema(conn, args) |
| 73 | + # ingest the data |
| 74 | + updater = SqlUpdater(model) |
| 75 | + node: BaseResource |
| 76 | + for node in collector.graph.nodes: |
| 77 | + node._graph = collector.graph |
| 78 | + exported = node_to_dict(node) |
| 79 | + exported["type"] = "node" |
| 80 | + exported["ancestors"] = { |
| 81 | + "cloud": {"reported": {"id": node.cloud().name}}, |
| 82 | + "account": {"reported": {"id": node.account().name}}, |
| 83 | + "region": {"reported": {"id": node.region().name}}, |
| 84 | + "zone": {"reported": {"id": node.zone().name}}, |
| 85 | + } |
| 86 | + stmt = updater.insert_node(exported) |
| 87 | + if stmt is not None: |
| 88 | + conn.execute(stmt) |
| 89 | + ne_current += 1 |
| 90 | + if ne_current % progress_update == 0: |
| 91 | + feedback.progress_done("sync_db", ne_current, node_edge_count, context=[collector.cloud]) |
| 92 | + for from_node, to_node, _ in collector.graph.edges: |
| 93 | + stmt = updater.insert_node({"from": from_node.chksum, "to": to_node.chksum, "type": "edge"}) |
| 94 | + if stmt is not None: |
| 95 | + conn.execute(stmt) |
| 96 | + ne_current += 1 |
| 97 | + if ne_current % progress_update == 0: |
| 98 | + feedback.progress_done("sync_db", ne_current, node_edge_count, context=[collector.cloud]) |
| 99 | + # commit all the changes to the tmp tables |
| 100 | + conn.commit() |
| 101 | + feedback.progress_done(collector.cloud, 1, 1) |
| 102 | + |
| 103 | + |
| 104 | +def show_messages(core_messages: Queue[Json], end: Event) -> None: |
| 105 | + info = CollectInfo() |
| 106 | + while not end.is_set(): |
| 107 | + with Live(info.render(), auto_refresh=False, transient=True) as live: |
| 108 | + with suppress(Exception): |
| 109 | + info.handle_message(core_messages.get(timeout=1)) |
| 110 | + live.update(info.render()) |
| 111 | + for message in info.rendered_messages(): |
| 112 | + rich_print(message) |
| 113 | + |
| 114 | + |
| 115 | +def collect_from_plugins(engine: Engine, args: Namespace) -> None: |
| 116 | + # the multiprocessing manager is used to share data between processes |
| 117 | + mp_manager = multiprocessing.Manager() |
| 118 | + core_messages: Queue[Json] = mp_manager.Queue() |
| 119 | + feedback = CoreFeedback("cloud2sql", "collect", "collect", core_messages) |
| 120 | + raw_config = configure(args.config) |
| 121 | + all_collectors = collectors(raw_config, feedback) |
| 122 | + end = Event() |
| 123 | + with ThreadPoolExecutor(max_workers=4) as executor: |
| 124 | + try: |
| 125 | + if args.show == "progress": |
| 126 | + executor.submit(show_messages, core_messages, end) |
| 127 | + futures: List[Future[Any]] = [] |
| 128 | + for collector in all_collectors.values(): |
| 129 | + futures.append(executor.submit(collect, collector, engine, feedback, args)) |
| 130 | + for future in concurrent.futures.as_completed(futures): |
| 131 | + future.result() |
| 132 | + # when all collectors are done, we can swap all temp tables |
| 133 | + SqlModel.swap_temp_tables(engine) |
| 134 | + except Exception as e: |
| 135 | + # set end and wait for live to finish, otherwise the cursor is not reset |
| 136 | + end.set() |
| 137 | + sleep(1) |
| 138 | + log.error("An error occurred", exc_info=e) |
| 139 | + print(f"Encountered Error. Giving up. {e}") |
| 140 | + emergency_shutdown() |
| 141 | + finally: |
| 142 | + end.set() |
0 commit comments