Skip to content

Commit e69c778

Browse files
rob-apellajbreeden
andauthored
Use SchemaV2, allow composite pk + multiple watermarks (#40)
* use SchemaV2, allow composite pks + multiple watermarks * Update example_project/example/models/example/http_events_bytes_sent.sql --------- Co-authored-by: Jared Breeden <[email protected]>
1 parent afaf537 commit e69c778

File tree

12 files changed

+447
-256
lines changed

12 files changed

+447
-256
lines changed

.github/workflows/main.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,14 @@ jobs:
127127
fail-fast: false
128128
matrix:
129129
os: [ubuntu-latest, macos-latest, windows-latest]
130-
python-version: [3.7, 3.8, 3.9]
130+
python-version: [3.8, 3.9, "3.10"]
131+
include:
132+
- os: ubuntu-20.04
133+
python-version: 3.7
134+
- os: macos-13
135+
python-version: 3.7
136+
- os: windows-latest
137+
python-version: 3.7
131138

132139
steps:
133140
- name: Set up Python ${{ matrix.python-version }}

dbt/adapters/decodable/impl.py

Lines changed: 45 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
# limitations under the License.
1515
#
1616

17-
from dataclasses import dataclass
18-
from typing import Any, ContextManager, Dict, Hashable, List, Optional, Set, Type
17+
from dataclasses import dataclass, field as dataclass_field
18+
from typing import Any, ContextManager, Dict, Hashable, List, Optional, Set, Type, Sequence
1919

2020
from agate.table import Table as AgateTable
2121
from dbt.adapters.base import BaseAdapter, BaseRelation, Column
@@ -41,12 +41,10 @@
4141
from dbt.adapters.decodable.relation import DecodableRelation
4242
from decodable.client.client import (
4343
DecodableControlPlaneApiClient,
44-
SchemaField,
4544
DecodableDataPlaneApiClient,
4645
)
4746
from decodable.client.types import (
4847
FieldType,
49-
PrimaryKey,
5048
String,
5149
Boolean,
5250
TimestampLocal,
@@ -55,10 +53,13 @@
5553
Decimal,
5654
)
5755

56+
from decodable.client.schema import SchemaV2, SchemaField, PhysicalSchemaField, Constraints
57+
5858

5959
@dataclass
6060
class DecodableConfig(AdapterConfig):
61-
watermark: Optional[str] = None
61+
watermarks: List[Dict[str, str]] = dataclass_field(default_factory=list)
62+
primary_key: List[str] = dataclass_field(default_factory=list)
6263

6364

6465
class DecodableAdapter(BaseAdapter):
@@ -377,9 +378,9 @@ def get_columns_in_relation(
377378
stream_id=relation.render()
378379
)
379380

380-
for schema_column in stream_info["schema"]:
381+
for schema_column in stream_info["schema_v2"]["fields"]:
381382
columns.append(
382-
Column.create(name=schema_column["name"], label_or_dtype=schema_column["type"])
383+
Column.create(name=schema_column["name"], label_or_dtype=schema_column.get("type"))
383384
)
384385

385386
return columns
@@ -389,22 +390,19 @@ def has_changed(
389390
self,
390391
sql: str,
391392
relation: BaseRelation,
392-
watermark: Optional[str] = None,
393-
primary_key: Optional[str] = None,
393+
watermarks: List[Dict[str, str]],
394+
primary_key: List[str],
394395
) -> bool:
395396
client = self._control_plane_client()
396397

397398
new_pipe_sql = self._wrap_as_pipeline(relation.render(), sql)
398-
fields: List[Dict[str, str]] = client.get_stream_from_sql(new_pipe_sql)["schema"]
399+
schema_json: Dict[str, Any] = client.get_stream_from_sql(new_pipe_sql)["schema_v2"]
399400

400-
schema: List[SchemaField]
401+
new_schema: SchemaV2
401402
try:
402-
schema = self._schema_from_json(fields)
403-
# The API returns the current primary key field. For the to-be value in this comparison, we set it to
404-
# the field specified via config.
405-
self._remove_primary_key(schema)
406-
if primary_key:
407-
self._set_primary_key(primary_key, schema)
403+
new_schema = SchemaV2.from_json_components(
404+
schema_json["fields"], watermarks, primary_key
405+
)
408406
except Exception as err:
409407
raise_compiler_error(f"Error checking changes to the '{relation}' stream: {err}")
410408

@@ -421,20 +419,13 @@ def has_changed(
421419
if pipe_info["sql"] != new_pipe_sql:
422420
return True
423421

424-
if stream_info["watermark"] != watermark:
425-
return True
426-
427-
existing_schema: List[SchemaField]
428-
existing_schema_fields = stream_info["schema"]
422+
existing_schema: SchemaV2
429423
try:
430-
existing_schema = self._schema_from_json(existing_schema_fields)
424+
existing_schema = SchemaV2.from_json(stream_info["schema_v2"])
431425
except Exception as err:
432426
raise_compiler_error(f"Error checking changes to the '{relation}' stream: {err}")
433427

434-
if existing_schema != schema:
435-
return True
436-
437-
return False
428+
return existing_schema != new_schema
438429

439430
@available
440431
def create_table(
@@ -443,8 +434,8 @@ def create_table(
443434
temporary: bool,
444435
relation: BaseRelation,
445436
nodes: Dict[str, Any],
446-
watermark: Optional[str] = None,
447-
primary_key: Optional[str] = None,
437+
watermarks: List[Dict[str, str]],
438+
primary_key: List[str],
448439
) -> None:
449440
if not relation.identifier:
450441
raise_compiler_error("Cannot create an unnamed relation")
@@ -464,20 +455,24 @@ def create_table(
464455

465456
fields: List[Dict[str, str]] = client.get_stream_from_sql(
466457
self._wrap_as_pipeline(relation.render(), sql)
467-
)["schema"]
458+
)["schema_v2"]["fields"]
468459

469460
if not fields:
470461
raise_database_error(
471462
f"Error creating the {relation} stream: empty schema returned for sql:\n{sql}"
472463
)
473464

474-
schema: List[SchemaField]
465+
schema: SchemaV2
475466
try:
476-
schema = self._schema_from_json(fields)
467+
schema = SchemaV2.from_json_components(
468+
fields,
469+
watermarks,
470+
primary_key,
471+
)
477472
except Exception as err:
478473
raise_compiler_error(f"Error creating the {relation} stream: {err}")
479474

480-
schema_hints: Set[SchemaField]
475+
schema_hints: Set[PhysicalSchemaField]
481476
try:
482477
if model:
483478
schema_hints = self._get_model_schema_hints(model)
@@ -486,19 +481,14 @@ def create_table(
486481
except Exception as err:
487482
raise_parsing_error(f"Error creating the {relation} stream: {err}")
488483

489-
if not schema_hints.issubset(schema):
484+
if not schema_hints.issubset(schema.fields):
490485
self.logger.warning(
491-
f"Column hints for '{name}' don't match the resulting schema:\n{self._pretty_schema(list(schema_hints), 1, 'hints')}\n{self._pretty_schema(schema, 1, 'schema')}"
486+
f"Column hints for '{name}' don't match the resulting schema:\n{self._pretty_schema(list(schema_hints), 1, 'hints')}\n{self._pretty_schema(schema.fields, 1, 'schema')}"
492487
)
493488

494-
if primary_key:
495-
for field in schema:
496-
if field.name == primary_key:
497-
field.type = PrimaryKey(field.type)
498-
499489
stream_id = client.get_stream_id(relation.render())
500490
if not stream_id:
501-
client.create_stream(relation.render(), schema, watermark)
491+
client.create_stream(relation.render(), schema)
502492
self.logger.debug(f"Stream '{relation}' successfully created!")
503493
else:
504494
raise_database_error(f"Error creating the {relation} stream: stream already exists!")
@@ -516,7 +506,7 @@ def create_table(
516506
def create_seed_table(
517507
self, table_name: str, agate_table: AgateTable, column_override: Dict[str, str]
518508
):
519-
schema: List[SchemaField] = []
509+
schema_fields: List[PhysicalSchemaField] = []
520510

521511
column_names: tuple[str] = agate_table.column_names
522512
for ix, col_name in enumerate(column_names):
@@ -543,12 +533,14 @@ def create_seed_table(
543533
f"Inferred type `{type}` for column `{col_name}` doesn't match any of Decodable's known types"
544534
)
545535

546-
schema.append(SchemaField(name=col_name, type=field_type))
536+
schema_fields.append(PhysicalSchemaField(name=col_name, type=field_type))
547537

548538
client = self._control_plane_client()
549539

550540
self.logger.debug(f"Creating connection and stream for seed `{table_name}`...")
551-
response = client.create_connection(name=table_name, schema=schema)
541+
response = client.create_connection(
542+
name=table_name, schema=SchemaV2(schema_fields, [], Constraints(primary_key=[]))
543+
)
552544
self.logger.debug(f"Connection and stream `{table_name}` successfully created!")
553545

554546
self.logger.debug(f"Activating connection `{table_name}`...")
@@ -666,55 +658,21 @@ def _data_plane_client(self) -> DecodableDataPlaneApiClient:
666658
return handle.data_plane_client
667659

668660
@classmethod
669-
def _get_model_schema_hints(cls, model: ParsedNode) -> Set[SchemaField]:
670-
schema: Set[SchemaField] = set()
671-
for column in model.columns.values():
672-
name: str = column.name
673-
data_type: Optional[str] = column.data_type
674-
675-
if not data_type:
676-
continue
677-
678-
t = FieldType.from_str(data_type)
679-
if not t:
680-
raise_compiler_error(f"Type '{data_type}' not recognized")
681-
682-
schema.add(SchemaField(name=name, type=t))
683-
684-
return schema
685-
686-
@staticmethod
687-
def _set_primary_key(primary_key_field: str, schema: List[SchemaField]) -> None:
688-
"""
689-
Sets the primary key to the specified field in the provided schema. Does nothing if the schema does not contain
690-
the specified field.
691-
"""
692-
for field in schema:
693-
if isinstance(field.type, PrimaryKey):
694-
raise ValueError(
695-
f"Trying to set primary key to {primary_key_field}, but schema already has a primary "
696-
f"key assigned to {field.name}"
697-
)
698-
if field.name == primary_key_field:
699-
field.type = PrimaryKey(field.type)
700-
701-
@staticmethod
702-
def _remove_primary_key(schema: List[SchemaField]) -> None:
703-
"""
704-
Removes the primary key from the provided schema (if present).
705-
"""
706-
for field in schema:
707-
if isinstance(field.type, PrimaryKey):
708-
field.type = field.type.inner_type
661+
def _get_model_schema_hints(cls, model: ParsedNode) -> Set[PhysicalSchemaField]:
662+
return {
663+
PhysicalSchemaField.get(column.name, column.data_type)
664+
for column in model.columns.values()
665+
if column.data_type
666+
}
709667

710668
@staticmethod
711669
def _pretty_schema(
712-
schema: List[SchemaField], indent: int = 0, name: Optional[str] = None
670+
schema: Sequence[SchemaField], indent: int = 0, name: Optional[str] = None
713671
) -> str:
714672
fields = ""
715-
for field in sorted(schema, key=lambda sf: sf.name):
673+
for field_ in sorted(schema, key=lambda sf: sf.name):
716674
i = "\t" * (indent + 1)
717-
fields += f"{i}{field.name}: {field.type},\n"
675+
fields += f"{i}{field_},\n"
718676

719677
i = "\t" * indent
720678
prefix = f"{i}{{"
@@ -727,16 +685,6 @@ def _pretty_schema(
727685

728686
return f"{prefix}\n{fields}{suffix}"
729687

730-
@classmethod
731-
def _schema_from_json(cls, json: List[Dict[str, str]]) -> List[SchemaField]:
732-
schema: List[SchemaField] = []
733-
for field in json:
734-
t = FieldType.from_str(field["type"])
735-
if not t:
736-
raise_compiler_error(f"Type '{field['type']}' not recognized")
737-
schema.append(SchemaField(name=field["name"], type=t))
738-
return schema
739-
740688
@classmethod
741689
def _wrap_as_pipeline(cls, sink: str, sql: str) -> str:
742690
return f"INSERT INTO {sink} {sql}"

dbt/include/decodable/macros/materializations/table/create_table_as.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
{% macro decodable__create_table_as(temporary, relation, sql) -%}
18-
{% set watermark = config.get('watermark') %}
19-
{% set primary_key = config.get('primary_key') %}
20-
{% do adapter.create_table(sql, temporary, relation, graph.nodes, watermark, primary_key) %}
18+
{% set watermarks = config.get('watermarks', []) %}
19+
{% set primary_key = config.get('primary_key', []) %}
20+
{% do adapter.create_table(sql, temporary, relation, graph.nodes, watermarks, primary_key) %}
2121
{%- endmacro %}

dbt/include/decodable/macros/materializations/table/table.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828

2929
{% set should_create = {'value': false} %}
3030
{% if existing_relation is not none %}
31-
{% set watermark = config.get('watermark') %}
32-
{% set primary_key = config.get('primary_key') %}
33-
{% if adapter.has_changed(sql, target_relation, watermark, primary_key) or should_full_refresh() %}
31+
{% set watermarks = config.get('watermarks', []) %}
32+
{% set primary_key = config.get('primary_key', []) %}
33+
{% if adapter.has_changed(sql, target_relation, watermarks, primary_key) or should_full_refresh() %}
3434
{{ adapter.drop_relation(existing_relation) }}
3535
{% do should_create.update({'value': true}) %}
3636
{% endif %}

0 commit comments

Comments
 (0)