Skip to content

Comments

transformations - updates#2718

Merged
sh-rp merged 62 commits intodevelfrom
feat/transformations-more-decoupling
Jul 11, 2025
Merged

transformations - updates#2718
sh-rp merged 62 commits intodevelfrom
feat/transformations-more-decoupling

Conversation

@sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Jun 5, 2025

Description

This PR includes the following changes:

  • Convert the readable dataset to handle sqlglot expressions where possible without intermediary steps where sql strings are generated and re-parsed.
  • Changes the transformation function to expect generators also for sql queries and relations
  • Add a configuration option to force materialization (this is very useful for testing, but also may come in handy when doing data quality checks that require extraction later)
  • Uses regular relations as dataitem object, adds new interface to mark an object that may compute its schema_hints
  • Moves handling of hints for sql transformations into the extractor.
  • Adds support for working with unbound ibis tables as well as native support for unbound ibis expressions for the call method of the dataset and transformations
  • Adds support for adding resource hints to pyarrow items that can be consumed by the ArrowExtractor
  • Removes ibis relation and consolidates dataset and relation implementations into one class each
  • Updates protocols for relation and dataset and makes these the publicly facing interface for these classes.
  • Updated docs

Open questions:

  • The whole handling of hints needs to be reviewed and probably improved. I am unwrapping and wrapping dataitems in the transformations now and this feels more like a workaround at this point. Also I am not sure if the place where the hints get merged (user supplied hints vs hints from the relation) is right, I am not 100% sure what I am doing there.

@netlify
Copy link

netlify bot commented Jun 5, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit bd4cfd7
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/6870ef4fb8a68a0008447f74

select_query = transformation_result.query()

# TODO: why? don't we prevent empty column schemas above?
all_columns = {**computed_columns, **(columns or {})}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a test for columns merging, and the columns yielded from here overwrite all incoming columns definitions from the decorator, which I think generally makes sense in dlt, but not for the transformations. I would maybe keep this topic open for now until we decide how this should work exactly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w8: you are overwriting computed_columns with columns which come from the decorator. so how are they not overwritten at the end?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you are asking. My goal would be, that if the user sets a column hin on the transformation decorator, this should overwrite whatever is discovered during lineage. At least I think it should, but I can also just remove this line and users will have to deal with this in some other way if they need to change precision or something like that.

QUERY_KNOWN_TABLE_STAR_SELECT = "SELECT * FROM table_1"
QUERY_UNKNOWN_TABLE_STAR_SELECT = "SELECT * FROM table_unknown"
QUERY_ANONYMOUS_SELECT = "SELECT LEN(col_varchar) FROM table_1"
QUERY_GIBBERISH = "%&/ GIBBERISH"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can not happen anymore, as non parseable sql strings are already caught in the dataset.

def inventory_original(dataset: SupportsReadableDataset[Any]) -> Any:
return dataset["inventory"]

@dlt.transformation(columns={"price": {"precision": 20, "scale": 2}})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the correct behavior, we should allow to set the precision for an existing column here. Or would you rather do this in the function body? I am not sure.

Will be translated to

