Skip to content

Commit 829f552

Browse files
authored
feat: Enable destination plugins to do reads (for backend) (#223)
* fix: dont error when field has no metadata. * feat: Enable destination plugins to do reads (for backend use case). * Fix lint. * Support destination plugins. Improve read tests.
1 parent c05de30 commit 829f552

File tree

12 files changed

+273
-12
lines changed

12 files changed

+273
-12
lines changed

cloudquery/sdk/internal/memdb/memdb.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from cloudquery.sdk import schema
66
from typing import List, Generator, Dict
77
import pyarrow as pa
8+
from cloudquery.sdk.schema.table import Table
89
from cloudquery.sdk.types import JSONType
910
from dataclasses import dataclass, field
1011

@@ -109,5 +110,9 @@ def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
109110
else:
110111
raise NotImplementedError(f"Unknown message type {type(msg)}")
111112

113+
def read(self, table: Table) -> Generator[message.ReadMessage, None, None]:
114+
for table, record in self._db.items():
115+
yield message.ReadMessage(record)
116+
112117
def close(self) -> None:
113118
self._db = {}

cloudquery/sdk/internal/servers/plugin_v3/plugin.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,14 @@ def Sync(self, request, context):
8181
# unknown sync message type
8282
raise NotImplementedError()
8383

84-
def Read(self, request, context):
85-
raise NotImplementedError()
84+
def Read(
85+
self, request: plugin_pb2.Read.Request, context
86+
) -> Generator[plugin_pb2.Read.Response, None, None]:
87+
schema = arrow.new_schema_from_bytes(request.table)
88+
table = Table.from_arrow_schema(schema)
89+
for msg in self._plugin.read(table):
90+
buf = arrow.record_to_bytes(msg.record)
91+
yield plugin_pb2.Read.Response(record=buf)
8692

8793
def Write(
8894
self, request_iterator: Generator[plugin_pb2.Write.Request, None, None], context
@@ -93,7 +99,9 @@ def msg_iterator() -> Generator[WriteMessage, None, None]:
9399
if field == "migrate_table":
94100
sc = arrow.new_schema_from_bytes(msg.migrate_table.table)
95101
table = Table.from_arrow_schema(sc)
96-
yield WriteMigrateTableMessage(table=table)
102+
yield WriteMigrateTableMessage(
103+
table=table, migrate_force=msg.migrate_table.migrate_force
104+
)
97105
elif field == "insert":
98106
yield WriteInsertMessage(
99107
record=arrow.new_record_from_bytes(msg.insert.record)

cloudquery/sdk/message/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
WriteMigrateTableMessage,
66
WriteDeleteStale,
77
)
8+
from .read import ReadMessage

cloudquery/sdk/message/read.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import pyarrow as pa
2+
3+
4+
class ReadMessage:
5+
def __init__(self, record: pa.RecordBatch):
6+
self.record = record

cloudquery/sdk/message/write.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ def __init__(self, record: pa.RecordBatch):
1212

1313

1414
class WriteMigrateTableMessage(WriteMessage):
15-
def __init__(self, table: Table):
15+
def __init__(self, table: Table, migrate_force: bool):
1616
self.table = table
17+
self.migrate_force = migrate_force
1718

1819

1920
class WriteDeleteStale(WriteMessage):

cloudquery/sdk/plugin/plugin.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,8 @@ def sync(self, options: SyncOptions) -> Generator[message.SyncMessage, None, Non
9393
def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
9494
raise NotImplementedError()
9595

96+
def read(self, table: Table) -> Generator[message.ReadMessage, None, None]:
97+
raise NotImplementedError()
98+
9699
def close(self) -> None:
97100
raise NotImplementedError()

cloudquery/sdk/schema/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
from .column import Column
2-
from .table import Table, tables_to_arrow_schemas, filter_dfs
2+
from .table import (
3+
Table,
4+
tables_to_arrow_schemas,
5+
filter_dfs,
6+
TableColumnChangeType,
7+
TableColumnChange,
8+
TableColumnChangeType,
9+
get_table_changes,
10+
get_table_column,
11+
flatten_tables_recursive,
12+
flatten_tables,
13+
)
314
from .resource import Resource
415

516
# from .table_resolver import TableReso

cloudquery/sdk/schema/column.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ def to_arrow_field(self):
5555
arrow.METADATA_TRUE if self.incremental_key else arrow.METADATA_FALSE
5656
),
5757
}
58-
return pa.field(self.name, self.type, metadata=metadata)
58+
return pa.field(
59+
self.name, self.type, metadata=metadata, nullable=not self.not_null
60+
)
5961

6062
@staticmethod
6163
def from_arrow_field(field: pa.Field) -> Column:

cloudquery/sdk/schema/table.py

Lines changed: 139 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
from __future__ import annotations
22

33
import copy
4+
from enum import IntEnum
45
import fnmatch
5-
from typing import List
6+
from typing import List, Optional
67

78
import pyarrow as pa
89

910
from cloudquery.sdk.schema import arrow
1011
from .column import Column
1112

1213

14+
CQ_SYNC_TIME_COLUMN = "cq_sync_time"
15+
CQ_SOURCE_NAME_COLUMN = "cq_source_name"
16+
17+
1318
class Client:
1419
pass
1520

@@ -192,9 +197,137 @@ def filter_dfs_child(r, matched, include, exclude, skip_dependent_tables):
192197
return None
193198

194199

195-
def flatten_tables(tables: List[Table]) -> List[Table]:
196-
flattened: List[Table] = []
200+
class TableColumnChangeType:
201+
ADD = 1
202+
REMOVE = 2
203+
REMOVE_UNIQUE_CONSTRAINT = 3
204+
205+
206+
class TableColumnChange:
207+
def __init__(
208+
self,
209+
type: TableColumnChangeType,
210+
column_name: str,
211+
current: Optional[Column],
212+
previous: Optional[Column],
213+
):
214+
self.type = type
215+
self.column_name = column_name
216+
self.current = current
217+
self.previous = previous
218+
219+
220+
class TableColumnChangeType(IntEnum):
221+
UNKNOWN = 0
222+
ADD = 1
223+
UPDATE = 2
224+
REMOVE = 3
225+
REMOVE_UNIQUE_CONSTRAINT = 4
226+
MOVE_TO_CQ_ONLY = 5
227+
228+
229+
def get_table_changes(new: Table, old: Table) -> List[TableColumnChange]:
230+
changes = []
231+
232+
# Special case: Moving from individual PKs to singular PK on _cq_id
233+
new_pks = new.primary_keys
234+
if (
235+
len(new_pks) == 1
236+
and new_pks[0] == "CqIDColumn"
237+
and get_table_column(old, "CqIDColumn") is None
238+
and len(old.primary_keys) > 0
239+
):
240+
changes.append(
241+
TableColumnChange(
242+
type=TableColumnChangeType.MOVE_TO_CQ_ONLY,
243+
)
244+
)
245+
246+
for c in new.columns:
247+
other_column = get_table_column(old, c.name)
248+
# A column was added to the table definition
249+
if other_column is None:
250+
changes.append(
251+
TableColumnChange(
252+
type=TableColumnChangeType.ADD,
253+
column_name=c.name,
254+
current=c,
255+
previous=None,
256+
)
257+
)
258+
continue
259+
260+
# Column type or options (e.g. PK, Not Null) changed in the new table definition
261+
if (
262+
c.type != other_column.type
263+
or c.not_null != other_column.not_null
264+
or c.primary_key != other_column.primary_key
265+
):
266+
changes.append(
267+
TableColumnChange(
268+
type=TableColumnChangeType.UPDATE,
269+
column_name=c.name,
270+
current=c,
271+
previous=other_column,
272+
)
273+
)
274+
275+
# Unique constraint was removed
276+
if not c.unique and other_column.unique:
277+
changes.append(
278+
TableColumnChange(
279+
type=TableColumnChangeType.REMOVE_UNIQUE_CONSTRAINT,
280+
column_name=c.name,
281+
current=c,
282+
previous=other_column,
283+
)
284+
)
285+
286+
# A column was removed from the table definition
287+
for c in old.columns:
288+
if get_table_column(new, c.name) is None:
289+
changes.append(
290+
TableColumnChange(
291+
type=TableColumnChangeType.REMOVE,
292+
column_name=c.name,
293+
current=None,
294+
previous=c,
295+
)
296+
)
297+
298+
return changes
299+
300+
301+
def get_table_column(table: Table, column_name: str) -> Optional[Column]:
302+
for c in table.columns:
303+
if c.name == column_name:
304+
return c
305+
return None
306+
307+
308+
def flatten_tables_recursive(original_tables: List[Table]) -> List[Table]:
309+
tables = []
310+
for table in original_tables:
311+
table_copy = Table(
312+
name=table.name,
313+
columns=table.columns,
314+
relations=table.relations,
315+
title=table.title,
316+
description=table.description,
317+
is_incremental=table.is_incremental,
318+
parent=table.parent,
319+
)
320+
tables.append(table_copy)
321+
tables.extend(flatten_tables_recursive(table.relations))
322+
return tables
323+
324+
325+
def flatten_tables(original_tables: List[Table]) -> List[Table]:
326+
tables = flatten_tables_recursive(original_tables)
327+
seen = set()
328+
deduped = []
197329
for table in tables:
198-
flattened.append(table)
199-
flattened.extend(flatten_tables(table.relations))
200-
return flattened
330+
if table.name not in seen:
331+
deduped.append(table)
332+
seen.add(table.name)
333+
return deduped

cloudquery/sdk/types/json.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ def __arrow_ext_deserialize__(self, storage_type, serialized):
2121
# return an instance of this subclass given the serialized
2222
# metadata.
2323
return JSONType()
24+
25+
26+
pa.register_extension_type(JSONType())

0 commit comments

Comments
 (0)