Skip to content

Commit d30be5c

Browse files
committed
feat: Report table errors from plugins via gRPC
1 parent cc679a3 commit d30be5c

File tree

5 files changed

+185
-62
lines changed

5 files changed

+185
-62
lines changed

cloudquery/sdk/internal/memdb/memdb.py

Lines changed: 164 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from cloudquery.sdk import plugin
44
from cloudquery.sdk import message
55
from cloudquery.sdk import schema
6-
from typing import List, Generator, Dict
6+
from cloudquery.sdk.scheduler import Scheduler, TableResolver
7+
from typing import List, Generator, Dict, Any
78
import pyarrow as pa
89
from cloudquery.sdk.schema.table import Table
910
from cloudquery.sdk.types import JSONType
@@ -13,90 +14,190 @@
1314
VERSION = "development"
1415

1516

17+
class Client:
18+
def __init__(self) -> None:
19+
pass
20+
21+
def id(self):
22+
return "memdb"
23+
24+
25+
class MemDBResolver(TableResolver):
26+
def __init__(
27+
self, table: Table, records: List, child_resolvers: list[TableResolver] = None
28+
) -> None:
29+
super().__init__(table=table, child_resolvers=child_resolvers)
30+
self._records = records
31+
32+
def resolve(self, client: None, parent_resource) -> Generator[Any, None, None]:
33+
for record in self._records:
34+
yield record
35+
36+
37+
class Table1Relation1(Table):
38+
def __init__(self) -> None:
39+
super().__init__(
40+
name="table_1_relation_1",
41+
columns=[
42+
schema.Column(
43+
name="name",
44+
type=pa.string(),
45+
primary_key=True,
46+
not_null=True,
47+
unique=True,
48+
),
49+
schema.Column(name="data", type=JSONType()),
50+
],
51+
title="Table 1 Relation 1",
52+
description="Test Table 1 Relation 1",
53+
)
54+
55+
@property
56+
def resolver(self):
57+
return MemDBResolver(
58+
self,
59+
records=[
60+
{"name": "a", "data": {"a": 1}},
61+
{"name": "b", "data": {"b": 2}},
62+
{"name": "c", "data": {"c": 3}},
63+
],
64+
)
65+
66+
67+
class Table1(Table):
68+
def __init__(self) -> None:
69+
super().__init__(
70+
name="table_1",
71+
columns=[
72+
schema.Column(
73+
name="name",
74+
type=pa.string(),
75+
primary_key=True,
76+
not_null=True,
77+
unique=True,
78+
),
79+
schema.Column(
80+
name="id",
81+
type=pa.int64(),
82+
primary_key=True,
83+
not_null=True,
84+
unique=True,
85+
incremental_key=True,
86+
),
87+
],
88+
title="Table 1",
89+
description="Test Table 1",
90+
is_incremental=True,
91+
relations=[Table1Relation1()],
92+
)
93+
94+
@property
95+
def resolver(self):
96+
child_resolvers: list[TableResolver] = []
97+
for rel in self.relations:
98+
child_resolvers.append(rel.resolver)
99+
100+
return MemDBResolver(
101+
self,
102+
records=[
103+
{"name": "a", "id": 1},
104+
{"name": "b", "id": 2},
105+
{"name": "c", "id": 3},
106+
],
107+
child_resolvers=child_resolvers,
108+
)
109+
110+
111+
class Table2(Table):
112+
def __init__(self) -> None:
113+
super().__init__(
114+
name="table_2",
115+
columns=[
116+
schema.Column(
117+
name="name",
118+
type=pa.string(),
119+
primary_key=True,
120+
not_null=True,
121+
unique=True,
122+
),
123+
schema.Column(name="id", type=pa.int64()),
124+
],
125+
title="Table 2",
126+
description="Test Table 2",
127+
)
128+
129+
@property
130+
def resolver(self):
131+
return MemDBResolver(
132+
self,
133+
records=[
134+
{"name": "a", "id": 1},
135+
{"name": "b", "id": 2},
136+
{"name": "c", "id": 3},
137+
],
138+
)
139+
140+
16141
@dataclass
17142
class Spec:
18-
abc: str = field(default="abc")
143+
concurrency: int = field(default=1000)
144+
queue_size: int = field(default=1000)
19145

20146

