Skip to content

Commit 7d01dc5

Browse files
authored
Add parquet support (#10)
1 parent 3539b34 commit 7d01dc5

File tree

10 files changed

+499
-86
lines changed

10 files changed

+499
-86
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ destinations:
7676
key: value
7777
```
7878

79+
#### Apache Parquet
80+
81+
```
82+
parquet:///path/to/parquet/directory
83+
```
84+
7985
#### My database is not listed here
8086

8187
cloud2sql uses SQLAlchemy to connect to the database. If your database is not listed here, you can check if it is supported in [SQLAlchemy Dialects](https://docs.sqlalchemy.org/en/20/dialects/index.html).

cloud2sql/__main__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from logging import getLogger
22
from typing import Optional
3-
43
from resotolib.args import Namespace, ArgumentParser
54
from resotolib.logger import setup_logger
65
from sqlalchemy import create_engine
@@ -44,7 +43,7 @@ def parse_args() -> Namespace:
4443
return args # type: ignore
4544

4645

47-
def collect(engine: Engine, args: Namespace, sender: AnalyticsEventSender) -> None:
46+
def collect(engine: Optional[Engine], args: Namespace, sender: AnalyticsEventSender) -> None:
4847
try:
4948
collect_from_plugins(engine, args, sender)
5049
except Exception as e:
@@ -59,7 +58,8 @@ def main() -> None:
5958
setup_logger("resoto.cloud2sql", level=args.log_level, force=True)
6059
sender = NoEventSender() if args.analytics_opt_out else PosthogEventSender()
6160
config = configure(args.config)
62-
engine = create_engine(db_string_from_config(config))
61+
is_parquet = next(iter(config["destinations"].keys()), None) == "parquet"
62+
engine = None if is_parquet else create_engine(db_string_from_config(config))
6363
collect(engine, args, sender)
6464
except Exception as e:
6565
if args.debug: # raise exception and show complete tracelog

cloud2sql/collect_plugins.py

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
import multiprocessing
33
from collections import defaultdict
44
from concurrent.futures import ThreadPoolExecutor, Future
5+
import concurrent.futures
56
from contextlib import suppress
67
from logging import getLogger
78
from queue import Queue
89
from threading import Event
910
from time import sleep
10-
from typing import Dict, Optional, List, Any, Tuple
11+
from typing import Dict, Optional, List, Any, Tuple, Set
12+
from pathlib import Path
1113

1214
import pkg_resources
1315
import yaml
@@ -25,9 +27,12 @@
2527
from rich.live import Live
2628
from sqlalchemy.engine import Engine
2729

30+
2831
from cloud2sql.analytics import AnalyticsEventSender
2932
from cloud2sql.show_progress import CollectInfo
3033
from cloud2sql.sql import SqlUpdater, sql_updater
34+
from cloud2sql.parquet import ParquetModel, ParquetWriter
35+
3136

3237
log = getLogger("resoto.cloud2sql")
3338

@@ -65,6 +70,73 @@ def configure(path_to_config: Optional[str]) -> Json:
6570

6671

6772
def collect(
73+
collector: BaseCollectorPlugin, engine: Optional[Engine], feedback: CoreFeedback, args: Namespace, config: Json
74+
) -> Tuple[str, int, int]:
75+
if engine:
76+
return collect_sql(collector, engine, feedback, args)
77+
else:
78+
return collect_parquet(collector, feedback, config)
79+
80+
81+
def prepare_node(node: BaseResource, collector: BaseCollectorPlugin) -> Json:
82+
node._graph = collector.graph
83+
exported = node_to_dict(node)
84+
exported["type"] = "node"
85+
exported["ancestors"] = {
86+
"cloud": {"reported": {"id": node.cloud().name}},
87+
"account": {"reported": {"id": node.account().name}},
88+
"region": {"reported": {"id": node.region().name}},
89+
"zone": {"reported": {"id": node.zone().name}},
90+
}
91+
return exported
92+
93+
94+
def collect_parquet(collector: BaseCollectorPlugin, feedback: CoreFeedback, config: Json) -> Tuple[str, int, int]:
95+
# collect cloud data
96+
feedback.progress_done(collector.cloud, 0, 1)
97+
collector.collect()
98+
# read the kinds created from this collector
99+
kinds = [from_json(m, Kind) for m in collector.graph.export_model(walk_subclasses=False)]
100+
model = ParquetModel(Model({k.fqn: k for k in kinds}))
101+
node_edge_count = len(collector.graph.nodes) + len(collector.graph.edges)
102+
ne_current = 0
103+
progress_update = node_edge_count // 100
104+
feedback.progress_done("sync_db", 0, node_edge_count, context=[collector.cloud])
105+
106+
# group all edges by kind of from/to
107+
edges_by_kind: Set[Tuple[str, str]] = set()
108+
for from_node, to_node, key in collector.graph.edges:
109+
if key.edge_type == EdgeType.default:
110+
edges_by_kind.add((from_node.kind, to_node.kind))
111+
# create the ddl metadata from the kinds
112+
model.create_schema(list(edges_by_kind))
113+
# ingest the data
114+
parquet_conf = config.get("destinations", {}).get("parquet")
115+
assert parquet_conf
116+
parquet_path = Path(parquet_conf["path"])
117+
parquet_batch_size = int(parquet_conf["batch_size"])
118+
writer = ParquetWriter(model, parquet_path, parquet_batch_size)
119+
node: BaseResource
120+
for node in sorted(collector.graph.nodes, key=lambda n: n.kind):
121+
exported = prepare_node(node, collector)
122+
writer.insert_node(exported)
123+
ne_current += 1
124+
if ne_current % progress_update == 0:
125+
feedback.progress_done("sync_db", ne_current, node_edge_count, context=[collector.cloud])
126+
for from_node, to_node, key in collector.graph.edges:
127+
if key.edge_type == EdgeType.default:
128+
writer.insert_node({"from": from_node.chksum, "to": to_node.chksum, "type": "edge"})
129+
ne_current += 1
130+
if ne_current % progress_update == 0:
131+
feedback.progress_done("sync_db", ne_current, node_edge_count, context=[collector.cloud])
132+
133+
writer.close()
134+
135+
feedback.progress_done(collector.cloud, 1, 1)
136+
return collector.cloud, len(collector.graph.nodes), len(collector.graph.edges)
137+
138+
139+
def collect_sql(
68140
collector: BaseCollectorPlugin, engine: Engine, feedback: CoreFeedback, args: Namespace
69141
) -> Tuple[str, int, int]:
70142
# collect cloud data
@@ -75,16 +147,8 @@ def collect(
75147
nodes_by_kind: Dict[str, List[Json]] = defaultdict(list)
76148
node: BaseResource
77149
for node in collector.graph.nodes:
78-
node._graph = collector.graph
79150
# create an exported node with the same scheme as resotocore
80-
exported = node_to_dict(node)
81-
exported["type"] = "node"
82-
exported["ancestors"] = {
83-
"cloud": {"reported": {"id": node.cloud().name}},
84-
"account": {"reported": {"id": node.account().name}},
85-
"region": {"reported": {"id": node.region().name}},
86-
"zone": {"reported": {"id": node.zone().name}},
87-
}
151+
exported = prepare_node(node, collector)
88152
nodes_by_kind[node.kind].append(exported)
89153

90154
# group all edges by kind of from/to
@@ -138,33 +202,35 @@ def show_messages(core_messages: Queue[Json], end: Event) -> None:
138202
rich_print(message)
139203

140204

141-
def collect_from_plugins(engine: Engine, args: Namespace, sender: AnalyticsEventSender) -> None:
205+
def collect_from_plugins(engine: Optional[Engine], args: Namespace, sender: AnalyticsEventSender) -> None:
142206
# the multiprocessing manager is used to share data between processes
143207
mp_manager = multiprocessing.Manager()
144208
core_messages: Queue[Json] = mp_manager.Queue()
145209
feedback = CoreFeedback("cloud2sql", "collect", "collect", core_messages)
146210
raw_config = configure(args.config)
147211
sources = raw_config["sources"]
148212
all_collectors = collectors(sources, feedback)
149-
analytics = {"total": len(all_collectors), "engine": engine.dialect.name} | {name: 1 for name in all_collectors}
213+
engine_name = engine.dialect.name if engine else "parquet"
214+
analytics = {"total": len(all_collectors), "engine": engine_name} | {name: 1 for name in all_collectors}
150215
end = Event()
151216
with ThreadPoolExecutor(max_workers=4) as executor:
152217
try:
153218
if args.show == "progress":
154219
executor.submit(show_messages, core_messages, end)
155220
futures: List[Future[Any]] = []
156221
for collector in all_collectors.values():
157-
futures.append(executor.submit(collect, collector, engine, feedback, args))
222+
futures.append(executor.submit(collect, collector, engine, feedback, args, raw_config))
158223
for future in concurrent.futures.as_completed(futures):
159224
name, nodes, edges = future.result()
160225
analytics[f"{name}_nodes"] = nodes
161226
analytics[f"{name}_edges"] = edges
162227
sender.capture("collect", **analytics)
163228
# when all collectors are done, we can swap all temp tables
164-
swap_tables = "Make latest snapshot available"
165-
feedback.progress_done(swap_tables, 0, 1)
166-
SqlUpdater.swap_temp_tables(engine)
167-
feedback.progress_done(swap_tables, 1, 1)
229+
if engine:
230+
swap_tables = "Make latest snapshot available"
231+
feedback.progress_done(swap_tables, 0, 1)
232+
SqlUpdater.swap_temp_tables(engine)
233+
feedback.progress_done(swap_tables, 1, 1)
168234
except Exception as e:
169235
# set end and wait for live to finish, otherwise the cursor is not reset
170236
end.set()

cloud2sql/parquet.py

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from resotoclient.models import Kind, Model, JsObject
2+
from typing import Dict, List, Any, NamedTuple, Optional, Tuple
3+
import pyarrow as pa
4+
from cloud2sql.schema_utils import (
5+
base_kinds,
6+
get_table_name,
7+
get_link_table_name,
8+
kind_properties,
9+
insert_node,
10+
)
11+
import pyarrow.parquet as pq
12+
from pathlib import Path
13+
from dataclasses import dataclass
14+
15+
16+
class ParquetModel:
17+
def __init__(self, model: Model):
18+
self.model = model
19+
self.table_kinds = [
20+
kind
21+
for kind in model.kinds.values()
22+
if kind.aggregate_root and kind.runtime_kind is None and kind.fqn not in base_kinds
23+
]
24+
self.schemas: Dict[str, pa.Schema] = {}
25+
26+
def _parquet_type(self, kind: str) -> pa.lib.DataType:
27+
if kind.startswith("dict") or "[]" in kind:
28+
return pa.string() # dicts and lists are converted to json strings
29+
elif kind == "int32":
30+
return pa.int32()
31+
elif kind == "int64":
32+
return pa.int64()
33+
elif kind == "float":
34+
pa.float32()
35+
elif kind == "double":
36+
return pa.float64()
37+
elif kind in {"string", "datetime", "date", "duration"}:
38+
return pa.string()
39+
elif kind == "boolean":
40+
return pa.bool_()
41+
else:
42+
return pa.string()
43+
44+
def create_schema(self, edges: List[Tuple[str, str]]) -> None:
45+
def table_schema(kind: Kind) -> None:
46+
table_name = get_table_name(kind.fqn, with_tmp_prefix=False)
47+
if table_name not in self.schemas:
48+
properties, _ = kind_properties(kind, self.model)
49+
schema = pa.schema(
50+
[
51+
pa.field("_id", pa.string()),
52+
*[pa.field(p.name, self._parquet_type(p.kind)) for p in properties],
53+
]
54+
)
55+
self.schemas[table_name] = schema
56+
57+
def link_table_schema(from_kind: str, to_kind: str) -> None:
58+
from_table = get_table_name(from_kind, with_tmp_prefix=False)
59+
to_table = get_table_name(to_kind, with_tmp_prefix=False)
60+
link_table = get_link_table_name(from_kind, to_kind, with_tmp_prefix=False)
61+
if link_table not in self.schemas and from_table in self.schemas and to_table in self.schemas:
62+
schema = pa.schema(
63+
[
64+
pa.field("from_id", pa.string()),
65+
pa.field("to_id", pa.string()),
66+
]
67+
)
68+
self.schemas[link_table] = schema
69+
70+
def link_table_schema_from_successors(kind: Kind) -> None:
71+
_, successors = kind_properties(kind, self.model)
72+
# create link table for all linked entities
73+
for successor in successors:
74+
link_table_schema(kind.fqn, successor)
75+
76+
# step 1: create tables for all kinds
77+
for kind in self.table_kinds:
78+
table_schema(kind)
79+
# step 2: create link tables for all kinds
80+
for kind in self.table_kinds:
81+
link_table_schema_from_successors(kind)
82+
# step 3: create link tables for all seen edges
83+
for from_kind, to_kind in edges:
84+
link_table_schema(from_kind, to_kind)
85+
86+
return None
87+
88+
89+
class WriteResult(NamedTuple):
90+
table_name: str
91+
92+
93+
@dataclass
94+
class ParquetBatch:
95+
rows: List[Dict[str, Any]]
96+
schema: pa.Schema
97+
writer: pq.ParquetWriter
98+
99+
100+
class ParquetWriter:
101+
def __init__(
102+
self,
103+
model: ParquetModel,
104+
result_directory: Path,
105+
rows_per_batch: int,
106+
):
107+
self.model = model
108+
self.kind_by_id: Dict[str, str] = {}
109+
self.batches: Dict[str, ParquetBatch] = {}
110+
self.rows_per_batch = rows_per_batch
111+
self.result_directory = result_directory
112+
113+
def insert_value(self, table_name: str, values: Any) -> Optional[WriteResult]:
114+
if self.model.schemas.get(table_name):
115+
116+
def ensure_path(path: Path) -> Path:
117+
path.mkdir(parents=True, exist_ok=True)
118+
return path
119+
120+
batch = self.batches.get(
121+
table_name,
122+
ParquetBatch(
123+
[],
124+
self.model.schemas[table_name],
125+
pq.ParquetWriter(
126+
Path(ensure_path(self.result_directory), f"{table_name}.parquet"),
127+
self.model.schemas[table_name],
128+
),
129+
),
130+
)
131+
132+
batch.rows.append(values)
133+
self.batches[table_name] = batch
134+
return WriteResult(table_name)
135+
return None
136+
137+
def write_batch_bundle(self, batch: ParquetBatch) -> None:
138+
rows = batch.rows
139+
batch.rows = []
140+
pa_table = pa.Table.from_pylist(rows, batch.schema)
141+
batch.writer.write_table(pa_table)
142+
143+
def insert_node(self, node: JsObject) -> None:
144+
result = insert_node(
145+
node,
146+
self.kind_by_id,
147+
self.insert_value,
148+
with_tmp_prefix=False,
149+
flatten=True,
150+
)
151+
should_write_batch = result and len(self.batches[result.table_name].rows) > self.rows_per_batch
152+
if result and should_write_batch:
153+
batch = self.batches[result.table_name]
154+
self.write_batch_bundle(batch)
155+
156+
def close(self) -> None:
157+
for batch in self.batches.values():
158+
self.write_batch_bundle(batch)
159+
batch.writer.close()

0 commit comments

Comments
 (0)