diff --git a/knowledge_base/python_decorator/.gitignore b/knowledge_base/python_decorator/.gitignore new file mode 100644 index 0000000..7a657d5 --- /dev/null +++ b/knowledge_base/python_decorator/.gitignore @@ -0,0 +1,3 @@ +.databricks +*/dist +__pycache__ diff --git a/knowledge_base/python_decorator/README.md b/knowledge_base/python_decorator/README.md new file mode 100644 index 0000000..6fc682b --- /dev/null +++ b/knowledge_base/python_decorator/README.md @@ -0,0 +1,78 @@ +# Python decorator + +This example shows how to package a **Python decorator–based job** so you can +call regular Python functions from a Databricks job with minimal boilerplate. + +## How it works + +| File | Purpose | +|----------------------------------------|----------------------------------------------------------------| +| `src/my_project/tasks.py` | Defines decorated task functions. | +| `src/my_project/jobs.py` | Wires those tasks into a `Job` definition. | +| `src/my_project/python_wheel_task.py` | Implements `@python_wheel_task` decorator. | +| `src/my_project/resources.py` | Defines `load_resources` function to load `Job` definitions. | + +### Creating a task function + +```python +# src/my_project/tasks.py +from my_project.python_wheel_task import python_wheel_task + + +@python_wheel_task +def get_message() -> None: + from databricks.sdk.runtime import dbutils + + # Makes the value available to downstream tasks as + # '{{ tasks..values.message }}' + dbutils.jobs.taskValues.set("message", "Hello World") +``` + +### Referencing the task in a job + +```python +# src/my_project/jobs.py +from my_project.tasks import get_message, print_message +from databricks.sdk.service import Job, Task + +my_job = Job( + name="Python Decorator Example", + tasks=[ + Task( + task_key="get_message", + python_wheel_task=get_message(), + ... + ), + Task( + task_key="print_message", + python_wheel_task=print_message( + message="{{ tasks.get_message.values.message }}" + ), + ... + ), + ... + ], +) +``` + +## Deploying the example + +1. Create a virtual environment and install `databricks-bundles` package into it: + +```bash + python3 -m venv .venv + .venv/bin/pip3 install databricks-bundles +``` + +2. Update the `host` field under `workspace` in `databricks.yml` to the Databricks workspace you wish to deploy to. + +3. Run `databricks bundle deploy` to upload the wheel and deploy the job. + +4. Run `databricks bundle run` to run either job. + +## Cleaning up +To remove all assets created by this example: + +```bash + databricks bundle destroy +``` diff --git a/knowledge_base/python_decorator/databricks.yml b/knowledge_base/python_decorator/databricks.yml new file mode 100644 index 0000000..36a2bfc --- /dev/null +++ b/knowledge_base/python_decorator/databricks.yml @@ -0,0 +1,21 @@ +bundle: + name: my_project + +experimental: + python: + venv_path: ".venv" + resources: + - "my_project.resources:load_resources" + +artifacts: + default: + type: whl + build: pip3 wheel -w dist . + +workspace: + host: https://myworkspace.databricks.com + +targets: + dev: + default: true + mode: development diff --git a/knowledge_base/python_decorator/pyproject.toml b/knowledge_base/python_decorator/pyproject.toml new file mode 100644 index 0000000..8d8529c --- /dev/null +++ b/knowledge_base/python_decorator/pyproject.toml @@ -0,0 +1,14 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "my_project" +requires-python = ">=3.10" +version = "0.0.1" + +[project.entry-points.packages] +python_wheel_task = "my_project.python_wheel_task:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/knowledge_base/python_decorator/src/my_project/__init__.py b/knowledge_base/python_decorator/src/my_project/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/python_decorator/src/my_project/jobs.py b/knowledge_base/python_decorator/src/my_project/jobs.py new file mode 100644 index 0000000..62bf9bf --- /dev/null +++ b/knowledge_base/python_decorator/src/my_project/jobs.py @@ -0,0 +1,42 @@ +from databricks.bundles.jobs import ( + Environment, + Job, + JobEnvironment, + Task, + TaskDependency, +) + +from my_project.tasks import get_message, print_message + +my_job = Job( + name="Python Decorator Example", + tasks=[ + Task( + task_key="get_message", + python_wheel_task=get_message(), + # use serverless, alternatively, you can use 'libraries' with 'job_cluster_key' + environment_key="Default", + ), + Task( + task_key="print_message", + environment_key="Default", + python_wheel_task=print_message( + message="{{ tasks.get_message.values.message }}" + ), + depends_on=[ + TaskDependency(task_key="get_message"), + ], + ), + ], + environments=[ + JobEnvironment( + environment_key="Default", + spec=Environment( + client="2", + dependencies=[ + "dist/*.whl", + ], + ), + ) + ], +) diff --git a/knowledge_base/python_decorator/src/my_project/python_wheel_task.py b/knowledge_base/python_decorator/src/my_project/python_wheel_task.py new file mode 100644 index 0000000..ddc99d4 --- /dev/null +++ b/knowledge_base/python_decorator/src/my_project/python_wheel_task.py @@ -0,0 +1,87 @@ +import sys +import inspect +import importlib + +from dataclasses import dataclass +from argparse import ArgumentParser +from typing import Callable, Generic, ParamSpec, TYPE_CHECKING + +if TYPE_CHECKING: + from databricks.bundles.jobs import PythonWheelTask + +_P = ParamSpec("_P") + + +@dataclass +class PythonWheelTaskFunction(Generic[_P]): + package_name: str + entry_point: str + func: Callable[_P, None] + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> "PythonWheelTask": + from databricks.bundles.jobs import PythonWheelTask + + if args: + raise ValueError("Only keyword arguments are supported") + + func_full_name = f"{self.func.__module__}:{self.func.__name__}" + parameters: list = [func_full_name] + [f"--{k}={v}" for k, v in kwargs.items()] + + return PythonWheelTask( + package_name=self.package_name, + entry_point=self.entry_point, + parameters=parameters, + ) + + +def python_wheel_task(func: Callable[_P, None]) -> Callable[_P, "PythonWheelTask"]: + if "." in func.__qualname__: + raise ValueError( + "Only function at module top level can be used as a task entry point" + ) + + return PythonWheelTaskFunction[_P]( + # must match 'project.name' in pyproject.toml + package_name="my_project", + # entry_point must be defined in 'project.entry-points.packages' in pyproject.toml + entry_point="python_wheel_task", + func=func, + ) + + +def main(): + """ + Entry point for the Python wheel task. + + This function is referenced through entry_point and package_name parameters in the PythonWheelTask, + and entry point is defined in 'project.entry-points.packages' in pyproject.toml. + """ + + func_full_name = sys.argv[1] + func_module_name, func_name = func_full_name.split(":") + + func_module = importlib.import_module(func_module_name) + decorated_func: PythonWheelTaskFunction = getattr(func_module, func_name) + func = decorated_func.func + + parser = ArgumentParser() + for param in inspect.signature(func).parameters.values(): + if param.annotation is not inspect.Parameter.empty: + parser.add_argument( + f"--{param.name}", + type=param.annotation, + required=param.default is inspect.Parameter.empty, + help=f"Argument {param.name} of type {param.annotation}", + ) + else: + parser.add_argument( + f"--{param.name}", + type=str, + required=param.default is inspect.Parameter.empty, + help=f"Argument {param.name} of type str", + ) + + ns = parser.parse_args(sys.argv[2:]) + kwargs = vars(ns) + + func(**kwargs) diff --git a/knowledge_base/python_decorator/src/my_project/resources.py b/knowledge_base/python_decorator/src/my_project/resources.py new file mode 100644 index 0000000..9a47494 --- /dev/null +++ b/knowledge_base/python_decorator/src/my_project/resources.py @@ -0,0 +1,7 @@ +from databricks.bundles.core import Resources, load_resources_from_package_module + +import my_project.jobs + + +def load_resources() -> Resources: + return load_resources_from_package_module(my_project.jobs) diff --git a/knowledge_base/python_decorator/src/my_project/tasks.py b/knowledge_base/python_decorator/src/my_project/tasks.py new file mode 100644 index 0000000..ee22570 --- /dev/null +++ b/knowledge_base/python_decorator/src/my_project/tasks.py @@ -0,0 +1,15 @@ +from my_project.python_wheel_task import python_wheel_task + + +@python_wheel_task +def get_message() -> None: + from databricks.sdk.runtime import dbutils + + # Makes the value available to downstream tasks as + # '{{ tasks..values.message }}' + dbutils.jobs.taskValues.set("message", "Hello World") + + +@python_wheel_task +def print_message(message: str) -> None: + print(f"Message: {message}")