Skip to content

Commit afaf537

Browse files
authored
Issue 33: Migrate stream clear to the data plane (#37)
* issue-33: Add data plane client to Decodable adapter * issue-33: Migrate stream clear to use the data plane * issue-33: Formatting * issue-33: Pyright
1 parent 21231c6 commit afaf537

File tree

2 files changed

+60
-23
lines changed

2 files changed

+60
-23
lines changed

dbt/adapters/decodable/impl.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
from agate.table import Table as AgateTable
2121
from dbt.adapters.base import BaseAdapter, BaseRelation, Column
22+
from dbt.adapters.base.meta import available
23+
from dbt.adapters.protocol import AdapterConfig
2224
from dbt.contracts.connection import Connection
2325
from dbt.contracts.graph.manifest import Manifest
2426
from dbt.contracts.graph.parsed import ParsedNode
25-
from dbt.adapters.base.meta import available
2627
from dbt.contracts.relation import RelationType
2728
from dbt.events import AdapterLogger
2829
from dbt.exceptions import (
@@ -38,8 +39,11 @@
3839
)
3940
from dbt.adapters.decodable.handler import DecodableHandler
4041
from dbt.adapters.decodable.relation import DecodableRelation
41-
from dbt.adapters.protocol import AdapterConfig
42-
from decodable.client.client import DecodableControlPlaneApiClient, SchemaField
42+
from decodable.client.client import (
43+
DecodableControlPlaneApiClient,
44+
SchemaField,
45+
DecodableDataPlaneApiClient,
46+
)
4347
from decodable.client.types import (
4448
FieldType,
4549
PrimaryKey,
@@ -189,7 +193,7 @@ def drop_relation(self, relation: BaseRelation) -> None:
189193
if relation.identifier is None:
190194
return
191195

192-
client = self._client()
196+
client = self._control_plane_client()
193197

194198
self.logger.debug(f"Dropping pipeline '{relation}'...")
195199

@@ -243,23 +247,25 @@ def truncate_relation(self, relation: BaseRelation) -> None:
243247
if not relation.identifier:
244248
raise_compiler_error("Cannot truncate an unnamed relation")
245249

246-
client = self._client()
247-
stream_id = client.get_stream_id(relation.render())
250+
control_plane_client = self._control_plane_client()
251+
data_plane_client = self._data_plane_client()
252+
stream_id = control_plane_client.get_stream_id(relation.render())
248253

249254
if not stream_id:
250255
raise_database_error(
251256
f"Error clearing stream `{relation.render()}`: stream doesn't exist"
252257
)
253258

254-
client.clear_stream(stream_id)
259+
clear_token_response = control_plane_client.get_clear_stream_token(stream_id)
260+
data_plane_client.clear_stream(stream_id, clear_token_response.token)
255261

256262
@available.parse_none
257263
def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None:
258264
"""Rename the relation from from_relation to to_relation.
259265
260266
Implementors must call self.cache.rename() to preserve cache state.
261267
"""
262-
client = self._client()
268+
client = self._control_plane_client()
263269
self.cache_renamed(from_relation, to_relation)
264270

265271
if not from_relation.identifier:
@@ -345,7 +351,7 @@ def expand_column_types(self, goal: BaseRelation, current: BaseRelation) -> None
345351
def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]:
346352
relations: List[BaseRelation] = []
347353

348-
stream_list: List[Dict[str, Any]] = self._client().list_streams().items
354+
stream_list: List[Dict[str, Any]] = self._control_plane_client().list_streams().items
349355
for stream in stream_list:
350356
relations.append(
351357
self.Relation.create(
@@ -367,7 +373,9 @@ def get_columns_in_relation(
367373
if not relation.identifier:
368374
return []
369375

370-
stream_info = self._client().get_stream_information(stream_id=relation.render())
376+
stream_info = self._control_plane_client().get_stream_information(
377+
stream_id=relation.render()
378+
)
371379

372380
for schema_column in stream_info["schema"]:
373381
columns.append(
@@ -384,7 +392,7 @@ def has_changed(
384392
watermark: Optional[str] = None,
385393
primary_key: Optional[str] = None,
386394
) -> bool:
387-
client = self._client()
395+
client = self._control_plane_client()
388396

389397
new_pipe_sql = self._wrap_as_pipeline(relation.render(), sql)
390398
fields: List[Dict[str, str]] = client.get_stream_from_sql(new_pipe_sql)["schema"]
@@ -452,7 +460,7 @@ def create_table(
452460
if not model:
453461
self.logger.debug(f"Model {relation.render()} not found in dbt graph")
454462

455-
client = self._client()
463+
client = self._control_plane_client()
456464

457465
fields: List[Dict[str, str]] = client.get_stream_from_sql(
458466
self._wrap_as_pipeline(relation.render(), sql)
@@ -537,7 +545,7 @@ def create_seed_table(
537545

538546
schema.append(SchemaField(name=col_name, type=field_type))
539547

540-
client = self._client()
548+
client = self._control_plane_client()
541549

542550
self.logger.debug(f"Creating connection and stream for seed `{table_name}`...")
543551
response = client.create_connection(name=table_name, schema=schema)
@@ -550,7 +558,7 @@ def create_seed_table(
550558
@available
551559
def send_seed_as_events(self, seed_name: str, data: AgateTable):
552560
self.logger.debug(f"Sending data to connection `{seed_name}`")
553-
client = self._client()
561+
client = self._control_plane_client()
554562

555563
conn_id = client.get_connection_id(seed_name)
556564
if not conn_id:
@@ -576,7 +584,7 @@ def send_seed_as_events(self, seed_name: str, data: AgateTable):
576584

577585
@available
578586
def reactivate_connection(self, connection: Relation):
579-
client = self._client()
587+
client = self._control_plane_client()
580588

581589
conn_id = client.get_connection_id(connection.render())
582590
if not conn_id:
@@ -586,7 +594,7 @@ def reactivate_connection(self, connection: Relation):
586594

587595
@available
588596
def stop_pipeline(self, pipe: Relation):
589-
client = self._client()
597+
client = self._control_plane_client()
590598

591599
pipe_id = client.get_pipeline_id(pipe.render())
592600
if not pipe_id:
@@ -596,7 +604,7 @@ def stop_pipeline(self, pipe: Relation):
596604

597605
@available
598606
def delete_pipeline(self, pipe: Relation):
599-
client = self._client()
607+
client = self._control_plane_client()
600608

601609
pipeline_id = client.get_pipeline_id(pipe.render())
602610
if pipeline_id:
@@ -607,7 +615,7 @@ def delete_pipeline(self, pipe: Relation):
607615

608616
@available
609617
def delete_stream(self, stream: Relation, skip_errors: bool = False):
610-
client = self._client()
618+
client = self._control_plane_client()
611619

612620
stream_id = client.get_stream_id(stream.render())
613621
if not stream_id:
@@ -623,7 +631,7 @@ def delete_stream(self, stream: Relation, skip_errors: bool = False):
623631

624632
@available
625633
def delete_connection(self, conn: Relation):
626-
client = self._client()
634+
client = self._control_plane_client()
627635

628636
conn_id = client.get_connection_id(conn.render())
629637
if not conn_id:
@@ -645,12 +653,18 @@ def should_materialize_tests(self) -> bool:
645653
return False
646654
return credentials.materialize_tests
647655

648-
def _client(self) -> DecodableControlPlaneApiClient:
656+
def _control_plane_client(self) -> DecodableControlPlaneApiClient:
649657
handle: DecodableHandler = (
650658
self.get_thread_connection().handle
651659
) # pyright: ignore [reportGeneralTypeIssues]
652660
return handle.control_plane_client
653661

662+
def _data_plane_client(self) -> DecodableDataPlaneApiClient:
663+
handle: DecodableHandler = (
664+
self.get_thread_connection().handle
665+
) # pyright: ignore [reportGeneralTypeIssues]
666+
return handle.data_plane_client
667+
654668
@classmethod
655669
def _get_model_schema_hints(cls, model: ParsedNode) -> Set[SchemaField]:
656670
schema: Set[SchemaField] = set()

decodable/client/client.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,22 @@ def from_dict(cls, response: Dict[str, Any]) -> PreviewTokensResponse:
9797
)
9898

9999

100+
@dataclass
101+
class DataPlaneTokenResponse:
102+
data_plane_request: Optional[str]
103+
token: str
104+
105+
@classmethod
106+
def from_dict(cls, response: Dict[str, Any]) -> DataPlaneTokenResponse:
107+
return cls(
108+
# Some requests don't include a body, so check whether it's present
109+
data_plane_request=response["data_plane_request"]
110+
if "data_plane_request" in response
111+
else None,
112+
token=response["token"],
113+
)
114+
115+
100116
class DecodableAPIException(Exception):
101117
@classmethod
102118
def category(cls) -> str:
@@ -169,6 +185,11 @@ def get_preview(self, auth_token: str, next_token: str) -> PreviewResponse:
169185
)
170186
return PreviewResponse.from_dict(response.json())
171187

188+
def clear_stream(self, stream_id: str, token: str) -> None:
189+
self._post_api_request(
190+
bearer_token=token, endpoint_url=f"{self.config.api_url}/streams/{stream_id}/clear"
191+
)
192+
172193
def _get_api_request(
173194
self,
174195
bearer_token: str,
@@ -291,10 +312,12 @@ def delete_stream(self, stream_id: str) -> None:
291312
endpoint_url=f"{self.config.decodable_api_url()}/streams/{stream_id}"
292313
)
293314

294-
def clear_stream(self, stream_id: str) -> None:
295-
self._post_api_request(
296-
payload={}, endpoint_url=f"{self.config.decodable_api_url()}/streams/{stream_id}/clear"
315+
def get_clear_stream_token(self, stream_id: str) -> DataPlaneTokenResponse:
316+
response = self._post_api_request(
317+
payload={},
318+
endpoint_url=f"{self.config.decodable_api_url()}/streams/{stream_id}/clear/token",
297319
)
320+
return DataPlaneTokenResponse.from_dict(response.json())
298321

299322
def list_pipelines(self) -> ApiResponse:
300323
response = self._get_api_request(

0 commit comments

Comments
 (0)