|
22 | 22 | from resotolib.types import Json |
23 | 23 | from rich import print as rich_print |
24 | 24 | from rich.live import Live |
25 | | -from sqlalchemy import Engine |
| 25 | +from sqlalchemy.engine import Engine |
26 | 26 |
|
27 | 27 | from cloud2sql.show_progress import CollectInfo |
28 | 28 | from cloud2sql.sql import SqlModel, SqlUpdater |
@@ -64,40 +64,41 @@ def collect(collector: BaseCollectorPlugin, engine: Engine, feedback: CoreFeedba |
64 | 64 | kinds = [from_json(m, Kind) for m in collector.graph.export_model(walk_subclasses=False)] |
65 | 65 | model = SqlModel(Model({k.fqn: k for k in kinds})) |
66 | 66 | node_edge_count = len(collector.graph.nodes) + len(collector.graph.edges) |
67 | | - ne_current = 0 |
68 | | - progress_update = 5000 |
69 | | - feedback.progress_done("sync_db", 0, node_edge_count, context=[collector.cloud]) |
| 67 | + ne_count = iter(range(0, node_edge_count)) |
| 68 | + progress_update = max(node_edge_count // 100, 50) |
| 69 | + schema = f"create temp tables {engine.dialect.name}" |
| 70 | + syncdb = f"synchronize {engine.dialect.name}" |
| 71 | + feedback.progress_done(schema, 0, 1, context=[collector.cloud]) |
| 72 | + feedback.progress_done(syncdb, 0, node_edge_count, context=[collector.cloud]) |
70 | 73 | with engine.connect() as conn: |
71 | | - # create the ddl metadata from the kinds |
72 | | - model.create_schema(conn, args) |
73 | | - # ingest the data |
74 | | - updater = SqlUpdater(model) |
75 | | - node: BaseResource |
76 | | - for node in collector.graph.nodes: |
77 | | - node._graph = collector.graph |
78 | | - exported = node_to_dict(node) |
79 | | - exported["type"] = "node" |
80 | | - exported["ancestors"] = { |
81 | | - "cloud": {"reported": {"id": node.cloud().name}}, |
82 | | - "account": {"reported": {"id": node.account().name}}, |
83 | | - "region": {"reported": {"id": node.region().name}}, |
84 | | - "zone": {"reported": {"id": node.zone().name}}, |
85 | | - } |
86 | | - stmt = updater.insert_node(exported) |
87 | | - if stmt is not None: |
88 | | - conn.execute(stmt) |
89 | | - ne_current += 1 |
90 | | - if ne_current % progress_update == 0: |
91 | | - feedback.progress_done("sync_db", ne_current, node_edge_count, context=[collector.cloud]) |
92 | | - for from_node, to_node, _ in collector.graph.edges: |
93 | | - stmt = updater.insert_node({"from": from_node.chksum, "to": to_node.chksum, "type": "edge"}) |
94 | | - if stmt is not None: |
95 | | - conn.execute(stmt) |
96 | | - ne_current += 1 |
97 | | - if ne_current % progress_update == 0: |
98 | | - feedback.progress_done("sync_db", ne_current, node_edge_count, context=[collector.cloud]) |
99 | | - # commit all the changes to the tmp tables |
100 | | - conn.commit() |
| 74 | + with conn.begin(): |
| 75 | + # create the ddl metadata from the kinds |
| 76 | + model.create_schema(conn, args) |
| 77 | + feedback.progress_done(schema, 1, 1, context=[collector.cloud]) |
| 78 | + # ingest the data |
| 79 | + updater = SqlUpdater(model) |
| 80 | + node: BaseResource |
| 81 | + for node in collector.graph.nodes: |
| 82 | + node._graph = collector.graph |
| 83 | + exported = node_to_dict(node) |
| 84 | + exported["type"] = "node" |
| 85 | + exported["ancestors"] = { |
| 86 | + "cloud": {"reported": {"id": node.cloud().name}}, |
| 87 | + "account": {"reported": {"id": node.account().name}}, |
| 88 | + "region": {"reported": {"id": node.region().name}}, |
| 89 | + "zone": {"reported": {"id": node.zone().name}}, |
| 90 | + } |
| 91 | + stmt = updater.insert_node(exported) |
| 92 | + if stmt is not None: |
| 93 | + conn.execute(stmt) |
| 94 | + if (nx := next(ne_count)) % progress_update == 0: |
| 95 | + feedback.progress_done(syncdb, nx, node_edge_count, context=[collector.cloud]) |
| 96 | + for from_node, to_node, _ in collector.graph.edges: |
| 97 | + stmt = updater.insert_node({"from": from_node.chksum, "to": to_node.chksum, "type": "edge"}) |
| 98 | + if stmt is not None: |
| 99 | + conn.execute(stmt) |
| 100 | + if (nx := next(ne_count)) % progress_update == 0: |
| 101 | + feedback.progress_done(syncdb, nx, node_edge_count, context=[collector.cloud]) |
101 | 102 | feedback.progress_done(collector.cloud, 1, 1) |
102 | 103 |
|
103 | 104 |
|
|
0 commit comments