Skip to content

Commit 941dae0

Browse files
committed
migrate tests and docs snippets to yield in transformations
1 parent 3ca690c commit 941dae0

File tree

7 files changed

+28
-30
lines changed

7 files changed

+28
-30
lines changed

docs/website/docs/general-usage/transformations/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ Most of the following examples will be using the ibis expressions of the `dlt.Da
9090

9191
* **Decorator arguments** mirror those accepted by `@dlt.resource`.
9292
* The transformation function signature must contain at least one `dlt.Dataset` which is used inside the function to create the transformation SQL statements and calculate the resulting schema update.
93-
* Return a `TReadableRelation` created with ibis expressions or a select query which will be materialized into the destination table. _Do **not** yield Python dictionaries._
93+
* Yields a `TReadableRelation` created with ibis expressions or a select query which will be materialized into the destination table. If the first item yielded is a valid sql query or relation object, data will be interpreted as a transformation. In all other cases, the tranformation decorator will work like any other resource.
9494

9595
## Loading to other datasets
9696

docs/website/docs/general-usage/transformations/transformation-snippets.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def basic_transformation_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
3636
def copied_customers(dataset: dlt.Dataset) -> Any:
3737
# Ibis expression: sort by name and keep first 5 rows
3838
customers_table = dataset["customers"]
39-
return customers_table.order_by("name").limit(5)
39+
yield customers_table.order_by("name").limit(5)
4040

4141
# Same pipeline & same dataset
4242
fruitshop_pipeline.run(copied_customers(fruitshop_pipeline.dataset()))
@@ -55,7 +55,7 @@ def orders_per_user_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
5555
@dlt.transformation(name="orders_per_user", write_disposition="merge")
5656
def orders_per_user(dataset: dlt.Dataset) -> Any:
5757
purchases = dataset["purchases"]
58-
return purchases.group_by(purchases.customer_id).aggregate(order_count=purchases.id.count())
58+
yield purchases.group_by(purchases.customer_id).aggregate(order_count=purchases.id.count())
5959

6060
# @@@DLT_SNIPPET_END orders_per_user
6161
fruitshop_pipeline.run(orders_per_user(fruitshop_pipeline.dataset()))
@@ -69,7 +69,7 @@ def loading_to_other_datasets_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
6969
@dlt.transformation()
7070
def copied_customers(dataset: dlt.Dataset) -> Any:
7171
customers_table = dataset["customers"]
72-
return customers_table.order_by(customers_table.name).limit(5)
72+
yield customers_table.order_by(customers_table.name).limit(5)
7373

7474
# Same duckdb instance, different dataset
7575
dest_p = dlt.pipeline(
@@ -98,12 +98,12 @@ def my_transformations(dataset: dlt.Dataset) -> Any:
9898
def enriched_purchases(dataset: dlt.Dataset) -> Any:
9999
purchases = dataset["purchases"]
100100
customers = dataset["customers"]
101-
return purchases.join(customers, purchases.customer_id == customers.id)
101+
yield purchases.join(customers, purchases.customer_id == customers.id)
102102

103103
@dlt.transformation(write_disposition="replace")
104104
def total_items_sold(dataset: dlt.Dataset) -> Any:
105105
purchases = dataset["purchases"]
106-
return purchases.aggregate(total_qty=purchases.quantity.sum())
106+
yield purchases.aggregate(total_qty=purchases.quantity.sum())
107107

108108
return enriched_purchases(dataset), total_items_sold(dataset)
109109

@@ -129,7 +129,7 @@ def sql_queries_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
129129
@dlt.transformation()
130130
def copied_customers(dataset: dlt.Dataset) -> Any:
131131
customers_table = dataset("SELECT * FROM customers LIMIT 5 ORDER BY name")
132-
return customers_table
132+
yield customers_table
133133

134134
# @@@DLT_SNIPPET_END sql_queries_short
135135

@@ -140,7 +140,7 @@ def enriched_purchases(dataset: dlt.Dataset) -> Any:
140140
"SELECT customers.name, purchases.quantity FROM purchases JOIN customers ON"
141141
" purchases.customer_id = customers.id"
142142
)
143-
return enriched_purchases
143+
yield enriched_purchases
144144

145145
# You can even use a different dialect than the one used by the destination by supplying the dialect parameter
146146
# dlt will compile the query to the right destination dialect
@@ -151,7 +151,7 @@ def enriched_purchases_postgres(dataset: dlt.Dataset) -> Any:
151151
" purchases.customer_id = customers.id",
152152
query_dialect="duckdb",
153153
)
154-
return enriched_purchases
154+
yield enriched_purchases
155155

156156
# @@@DLT_SNIPPET_END sql_queries
157157

@@ -227,7 +227,7 @@ def enriched_purchases(dataset: dlt.Dataset) -> Any:
227227
"SELECT customers.name, purchases.quantity FROM purchases JOIN customers ON"
228228
" purchases.customer_id = customers.id"
229229
)
230-
return enriched_purchases
230+
yield enriched_purchases
231231

232232
# Let's run the transformation and see that the name column in the NEW table is also marked as PII
233233
fruitshop_pipeline.run(enriched_purchases(fruitshop_pipeline.dataset()))
@@ -272,7 +272,7 @@ def in_transit_transformations_snippet() -> None:
272272
def orders_per_store(dataset: dlt.Dataset) -> Any:
273273
orders = dataset["orders"]
274274
stores = dataset["stores"]
275-
return (
275+
yield (
276276
orders.join(stores, orders.store_id == stores.id)
277277
.group_by(stores.name)
278278
.aggregate(order_count=orders.id.count())
@@ -312,9 +312,7 @@ def cleaned_customers(dataset: dlt.Dataset) -> Any:
312312
customers_table = dataset.customers
313313

314314
# filter only new customers and exclude the name column in the result
315-
return customers_table.filter(customers_table.id > max_pimary_key).drop(
316-
customers_table.name
317-
)
315+
yield customers_table.filter(customers_table.id > max_pimary_key).drop(customers_table.name)
318316

319317
# create a warehouse dataset, would ordinarily be snowflake or some other warehousing destination
320318
warehouse_pipeline = dlt.pipeline(

tests/load/test_configuration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
def test_transformation_defaults() -> None:
99
@dlt.transformation()
1010
def my_tf(dataset: SupportsReadableDataset[Any]) -> Any:
11-
return dataset["example_table"].limit(5)
11+
yield dataset["example_table"].limit(5)
1212

1313
assert my_tf.write_disposition == "append"
1414
# assert my_tf(dataset).materialization == "table"

tests/load/transformations/test_basic_transformations.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def test_simple_query_transformations(destination_config: DestinationTestConfigu
3737

3838
@dlt.transformation()
3939
def copied_customers(dataset: dlt.Dataset) -> Any:
40-
return dataset["customers"].limit(5)
40+
yield dataset["customers"].limit(5)
4141

4242
# transform into transformed dataset
4343
dest_p.run(copied_customers(fruit_p.dataset()))
@@ -68,11 +68,11 @@ def test_grouped_transformations(destination_config: DestinationTestConfiguratio
6868

6969
@dlt.transformation()
7070
def copied_customers(dataset: dlt.Dataset) -> Any:
71-
return dataset["customers"].limit(5)
71+
yield dataset["customers"].limit(5)
7272

7373
@dlt.transformation()
7474
def copied_customers2(dataset: dlt.Dataset) -> Any:
75-
return dataset["customers"].limit(7)
75+
yield dataset["customers"].limit(7)
7676

7777
@dlt.source()
7878
def transformations(dataset: dlt.Dataset) -> List[Any]:
@@ -113,7 +113,7 @@ def test_replace_sql_transformations(destination_config: DestinationTestConfigur
113113

114114
@dlt.transformation(write_disposition="replace")
115115
def copied_customers(dataset: dlt.Dataset) -> Any:
116-
return dataset["customers"].limit(5)
116+
yield dataset["customers"].limit(5)
117117

118118
# transform into same dataset
119119
dest_p.run(copied_customers(fruit_p.dataset()))
@@ -128,7 +128,7 @@ def copied_customers(dataset: dlt.Dataset) -> Any:
128128
table_name="copied_customers",
129129
)
130130
def copied_customers_updated(dataset: dlt.Dataset) -> Any:
131-
return dataset["customers"].limit(3)
131+
yield dataset["customers"].limit(3)
132132

133133
# transform into same dataset
134134
dest_p.run(copied_customers_updated(fruit_p.dataset()))
@@ -151,7 +151,7 @@ def test_append_sql_transformations(destination_config: DestinationTestConfigura
151151

152152
@dlt.transformation(write_disposition="append")
153153
def copied_customers(dataset: dlt.Dataset) -> Any:
154-
return dataset["customers"].limit(5)
154+
yield dataset["customers"].limit(5)
155155

156156
# transform into same dataset
157157
dest_p.run(copied_customers(fruit_p.dataset()))
@@ -162,7 +162,7 @@ def copied_customers(dataset: dlt.Dataset) -> Any:
162162

163163
@dlt.transformation(write_disposition="append", table_name="copied_customers")
164164
def copied_table_updated(dataset: dlt.Dataset) -> Any:
165-
return dataset["customers"].limit(7)
165+
yield dataset["customers"].limit(7)
166166

167167
# transform into same dataset
168168
dest_p.run(copied_table_updated(fruit_p.dataset()))
@@ -194,15 +194,15 @@ def test_sql_transformation_with_unknown_column_types(
194194

195195
@dlt.transformation()
196196
def mutated_purchases(dataset: dlt.Dataset) -> Any:
197-
return dataset["customers"].mutate(new_col=5).limit(5)
197+
yield dataset["customers"].mutate(new_col=5).limit(5)
198198

199199
# problem should already be detected at extraction time
200200
with pytest.raises(PipelineStepFailed):
201201
dest_p.extract(mutated_purchases(fruit_p.dataset()))
202202

203203
@dlt.transformation()
204204
def mutated_purchases_with_hints(dataset: dlt.Dataset) -> Any:
205-
return dataset["customers"].mutate(new_col=5).limit(5)
205+
yield dataset["customers"].mutate(new_col=5).limit(5)
206206

207207
dest_p.run(mutated_purchases_with_hints(fruit_p.dataset()))
208208
assert load_table_counts(dest_p, "mutated_purchases_with_hints") == {

tests/load/transformations/test_incremental_transforms.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def transformed_items(dataset: dlt.Dataset, last_loaded_load_id: str) -> Any:
9797
dlt.current.resource_state()[LAST_PROCESSED_LOAD_ID] = max_load_id
9898

9999
# return filtered transformation
100-
return items_table.filter(
100+
yield items_table.filter(
101101
items_table._dlt_load_id > last_processed_load_id,
102102
items_table._dlt_load_id <= last_loaded_load_id,
103103
).mutate(double_items=items_table.id * 2)
@@ -149,7 +149,7 @@ def transformed_items(dataset: dlt.Dataset) -> Any:
149149

150150
# return filtered transformation
151151
items_table = dataset.items
152-
return items_table.filter(items_table.id > max_pimary_key).mutate(
152+
yield items_table.filter(items_table.id > max_pimary_key).mutate(
153153
double_items=items_table.id * 2
154154
)
155155

@@ -203,7 +203,7 @@ def transformed_items(dataset: dlt.Dataset) -> Any:
203203
items_table._dlt_load_id <= last_loaded_load_id,
204204
).mutate(double_items=items_table.id * 2)
205205

206-
return transformed_items(dataset)
206+
yield transformed_items(dataset)
207207

208208
# first round
209209
inc_p.run(first_load())
@@ -242,7 +242,7 @@ def test_merge_based_incremental_transform(
242242
def transformed_items(dataset: dlt.Dataset) -> Any:
243243
# return filtered transformation
244244
items_table = dataset.items
245-
return items_table.mutate(double_items=items_table.id * 2)
245+
yield items_table.mutate(double_items=items_table.id * 2)
246246

247247
# first round
248248
inc_p.run(first_load())

tests/load/transformations/test_multidataset_transformations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
def test_combine_two_datasets(fruit_p: dlt.Pipeline, private_fruit_p: dlt.Pipeline) -> None:
1010
@dlt.transformation()
1111
def customers_with_ages(dataset: dlt.Dataset, dataset2: dlt.Dataset) -> Any:
12-
return dataset["customers"].join(
12+
yield dataset["customers"].join(
1313
dataset2["customers_ages"], dataset["customers"].id == dataset2["customers_ages"].id
1414
)
1515

tests/load/transformations/test_transformation_lineage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def test_simple_lineage(
3636
def enriched_purchases(dataset: dlt.Dataset) -> Any:
3737
purchases = dataset["purchases"]
3838
customers = dataset["customers"]
39-
return purchases.join(customers, purchases.customer_id == customers.id)
39+
yield purchases.join(customers, purchases.customer_id == customers.id)
4040

4141
dest_p.run(enriched_purchases(fruit_p.dataset()))
4242

0 commit comments

Comments
 (0)