Skip to content

Commit f86b32c

Browse files
Data engineering template (#45)
# Summary This adds a data engineering template. * ETL pipelines have their own folder structure as seen under https://github.com/databricks/bundle-examples/tree/data-engineering/contrib/templates/data-engineering/assets/etl-pipeline/template/assets/%7B%7B.pipeline_name%7D%7D * uv is used to manage packages and scripts * `uv run add-asset` can be used to add a pipeline to a project * `uv run test` can be used to run all tests on serverless compute * `uv run pytest` can be used to run all tests on any type of compute as configured in the current IDE or .databrickscfg settings # How to try this You can give the template a try using ``` databricks bundle init https://github.com/databricks/bundle-examples --template-dir contrib/templates/data-engineering ``` Note that each pipeline has a separate template. New pipelines can be added by using the short-hand `uv run add-asset` or by manually instantiating the pipeline template with ``` databricks bundle init https://github.com/databricks/bundle-examples --template-dir contrib/templates/data-engineering/assets/etl-pipeline ``` --------- Co-authored-by: Pieter Noordhuis <[email protected]>
1 parent c9d4b1b commit f86b32c

File tree

26 files changed

+584
-0
lines changed

26 files changed

+584
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# data-engineering template
2+
3+
This template introduces a new structure for organizing data-engineering
4+
assets in DABs.
5+
6+
Install it using
7+
8+
```
9+
databricks bundle init https://github.com/databricks/bundle-examples --template-dir contrib/templates/data-engineering
10+
```
11+
12+
Note that by default this template doesn't come with any assets such as jobs or pipelines.
13+
Follow the instructions in the template setup and README to add them.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
{
2+
"welcome_message": "\nWelcome to the data-engineering pipeline template!",
3+
"properties": {
4+
"pipeline_name": {
5+
"type": "string",
6+
"description": "\nPlease provide the name of the pipeline to generate.\npipeline_name",
7+
"default": "etl_pipeline",
8+
"order": 1
9+
},
10+
"format": {
11+
"type": "string",
12+
"description": "\nPlease select the format to use to define this pipeline.\nformat",
13+
"order": 2,
14+
"enum": [
15+
"python files",
16+
"sql files",
17+
"notebooks"
18+
],
19+
"default": "python files"
20+
},
21+
"only_python_files_supported": {
22+
"skip_prompt_if": {
23+
"properties": {
24+
"format": {
25+
"pattern": "python files"
26+
}
27+
}
28+
},
29+
"default": "ignored",
30+
"type": "string",
31+
"description": "{{fail \"Only Python files are supported in this template at this time.\"}}",
32+
"order": 3
33+
},
34+
"include_job": {
35+
"type": "string",
36+
"description": "\nWould you like to include a job that automatically triggers this pipeline?\nThis trigger will only be enabled for production deployments.\ninclude_job",
37+
"order": 4,
38+
"enum": [
39+
"yes",
40+
"no"
41+
],
42+
"default": "yes"
43+
}
44+
},
45+
"success_message": "\n\n🪠 New pipeline definition generated under 'assets/{{.pipeline_name}}'!"
46+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# This is the entry point for the {{.pipeline_name}} pipeline.
2+
# It makes sure all transformations in the transformations directory are included.
3+
import transformations
4+
5+
__all__ = ["transformations"]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# explorations
2+
3+
This folder is reserved for personal, exploratory notebooks.
4+
By default these are not committed to Git, as 'explorations' is listed in .gitignore.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"metadata": {
7+
"application/vnd.databricks.v1+cell": {
8+
"cellMetadata": {
9+
"byteLimit": 2048000,
10+
"rowLimit": 10000
11+
},
12+
"inputWidgets": {},
13+
"nuid": "6bca260b-13d1-448f-8082-30b60a85c9ae",
14+
"showTitle": false,
15+
"title": ""
16+
}
17+
},
18+
"outputs": [],
19+
"source": [
20+
"import sys\n",
21+
"sys.path.append('..')\n",
22+
"from {{.pipeline_name}}.transformations import taxi_stats\n",
23+
"\n",
24+
"\n",
25+
"spark = SparkSession.builder.getOrCreate()\n",
26+
"spark.sql('SELECT * FROM taxi_stats').show()"
27+
]
28+
}
29+
],
30+
"metadata": {
31+
"application/vnd.databricks.v1+notebook": {
32+
"dashboards": [],
33+
"language": "python",
34+
"notebookMetadata": {
35+
"pythonIndentUnit": 2
36+
},
37+
"notebookName": "ipynb-notebook",
38+
"widgets": {}
39+
},
40+
"kernelspec": {
41+
"display_name": "Python 3",
42+
"language": "python",
43+
"name": "python3"
44+
},
45+
"language_info": {
46+
"name": "python",
47+
"version": "3.11.4"
48+
}
49+
},
50+
"nbformat": 4,
51+
"nbformat_minor": 0
52+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import dlt
2+
from pyspark.sql import DataFrame
3+
from databricks.sdk.runtime import spark
4+
5+
6+
@dlt.view(comment="Small set of taxis for development (uses LIMIT 10)")
7+
def taxis() -> DataFrame:
8+
return spark.sql("SELECT * FROM samples.nyctaxi.trips LIMIT 10")
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import dlt
2+
from pyspark.sql import DataFrame
3+
from databricks.sdk.runtime import spark
4+
5+
6+
@dlt.view
7+
def taxis() -> DataFrame:
8+
return spark.sql("SELECT * FROM samples.nyctaxi.trips")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from ..sources.dev.taxis import taxis
2+
from ..transformations import taxi_stats
3+
4+
5+
def test_taxi_stats():
6+
result = taxi_stats.filter_taxis(taxis())
7+
assert len(result.collect()) > 5
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# __init__.py defines the 'transformations' Python package
2+
import importlib
3+
import pkgutil
4+
5+
6+
# Import all modules in the package except those starting with '_', like '__init__.py'
7+
for _, module_name, _ in pkgutil.iter_modules(__path__):
8+
if not module_name.startswith("_"):
9+
importlib.import_module(f"{__name__}.{module_name}")
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import dlt
2+
from pyspark.sql.functions import to_date, count
3+
from pyspark.sql import DataFrame
4+
5+
6+
@dlt.table(comment="Daily statistics of NYC Taxi trips")
7+
def taxi_stats() -> DataFrame:
8+
"""Read from the 'taxis' view from etl_pipeline/sources."""
9+
taxis = dlt.read("taxis")
10+
11+
return filter_taxis(taxis)
12+
13+
14+
def filter_taxis(taxis: DataFrame) -> DataFrame:
15+
"""Group by date and calculate the number of trips."""
16+
return (
17+
taxis.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
18+
.groupBy("pickup_date")
19+
.agg(count("*").alias("number_of_trips"))
20+
)

0 commit comments

Comments
 (0)