21147
class MemDB(plugin.Plugin):
22148
def __init__(self) -> None:
23149
super().__init__(
24150
NAME, VERSION, opts=plugin.plugin.Options(team="cloudquery", kind="source")
25151
)
26-
self._db: Dict[str, pa.RecordBatch] = {}
152+
table1 = Table1()
153+
table2 = Table2()
27154
self._tables: Dict[str, schema.Table] = {
28-
"table_1": schema.Table(
29-
name="table_1",
30-
columns=[
31-
schema.Column(
32-
name="name",
33-
type=pa.string(),
34-
primary_key=True,
35-
not_null=True,
36-
unique=True,
37-
),
38-
schema.Column(
39-
name="id",
40-
type=pa.string(),
41-
primary_key=True,
42-
not_null=True,
43-
unique=True,
44-
incremental_key=True,
45-
),
46-
],
47-
title="Table 1",
48-
description="Test Table 1",
49-
is_incremental=True,
50-
relations=[
51-
schema.Table(
52-
name="table_1_relation_1",
53-
columns=[
54-
schema.Column(
55-
name="name",
56-
type=pa.string(),
57-
primary_key=True,
58-
not_null=True,
59-
unique=True,
60-
),
61-
schema.Column(name="data", type=JSONType()),
62-
],
63-
title="Table 1 Relation 1",
64-
description="Test Table 1 Relation 1",
65-
)
66-
],
67-
),
68-
"table_2": schema.Table(
69-
name="table_2",
70-
columns=[
71-
schema.Column(
72-
name="name",
73-
type=pa.string(),
74-
primary_key=True,
75-
not_null=True,
76-
unique=True,
77-
),
78-
schema.Column(name="id", type=pa.string()),
79-
],
80-
title="Table 2",
81-
description="Test Table 2",
82-
),
155+
table1.name: table1,
156+
table2.name: table2,
83157
}
158+
self._db: Dict[str, pa.RecordBatch] = {}
159+
self._client = Client()
160+
161+
def set_logger(self, logger) -> None:
162+
self._logger = logger
84163

85164
def init(self, spec, no_connection: bool = False):
86165
if no_connection:
87166
return
88167
self._spec_json = json.loads(spec)
89168
self._spec = Spec(**self._spec_json)
169+
self._scheduler = Scheduler(
170+
concurrency=self._spec.concurrency,
171+
queue_size=self._spec.queue_size,
172+
logger=self._logger,
173+
)
90174

91175
def get_tables(self, options: plugin.TableOptions = None) -> List[plugin.Table]:
92176
tables = list(self._tables.values())
177+
178+
# set parent table relationships
179+
for table in tables:
180+
for relation in table.relations:
181+
relation.parent = table
182+
93183
return schema.filter_dfs(tables, options.tables, options.skip_tables)
94184

95185
def sync(
96186
self, options: plugin.SyncOptions
97187
) -> Generator[message.SyncMessage, None, None]:
98-
for table, record in self._db.items():
99-
yield message.SyncInsertMessage(record)
188+
resolvers: list[TableResolver] = []
189+
for table in self.get_tables(
190+
plugin.TableOptions(
191+
tables=options.tables,
192+
skip_tables=options.skip_tables,
193+
skip_dependent_tables=options.skip_dependent_tables,
194+
)
195+
):
196+
resolvers.append(table.resolver)
197+
198+
return self._scheduler.sync(
199+
self._client, resolvers, options.deterministic_cq_id
200+
)
100201

101202
def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
102203
for msg in writer:
@@ -112,7 +213,9 @@ def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
112213

113214
def read(self, table: Table) -> Generator[message.ReadMessage, None, None]:
114215
for table, record in self._db.items():
115-
yield message.ReadMessage(record)
216+
recordMetadata = record.schema.metadata.get(schema.MetadataTableName)
217+
if recordMetadata == table.name:
218+
yield message.ReadMessage(record)
116219

117220
def close(self) -> None:
118221
self._db = {}

cloudquery/sdk/internal/servers/plugin_v3/plugin.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from cloudquery.sdk.message import (
88
SyncInsertMessage,
99
SyncMigrateTableMessage,
10+
SyncErrorMessage,
1011
WriteInsertMessage,
1112
WriteMigrateTableMessage,
1213
WriteMessage,
@@ -77,6 +78,12 @@ def Sync(self, request, context):
7778
yield plugin_pb2.Sync.Response(
7879
migrate_table=plugin_pb2.Sync.MessageMigrateTable(table=buf)
7980
)
81+
elif isinstance(msg, SyncErrorMessage) and request.withErrorMessages:
82+
yield plugin_pb2.Sync.Response(
83+
error=plugin_pb2.Sync.MessageError(
84+
table_name=msg.table_name, error=msg.error
85+
)
86+
)
8087
else:
8188
# unknown sync message type
8289
raise NotImplementedError()

cloudquery/sdk/message/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
from .sync import SyncMessage, SyncInsertMessage, SyncMigrateTableMessage
1+
from .sync import (
2+
SyncMessage,
3+
SyncInsertMessage,
4+
SyncMigrateTableMessage,
5+
SyncErrorMessage,
6+
)
27
from .write import (
38
WriteMessage,
49
WriteInsertMessage,

cloudquery/sdk/message/sync.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,9 @@ def __init__(self, record: pa.RecordBatch):
1313
class SyncMigrateTableMessage(SyncMessage):
1414
def __init__(self, table: pa.Schema):
1515
self.table = table
16+
17+
18+
class SyncErrorMessage(SyncMessage):
19+
def __init__(self, table_name: str, error: str):
20+
self.table_name = table_name
21+
self.error = error

cloudquery/sdk/scheduler/scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
SyncMessage,
99
SyncInsertMessage,
1010
SyncMigrateTableMessage,
11+
SyncErrorMessage,
1112
)
1213
from cloudquery.sdk.schema import Resource
1314
from cloudquery.sdk.stateclient.stateclient import StateClient
@@ -162,6 +163,7 @@ def resolve_table(
162163
depth=depth,
163164
exc_info=e,
164165
)
166+
res.put(SyncErrorMessage(resolver.table.name, str(e)))
165167
finally:
166168
res.put(TableResolverFinished())
167169

0 commit comments

Comments
 (0)