diff --git a/.github/workflows/flights_liquibase.yml b/.github/workflows/flights_liquibase.yml new file mode 100644 index 0000000..c20f71a --- /dev/null +++ b/.github/workflows/flights_liquibase.yml @@ -0,0 +1,117 @@ +name: Deploy flights_project to TEST with liquibase + +concurrency: 1 + +on: + workflow_dispatch: + + pull_request: + types: + - opened + - synchronize + branches: + - dev + paths: + - "**/*.yml" + - "**/*.py" + + push: + branches: + - feat/liquibase + paths: + - "**/*.yml" + - "**/*.py" + - "**/*.xml" + +jobs: + deploy: + name: "Test and deploy bundle" + runs-on: ubuntu-latest + environment: test + env: + DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN_TST }} + DATABRICKS_CLIENT_ID: ${{ secrets.DATABRICKS_CLIENT_ID }} + DATABRICKS_CLIENT_SECRET: ${{ secrets.DATABRICKS_CLIENT_SECRET }} + DATABRICKS_BUNDLE_ENV: tst + defaults: + run: + working-directory: . + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: '3.12' + + #---------------------------------------------- + # Set up Java (Temurin JDK 8) + #---------------------------------------------- + - name: Set up Java 8 + uses: actions/setup-java@v4 + with: + java-version: '8' + distribution: 'temurin' + + #---------------------------------------------- + # Download Liquibase CLI and make executable + #---------------------------------------------- + - name: Download Liquibase CLI + run: | + LIQUIBASE_VERSION=4.33.0 + wget https://github.com/liquibase/liquibase/releases/download/v${LIQUIBASE_VERSION}/liquibase-${LIQUIBASE_VERSION}.tar.gz + mkdir liquibase + tar -xzf liquibase-${LIQUIBASE_VERSION}.tar.gz -C liquibase + + #---------------------------------------------- + # Download Databricks JDBC and extension JARs + #---------------------------------------------- + - name: Download Databricks JDBC Driver and Liquibase Extension + run: | + # Download Databricks JDBC driver + wget https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/jdbc/2.7.3/DatabricksJDBC42-2.7.3.1010.zip + unzip DatabricksJDBC42-2.7.3.1010.zip + mv DatabricksJDBC-2.7.3.1010/DatabricksJDBC42.jar liquibase/lib + + # Download Liquibase-Databricks extension + wget https://github.com/liquibase/liquibase-databricks/releases/download/v1.4.2/liquibase-databricks-1.4.2.jar + mv liquibase-databricks-1.4.2.jar liquibase/lib + + #---------------------------------------------- + # Run Liquibase connection test + #---------------------------------------------- + - name: Run Liquibase connection test + env: + LIQUIBASE_COMMAND_URL: ${{ secrets.LIQUIBASE_COMMAND_URL }} # alternative: store only the workspace url and compose the Liquibase URL + LIQUIBASE_COMMAND_USERNAME: "token" + LIQUIBASE_COMMAND_PASSWORD: ${{ secrets.DATABRICKS_TOKEN_TST }} + run: | + liquibase/liquibase update --changelog-file=flights/flights-liquibase/liquibase/root.changelog.xml --log-level INFO --defaultsFile=flights/flights-liquibase/liquibase.properties + + #---------------------------------------------- + # Install dependencies and package + #---------------------------------------------- + - run: python -m pip install --upgrade pip + - run: pip install -r flights/flights-liquibase/requirements.txt -r flights/flights-liquibase/dev-requirements.txt + - run: pip install -e flights/flights-liquibase/ + + #---------------------------------------------- + # run test suite + #---------------------------------------------- + - name: Run unit tests + env: + DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN_TST }} + DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }} + run: | + pytest flights/flights-liquibase/tests/ + + - uses: databricks/setup-cli@main + +# - run: cd flights/flights-liquibase; databricks bundle deploy --target test_automated +# env: +# DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }} +# DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN_TST }} +# DATABRICKS_BUNDLE_ENV: tst diff --git a/flights/flights-liquibase/databricks.yml b/flights/flights-liquibase/databricks.yml new file mode 100644 index 0000000..870cea1 --- /dev/null +++ b/flights/flights-liquibase/databricks.yml @@ -0,0 +1,108 @@ +# This is a Databricks asset bundle definition for databricks-dab-examples. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: flights_simple + +include: + - resources/*.yml + - resources/dlt/*.yml + +variables: + catalog: + default: "main" + database: + default: ${resources.schemas.project_schema.name} + flights_dlt_schema: + default: ${resources.schemas.project_schema.name}_dlt + flights_test_schema: + default: "${resources.schemas.project_schema.name}_validation" + spark_version: + default: "15.3.x-scala2.12" + node_type_id: + default: "m6gd.xlarge" + shared_cluster_config: + type: "complex" + default: + spark_version: ${var.spark_version} + node_type_id: ${var.node_type_id} + data_security_mode: USER_ISOLATION + autoscale: + min_workers: 1 + max_workers: 2 + +targets: + # The 'dev' target, used for development purposes. + # Whenever a developer deploys using 'dev', they get their own copy. + dev: + # We use 'mode: development' to make sure everything deployed to this target gets a prefix + # like '[dev my_user_name]'. Setting this mode also disables any schedules and + # automatic triggers for jobs and enables the 'development' mode for Delta Live Tables pipelines. + mode: development + default: true + workspace: + host: https://e2-demo-field-eng.cloud.databricks.com/ + root_path: /Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + # resource override + ## Only create this schema in dev, for other targets manage with Terraform. + + resources: + schemas: + project_schema: + name: flights + catalog_name: ${var.catalog} + comment: "Schema for flight data" + + test: + # For test/staging deployments, we only have a single copy, so we should deploy as + # the same user (Service Principal) ever time. + mode: production + workspace: + host: https://company-tst.cloud.databricks.com/ + root_path: /Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + run_as: + user_name: ${workspace.current_user.userName} + variables: + database: flights_${bundle.target} + flights_dlt_schema: flights_dlt_${bundle.target} + flights_test_schema: flights_validation_${bundle.target} + + test_automated: + # For test/staging deployments, we only have a single copy, so we should deploy as + # the same user (Service Principal) ever time. + mode: production + workspace: + root_path: /Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + run_as: + user_name: ${workspace.current_user.userName} + variables: + database: flights_${bundle.target} + flights_dlt_schema: flights_dlt_${bundle.target} + flights_test_schema: flights_validation_${bundle.target} + node_type_id: Standard_DS3_v2 + + staging: + mode: production + workspace: + host: https://company-stg.cloud.databricks.com/ + root_path: /Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + run_as: + user_name: ${workspace.current_user.userName} + variables: + database: flights_${bundle.target} + flights_dlt_schema: flights_dlt_${bundle.target} + flights_test_schema: flights_validation_${bundle.target} + + # The 'prod' target, used for production deployment. + prod: + mode: production + workspace: + host: https://company-prd.cloud.databricks.com/ + root_path: /Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + run_as: + # This can run as a specific user or service principal in production. + # To run as service principal use service_principal_name (see Databricks documentation). + user_name: ${workspace.current_user.userName} + variables: + database: flights_${bundle.target} + flights_dlt_schema: flights_dlt_${bundle.target} + flights_test_schema: flights_validation_${bundle.target} diff --git a/flights/flights-liquibase/dev-requirements.txt b/flights/flights-liquibase/dev-requirements.txt new file mode 100644 index 0000000..f39f467 --- /dev/null +++ b/flights/flights-liquibase/dev-requirements.txt @@ -0,0 +1,4 @@ +wheel +pytest +databricks-sdk[notebook]>=0.46.0 +pandas==2.2.3 diff --git a/flights/flights-liquibase/liquibase.properties b/flights/flights-liquibase/liquibase.properties new file mode 100644 index 0000000..ab1cee4 --- /dev/null +++ b/flights/flights-liquibase/liquibase.properties @@ -0,0 +1,2 @@ +liquibase.databricks.catalog: liquibase +liquibase.databricks.schema: lr_liquibase_test diff --git a/flights/flights-liquibase/liquibase/changelogs/changelog.xml b/flights/flights-liquibase/liquibase/changelogs/changelog.xml new file mode 100644 index 0000000..01b8cfa --- /dev/null +++ b/flights/flights-liquibase/liquibase/changelogs/changelog.xml @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/flights/flights-liquibase/liquibase/root.changelog.xml b/flights/flights-liquibase/liquibase/root.changelog.xml new file mode 100644 index 0000000..7005b63 --- /dev/null +++ b/flights/flights-liquibase/liquibase/root.changelog.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + diff --git a/flights/flights-liquibase/pyproject.toml b/flights/flights-liquibase/pyproject.toml new file mode 100644 index 0000000..9cc91c4 --- /dev/null +++ b/flights/flights-liquibase/pyproject.toml @@ -0,0 +1,12 @@ +[project] +name = "flights" +version = "0.0.1" +authors = [{"name" = "Databricks Field Eng"}] +description = "Flights project with Liquibase integration" + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +package-dir = {"flights" = "./src/flights"} diff --git a/flights/flights-liquibase/requirements.txt b/flights/flights-liquibase/requirements.txt new file mode 100644 index 0000000..1451eeb --- /dev/null +++ b/flights/flights-liquibase/requirements.txt @@ -0,0 +1 @@ +databricks-connect>=17.0.0 diff --git a/flights/flights-liquibase/resources/flights_notebook_job_serverless.yml b/flights/flights-liquibase/resources/flights_notebook_job_serverless.yml new file mode 100644 index 0000000..19dc074 --- /dev/null +++ b/flights/flights-liquibase/resources/flights_notebook_job_serverless.yml @@ -0,0 +1,17 @@ +# The main job for flights_project +resources: + jobs: + flights_notebook: + name: flights_notebook_job_${bundle.target} + max_concurrent_runs: 1 + + tasks: + - task_key: notebook_task + notebook_task: + notebook_path: ../src/flights_main_notebook.py + base_parameters: + catalog: ${var.catalog} + database: ${var.database} + artifact_path: ${workspace.artifact_path} + max_retries: 0 + diff --git a/flights/flights-liquibase/src/flights/__init__.py b/flights/flights-liquibase/src/flights/__init__.py new file mode 100644 index 0000000..362d74a --- /dev/null +++ b/flights/flights-liquibase/src/flights/__init__.py @@ -0,0 +1 @@ +__version__ = "0.0.1" diff --git a/flights/flights-liquibase/src/flights/flights_main_notebook.py b/flights/flights-liquibase/src/flights/flights_main_notebook.py new file mode 100644 index 0000000..86107df --- /dev/null +++ b/flights/flights-liquibase/src/flights/flights_main_notebook.py @@ -0,0 +1,50 @@ +# Databricks notebook source +dbutils.widgets.text("catalog", "main") +dbutils.widgets.text("database", "flights_dev") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Read csv data (batch mode) + +# COMMAND ---------- + +# DBTITLE 1,Setup vars and functions +from flights.transforms import flight_transforms, shared_transforms +from flights.utils import flight_utils + +from flights.utils import flight_utils + +catalog = dbutils.widgets.get("catalog") +database = dbutils.widgets.get("database") + +path = "/databricks-datasets/airlines" +raw_table_name = f"{catalog}.{database}.flights_raw" + + +# COMMAND ---------- + +# DBTITLE 1,Read raw +df = flight_utils.read_batch(spark, path).limit(1000) +display(df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Transform data + +# COMMAND ---------- +df_transformed = ( + df.transform(flight_transforms.delay_type_transform) + .transform(shared_transforms.add_metadata_columns) + ) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Write raw Delta Lake table (batch mode) + +# COMMAND ---------- + +df_transformed.write.format("delta").mode("append").saveAsTable(raw_table_name) +print(f"Succesfully wrote data to {raw_table_name}") \ No newline at end of file diff --git a/flights/flights-liquibase/src/flights/transforms/flight_transforms.py b/flights/flights-liquibase/src/flights/transforms/flight_transforms.py new file mode 100644 index 0000000..39cc216 --- /dev/null +++ b/flights/flights-liquibase/src/flights/transforms/flight_transforms.py @@ -0,0 +1,21 @@ +"""Python functions to test +These represent Python functions that you would keep in a Python file and import to test. +""" +from pyspark.sql.functions import expr + + +def delay_type_transform(df): + delay_expr = expr( + """case when WeatherDelay != 'NA' + then 'WeatherDelay' + when NASDelay != 'NA' + then 'NASDelay' + when SecurityDelay != 'NA' + then 'SecurityDelay' + when LateAircraftDelay != 'NA' + then 'LateAircraftDelay' + when IsArrDelayed == 'YES' OR IsDepDelayed == 'YES' + then 'UncategorizedDelay' + end + """) + return df.withColumn("delay_type", delay_expr) diff --git a/flights/flights-liquibase/src/flights/transforms/shared_transforms.py b/flights/flights-liquibase/src/flights/transforms/shared_transforms.py new file mode 100644 index 0000000..707f8ed --- /dev/null +++ b/flights/flights-liquibase/src/flights/transforms/shared_transforms.py @@ -0,0 +1,13 @@ +"""Python functions to test +These represent Python functions that you would keep in a Python file and import to test. +""" +from pyspark.sql.functions import current_timestamp, current_date, col + +def add_metadata_columns(df, include_time=True): + if include_time: + df = df.withColumn("last_updated_time", current_timestamp()) + else: + df = df.withColumn("last_updated_date", current_date()) + + df = df.withColumn("source_file", col("_metadata.file_path")) + return df diff --git a/flights/flights-liquibase/src/flights/utils/flight_utils.py b/flights/flights-liquibase/src/flights/utils/flight_utils.py new file mode 100644 index 0000000..30ab63c --- /dev/null +++ b/flights/flights-liquibase/src/flights/utils/flight_utils.py @@ -0,0 +1,62 @@ +from pyspark.sql.types import StructType, StructField, IntegerType, StringType + + +def get_flight_schema(): + schema = StructType([ + StructField("Year", IntegerType(), True), + StructField("Month", IntegerType(), True), + StructField("DayofMonth", IntegerType(), True), + StructField("DayOfWeek", IntegerType(), True), + StructField("DepTime", StringType(), True), + StructField("CRSDepTime", IntegerType(), True), + StructField("ArrTime", StringType(), True), + StructField("CRSArrTime", IntegerType(), True), + StructField("UniqueCarrier", StringType(), True), + StructField("FlightNum", IntegerType(), True), + StructField("TailNum", StringType(), True), + StructField("ActualElapsedTime", StringType(), True), + StructField("CRSElapsedTime", IntegerType(), True), + StructField("AirTime", StringType(), True), + StructField("ArrDelay", StringType(), True), + StructField("DepDelay", StringType(), True), + StructField("Origin", StringType(), True), + StructField("Dest", StringType(), True), + StructField("Distance", StringType(), True), + StructField("TaxiIn", StringType(), True), + StructField("TaxiOut", StringType(), True), + StructField("Cancelled", IntegerType(), True), + StructField("CancellationCode", StringType(), True), + StructField("Diverted", IntegerType(), True), + StructField("CarrierDelay", StringType(), True), + StructField("WeatherDelay", StringType(), True), + StructField("NASDelay", StringType(), True), + StructField("SecurityDelay", StringType(), True), + StructField("LateAircraftDelay", StringType(), True), + StructField("IsArrDelayed", StringType(), True), + StructField("IsDepDelayed", StringType(), True) + ]) + return schema + + +def read_batch(spark, path): + schema = get_flight_schema() + + batch_df = (spark.read.format("csv") + .option("header", "false") + .schema(schema) + .load(path) + ) + + return batch_df + + +def read_autoloader(spark, path): + schema = get_flight_schema() + + streaming_df = spark.readStream.format("cloudFiles") \ + .option("cloudFiles.format", "csv") \ + .option("cloudFiles.includeExistingFiles", "true") \ + .schema(schema) \ + .load(path) + + return streaming_df \ No newline at end of file diff --git a/flights/flights-liquibase/src/flights/utils/shared_utils.py b/flights/flights-liquibase/src/flights/utils/shared_utils.py new file mode 100644 index 0000000..857f18b --- /dev/null +++ b/flights/flights-liquibase/src/flights/utils/shared_utils.py @@ -0,0 +1,5 @@ +def append_to_delta(df, dest_table, streaming=False, checkpoint_location=None): + if not streaming: + df.write.format("delta").mode("append").saveAsTable(dest_table) + else: + df.writeStream.format("delta").outputMode("append").option("checkpointLocation", checkpoint_location).toTable(dest_table) diff --git a/flights/flights-liquibase/tests/unit_transforms/test_flight_transforms.py b/flights/flights-liquibase/tests/unit_transforms/test_flight_transforms.py new file mode 100644 index 0000000..3d3455d --- /dev/null +++ b/flights/flights-liquibase/tests/unit_transforms/test_flight_transforms.py @@ -0,0 +1,51 @@ +import pytest, os +from pyspark.testing.utils import assertDataFrameEqual, assertSchemaEqual + +from flights.transforms import flight_transforms + +@pytest.fixture(scope="module") +def spark_session(): + try: + from databricks.connect import DatabricksSession + # if os.environ.get("DBCONNECT_SERVERLESS", "false").lower() == "true": + # return DatabricksSession.builder.serverless(True).getOrCreate() + # else: + return DatabricksSession.builder.getOrCreate() + # except (ValueError, RuntimeError): + # from databricks.connect import DatabricksSession + # return DatabricksSession.builder.profile("unit_tests").getOrCreate() + except (ImportError, ValueError, RuntimeError): + # else: + print("No Databricks Connect, build and return local SparkSession") + from pyspark.sql import SparkSession + return SparkSession.builder.getOrCreate() + + +def test_delay_type_transform__valid(spark_session): + input_df = spark_session.createDataFrame([ + ["0","NA","NA","NA", "NO", "NO"], + ["NA","0","NA","NA", "NO", "NO"], + ["NA","NA","0","NA", "NO", "NO"], + ["NA","NA","NA", "0", "NO", "NO"], + ["NA","NA","NA","NA", "YES", "NO"], + ["NA","NA","NA","NA", "NO", "YES"], + ["0","0","0","0", "YES", "YES"], + ], ["WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay", "IsArrDelayed", "IsDepDelayed"]) + + expected_data = [ + ['WeatherDelay'], + ['NASDelay'], + ['SecurityDelay'], + ['LateAircraftDelay'], + ['UncategorizedDelay'], + ['UncategorizedDelay'], + ['WeatherDelay'] + ] + + expected_df = spark_session.createDataFrame(expected_data,["delay_type"]) + + result_df = flight_transforms.delay_type_transform(input_df) + + assertDataFrameEqual(result_df.select("delay_type"), expected_df) + + diff --git a/flights/flights-liquibase/tests/unit_transforms/test_shared_transforms.py b/flights/flights-liquibase/tests/unit_transforms/test_shared_transforms.py new file mode 100644 index 0000000..e69de29 diff --git a/flights/flights-liquibase/tests/unit_utils/test_flight_utils.py b/flights/flights-liquibase/tests/unit_utils/test_flight_utils.py new file mode 100644 index 0000000..1fad5c2 --- /dev/null +++ b/flights/flights-liquibase/tests/unit_utils/test_flight_utils.py @@ -0,0 +1,9 @@ +import pytest +from pyspark.testing.utils import assertDataFrameEqual, assertSchemaEqual + +from flights.utils import flight_utils + +def test_get_flight_schema__valid(): + schema = flight_utils.get_flight_schema() + assert schema is not None + assert len(schema) == 31 \ No newline at end of file