```sql
INSERT INTO
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the actually generated insert statement. I realized we were not quoting identifiers, so I added this. We are doing 2 subqueries here now, one stemming from the normalization in the extract and one from @anuunchin normalizer work. We can probably still optimize here?

assert (
normalized_query
== 'SELECT _dlt_subquery."a" AS a, _dlt_subquery."b" AS b, UUID() AS _dlt_id FROM'
== 'SELECT _dlt_subquery."a" AS "a", _dlt_subquery."b" AS "b", UUID() AS "_dlt_id" FROM'
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the model item normalizer will now always use quoted identifiers on the outer queries, regardless wether the inner select has quoted identifiers. Since all queries arriving here now go through query normalization in the dataset where quotes are applied, this should be fine acutally.

def _query(self) -> sge.Query:
from dlt.helpers.ibis import duckdb_compiler

select_query = duckdb_compiler.to_sqlglot(self._ibis_object)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For converting an ibis expression into sqlglot, you always need a compiler which is destination dependent. I think the compiler will fail if you use function that are known not to exist on a specific destination, and certain materializations will also occur in this step. If you have an expression that counts rows, the alias for the result will be created here, which will be CountStar(*) for duckdb, but which is not a valid identifier for bigquery for example (see comment in query normalizer)

if node.db != expanded_path[0]:
node.set("catalog", sqlglot.to_identifier(expanded_path[0], quoted=False))
if isinstance(node, sge.Alias):
node.set("alias", naming.normalize_identifier(node.alias))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I make sure aliases that are created by the user or automatically by ibis in the compile step above adhere to the naming convention of the current destination. The alternative would be to use a different compiler for every destination. Normalizing the alias in the ModelNormalizer step is too late, since this code is also used for just accessing the data. Without this change, def test_row_counts will fail for bigquery because an alias with invalid symbols remains in the query. All other destinations seem to accept this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reverted this change and am now using the bigquery compiler only for bigquery destinations. Maybe we should be using the correct compiler for each destination type, but it seems duckdb works for all. I am not sure about this one.

@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch from 211cbfc to 1f5a787 Compare June 6, 2025 12:44
@sh-rp sh-rp marked this pull request as ready for review June 11, 2025 07:21
@sh-rp sh-rp requested a review from rudolfix June 11, 2025 07:21
@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch from 848233e to 28b7361 Compare June 11, 2025 13:50
# NOTE: We can use the duckdb compiler for all dialects except for bigquery
# as bigquery is more strict about identifier naming and will not accept
# identifiers generated by the duckdb compiler for anonymous columns
destination_dialect = self._dataset.sql_client.capabilities.sqlglot_dialect
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll keep the old notes:
# NOTE: ibis is optimized for reading a real schema from db and pushing back optimized sql
# - it quotes all identifiers, there's no option to get unqoted query
# - it converts full lists of column names back into star
# - it optimizes the query inside, adds meaningless aliases to all tables etc.

for example: I bet that BigQuery problem is quotation problem where " are not accepted

"Must be an SQL SELECT statement."
)

return query.sql(dialect=self._dataset.sql_client.capabilities.sqlglot_dialect)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why provided_dialect is not used here?

sqlglot_schema: SQLGlotSchema,
qualified_query: sge.Query,
sql_client: SqlClientBase[Any],
naming: NamingConvention,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed anymore. and I think it should not be needed here

select_query = transformation_result.query()

# TODO: why? don't we prevent empty column schemas above?
all_columns = {**computed_columns, **(columns or {})}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w8: you are overwriting computed_columns with columns which come from the decorator. so how are they not overwritten at the end?

+ "Please run with strict lineage or provide data_type hints "
+ f"for following columns: {unknown_column_types}",
)
yield dlt.mark.with_hints(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is something I wrote in the old PR: could we place all_column in the SqlModel? or better: could we place a relation in it? then:

  • user is able to use dlt.mark on the model (ie. their own custom hints or table name)
  • all_columns = {**computed_columns, **(columns or {})} is not needed. columns will be applied by compute_table_schema in DltResource
  • you can use relation/computed_columns in SqlModel to apply to the resource in extractor. Look how we call this method
root_table_schema = resource.compute_table_schema(items, meta)

and

def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema:
        """Computes the table schema based on hints and column definitions passed during resource creation.
        `item` parameter is used to resolve table hints based on data.
        `meta` parameter is taken from Pipe and may further specify table name if variant is to be used
        """

so SqlModel will be passed there and you can use it to extract computed hints.

Or you can implement _compute_schema on ModelExtractor for fully custom logic

Copy link
Collaborator Author

@sh-rp sh-rp Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of did that now, but I also have to unwrap and rewrap the dataitem in the transform function and I am not fully convinced that the way I did it is the right way.. Also I don't think it works for materialized queries...

@sh-rp sh-rp mentioned this pull request Jun 18, 2025
11 tasks
@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch 4 times, most recently from 6ae764d to 3584402 Compare June 25, 2025 15:07
@sh-rp sh-rp changed the title transformations - more decoupling transformations - new versions Jun 26, 2025
@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch from cc89745 to 941dae0 Compare June 26, 2025 12:42
from dlt.extract.pipe_iterator import DataItemWithMeta


class MaterializableSqlModel(SqlModel, WithComputableHints):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: find better name, or merge this with relation.

relation = unwrapped_item
# we see if the string is a valid sql query, if so we need a dataset
elif isinstance(unwrapped_item, str):
try:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will allow a malformed sql query to be interpreted as string, possibly not what we want here, maybe single strings should always strictly be interpreted as sql query and fail this transformation, otherwise the user will have very strange results.

for chunk in datasets[0](select_query).iter_arrow(chunk_size=config.buffer_max_items):
yield dlt.mark.with_hints(chunk, hints=make_hints(columns=all_columns))
for chunk in relation.iter_arrow(chunk_size=config.buffer_max_items):
yield dlt.mark.with_hints(chunk, hints=sql_model.compute_hints())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we discard the hints coming from the outside, for the sqlmodel they are merged during extract, but since we have pure arrow tables here, we can't do it. We had this idea of putting our schema into the user data area of the arrow table. I have not looked at this yet, but should we do it?

@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch from 36e2e96 to 93c97e2 Compare June 26, 2025 15:04
@rudolfix
Copy link
Collaborator

rudolfix commented Jul 1, 2025

I will review tests later

@sh-rp
Copy link
Collaborator Author

sh-rp commented Jul 2, 2025

I will review tests later

There are a bunch of relevant tests in tests/load/pipeline/test_model_item_format.py which can probably be moved to transformation tests.

sh-rp added 2 commits July 2, 2025 10:46
-> this allows to have multiple relations with different schemas in the relation, so this is allowed now too
@sh-rp
Copy link
Collaborator Author

sh-rp commented Jul 2, 2025

OK! this is pretty minor stuff now. I have one question for execution order. We execute models randomly right? so if I access data from output dataset (ie. dlt.current.pipeline.dataset()) I may get partially modified destination data.

maybe we should have a mode that we always use staging dataset? it seems we just need append strategy

I would add a note in the docs, I think it's quite unlikely that users would use tables from the output dataset as input in the same load.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! just a few more questions

fruit_p.run(fruitshop_source())

@dlt.transformation()
def multiple_transformations(dataset: Dataset) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be cool to enumerate resource without pipeline and also to have a mixed easy and eager transformations yielded

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resources are enumerated and schema access is tested in test_extract_without_source_name_or_pipeline and test_materializable_sql_model and test_enumerate_and_access_relation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the mixed type function a small fix in the normalizer was needed that prevented mixed loader_file_type tables in general

["id", "customer_id", "inventory_id", "quantity", "name"]
]

model = list(materializable_sql_model(fruit_p.dataset()))[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a specific test for resource iteration?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this means, this is tested in the line above, no?

@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch from d04e962 to 089f5ec Compare July 5, 2025 10:40
if table_name in self._filtered_tables:
continue
# allow to cache dynamic table hints if only table name is dynamic
if table_name not in self._table_contracts or resource._table_has_other_dynamic_hints:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix I had to revert his change that you made, because yielding multiple transformations with different schemas would not work without it. Relevant test is: def test_multiple_transformations_in_function. Maybe you should review this once again.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right! but this prevents caching of the computed dynamic table hints which is 99% of cases we handle. my take:
convert _table_has_other_dynamic_hints into has_other_dynamic_hints which for transformation resources is always True and for others it falls back to the current behavior

@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch from 9e72cf7 to 19822bc Compare July 7, 2025 10:03
@sh-rp sh-rp force-pushed the feat/transformations-more-decoupling branch from 19822bc to c5f9b33 Compare July 7, 2025 10:29
input_data Outdated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this shouldn't be here?

raise NotImplementedError("`fetchone()` method is not supported for this relation")


class Relation(DataAccess):
Copy link
Collaborator

@zilto zilto Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU, DataAccess and Relation are implemented as abstract base classes (abc.ABC) rather than duck-typing (Protocol). The class Relation inheriting DataAccess protocol should implement the methods directly (otherwise, make Relation the protocol).

For instance, the type inspection from the IDE complains that all methods don't return anything while they indicate they do.

image

What's the practical use case here? I thought IbisRelation was removed to have a single Dataset and Relation implementation. This has the benefit of shedding this abstract machinery

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have interfaces (protocols or abstract classes) in common and implementations in the other parts. This enables us to have these interfaces as return types in places where the implementation is not known or can not be imported due to circular dependencies

Copy link
Collaborator Author

@sh-rp sh-rp Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wrt to the protocol vs abc: Ideally the Relation would also be a Protocol, the problem is, that I am checking variables for types in the transformations to see what the transformations are yielding, and you can't check for protocols (or you can with runtime_checkable decorator but apparently that is very slow). I also don't see any problems in my IDE, maybe we are using different linters? The CI passes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This enables us to have these interfaces as return types in places where the implementation is not known or can not be imported due to circular dependencies

For this purpose, we should simply use

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
  # import from other modules

We should gradually introduce levels of abstraction. If we have a single dlt.Dataset and dlt.Relation, the additional layers make reading and editing the code harder IMO. For instance, I still struggle with the many layers of proxying and wrapping on Relation.df(), which could be written in a very straightforward manner


normalized_query = parsed_query.sql(dialect=dialect)
self._f.write("dialect: " + (dialect or "") + "\n" + normalized_query + "\n")
dialect = item.query_dialect() or (self._caps.sqlglot_dialect if self._caps else None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my question was: can item.query_dialect() NOT return a dialect in current impl? so we can remove expression after "or"

def __init__(self, *args: Any, **kwds: Any) -> None:
super().__init__(*args, **kwds)

def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this background why we do the schema caching and when

return self._columns_schema

@columns_schema.setter
def columns_schema(self, new_value: TTableSchemaColumns) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping! can we remove it?

@sh-rp sh-rp merged commit befe9ce into devel Jul 11, 2025
59 checks passed
zilto pushed a commit that referenced this pull request Jul 22, 2025
* rename flag for executing raw queries to "execute_raw_query"

* return sge queries from the internal _query method which removes a lot of unneeded transpiling
clean up make_transformation function
tests still pending

* adds some tests to readable dataset and a test for column hint merging

* allows any dialect when writing queries and fixes tests

* update docs and set correct quoting to queries in normalization and load stage

* fixes normalizer tests

* fix limit on mssql
normalize aliases in normalization step

* add missing quote to alias

* revert identifier normalization step in normalizer_query and use bigquery compiler for bigquery destinations

* post rebase fix

* smallish pr fixes

* add materializable sqlmodel and handle hints in extractor

* add and test always_materialize setting

* add test for sql transformation type

* convert transformation functions to need yield instead of return

* migrate tests and docs snippets to yield in transformations

* add simple test for materializable model

* use correct compiler for converting ibis into sqlglot for each dialect
fixes on transformation test

* add first simple version of using unbound ibis tables in transformations

* skip ibis test on python 3.9

* fix query building in new relation

* return a "real" relation from a transformation

* add ibis option when getting table from dataset
natively support unbound ibis tables in transformations and when getting relations from dataset

* update model item format tests to use relation

* * remove one unneeded test (same thing is already tested in transformations)
* fix wei conversion in linneage

* adds support for adding resource hints to pyarrow items

* switch most read access tests to default dataset

* update datasets and transformations docs pages

* separate ibis and default dbapi datasets and fix typing

* update transformation tests and small typing fixes for updated datasets

* fix default dataset type

* fix wei sqlglot conversion

* add sqlglot dialect type and some cleanup

* fix dataset snippets

* fix sqlglot schema test

* removes ibis relation and dataset
consolidates relation and dataset baseclasses with implementations
updates interfaces/protocols fro relation and dataset and makes those the publicly available interface with "Relation" and "Dataset"
remove query method from relation interface

* fix one doc snippet

* rename dataset and relation interfaces

* fix relation ship between cursor and relation, remove function wiring hack in favor of explicit forwarding for better typing

* clean up readablerelation (no actual code changes)

* fix str test to assume pretty sql (which it is now)
fix one transformation snippet

* small changes from review comments:
* query method on dataset
* typing update of table method

* rename query method to "to_sql" on relation

* clean up transform function a bit (could maybe be even better=
reject non-sql strings in transformation to not shadow errors

* add support for "non-generator" transformations

* move hints computation into resource class

* smallish PR fixes

* add support for dynamic hints in transformations
-> this allows to have multiple relations with different schemas in the relation, so this is allowed now too

* fixes dynamic table caching

* Enhances ReadableDBAPIRelation: min/max, filter with expression (#2833)

* Min max, filter with expr_or_string

* Fix in min max test

* Overload fix and docs

* Test read interfaces partially uses default relation max

* prevent sqglot schema from adding default hints info, only allow parametrized types and don't supply hints if none are present in dlt schema

* make multi schema transformations work again

* move model item format tests to transformations folder

* re-order interface tests and fix playground dataset access

* PR review test updated

* update dataset and transformation pages

* update transformations tests to new fruitshop

* Last PR fixes

* update columns_schema property

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
Co-authored-by: anuunchin <88698977+anuunchin@users.noreply.github.com>
@rudolfix rudolfix deleted the feat/transformations-more-decoupling branch January 20, 2026 10:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants