Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions knowledge_base/python_decorator/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.databricks
*/dist
__pycache__
78 changes: 78 additions & 0 deletions knowledge_base/python_decorator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Python decorator

This example shows how to package a **Python decorator–based job** so you can
call regular Python functions from a Databricks job with minimal boilerplate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make it more clear this is about Python support for DABs?

E.g. include a doc link in this paragraph.


## How it works

| File | Purpose |
|----------------------------------------|----------------------------------------------------------------|
| `src/my_project/tasks.py` | Defines decorated task functions. |
| `src/my_project/jobs.py` | Wires those tasks into a `Job` definition. |
| `src/my_project/python_wheel_task.py` | Implements `@python_wheel_task` decorator. |
| `src/my_project/resources.py` | Defines `load_resources` function to load `Job` definitions. |

### Creating a task function

```python
# src/my_project/tasks.py
from my_project.python_wheel_task import python_wheel_task


@python_wheel_task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this mini-framework, should this not just be called @task? In a more extensive version, we could always decide to use subtypes like @task.sql or move to top-level task types like @sql_file_task.

def get_message() -> None:
from databricks.sdk.runtime import dbutils

# Makes the value available to downstream tasks as
# '{{ tasks.<task_key>.values.message }}'
dbutils.jobs.taskValues.set("message", "Hello World")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this example do something Sparky, like spark.read.table("samples.nyctaxi.trips")?

```

### Referencing the task in a job

```python
# src/my_project/jobs.py
from my_project.tasks import get_message, print_message
from databricks.sdk.service import Job, Task

my_job = Job(
name="Python Decorator Example",
tasks=[
Task(
task_key="get_message",
python_wheel_task=get_message(),
...
),
Task(
task_key="print_message",
python_wheel_task=print_message(
message="{{ tasks.get_message.values.message }}"
),
...
),
...
],
)
```

## Deploying the example

1. Create a virtual environment and install `databricks-bundles` package into it:

```bash
python3 -m venv .venv
.venv/bin/pip3 install databricks-bundles
```

2. Update the `host` field under `workspace` in `databricks.yml` to the Databricks workspace you wish to deploy to.

3. Run `databricks bundle deploy` to upload the wheel and deploy the job.

4. Run `databricks bundle run` to run either job.

## Cleaning up
To remove all assets created by this example:

```bash
databricks bundle destroy
```
21 changes: 21 additions & 0 deletions knowledge_base/python_decorator/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
bundle:
name: my_project

experimental:
python:
venv_path: ".venv"
resources:
- "my_project.resources:load_resources"

artifacts:
default:
type: whl
build: pip3 wheel -w dist .

workspace:
host: https://myworkspace.databricks.com

targets:
dev:
default: true
mode: development
14 changes: 14 additions & 0 deletions knowledge_base/python_decorator/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[project]
name = "my_project"
requires-python = ">=3.10"
version = "0.0.1"

[project.entry-points.packages]
python_wheel_task = "my_project.python_wheel_task:main"

[tool.setuptools.packages.find]
where = ["src"]
Empty file.
42 changes: 42 additions & 0 deletions knowledge_base/python_decorator/src/my_project/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from databricks.bundles.jobs import (
Environment,
Job,
JobEnvironment,
Task,
TaskDependency,
)

from my_project.tasks import get_message, print_message

my_job = Job(
name="Python Decorator Example",
tasks=[
Task(
task_key="get_message",
python_wheel_task=get_message(),
# use serverless, alternatively, you can use 'libraries' with 'job_cluster_key'
environment_key="Default",
),
Task(
task_key="print_message",
environment_key="Default",
python_wheel_task=print_message(
message="{{ tasks.get_message.values.message }}"
),
depends_on=[
TaskDependency(task_key="get_message"),
],
),
],
environments=[
JobEnvironment(
environment_key="Default",
spec=Environment(
client="2",
dependencies=[
"dist/*.whl",
],
),
)
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import sys
import inspect
import importlib

from dataclasses import dataclass
from argparse import ArgumentParser
from typing import Callable, Generic, ParamSpec, TYPE_CHECKING

if TYPE_CHECKING:
from databricks.bundles.jobs import PythonWheelTask

_P = ParamSpec("_P")


@dataclass
class PythonWheelTaskFunction(Generic[_P]):
package_name: str
entry_point: str
func: Callable[_P, None]

def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> "PythonWheelTask":
from databricks.bundles.jobs import PythonWheelTask

if args:
raise ValueError("Only keyword arguments are supported")

func_full_name = f"{self.func.__module__}:{self.func.__name__}"
parameters: list = [func_full_name] + [f"--{k}={v}" for k, v in kwargs.items()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With string conversion + argparse turning it from a string back into the type that the function accepts, does this limit the function argument types in a particular way?


return PythonWheelTask(
package_name=self.package_name,
entry_point=self.entry_point,
parameters=parameters,
)


def python_wheel_task(func: Callable[_P, None]) -> Callable[_P, "PythonWheelTask"]:
if "." in func.__qualname__:
raise ValueError(
"Only function at module top level can be used as a task entry point"
)

return PythonWheelTaskFunction[_P](
# must match 'project.name' in pyproject.toml
package_name="my_project",
# entry_point must be defined in 'project.entry-points.packages' in pyproject.toml
entry_point="python_wheel_task",
func=func,
)


def main():
"""
Entry point for the Python wheel task.

This function is referenced through entry_point and package_name parameters in the PythonWheelTask,
and entry point is defined in 'project.entry-points.packages' in pyproject.toml.
"""

func_full_name = sys.argv[1]
func_module_name, func_name = func_full_name.split(":")

func_module = importlib.import_module(func_module_name)
decorated_func: PythonWheelTaskFunction = getattr(func_module, func_name)
func = decorated_func.func

parser = ArgumentParser()
for param in inspect.signature(func).parameters.values():
if param.annotation is not inspect.Parameter.empty:
parser.add_argument(
f"--{param.name}",
type=param.annotation,
required=param.default is inspect.Parameter.empty,
help=f"Argument {param.name} of type {param.annotation}",
)
else:
parser.add_argument(
f"--{param.name}",
type=str,
required=param.default is inspect.Parameter.empty,
help=f"Argument {param.name} of type str",
)

ns = parser.parse_args(sys.argv[2:])
kwargs = vars(ns)

func(**kwargs)
7 changes: 7 additions & 0 deletions knowledge_base/python_decorator/src/my_project/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from databricks.bundles.core import Resources, load_resources_from_package_module

import my_project.jobs


def load_resources() -> Resources:
return load_resources_from_package_module(my_project.jobs)
15 changes: 15 additions & 0 deletions knowledge_base/python_decorator/src/my_project/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from my_project.python_wheel_task import python_wheel_task


@python_wheel_task
def get_message() -> None:
from databricks.sdk.runtime import dbutils

# Makes the value available to downstream tasks as
# '{{ tasks.<task_key>.values.message }}'
dbutils.jobs.taskValues.set("message", "Hello World")


@python_wheel_task
def print_message(message: str) -> None:
print(f"Message: {message}")