Skip to content

Commit 670eeab

Browse files
committed
feat: Report table errors from plugins via gRPC
1 parent cc679a3 commit 670eeab

File tree

5 files changed

+150
-63
lines changed

5 files changed

+150
-63
lines changed

cloudquery/sdk/internal/memdb/memdb.py

Lines changed: 129 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from cloudquery.sdk import plugin
44
from cloudquery.sdk import message
55
from cloudquery.sdk import schema
6-
from typing import List, Generator, Dict
6+
from cloudquery.sdk.scheduler import Scheduler, TableResolver
7+
from typing import List, Generator, Dict, Any
78
import pyarrow as pa
89
from cloudquery.sdk.schema.table import Table
910
from cloudquery.sdk.types import JSONType
@@ -12,91 +13,156 @@
1213
NAME = "memdb"
1314
VERSION = "development"
1415

16+
class Client:
17+
def __init__(self) -> None:
18+
pass
19+
20+
def id(self):
21+
return "memdb"
22+
23+
class MemDBResolver(TableResolver):
24+
def __init__(self, table: Table, records: List, child_resolvers: list[TableResolver] = None) -> None:
25+
super().__init__(table=table, child_resolvers=child_resolvers)
26+
self._records = records
27+
28+
def resolve(self, client: None, parent_resource) -> Generator[Any, None, None]:
29+
for record in self._records:
30+
yield record
31+
32+
class Table1Relation1(Table):
33+
def __init__(self) -> None:
34+
super().__init__(
35+
name="table_1_relation_1",
36+
columns=[
37+
schema.Column(
38+
name="name",
39+
type=pa.string(),
40+
primary_key=True,
41+
not_null=True,
42+
unique=True,
43+
),
44+
schema.Column(name="data", type=JSONType()),
45+
],
46+
title="Table 1 Relation 1",
47+
description="Test Table 1 Relation 1"
48+
)
49+
50+
@property
51+
def resolver(self):
52+
return MemDBResolver(self, records=[{"name": "a", "data": {"a": 1}}, {"name": "b", "data": {"b": 2}}, {"name": "c", "data": {"c": 3}}])
53+
54+
class Table1(Table):
55+
def __init__(self) -> None:
56+
super().__init__(
57+
name="table_1",
58+
columns=[
59+
schema.Column(
60+
name="name",
61+
type=pa.string(),
62+
primary_key=True,
63+
not_null=True,
64+
unique=True,
65+
),
66+
schema.Column(
67+
name="id",
68+
type=pa.int64(),
69+
primary_key=True,
70+
not_null=True,
71+
unique=True,
72+
incremental_key=True,
73+
),
74+
],
75+
title="Table 1",
76+
description="Test Table 1",
77+
is_incremental=True,
78+
relations=[Table1Relation1()],
79+
)
80+
81+
@property
82+
def resolver(self):
83+
child_resolvers: list[TableResolver] = []
84+
for rel in self.relations:
85+
child_resolvers.append(rel.resolver)
86+
87+
return MemDBResolver(self, records=[{"name": "a", "id": 1}, {"name": "b", "id": 2}, {"name": "c", "id": 3}], child_resolvers=child_resolvers)
88+
89+
class Table2(Table):
90+
def __init__(self) -> None:
91+
super().__init__(
92+
name="table_2",
93+
columns=[
94+
schema.Column(name="name", type=pa.string(), primary_key=True, not_null=True, unique=True),
95+
schema.Column(name="id", type=pa.int64()),
96+
],
97+
title="Table 2",
98+
description="Test Table 2",
99+
)
100+
101+
@property
102+
def resolver(self):
103+
return MemDBResolver(self, records=[{"name": "a", "id": 1}, {"name": "b", "id": 2}, {"name": "c", "id": 3}])
104+
15105

16106
@dataclass
17107
class Spec:
18-
abc: str = field(default="abc")
108+
concurrency: int = field(default=1000)
109+
queue_size: int = field(default=1000)
19110

20111

21112
class MemDB(plugin.Plugin):
22113
def __init__(self) -> None:
23114
super().__init__(
24115
NAME, VERSION, opts=plugin.plugin.Options(team="cloudquery", kind="source")
25116
)
26-
self._db: Dict[str, pa.RecordBatch] = {}
117+
table1 = Table1()
118+
table2 = Table2()
27119
self._tables: Dict[str, schema.Table] = {
28-
"table_1": schema.Table(
29-
name="table_1",
30-
columns=[
31-
schema.Column(
32-
name="name",
33-
type=pa.string(),
34-
primary_key=True,
35-
not_null=True,
36-
unique=True,
37-
),
38-
schema.Column(
39-
name="id",
40-
type=pa.string(),
41-
primary_key=True,
42-
not_null=True,
43-
unique=True,
44-
incremental_key=True,
45-
),
46-
],
47-
title="Table 1",
48-
description="Test Table 1",
49-
is_incremental=True,
50-
relations=[
51-
schema.Table(
52-
name="table_1_relation_1",
53-
columns=[
54-
schema.Column(
55-
name="name",
56-
type=pa.string(),
57-
primary_key=True,
58-
not_null=True,
59-
unique=True,
60-
),
61-
schema.Column(name="data", type=JSONType()),
62-
],
63-
title="Table 1 Relation 1",
64-
description="Test Table 1 Relation 1",
65-
)
66-
],
67-
),
68-
"table_2": schema.Table(
69-
name="table_2",
70-
columns=[
71-
schema.Column(
72-
name="name",
73-
type=pa.string(),
74-
primary_key=True,
75-
not_null=True,
76-
unique=True,
77-
),
78-
schema.Column(name="id", type=pa.string()),
79-
],
80-
title="Table 2",
81-
description="Test Table 2",
82-
),
120+
table1.name: table1,
121+
table2.name: table2,
83122
}
123+
self._db: Dict[str, pa.RecordBatch] = {}
124+
self._client = Client()
125+
126+
def set_logger(self, logger) -> None:
127+
self._logger = logger
84128

