Skip to content

Commit 7e70bdb

Browse files
committed
Working sync proof of concept
1 parent 465f427 commit 7e70bdb

File tree

8 files changed

+78
-18
lines changed

8 files changed

+78
-18
lines changed

cloudquery/sdk/internal/servers/plugin_v3/plugin.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import pyarrow as pa
22

33
from cloudquery.plugin_v3 import plugin_pb2, plugin_pb2_grpc
4-
from cloudquery.sdk.plugin.plugin import Plugin
4+
from cloudquery.sdk.message import SyncInsertMessage, SyncMigrateTableMessage
5+
from cloudquery.sdk.plugin.plugin import Plugin, SyncOptions
56
from cloudquery.sdk.schema import tables_to_arrow_schemas
67

78

@@ -33,8 +34,31 @@ def GetTables(self, request: plugin_pb2.GetTables.Request, context):
3334
return plugin_pb2.GetTables.Response(tables=tablesBytes)
3435

3536
def Sync(self, request, context):
36-
plugin_pb2.Sync.Response()
37-
return plugin_pb2.Sync.Response()
37+
options = SyncOptions(
38+
deterministic_cq_id=False, # TODO
39+
skip_dependent_tables=request.skip_dependent_tables,
40+
skip_tables=request.skip_tables,
41+
tables=request.tables,
42+
backend_options=None,
43+
)
44+
45+
for msg in self._plugin.sync(options):
46+
if isinstance(msg, SyncInsertMessage):
47+
sink = pa.BufferOutputStream()
48+
writer = pa.ipc.new_stream(sink, msg.record.schema)
49+
writer.write_batch(msg.record)
50+
writer.close()
51+
buf = sink.getvalue().to_pybytes()
52+
yield plugin_pb2.Sync.Response(insert=plugin_pb2.Sync.MessageInsert(
53+
record=buf
54+
))
55+
elif isinstance(msg, SyncMigrateTableMessage):
56+
yield plugin_pb2.Sync.Response(migrate_table=plugin_pb2.Sync.MessageMigrateTable(
57+
table=msg.table.to_arrow_schema().serialize().to_pybytes()
58+
))
59+
else:
60+
# unknown sync message type
61+
raise NotImplementedError()
3862

3963
def Read(self, request, context):
4064
raise NotImplementedError()

cloudquery/sdk/message/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from sync import SyncMessage, SyncInsertMessage
1+
from .sync import SyncMessage, SyncInsertMessage, SyncMigrateTableMessage

cloudquery/sdk/message/sync.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
21
import pyarrow as pa
32

3+
44
class SyncMessage:
55
pass
66

7+
78
class SyncInsertMessage:
89
def __init__(self, record: pa.RecordBatch):
9-
self._record = record
10+
self.record = record
1011

11-
class SyncMigrateMessage:
12-
def __init__(self, record: pa.RecordBatch):
13-
self._record = record
12+
13+
class SyncMigrateTableMessage:
14+
def __init__(self, table: pa.Schema):
15+
self.table = table

cloudquery/sdk/message/write.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
21
import pyarrow as pa
32

3+
44
class WriteMessage:
55
pass
66

7+
78
class InsertMessage(WriteMessage):
89
def __init__(self, record: pa.RecordBatch):
9-
self._record = record
10+
self.record = record
11+
1012

1113
class MigrateMessage(WriteMessage):
12-
def __init__(self, record: pa.RecordBatch):
13-
self._record = record
14+
def __init__(self, table: pa.Schema):
15+
self.table = table

cloudquery/sdk/plugin/plugin.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,33 @@
1+
import queue
2+
from dataclasses import dataclass
13
from typing import List
24

35
from cloudquery.sdk.schema import Table
46

7+
MIGRATE_MODE_STRINGS = ["safe", "force"]
8+
9+
10+
@dataclass
11+
class TableOptions:
12+
tables: List[str] = None
13+
skip_tables: List[str] = None
14+
skip_dependent_tables: bool = False
15+
16+
17+
@dataclass
18+
class BackendOptions:
19+
connection: str = None
20+
table_name: str = None
21+
22+
23+
@dataclass
24+
class SyncOptions:
25+
deterministic_cq_id: bool = False
26+
skip_dependent_tables: bool = False
27+
skip_tables: List[str] = None
28+
tables: List[str] = None
29+
backend_options: BackendOptions = None
30+
531

632
class Plugin:
733
def __init__(self, name: str, version: str) -> None:
@@ -17,5 +43,11 @@ def name(self) -> str:
1743
def version(self) -> str:
1844
return self._version
1945

20-
def get_tables(self, tables: List[str], skip_tables: List[str]) -> List[Table]:
46+
def get_tables(self, options: TableOptions) -> List[Table]:
47+
raise NotImplementedError()
48+
49+
def sync(self, options: SyncOptions, results: queue.Queue) -> None:
2150
raise NotImplementedError()
51+
52+
def close(self) -> None:
53+
pass

cloudquery/sdk/schema/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
1-
21
from .column import Column
32
from .table import Table, tables_to_arrow_schemas

cloudquery/sdk/schema/table.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def primary_keys(self):
2929
def incremental_keys(self):
3030
return [column.name for column in self.columns if column.incremental_key]
3131

32-
def to_arrow_schemas(self):
32+
def to_arrow_schema(self):
3333
fields = []
3434
md = {
3535
arrow.METADATA_TABLE_NAME: self.name,
@@ -48,5 +48,5 @@ def __lt__(self, other):
4848
def tables_to_arrow_schemas(tables: List[Table]):
4949
schemas = []
5050
for table in tables:
51-
schemas.append(table.to_arrow_schemas())
51+
schemas.append(table.to_arrow_schema())
5252
return schemas

tests/schema/test_table.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import pyarrow as pa
2+
23
from cloudquery.sdk.schema import Table, Column
34

45

56
def test_table():
67
table = Table("test_table", [Column("test_column", pa.int32())])
7-
table.to_arrow_schemas()
8+
table.to_arrow_schema()

0 commit comments

Comments
 (0)