85129
def init(self, spec, no_connection: bool = False):
86130
if no_connection:
87131
return
88132
self._spec_json = json.loads(spec)
89133
self._spec = Spec(**self._spec_json)
134+
self._scheduler = Scheduler(
135+
concurrency=self._spec.concurrency,
136+
queue_size=self._spec.queue_size,
137+
logger=self._logger,
138+
)
90139

91140
def get_tables(self, options: plugin.TableOptions = None) -> List[plugin.Table]:
92141
tables = list(self._tables.values())
142+
143+
# set parent table relationships
144+
for table in tables:
145+
for relation in table.relations:
146+
relation.parent = table
147+
93148
return schema.filter_dfs(tables, options.tables, options.skip_tables)
94149

95150
def sync(
96151
self, options: plugin.SyncOptions
97152
) -> Generator[message.SyncMessage, None, None]:
98-
for table, record in self._db.items():
99-
yield message.SyncInsertMessage(record)
153+
resolvers: list[TableResolver] = []
154+
for table in self.get_tables(
155+
plugin.TableOptions(
156+
tables=options.tables,
157+
skip_tables=options.skip_tables,
158+
skip_dependent_tables=options.skip_dependent_tables,
159+
)
160+
):
161+
resolvers.append(table.resolver)
162+
163+
return self._scheduler.sync(
164+
self._client, resolvers, options.deterministic_cq_id
165+
)
100166

101167
def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
102168
for msg in writer:
@@ -112,7 +178,9 @@ def write(self, writer: Generator[message.WriteMessage, None, None]) -> None:
112178

113179
def read(self, table: Table) -> Generator[message.ReadMessage, None, None]:
114180
for table, record in self._db.items():
115-
yield message.ReadMessage(record)
181+
recordMetadata = record.schema.metadata.get(schema.MetadataTableName)
182+
if recordMetadata == table.name:
183+
yield message.ReadMessage(record)
116184

117185
def close(self) -> None:
118186
self._db = {}

cloudquery/sdk/internal/servers/plugin_v3/plugin.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from cloudquery.sdk.message import (
88
SyncInsertMessage,
99
SyncMigrateTableMessage,
10+
SyncErrorMessage,
1011
WriteInsertMessage,
1112
WriteMigrateTableMessage,
1213
WriteMessage,
@@ -77,6 +78,12 @@ def Sync(self, request, context):
7778
yield plugin_pb2.Sync.Response(
7879
migrate_table=plugin_pb2.Sync.MessageMigrateTable(table=buf)
7980
)
81+
elif isinstance(msg, SyncErrorMessage) and request.withErrorMessages:
82+
yield plugin_pb2.Sync.Response(
83+
error=plugin_pb2.Sync.MessageError(
84+
table_name=msg.table_name, error=msg.error
85+
)
86+
)
8087
else:
8188
# unknown sync message type
8289
raise NotImplementedError()

cloudquery/sdk/message/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
from .sync import SyncMessage, SyncInsertMessage, SyncMigrateTableMessage
1+
from .sync import (
2+
SyncMessage,
3+
SyncInsertMessage,
4+
SyncMigrateTableMessage,
5+
SyncErrorMessage,
6+
)
27
from .write import (
38
WriteMessage,
49
WriteInsertMessage,

cloudquery/sdk/message/sync.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,9 @@ def __init__(self, record: pa.RecordBatch):
1313
class SyncMigrateTableMessage(SyncMessage):
1414
def __init__(self, table: pa.Schema):
1515
self.table = table
16+
17+
18+
class SyncErrorMessage(SyncMessage):
19+
def __init__(self, table_name: str, error: str):
20+
self.table_name = table_name
21+
self.error = error

cloudquery/sdk/scheduler/scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
SyncMessage,
99
SyncInsertMessage,
1010
SyncMigrateTableMessage,
11+
SyncErrorMessage,
1112
)
1213
from cloudquery.sdk.schema import Resource
1314
from cloudquery.sdk.stateclient.stateclient import StateClient
@@ -162,9 +163,9 @@ def resolve_table(
162163
depth=depth,
163164
exc_info=e,
164165
)
166+
res.put(SyncErrorMessage(resolver.table.name, str(e)))
165167
finally:
166168
res.put(TableResolverFinished())
167-
168169
def _sync(
169170
self,
170171
client,

0 commit comments

Comments
 (0)