From c6f464e6c7de2e52c7f63c7edb39dbc5f3627e2b Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Wed, 29 Jan 2025 12:10:44 -0800 Subject: [PATCH 1/9] Checkin example multi-region-serving --- multi_region_serving/.gitignore | 2 + multi_region_serving/README.md | 51 +++ multi_region_serving/databricks.yml | 26 ++ multi_region_serving/requirements-dev.txt | 29 ++ .../resources/manage_serving.job.yml | 18 ++ .../resources/manage_share.job.yml | 18 ++ multi_region_serving/scratch/README.md | 4 + multi_region_serving/setup.py | 37 +++ multi_region_serving/src/lib/rest_client.py | 22 ++ .../src/manage_endpoint.ipynb | 297 ++++++++++++++++++ multi_region_serving/src/manage_share.ipynb | 222 +++++++++++++ 11 files changed, 726 insertions(+) create mode 100644 multi_region_serving/.gitignore create mode 100644 multi_region_serving/README.md create mode 100644 multi_region_serving/databricks.yml create mode 100644 multi_region_serving/requirements-dev.txt create mode 100644 multi_region_serving/resources/manage_serving.job.yml create mode 100644 multi_region_serving/resources/manage_share.job.yml create mode 100644 multi_region_serving/scratch/README.md create mode 100644 multi_region_serving/setup.py create mode 100644 multi_region_serving/src/lib/rest_client.py create mode 100644 multi_region_serving/src/manage_endpoint.ipynb create mode 100644 multi_region_serving/src/manage_share.ipynb diff --git a/multi_region_serving/.gitignore b/multi_region_serving/.gitignore new file mode 100644 index 0000000..de811f1 --- /dev/null +++ b/multi_region_serving/.gitignore @@ -0,0 +1,2 @@ + +.databricks diff --git a/multi_region_serving/README.md b/multi_region_serving/README.md new file mode 100644 index 0000000..7211743 --- /dev/null +++ b/multi_region_serving/README.md @@ -0,0 +1,51 @@ +# Multi-region Serving + +This Databricks Asset Bundle (DAB) is an example tool used to sync resources between main +workspaces and remote workspaces to simplify the workflow for serving models or features +across multiple regions. + +## How to use this example +1. Download this example + +2. Make changes as needed. Some files to highlight: + * resources/*.job.yml - Job metadata, including parameters. + * src/manage_endpoint.ipynb - Notebook for create / update serving endpoints. + * src/manage_share.ipynb - Notebook for syncing dependencies of a shared model. + * databricks.yml - DAB bundle configuration including target name and workspace URL. + +## How to trigger the workflows + +1. Install the Databricks CLI from https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +2. Authenticate to your Databricks workspaces, if you have not done so already: + ``` + $ databricks configure + ``` + +3. To deploy a copy to your main workspace: + ``` + $ databricks bundle deploy --target main + ``` + (Note that "main" is the target name defined in databricks.yml) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] manage_serving_job` to your workspace. + You can find that job by opening your workpace and clicking on **Workflows**. + +4. Similarly, to deploy a remote workspace, type: + ``` + $ databricks bundle -t remote1 -p deploy + ``` + + Use `-p` to specify the databricks profile used by this command. The profile need to be + configured in `~/.databrickscfg`. + +5. To run the workflow to sync a share, use the "run" command: + ``` + $ databricks bundle -t main -p run manage_share_job + ``` + +6. For documentation on the Databricks asset bundles format used + for this project, and for CI/CD configuration, see + https://docs.databricks.com/dev-tools/bundles/index.html. diff --git a/multi_region_serving/databricks.yml b/multi_region_serving/databricks.yml new file mode 100644 index 0000000..d6e001e --- /dev/null +++ b/multi_region_serving/databricks.yml @@ -0,0 +1,26 @@ +# This is a Databricks asset bundle definition for manage_serving. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: manage_serving + +include: + - resources/*.yml + +targets: + main: + # The default target uses 'mode: development' to create a development copy. + # - Deployed resources get prefixed with '[dev my_user_name]' + # - Any job schedules and triggers are paused by default. + # See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html. + mode: development + default: true + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com + + remote1: + # The remote workspace that serves the model + mode: development + workspace: + host: https://e2-dogfood-feature-store.staging.cloud.databricks.com + + diff --git a/multi_region_serving/requirements-dev.txt b/multi_region_serving/requirements-dev.txt new file mode 100644 index 0000000..0ffbf6a --- /dev/null +++ b/multi_region_serving/requirements-dev.txt @@ -0,0 +1,29 @@ +## requirements-dev.txt: dependencies for local development. +## +## For defining dependencies used by jobs in Databricks Workflows, see +## https://docs.databricks.com/dev-tools/bundles/library-dependencies.html + +## Add code completion support for DLT +databricks-dlt + +## pytest is the default package used for testing +pytest + +## Dependencies for building wheel files +setuptools +wheel + +## databricks-connect can be used to run parts of this project locally. +## See https://docs.databricks.com/dev-tools/databricks-connect.html. +## +## databricks-connect is automatically installed if you're using Databricks +## extension for Visual Studio Code +## (https://docs.databricks.com/dev-tools/vscode-ext/dev-tasks/databricks-connect.html). +## +## To manually install databricks-connect, either follow the instructions +## at https://docs.databricks.com/dev-tools/databricks-connect.html +## to install the package system-wide. Or uncomment the line below to install a +## version of db-connect that corresponds to the Databricks Runtime version used +## for this project. +# +# databricks-connect>=15.4,<15.5 diff --git a/multi_region_serving/resources/manage_serving.job.yml b/multi_region_serving/resources/manage_serving.job.yml new file mode 100644 index 0000000..a79e89d --- /dev/null +++ b/multi_region_serving/resources/manage_serving.job.yml @@ -0,0 +1,18 @@ +resources: + jobs: + manage_serving_job: + name: manage_serving_job + email_notifications: + on_failure: + - mingyang.ge@databricks.com + tasks: + - task_key: notebook_task + notebook_task: + notebook_path: ../src/manage_endpoint.ipynb + parameters: + - name: endpoint_name + default: my-share-endpoint + - name: model_name + default: mingyang_share.sharing_bugbash_a7ce.fs_model + - name: model_version + default: "2" diff --git a/multi_region_serving/resources/manage_share.job.yml b/multi_region_serving/resources/manage_share.job.yml new file mode 100644 index 0000000..24edaeb --- /dev/null +++ b/multi_region_serving/resources/manage_share.job.yml @@ -0,0 +1,18 @@ +resources: + jobs: + manage_share_job: + name: manage_share_job + email_notifications: + on_failure: + - mingyang.ge@databricks.com + tasks: + - task_key: notebook_task + notebook_task: + notebook_path: ../src/manage_share.ipynb + parameters: + - name: model_name + default: 'feature_store.sharing_bugbash_a7ce.fs_model' + - name: max_number_of_versions_to_sync + default: '10' + - name: share_name + default: 'mingyang-test-share' \ No newline at end of file diff --git a/multi_region_serving/scratch/README.md b/multi_region_serving/scratch/README.md new file mode 100644 index 0000000..e6cfb81 --- /dev/null +++ b/multi_region_serving/scratch/README.md @@ -0,0 +1,4 @@ +# scratch + +This folder is reserved for personal, exploratory notebooks. +By default these are not committed to Git, as 'scratch' is listed in .gitignore. diff --git a/multi_region_serving/setup.py b/multi_region_serving/setup.py new file mode 100644 index 0000000..0d63c7e --- /dev/null +++ b/multi_region_serving/setup.py @@ -0,0 +1,37 @@ +""" +setup.py configuration script describing how to build and package this project. + +This file is primarily used by the setuptools library and typically should not +be executed directly. See README.md for how to deploy, test, and run +the manage_serving project. +""" +from setuptools import setup, find_packages + +import sys +sys.path.append('./src') + +import datetime +import lib.rest_client as rest_client + +setup( + name="manage_serving", + # We use timestamp as Local version identifier (https://peps.python.org/pep-0440/#local-version-identifiers.) + # to ensure that changes to wheel package are picked up when used on all-purpose clusters + version=rest_client.__version__ + "+" + datetime.datetime.utcnow().strftime("%Y%m%d.%H%M%S"), + url="https://databricks.com", + author="mingyang.ge@databricks.com", + description="wheel file based on manage_serving/src", + packages=find_packages(where='./src'), + package_dir={'': 'src'}, + entry_points={ + "packages": [ + "main=manage_serving.main:main" + ] + }, + install_requires=[ + # Dependencies in case the output wheel file is used as a library dependency. + # For defining dependencies, when this package is used in Databricks, see: + # https://docs.databricks.com/dev-tools/bundles/library-dependencies.html + "setuptools" + ], +) diff --git a/multi_region_serving/src/lib/rest_client.py b/multi_region_serving/src/lib/rest_client.py new file mode 100644 index 0000000..639612d --- /dev/null +++ b/multi_region_serving/src/lib/rest_client.py @@ -0,0 +1,22 @@ +import urllib.request +import json +from databricks.sdk.runtime import spark + +class RestClient: + def __init__(self, context): + self.base_url = "https://" + spark.conf.get("spark.databricks.workspaceUrl") + self.token = context.apiToken().get() + + def get_share_info(self, share_name:str): + return self._get(f'api/2.1/unity-catalog/shares/{share_name}?include_shared_data=true') + + def _get(self, uri): + url = f'{self.base_url}/{uri}' + headers = { 'Authorization': f'Bearer {self.token}'} + req = urllib.request.Request(url, headers=headers) + try: + response = urllib.request.urlopen(req) + return json.load(response) + except urllib.error.HTTPError as e: + result = e.read().decode() + print((e.code, result)) \ No newline at end of file diff --git a/multi_region_serving/src/manage_endpoint.ipynb b/multi_region_serving/src/manage_endpoint.ipynb new file mode 100644 index 0000000..13dfa82 --- /dev/null +++ b/multi_region_serving/src/manage_endpoint.ipynb @@ -0,0 +1,297 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ee353e42-ff58-4955-9608-12865bd0950e", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Update Model Serving Endpoint\n", + "\n", + "Update the deployed serving endpoints with a new model version." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install databricks-sdk>=0.38.0\n", + "%restart_python" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "da99ca57-cba0-4d67-a5a9-75d6a7884311", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "dbutils.widgets.text(\"endpoint_name\", defaultValue=\"\")\n", + "dbutils.widgets.text(\"model_name\", defaultValue=\"\")\n", + "dbutils.widgets.text(\"model_version\", defaultValue=\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "5f2d79fc-86ab-4208-8970-b5547dc75820", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'endpoint_name': 'test-my-dab-endpoint', 'model_name': 'feature_store.sharing_bugbash_a7ce.fs_model', 'model_version': '2'}\n" + ] + } + ], + "source": [ + "ARGS = dbutils.widgets.getAll()\n", + "\n", + "endpoint_name = ARGS[\"endpoint_name\"]\n", + "model_name = ARGS['model_name']\n", + "model_version = ARGS['model_version']" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "43d66a63-39ea-436b-a82b-dd27eb479e40", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "from databricks.sdk import WorkspaceClient\n", + "from databricks.sdk.service.serving import ServedEntityInput, EndpointCoreConfigInput\n", + "from databricks.sdk.errors import ResourceDoesNotExist\n", + "\n", + "workspace = WorkspaceClient()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3fe1e5b8-76ca-496b-890f-1cc584d29b38", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "A pending update for endpoint test-my-dab-endpoint is being processed.\n" + ] + } + ], + "source": [ + "try:\n", + " endpoint = workspace.serving_endpoints.get(name=endpoint_name)\n", + "except ResourceDoesNotExist as e:\n", + " endpoint = None\n", + "\n", + "if endpoint is None:\n", + " workspace.serving_endpoints.create(\n", + " name=endpoint_name,\n", + " config=EndpointCoreConfigInput(\n", + " served_entities=[\n", + " ServedEntityInput(\n", + " entity_name=model_name,\n", + " entity_version=model_version,\n", + " scale_to_zero_enabled=True,\n", + " workload_size=\"Small\"\n", + " )\n", + " ]\n", + " )\n", + " )\n", + " print(f\"Created endpoint {endpoint_name}\")\n", + "elif endpoint.pending_config is not None:\n", + " print(f\"A pending update for endpoint {endpoint_name} is being processed.\")\n", + "elif endpoint.config.served_entities[0].entity_name != model_name or endpoint.config.served_entities[0].entity_version != model_version:\n", + " # Update endpoint\n", + " workspace.serving_endpoints.update_config(\n", + " name=endpoint_name,\n", + " served_entities=[\n", + " ServedEntityInput(\n", + " entity_name=model_name,\n", + " entity_version=model_version,\n", + " scale_to_zero_enabled=True,\n", + " workload_size=\"Small\"\n", + " )\n", + " ]\n", + " )\n", + " print(f\"Updated endpoint {endpoint_name}\")\n", + "else:\n", + " print(\"Endpoint already up-to-date\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b0520726-0ce8-4802-85b2-431ec9d02bc3", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "manage_endpoint", + "widgets": { + "endpoint_name": { + "currentValue": "test-my-dab-endpoint", + "nuid": "c116073e-3f87-45c6-9fe0-42bc21d624f4", + "typedWidgetInfo": { + "autoCreated": false, + "defaultValue": "", + "label": null, + "name": "endpoint_name", + "options": { + "validationRegex": null, + "widgetDisplayType": "Text" + }, + "parameterDataType": "String" + }, + "widgetInfo": { + "defaultValue": "", + "label": null, + "name": "endpoint_name", + "options": { + "autoCreated": null, + "validationRegex": null, + "widgetType": "text" + }, + "widgetType": "text" + } + }, + "model_name": { + "currentValue": "feature_store.sharing_bugbash_a7ce.fs_model", + "nuid": "9817611f-00f2-45f0-a61f-9a88132afd1b", + "typedWidgetInfo": { + "autoCreated": false, + "defaultValue": "", + "label": null, + "name": "model_name", + "options": { + "validationRegex": null, + "widgetDisplayType": "Text" + }, + "parameterDataType": "String" + }, + "widgetInfo": { + "defaultValue": "", + "label": null, + "name": "model_name", + "options": { + "autoCreated": null, + "validationRegex": null, + "widgetType": "text" + }, + "widgetType": "text" + } + }, + "model_version": { + "currentValue": "2", + "nuid": "d437ed85-d52e-4251-8896-b64d9a39f576", + "typedWidgetInfo": { + "autoCreated": false, + "defaultValue": "", + "label": null, + "name": "model_version", + "options": { + "validationRegex": null, + "widgetDisplayType": "Text" + }, + "parameterDataType": "String" + }, + "widgetInfo": { + "defaultValue": "", + "label": null, + "name": "model_version", + "options": { + "autoCreated": null, + "validationRegex": null, + "widgetType": "text" + }, + "widgetType": "text" + } + } + } + }, + "kernelspec": { + "display_name": "dbconnect", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/multi_region_serving/src/manage_share.ipynb b/multi_region_serving/src/manage_share.ipynb new file mode 100644 index 0000000..fed9538 --- /dev/null +++ b/multi_region_serving/src/manage_share.ipynb @@ -0,0 +1,222 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Sync the share and add all required resources" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "%pip install databricks-sdk>=0.38.0\n", + "%restart_python" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dbutils.widgets.text(\"model_name\", defaultValue=\"feature_store.sharing_bugbash_a7ce.fs_model\")\n", + "dbutils.widgets.text(\"share_name\", defaultValue=\"mingyang-test-share\")\n", + "dbutils.widgets.text(\"max_number_of_versions_to_sync\", defaultValue=\"10\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from databricks.sdk import WorkspaceClient\n", + "from databricks.sdk.service.serving import ServedEntityInput\n", + "from databricks.sdk.service.sharing import (\n", + " SharedDataObjectUpdate, \n", + " SharedDataObjectUpdateAction, \n", + " SharedDataObjectDataObjectType, \n", + " SharedDataObject, \n", + " SharedDataObjectHistoryDataSharingStatus,\n", + ")\n", + "\n", + "workspace = WorkspaceClient()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model_name = dbutils.widgets.get(\"model_name\")\n", + "share_name = dbutils.widgets.get(\"share_name\")\n", + "max_number_of_versions_to_sync = int(dbutils.widgets.get(\"max_number_of_versions_to_sync\"))\n", + "\n", + "print(\"~~~ parameters ~~~\")\n", + "print(f\"Model name: {model_name}\")\n", + "print(f\"Share name: {share_name}\")\n", + "print(f\"Max number of versions to sync: {max_number_of_versions_to_sync}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def getLatestVersions(model_name: str, max_number_of_versions: int):\n", + " versions = workspace.model_versions.list(\n", + " full_name=model_name,\n", + " )\n", + " result = []\n", + " for version in versions:\n", + " result.append(workspace.model_versions.get(full_name=model_name, version=version.version))\n", + " return result\n", + "\n", + "def getDependencies(model_versions):\n", + " tables = set()\n", + " functions = set()\n", + " for version in model_versions:\n", + " for dependency in version.model_version_dependencies.dependencies:\n", + " if dependency.table is not None:\n", + " tables.add(dependency.table.table_full_name)\n", + " elif dependency.function is not None:\n", + " functions.add(dependency.function.function_full_name)\n", + " return tables, functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "versions = getLatestVersions(model_name, max_number_of_versions_to_sync)\n", + "tableDependencies, functionDependencies = getDependencies(versions)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from lib.rest_client import RestClient\n", + "\n", + "notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()\n", + "rc = RestClient(notebook_context)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "shareInfo = rc.get_share_info(share_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sharedTables = sharedFunctions = sharedSchemas = sharedModels = set()\n", + "model_is_shared = False\n", + "\n", + "if 'objects' in shareInfo:\n", + " sharedTables = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'TABLE', shareInfo['objects'])])\n", + " sharedFunctions = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'FUNCTION', shareInfo['objects'])])\n", + " sharedSchemas = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'SCHEMA', shareInfo['objects'])])\n", + " sharedModels = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'MODEL', shareInfo['objects'])])\n", + " model_is_shared = model_name in sharedModels" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def getSchema(full_name):\n", + " name_sections = full_name.split(\".\")\n", + " return f\"{name_sections[0]}.{name_sections[1]}\"\n", + "\n", + "def getObjectsToAdd(dependencies, sharedObjects, sharedSchemas):\n", + " newDependencies = dependencies - sharedObjects\n", + " return list(filter(lambda x: getSchema(x) not in sharedSchemas, newDependencies))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tablesToAdd = getObjectsToAdd(tableDependencies, sharedTables, sharedSchemas)\n", + "functionsToAdd = getObjectsToAdd(functionDependencies, sharedFunctions, sharedSchemas)\n", + "\n", + "updates = []\n", + "\n", + "for table in tablesToAdd:\n", + " updates.append(SharedDataObjectUpdate(\n", + " action=SharedDataObjectUpdateAction.ADD,\n", + " data_object=SharedDataObject(\n", + " name=table,\n", + " data_object_type=SharedDataObjectDataObjectType.TABLE,\n", + " history_data_sharing_status=SharedDataObjectHistoryDataSharingStatus.ENABLED\n", + " )\n", + " ))\n", + "\n", + "\n", + "for function in functionsToAdd:\n", + " updates.append(SharedDataObjectUpdate(\n", + " action=SharedDataObjectUpdateAction.ADD,\n", + " data_object=SharedDataObject(\n", + " name=function,\n", + " data_object_type=SharedDataObjectDataObjectType.FUNCTION\n", + " )\n", + " ))\n", + "\n", + "if not model_is_shared:\n", + " updates.append(SharedDataObjectUpdate(\n", + " action=SharedDataObjectUpdateAction.ADD,\n", + " data_object=SharedDataObject(\n", + " name=model_name,\n", + " data_object_type=SharedDataObjectDataObjectType.MODEL\n", + " )\n", + " ))\n", + "\n", + "def print_update_summary(updates):\n", + " for update in updates:\n", + " print(f\"{update.action.value} {update.data_object.data_object_type} {update.data_object.name}\")\n", + "\n", + "if updates:\n", + " print_update_summary(updates)\n", + " workspace.shares.update(\n", + " name=share_name,\n", + " updates=updates\n", + " )\n", + "else:\n", + " print(\"The share is already up-to-date.\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 59ae338315c96e1be1e073c159f733146ceb115b Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Wed, 29 Jan 2025 13:11:12 -0800 Subject: [PATCH 2/9] lint --- multi_region_serving/.gitignore | 2 +- .../resources/manage_share.job.yml | 2 +- multi_region_serving/setup.py | 37 ------------------- multi_region_serving/src/lib/rest_client.py | 17 +++++---- 4 files changed, 12 insertions(+), 46 deletions(-) delete mode 100644 multi_region_serving/setup.py diff --git a/multi_region_serving/.gitignore b/multi_region_serving/.gitignore index de811f1..9e119e7 100644 --- a/multi_region_serving/.gitignore +++ b/multi_region_serving/.gitignore @@ -1,2 +1,2 @@ - +.vscode .databricks diff --git a/multi_region_serving/resources/manage_share.job.yml b/multi_region_serving/resources/manage_share.job.yml index 24edaeb..3925559 100644 --- a/multi_region_serving/resources/manage_share.job.yml +++ b/multi_region_serving/resources/manage_share.job.yml @@ -15,4 +15,4 @@ resources: - name: max_number_of_versions_to_sync default: '10' - name: share_name - default: 'mingyang-test-share' \ No newline at end of file + default: 'mingyang-test-share' diff --git a/multi_region_serving/setup.py b/multi_region_serving/setup.py deleted file mode 100644 index 0d63c7e..0000000 --- a/multi_region_serving/setup.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -setup.py configuration script describing how to build and package this project. - -This file is primarily used by the setuptools library and typically should not -be executed directly. See README.md for how to deploy, test, and run -the manage_serving project. -""" -from setuptools import setup, find_packages - -import sys -sys.path.append('./src') - -import datetime -import lib.rest_client as rest_client - -setup( - name="manage_serving", - # We use timestamp as Local version identifier (https://peps.python.org/pep-0440/#local-version-identifiers.) - # to ensure that changes to wheel package are picked up when used on all-purpose clusters - version=rest_client.__version__ + "+" + datetime.datetime.utcnow().strftime("%Y%m%d.%H%M%S"), - url="https://databricks.com", - author="mingyang.ge@databricks.com", - description="wheel file based on manage_serving/src", - packages=find_packages(where='./src'), - package_dir={'': 'src'}, - entry_points={ - "packages": [ - "main=manage_serving.main:main" - ] - }, - install_requires=[ - # Dependencies in case the output wheel file is used as a library dependency. - # For defining dependencies, when this package is used in Databricks, see: - # https://docs.databricks.com/dev-tools/bundles/library-dependencies.html - "setuptools" - ], -) diff --git a/multi_region_serving/src/lib/rest_client.py b/multi_region_serving/src/lib/rest_client.py index 639612d..44beac7 100644 --- a/multi_region_serving/src/lib/rest_client.py +++ b/multi_region_serving/src/lib/rest_client.py @@ -1,22 +1,25 @@ import urllib.request import json from databricks.sdk.runtime import spark - + + class RestClient: def __init__(self, context): self.base_url = "https://" + spark.conf.get("spark.databricks.workspaceUrl") self.token = context.apiToken().get() - def get_share_info(self, share_name:str): - return self._get(f'api/2.1/unity-catalog/shares/{share_name}?include_shared_data=true') - + def get_share_info(self, share_name: str): + return self._get( + f"api/2.1/unity-catalog/shares/{share_name}?include_shared_data=true" + ) + def _get(self, uri): - url = f'{self.base_url}/{uri}' - headers = { 'Authorization': f'Bearer {self.token}'} + url = f"{self.base_url}/{uri}" + headers = {"Authorization": f"Bearer {self.token}"} req = urllib.request.Request(url, headers=headers) try: response = urllib.request.urlopen(req) return json.load(response) except urllib.error.HTTPError as e: result = e.read().decode() - print((e.code, result)) \ No newline at end of file + print((e.code, result)) From b350599b19cdf0b760a7b72ddd708367cb14fb62 Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Wed, 29 Jan 2025 13:19:24 -0800 Subject: [PATCH 3/9] x --- .../resources/manage_serving.job.yml | 8 ++++---- multi_region_serving/resources/manage_share.job.yml | 6 +++--- multi_region_serving/src/manage_endpoint.ipynb | 7 +++++-- multi_region_serving/src/manage_share.ipynb | 13 ++++++++++--- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/multi_region_serving/resources/manage_serving.job.yml b/multi_region_serving/resources/manage_serving.job.yml index a79e89d..0f42ace 100644 --- a/multi_region_serving/resources/manage_serving.job.yml +++ b/multi_region_serving/resources/manage_serving.job.yml @@ -4,15 +4,15 @@ resources: name: manage_serving_job email_notifications: on_failure: - - mingyang.ge@databricks.com + - tasks: - task_key: notebook_task notebook_task: notebook_path: ../src/manage_endpoint.ipynb parameters: - name: endpoint_name - default: my-share-endpoint + default: - name: model_name - default: mingyang_share.sharing_bugbash_a7ce.fs_model + default: - name: model_version - default: "2" + default: "1" diff --git a/multi_region_serving/resources/manage_share.job.yml b/multi_region_serving/resources/manage_share.job.yml index 3925559..2de75e4 100644 --- a/multi_region_serving/resources/manage_share.job.yml +++ b/multi_region_serving/resources/manage_share.job.yml @@ -4,15 +4,15 @@ resources: name: manage_share_job email_notifications: on_failure: - - mingyang.ge@databricks.com + - tasks: - task_key: notebook_task notebook_task: notebook_path: ../src/manage_share.ipynb parameters: - name: model_name - default: 'feature_store.sharing_bugbash_a7ce.fs_model' + default: - name: max_number_of_versions_to_sync default: '10' - name: share_name - default: 'mingyang-test-share' + default: diff --git a/multi_region_serving/src/manage_endpoint.ipynb b/multi_region_serving/src/manage_endpoint.ipynb index 13dfa82..7a6101b 100644 --- a/multi_region_serving/src/manage_endpoint.ipynb +++ b/multi_region_serving/src/manage_endpoint.ipynb @@ -13,9 +13,12 @@ } }, "source": [ - "# Update Model Serving Endpoint\n", + "# Create or Update Model Serving Endpoint\n", "\n", - "Update the deployed serving endpoints with a new model version." + "Create or Update the deployed serving endpoints with a new model version.\n", + "\n", + "* Make sure you've created online tables for all the required feature tables.\n", + "* Run this job on the workspace where you want to serve the model. " ] }, { diff --git a/multi_region_serving/src/manage_share.ipynb b/multi_region_serving/src/manage_share.ipynb index fed9538..a179464 100644 --- a/multi_region_serving/src/manage_share.ipynb +++ b/multi_region_serving/src/manage_share.ipynb @@ -4,7 +4,14 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Sync the share and add all required resources" + "# Sync the share and add all required resources\n", + "\n", + "Add a model and all it's dependencies to the given Share.\n", + "\n", + "Prerequisit:\n", + "\n", + "* Create a share in [delta-sharing](https://docs.databricks.com/en/delta-sharing/create-share.html).\n", + "* Config the default parameters in resources/manage_share.job.yml " ] }, { @@ -27,8 +34,8 @@ "metadata": {}, "outputs": [], "source": [ - "dbutils.widgets.text(\"model_name\", defaultValue=\"feature_store.sharing_bugbash_a7ce.fs_model\")\n", - "dbutils.widgets.text(\"share_name\", defaultValue=\"mingyang-test-share\")\n", + "dbutils.widgets.text(\"model_name\", defaultValue=\"\")\n", + "dbutils.widgets.text(\"share_name\", defaultValue=\"\")\n", "dbutils.widgets.text(\"max_number_of_versions_to_sync\", defaultValue=\"10\")" ] }, From 9e7b5eac6b4ebed780d48cec5ccf3d7f996a78c0 Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Wed, 29 Jan 2025 13:20:38 -0800 Subject: [PATCH 4/9] ignore --- multi_region_serving/.gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/multi_region_serving/.gitignore b/multi_region_serving/.gitignore index 9e119e7..364994d 100644 --- a/multi_region_serving/.gitignore +++ b/multi_region_serving/.gitignore @@ -1,2 +1,3 @@ .vscode .databricks +.scratch From c2d8ace6179532144c2528a5154c46bedfc88d4e Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Wed, 29 Jan 2025 13:23:46 -0800 Subject: [PATCH 5/9] lint --- .../src/manage_endpoint.ipynb | 69 ++++---- multi_region_serving/src/manage_share.ipynb | 153 +++++++++++------- 2 files changed, 132 insertions(+), 90 deletions(-) diff --git a/multi_region_serving/src/manage_endpoint.ipynb b/multi_region_serving/src/manage_endpoint.ipynb index 7a6101b..185e7a0 100644 --- a/multi_region_serving/src/manage_endpoint.ipynb +++ b/multi_region_serving/src/manage_endpoint.ipynb @@ -83,8 +83,8 @@ "ARGS = dbutils.widgets.getAll()\n", "\n", "endpoint_name = ARGS[\"endpoint_name\"]\n", - "model_name = ARGS['model_name']\n", - "model_version = ARGS['model_version']" + "model_name = ARGS[\"model_name\"]\n", + "model_version = ARGS[\"model_version\"]" ] }, { @@ -139,43 +139,46 @@ ], "source": [ "try:\n", - " endpoint = workspace.serving_endpoints.get(name=endpoint_name)\n", + " endpoint = workspace.serving_endpoints.get(name=endpoint_name)\n", "except ResourceDoesNotExist as e:\n", - " endpoint = None\n", + " endpoint = None\n", "\n", "if endpoint is None:\n", - " workspace.serving_endpoints.create(\n", - " name=endpoint_name,\n", - " config=EndpointCoreConfigInput(\n", - " served_entities=[\n", - " ServedEntityInput(\n", - " entity_name=model_name,\n", - " entity_version=model_version,\n", - " scale_to_zero_enabled=True,\n", - " workload_size=\"Small\"\n", - " )\n", - " ]\n", + " workspace.serving_endpoints.create(\n", + " name=endpoint_name,\n", + " config=EndpointCoreConfigInput(\n", + " served_entities=[\n", + " ServedEntityInput(\n", + " entity_name=model_name,\n", + " entity_version=model_version,\n", + " scale_to_zero_enabled=True,\n", + " workload_size=\"Small\",\n", + " )\n", + " ]\n", + " ),\n", " )\n", - " )\n", - " print(f\"Created endpoint {endpoint_name}\")\n", + " print(f\"Created endpoint {endpoint_name}\")\n", "elif endpoint.pending_config is not None:\n", - " print(f\"A pending update for endpoint {endpoint_name} is being processed.\")\n", - "elif endpoint.config.served_entities[0].entity_name != model_name or endpoint.config.served_entities[0].entity_version != model_version:\n", - " # Update endpoint\n", - " workspace.serving_endpoints.update_config(\n", - " name=endpoint_name,\n", - " served_entities=[\n", - " ServedEntityInput(\n", - " entity_name=model_name,\n", - " entity_version=model_version,\n", - " scale_to_zero_enabled=True,\n", - " workload_size=\"Small\"\n", - " )\n", - " ]\n", - " )\n", - " print(f\"Updated endpoint {endpoint_name}\")\n", + " print(f\"A pending update for endpoint {endpoint_name} is being processed.\")\n", + "elif (\n", + " endpoint.config.served_entities[0].entity_name != model_name\n", + " or endpoint.config.served_entities[0].entity_version != model_version\n", + "):\n", + " # Update endpoint\n", + " workspace.serving_endpoints.update_config(\n", + " name=endpoint_name,\n", + " served_entities=[\n", + " ServedEntityInput(\n", + " entity_name=model_name,\n", + " entity_version=model_version,\n", + " scale_to_zero_enabled=True,\n", + " workload_size=\"Small\",\n", + " )\n", + " ],\n", + " )\n", + " print(f\"Updated endpoint {endpoint_name}\")\n", "else:\n", - " print(\"Endpoint already up-to-date\")" + " print(\"Endpoint already up-to-date\")" ] }, { diff --git a/multi_region_serving/src/manage_share.ipynb b/multi_region_serving/src/manage_share.ipynb index a179464..ed23142 100644 --- a/multi_region_serving/src/manage_share.ipynb +++ b/multi_region_serving/src/manage_share.ipynb @@ -48,10 +48,10 @@ "from databricks.sdk import WorkspaceClient\n", "from databricks.sdk.service.serving import ServedEntityInput\n", "from databricks.sdk.service.sharing import (\n", - " SharedDataObjectUpdate, \n", - " SharedDataObjectUpdateAction, \n", - " SharedDataObjectDataObjectType, \n", - " SharedDataObject, \n", + " SharedDataObjectUpdate,\n", + " SharedDataObjectUpdateAction,\n", + " SharedDataObjectDataObjectType,\n", + " SharedDataObject,\n", " SharedDataObjectHistoryDataSharingStatus,\n", ")\n", "\n", @@ -66,7 +66,9 @@ "source": [ "model_name = dbutils.widgets.get(\"model_name\")\n", "share_name = dbutils.widgets.get(\"share_name\")\n", - "max_number_of_versions_to_sync = int(dbutils.widgets.get(\"max_number_of_versions_to_sync\"))\n", + "max_number_of_versions_to_sync = int(\n", + " dbutils.widgets.get(\"max_number_of_versions_to_sync\")\n", + ")\n", "\n", "print(\"~~~ parameters ~~~\")\n", "print(f\"Model name: {model_name}\")\n", @@ -81,24 +83,27 @@ "outputs": [], "source": [ "def getLatestVersions(model_name: str, max_number_of_versions: int):\n", - " versions = workspace.model_versions.list(\n", - " full_name=model_name,\n", - " )\n", - " result = []\n", - " for version in versions:\n", - " result.append(workspace.model_versions.get(full_name=model_name, version=version.version))\n", - " return result\n", + " versions = workspace.model_versions.list(\n", + " full_name=model_name,\n", + " )\n", + " result = []\n", + " for version in versions:\n", + " result.append(\n", + " workspace.model_versions.get(full_name=model_name, version=version.version)\n", + " )\n", + " return result\n", + "\n", "\n", "def getDependencies(model_versions):\n", - " tables = set()\n", - " functions = set()\n", - " for version in model_versions:\n", - " for dependency in version.model_version_dependencies.dependencies:\n", - " if dependency.table is not None:\n", - " tables.add(dependency.table.table_full_name)\n", - " elif dependency.function is not None:\n", - " functions.add(dependency.function.function_full_name)\n", - " return tables, functions" + " tables = set()\n", + " functions = set()\n", + " for version in model_versions:\n", + " for dependency in version.model_version_dependencies.dependencies:\n", + " if dependency.table is not None:\n", + " tables.add(dependency.table.table_full_name)\n", + " elif dependency.function is not None:\n", + " functions.add(dependency.function.function_full_name)\n", + " return tables, functions" ] }, { @@ -141,11 +146,39 @@ "sharedTables = sharedFunctions = sharedSchemas = sharedModels = set()\n", "model_is_shared = False\n", "\n", - "if 'objects' in shareInfo:\n", - " sharedTables = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'TABLE', shareInfo['objects'])])\n", - " sharedFunctions = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'FUNCTION', shareInfo['objects'])])\n", - " sharedSchemas = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'SCHEMA', shareInfo['objects'])])\n", - " sharedModels = set([obj['name'] for obj in filter(lambda obj: obj['data_object_type'] == 'MODEL', shareInfo['objects'])])\n", + "if \"objects\" in shareInfo:\n", + " sharedTables = set(\n", + " [\n", + " obj[\"name\"]\n", + " for obj in filter(\n", + " lambda obj: obj[\"data_object_type\"] == \"TABLE\", shareInfo[\"objects\"]\n", + " )\n", + " ]\n", + " )\n", + " sharedFunctions = set(\n", + " [\n", + " obj[\"name\"]\n", + " for obj in filter(\n", + " lambda obj: obj[\"data_object_type\"] == \"FUNCTION\", shareInfo[\"objects\"]\n", + " )\n", + " ]\n", + " )\n", + " sharedSchemas = set(\n", + " [\n", + " obj[\"name\"]\n", + " for obj in filter(\n", + " lambda obj: obj[\"data_object_type\"] == \"SCHEMA\", shareInfo[\"objects\"]\n", + " )\n", + " ]\n", + " )\n", + " sharedModels = set(\n", + " [\n", + " obj[\"name\"]\n", + " for obj in filter(\n", + " lambda obj: obj[\"data_object_type\"] == \"MODEL\", shareInfo[\"objects\"]\n", + " )\n", + " ]\n", + " )\n", " model_is_shared = model_name in sharedModels" ] }, @@ -156,12 +189,13 @@ "outputs": [], "source": [ "def getSchema(full_name):\n", - " name_sections = full_name.split(\".\")\n", - " return f\"{name_sections[0]}.{name_sections[1]}\"\n", + " name_sections = full_name.split(\".\")\n", + " return f\"{name_sections[0]}.{name_sections[1]}\"\n", + "\n", "\n", "def getObjectsToAdd(dependencies, sharedObjects, sharedSchemas):\n", - " newDependencies = dependencies - sharedObjects\n", - " return list(filter(lambda x: getSchema(x) not in sharedSchemas, newDependencies))" + " newDependencies = dependencies - sharedObjects\n", + " return list(filter(lambda x: getSchema(x) not in sharedSchemas, newDependencies))" ] }, { @@ -176,46 +210,51 @@ "updates = []\n", "\n", "for table in tablesToAdd:\n", - " updates.append(SharedDataObjectUpdate(\n", - " action=SharedDataObjectUpdateAction.ADD,\n", - " data_object=SharedDataObject(\n", - " name=table,\n", - " data_object_type=SharedDataObjectDataObjectType.TABLE,\n", - " history_data_sharing_status=SharedDataObjectHistoryDataSharingStatus.ENABLED\n", + " updates.append(\n", + " SharedDataObjectUpdate(\n", + " action=SharedDataObjectUpdateAction.ADD,\n", + " data_object=SharedDataObject(\n", + " name=table,\n", + " data_object_type=SharedDataObjectDataObjectType.TABLE,\n", + " history_data_sharing_status=SharedDataObjectHistoryDataSharingStatus.ENABLED,\n", + " ),\n", + " )\n", " )\n", - " ))\n", "\n", "\n", "for function in functionsToAdd:\n", - " updates.append(SharedDataObjectUpdate(\n", - " action=SharedDataObjectUpdateAction.ADD,\n", - " data_object=SharedDataObject(\n", - " name=function,\n", - " data_object_type=SharedDataObjectDataObjectType.FUNCTION\n", + " updates.append(\n", + " SharedDataObjectUpdate(\n", + " action=SharedDataObjectUpdateAction.ADD,\n", + " data_object=SharedDataObject(\n", + " name=function, data_object_type=SharedDataObjectDataObjectType.FUNCTION\n", + " ),\n", + " )\n", " )\n", - " ))\n", "\n", "if not model_is_shared:\n", - " updates.append(SharedDataObjectUpdate(\n", - " action=SharedDataObjectUpdateAction.ADD,\n", - " data_object=SharedDataObject(\n", - " name=model_name,\n", - " data_object_type=SharedDataObjectDataObjectType.MODEL\n", + " updates.append(\n", + " SharedDataObjectUpdate(\n", + " action=SharedDataObjectUpdateAction.ADD,\n", + " data_object=SharedDataObject(\n", + " name=model_name, data_object_type=SharedDataObjectDataObjectType.MODEL\n", + " ),\n", + " )\n", " )\n", - " ))\n", + "\n", "\n", "def print_update_summary(updates):\n", - " for update in updates:\n", - " print(f\"{update.action.value} {update.data_object.data_object_type} {update.data_object.name}\")\n", + " for update in updates:\n", + " print(\n", + " f\"{update.action.value} {update.data_object.data_object_type} {update.data_object.name}\"\n", + " )\n", + "\n", "\n", "if updates:\n", - " print_update_summary(updates)\n", - " workspace.shares.update(\n", - " name=share_name,\n", - " updates=updates\n", - " )\n", + " print_update_summary(updates)\n", + " workspace.shares.update(name=share_name, updates=updates)\n", "else:\n", - " print(\"The share is already up-to-date.\")" + " print(\"The share is already up-to-date.\")" ] } ], From a41392518197848d1bab57c1e3366371f5afb868 Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Fri, 31 Jan 2025 14:17:29 -0800 Subject: [PATCH 6/9] address --- multi_region_serving/README.md | 27 ++++++++++++------- multi_region_serving/databricks.yml | 18 +++++++++++-- .../resources/manage_serving.job.yml | 8 +++--- .../resources/manage_share.job.yml | 6 ++--- .../src/manage_endpoint.ipynb | 16 ----------- 5 files changed, 41 insertions(+), 34 deletions(-) diff --git a/multi_region_serving/README.md b/multi_region_serving/README.md index 7211743..f6a904e 100644 --- a/multi_region_serving/README.md +++ b/multi_region_serving/README.md @@ -8,10 +8,9 @@ across multiple regions. 1. Download this example 2. Make changes as needed. Some files to highlight: - * resources/*.job.yml - Job metadata, including parameters. + * databricks.yml - DAB bundle configuration including variable names and default values. * src/manage_endpoint.ipynb - Notebook for create / update serving endpoints. * src/manage_share.ipynb - Notebook for syncing dependencies of a shared model. - * databricks.yml - DAB bundle configuration including target name and workspace URL. ## How to trigger the workflows @@ -22,9 +21,19 @@ across multiple regions. $ databricks configure ``` -3. To deploy a copy to your main workspace: +3. Validate bundle variables + + If you don't want to set a default value for any variables defined in `databricks.yaml`, you + need to provide the variables when running any commands. You can validate if all variables are + provided + ``` + $ MY_BUNDLE_VARS="share_name=,model_name=,model_version=,endpoint_name=,notification_email=" + $ databricks bundle validate --var=$MY_BUNDLE_VARS + ``` + +4. To deploy a copy to your main workspace: ``` - $ databricks bundle deploy --target main + $ databricks bundle deploy --target main --var=$MY_BUNDLE_VARS ``` (Note that "main" is the target name defined in databricks.yml) @@ -33,19 +42,19 @@ across multiple regions. `[dev yourname] manage_serving_job` to your workspace. You can find that job by opening your workpace and clicking on **Workflows**. -4. Similarly, to deploy a remote workspace, type: +5. Similarly, to deploy a remote workspace, type: ``` - $ databricks bundle -t remote1 -p deploy + $ databricks bundle -p deploy --target remote1 --var=$MY_BUNDLE_VARS ``` Use `-p` to specify the databricks profile used by this command. The profile need to be configured in `~/.databrickscfg`. -5. To run the workflow to sync a share, use the "run" command: +6. To run the workflow to sync a share, use the "run" command: ``` - $ databricks bundle -t main -p run manage_share_job + $ databricks bundle -t main -p run manage_share_job --var=$MY_BUNDLE_VARS ``` -6. For documentation on the Databricks asset bundles format used +7. For documentation on the Databricks asset bundles format used for this project, and for CI/CD configuration, see https://docs.databricks.com/dev-tools/bundles/index.html. diff --git a/multi_region_serving/databricks.yml b/multi_region_serving/databricks.yml index d6e001e..0fb576e 100644 --- a/multi_region_serving/databricks.yml +++ b/multi_region_serving/databricks.yml @@ -3,6 +3,20 @@ bundle: name: manage_serving +variables: + notification_email: + description: Experiment name for the model training. + model_name: + description: Model name for the model training. + remote_model_name: + description: The model name in receipient workspace. This might be similar with the origional model name with a new catalog name in the receipient workspace. + model_version: + description: Model name for the model training. + endpoint_name: + description: Name of the endpoint to deploy. + share_name: + description: Name of the share. + include: - resources/*.yml @@ -15,12 +29,12 @@ targets: mode: development default: true workspace: - host: https://e2-dogfood.staging.cloud.databricks.com + host: https://myworkspace.databricks.com remote1: # The remote workspace that serves the model mode: development workspace: - host: https://e2-dogfood-feature-store.staging.cloud.databricks.com + host: https://myworkspace-remote.databricks.com diff --git a/multi_region_serving/resources/manage_serving.job.yml b/multi_region_serving/resources/manage_serving.job.yml index 0f42ace..bd7e2d7 100644 --- a/multi_region_serving/resources/manage_serving.job.yml +++ b/multi_region_serving/resources/manage_serving.job.yml @@ -4,15 +4,15 @@ resources: name: manage_serving_job email_notifications: on_failure: - - + - ${var.notification_email} tasks: - task_key: notebook_task notebook_task: notebook_path: ../src/manage_endpoint.ipynb parameters: - name: endpoint_name - default: + default: ${var.endpoint_name} - name: model_name - default: + default: ${var.remote_model_name} - name: model_version - default: "1" + default: ${var.model_version} diff --git a/multi_region_serving/resources/manage_share.job.yml b/multi_region_serving/resources/manage_share.job.yml index 2de75e4..75c15af 100644 --- a/multi_region_serving/resources/manage_share.job.yml +++ b/multi_region_serving/resources/manage_share.job.yml @@ -4,15 +4,15 @@ resources: name: manage_share_job email_notifications: on_failure: - - + - ${var.notification_email} tasks: - task_key: notebook_task notebook_task: notebook_path: ../src/manage_share.ipynb parameters: - name: model_name - default: + default: ${var.model_name} - name: max_number_of_versions_to_sync default: '10' - name: share_name - default: + default: ${var.share_name} diff --git a/multi_region_serving/src/manage_endpoint.ipynb b/multi_region_serving/src/manage_endpoint.ipynb index 185e7a0..c63ed8d 100644 --- a/multi_region_serving/src/manage_endpoint.ipynb +++ b/multi_region_serving/src/manage_endpoint.ipynb @@ -180,22 +180,6 @@ "else:\n", " print(\"Endpoint already up-to-date\")" ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, - "inputWidgets": {}, - "nuid": "b0520726-0ce8-4802-85b2-431ec9d02bc3", - "showTitle": false, - "tableResultSettingsMap": {}, - "title": "" - } - }, - "outputs": [], - "source": [] } ], "metadata": { From acf9fc8e3bd258a2d233458a91147e104b3ea596 Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Mon, 3 Feb 2025 13:19:32 -0800 Subject: [PATCH 7/9] x --- .../src/manage_endpoint.ipynb | 287 ------------------ multi_region_serving/src/manage_endpoint.py | 79 +++++ multi_region_serving/src/manage_share.ipynb | 268 ---------------- multi_region_serving/src/manage_share.py | 194 ++++++++++++ 4 files changed, 273 insertions(+), 555 deletions(-) delete mode 100644 multi_region_serving/src/manage_endpoint.ipynb create mode 100644 multi_region_serving/src/manage_endpoint.py delete mode 100644 multi_region_serving/src/manage_share.ipynb create mode 100644 multi_region_serving/src/manage_share.py diff --git a/multi_region_serving/src/manage_endpoint.ipynb b/multi_region_serving/src/manage_endpoint.ipynb deleted file mode 100644 index c63ed8d..0000000 --- a/multi_region_serving/src/manage_endpoint.ipynb +++ /dev/null @@ -1,287 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, - "inputWidgets": {}, - "nuid": "ee353e42-ff58-4955-9608-12865bd0950e", - "showTitle": false, - "tableResultSettingsMap": {}, - "title": "" - } - }, - "source": [ - "# Create or Update Model Serving Endpoint\n", - "\n", - "Create or Update the deployed serving endpoints with a new model version.\n", - "\n", - "* Make sure you've created online tables for all the required feature tables.\n", - "* Run this job on the workspace where you want to serve the model. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%pip install databricks-sdk>=0.38.0\n", - "%restart_python" - ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, - "inputWidgets": {}, - "nuid": "da99ca57-cba0-4d67-a5a9-75d6a7884311", - "showTitle": false, - "tableResultSettingsMap": {}, - "title": "" - } - }, - "outputs": [], - "source": [ - "dbutils.widgets.text(\"endpoint_name\", defaultValue=\"\")\n", - "dbutils.widgets.text(\"model_name\", defaultValue=\"\")\n", - "dbutils.widgets.text(\"model_version\", defaultValue=\"\")" - ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, - "inputWidgets": {}, - "nuid": "5f2d79fc-86ab-4208-8970-b5547dc75820", - "showTitle": false, - "tableResultSettingsMap": {}, - "title": "" - } - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "{'endpoint_name': 'test-my-dab-endpoint', 'model_name': 'feature_store.sharing_bugbash_a7ce.fs_model', 'model_version': '2'}\n" - ] - } - ], - "source": [ - "ARGS = dbutils.widgets.getAll()\n", - "\n", - "endpoint_name = ARGS[\"endpoint_name\"]\n", - "model_name = ARGS[\"model_name\"]\n", - "model_version = ARGS[\"model_version\"]" - ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, - "inputWidgets": {}, - "nuid": "43d66a63-39ea-436b-a82b-dd27eb479e40", - "showTitle": false, - "tableResultSettingsMap": {}, - "title": "" - } - }, - "outputs": [], - "source": [ - "from databricks.sdk import WorkspaceClient\n", - "from databricks.sdk.service.serving import ServedEntityInput, EndpointCoreConfigInput\n", - "from databricks.sdk.errors import ResourceDoesNotExist\n", - "\n", - "workspace = WorkspaceClient()" - ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, - "inputWidgets": {}, - "nuid": "3fe1e5b8-76ca-496b-890f-1cc584d29b38", - "showTitle": false, - "tableResultSettingsMap": {}, - "title": "" - } - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "A pending update for endpoint test-my-dab-endpoint is being processed.\n" - ] - } - ], - "source": [ - "try:\n", - " endpoint = workspace.serving_endpoints.get(name=endpoint_name)\n", - "except ResourceDoesNotExist as e:\n", - " endpoint = None\n", - "\n", - "if endpoint is None:\n", - " workspace.serving_endpoints.create(\n", - " name=endpoint_name,\n", - " config=EndpointCoreConfigInput(\n", - " served_entities=[\n", - " ServedEntityInput(\n", - " entity_name=model_name,\n", - " entity_version=model_version,\n", - " scale_to_zero_enabled=True,\n", - " workload_size=\"Small\",\n", - " )\n", - " ]\n", - " ),\n", - " )\n", - " print(f\"Created endpoint {endpoint_name}\")\n", - "elif endpoint.pending_config is not None:\n", - " print(f\"A pending update for endpoint {endpoint_name} is being processed.\")\n", - "elif (\n", - " endpoint.config.served_entities[0].entity_name != model_name\n", - " or endpoint.config.served_entities[0].entity_version != model_version\n", - "):\n", - " # Update endpoint\n", - " workspace.serving_endpoints.update_config(\n", - " name=endpoint_name,\n", - " served_entities=[\n", - " ServedEntityInput(\n", - " entity_name=model_name,\n", - " entity_version=model_version,\n", - " scale_to_zero_enabled=True,\n", - " workload_size=\"Small\",\n", - " )\n", - " ],\n", - " )\n", - " print(f\"Updated endpoint {endpoint_name}\")\n", - "else:\n", - " print(\"Endpoint already up-to-date\")" - ] - } - ], - "metadata": { - "application/vnd.databricks.v1+notebook": { - "computePreferences": null, - "dashboards": [], - "environmentMetadata": null, - "language": "python", - "notebookMetadata": { - "pythonIndentUnit": 2 - }, - "notebookName": "manage_endpoint", - "widgets": { - "endpoint_name": { - "currentValue": "test-my-dab-endpoint", - "nuid": "c116073e-3f87-45c6-9fe0-42bc21d624f4", - "typedWidgetInfo": { - "autoCreated": false, - "defaultValue": "", - "label": null, - "name": "endpoint_name", - "options": { - "validationRegex": null, - "widgetDisplayType": "Text" - }, - "parameterDataType": "String" - }, - "widgetInfo": { - "defaultValue": "", - "label": null, - "name": "endpoint_name", - "options": { - "autoCreated": null, - "validationRegex": null, - "widgetType": "text" - }, - "widgetType": "text" - } - }, - "model_name": { - "currentValue": "feature_store.sharing_bugbash_a7ce.fs_model", - "nuid": "9817611f-00f2-45f0-a61f-9a88132afd1b", - "typedWidgetInfo": { - "autoCreated": false, - "defaultValue": "", - "label": null, - "name": "model_name", - "options": { - "validationRegex": null, - "widgetDisplayType": "Text" - }, - "parameterDataType": "String" - }, - "widgetInfo": { - "defaultValue": "", - "label": null, - "name": "model_name", - "options": { - "autoCreated": null, - "validationRegex": null, - "widgetType": "text" - }, - "widgetType": "text" - } - }, - "model_version": { - "currentValue": "2", - "nuid": "d437ed85-d52e-4251-8896-b64d9a39f576", - "typedWidgetInfo": { - "autoCreated": false, - "defaultValue": "", - "label": null, - "name": "model_version", - "options": { - "validationRegex": null, - "widgetDisplayType": "Text" - }, - "parameterDataType": "String" - }, - "widgetInfo": { - "defaultValue": "", - "label": null, - "name": "model_version", - "options": { - "autoCreated": null, - "validationRegex": null, - "widgetType": "text" - }, - "widgetType": "text" - } - } - } - }, - "kernelspec": { - "display_name": "dbconnect", - "language": "python", - "name": "python3" - }, - "language_info": { - "name": "python" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/multi_region_serving/src/manage_endpoint.py b/multi_region_serving/src/manage_endpoint.py new file mode 100644 index 0000000..a8b778b --- /dev/null +++ b/multi_region_serving/src/manage_endpoint.py @@ -0,0 +1,79 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Create or Update Model Serving Endpoint +# MAGIC +# MAGIC Create or Update the deployed serving endpoints with a new model version. +# MAGIC +# MAGIC * Make sure you've created online tables for all the required feature tables. +# MAGIC * Run this job on the workspace where you want to serve the model. + +# COMMAND ---------- + +# MAGIC %pip install databricks-sdk>=0.38.0 +# MAGIC %restart_python + +# COMMAND ---------- + +dbutils.widgets.text("endpoint_name", defaultValue="") +dbutils.widgets.text("model_name", defaultValue="") +dbutils.widgets.text("model_version", defaultValue="") + +# COMMAND ---------- + +ARGS = dbutils.widgets.getAll() + +endpoint_name = ARGS["endpoint_name"] +model_name = ARGS["model_name"] +model_version = ARGS["model_version"] + +# COMMAND ---------- + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import ServedEntityInput, EndpointCoreConfigInput +from databricks.sdk.errors import ResourceDoesNotExist + +workspace = WorkspaceClient() + +# COMMAND ---------- + +try: + endpoint = workspace.serving_endpoints.get(name=endpoint_name) +except ResourceDoesNotExist as e: + endpoint = None + +if endpoint is None: + workspace.serving_endpoints.create( + name=endpoint_name, + config=EndpointCoreConfigInput( + served_entities=[ + ServedEntityInput( + entity_name=model_name, + entity_version=model_version, + scale_to_zero_enabled=True, + workload_size="Small", + ) + ] + ), + ) + print(f"Created endpoint {endpoint_name}") +elif endpoint.pending_config is not None: + print(f"A pending update for endpoint {endpoint_name} is being processed.") +elif ( + endpoint.config.served_entities[0].entity_name != model_name + or endpoint.config.served_entities[0].entity_version != model_version +): + # Update endpoint + workspace.serving_endpoints.update_config( + name=endpoint_name, + served_entities=[ + ServedEntityInput( + entity_name=model_name, + entity_version=model_version, + scale_to_zero_enabled=True, + workload_size="Small", + ) + ], + ) + print(f"Updated endpoint {endpoint_name}") +else: + print("Endpoint already up-to-date") \ No newline at end of file diff --git a/multi_region_serving/src/manage_share.ipynb b/multi_region_serving/src/manage_share.ipynb deleted file mode 100644 index ed23142..0000000 --- a/multi_region_serving/src/manage_share.ipynb +++ /dev/null @@ -1,268 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Sync the share and add all required resources\n", - "\n", - "Add a model and all it's dependencies to the given Share.\n", - "\n", - "Prerequisit:\n", - "\n", - "* Create a share in [delta-sharing](https://docs.databricks.com/en/delta-sharing/create-share.html).\n", - "* Config the default parameters in resources/manage_share.job.yml " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "vscode": { - "languageId": "plaintext" - } - }, - "outputs": [], - "source": [ - "%pip install databricks-sdk>=0.38.0\n", - "%restart_python" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "dbutils.widgets.text(\"model_name\", defaultValue=\"\")\n", - "dbutils.widgets.text(\"share_name\", defaultValue=\"\")\n", - "dbutils.widgets.text(\"max_number_of_versions_to_sync\", defaultValue=\"10\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from databricks.sdk import WorkspaceClient\n", - "from databricks.sdk.service.serving import ServedEntityInput\n", - "from databricks.sdk.service.sharing import (\n", - " SharedDataObjectUpdate,\n", - " SharedDataObjectUpdateAction,\n", - " SharedDataObjectDataObjectType,\n", - " SharedDataObject,\n", - " SharedDataObjectHistoryDataSharingStatus,\n", - ")\n", - "\n", - "workspace = WorkspaceClient()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "model_name = dbutils.widgets.get(\"model_name\")\n", - "share_name = dbutils.widgets.get(\"share_name\")\n", - "max_number_of_versions_to_sync = int(\n", - " dbutils.widgets.get(\"max_number_of_versions_to_sync\")\n", - ")\n", - "\n", - "print(\"~~~ parameters ~~~\")\n", - "print(f\"Model name: {model_name}\")\n", - "print(f\"Share name: {share_name}\")\n", - "print(f\"Max number of versions to sync: {max_number_of_versions_to_sync}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def getLatestVersions(model_name: str, max_number_of_versions: int):\n", - " versions = workspace.model_versions.list(\n", - " full_name=model_name,\n", - " )\n", - " result = []\n", - " for version in versions:\n", - " result.append(\n", - " workspace.model_versions.get(full_name=model_name, version=version.version)\n", - " )\n", - " return result\n", - "\n", - "\n", - "def getDependencies(model_versions):\n", - " tables = set()\n", - " functions = set()\n", - " for version in model_versions:\n", - " for dependency in version.model_version_dependencies.dependencies:\n", - " if dependency.table is not None:\n", - " tables.add(dependency.table.table_full_name)\n", - " elif dependency.function is not None:\n", - " functions.add(dependency.function.function_full_name)\n", - " return tables, functions" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "versions = getLatestVersions(model_name, max_number_of_versions_to_sync)\n", - "tableDependencies, functionDependencies = getDependencies(versions)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from lib.rest_client import RestClient\n", - "\n", - "notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()\n", - "rc = RestClient(notebook_context)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "shareInfo = rc.get_share_info(share_name)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "sharedTables = sharedFunctions = sharedSchemas = sharedModels = set()\n", - "model_is_shared = False\n", - "\n", - "if \"objects\" in shareInfo:\n", - " sharedTables = set(\n", - " [\n", - " obj[\"name\"]\n", - " for obj in filter(\n", - " lambda obj: obj[\"data_object_type\"] == \"TABLE\", shareInfo[\"objects\"]\n", - " )\n", - " ]\n", - " )\n", - " sharedFunctions = set(\n", - " [\n", - " obj[\"name\"]\n", - " for obj in filter(\n", - " lambda obj: obj[\"data_object_type\"] == \"FUNCTION\", shareInfo[\"objects\"]\n", - " )\n", - " ]\n", - " )\n", - " sharedSchemas = set(\n", - " [\n", - " obj[\"name\"]\n", - " for obj in filter(\n", - " lambda obj: obj[\"data_object_type\"] == \"SCHEMA\", shareInfo[\"objects\"]\n", - " )\n", - " ]\n", - " )\n", - " sharedModels = set(\n", - " [\n", - " obj[\"name\"]\n", - " for obj in filter(\n", - " lambda obj: obj[\"data_object_type\"] == \"MODEL\", shareInfo[\"objects\"]\n", - " )\n", - " ]\n", - " )\n", - " model_is_shared = model_name in sharedModels" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def getSchema(full_name):\n", - " name_sections = full_name.split(\".\")\n", - " return f\"{name_sections[0]}.{name_sections[1]}\"\n", - "\n", - "\n", - "def getObjectsToAdd(dependencies, sharedObjects, sharedSchemas):\n", - " newDependencies = dependencies - sharedObjects\n", - " return list(filter(lambda x: getSchema(x) not in sharedSchemas, newDependencies))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "tablesToAdd = getObjectsToAdd(tableDependencies, sharedTables, sharedSchemas)\n", - "functionsToAdd = getObjectsToAdd(functionDependencies, sharedFunctions, sharedSchemas)\n", - "\n", - "updates = []\n", - "\n", - "for table in tablesToAdd:\n", - " updates.append(\n", - " SharedDataObjectUpdate(\n", - " action=SharedDataObjectUpdateAction.ADD,\n", - " data_object=SharedDataObject(\n", - " name=table,\n", - " data_object_type=SharedDataObjectDataObjectType.TABLE,\n", - " history_data_sharing_status=SharedDataObjectHistoryDataSharingStatus.ENABLED,\n", - " ),\n", - " )\n", - " )\n", - "\n", - "\n", - "for function in functionsToAdd:\n", - " updates.append(\n", - " SharedDataObjectUpdate(\n", - " action=SharedDataObjectUpdateAction.ADD,\n", - " data_object=SharedDataObject(\n", - " name=function, data_object_type=SharedDataObjectDataObjectType.FUNCTION\n", - " ),\n", - " )\n", - " )\n", - "\n", - "if not model_is_shared:\n", - " updates.append(\n", - " SharedDataObjectUpdate(\n", - " action=SharedDataObjectUpdateAction.ADD,\n", - " data_object=SharedDataObject(\n", - " name=model_name, data_object_type=SharedDataObjectDataObjectType.MODEL\n", - " ),\n", - " )\n", - " )\n", - "\n", - "\n", - "def print_update_summary(updates):\n", - " for update in updates:\n", - " print(\n", - " f\"{update.action.value} {update.data_object.data_object_type} {update.data_object.name}\"\n", - " )\n", - "\n", - "\n", - "if updates:\n", - " print_update_summary(updates)\n", - " workspace.shares.update(name=share_name, updates=updates)\n", - "else:\n", - " print(\"The share is already up-to-date.\")" - ] - } - ], - "metadata": { - "language_info": { - "name": "python" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/multi_region_serving/src/manage_share.py b/multi_region_serving/src/manage_share.py new file mode 100644 index 0000000..af0e36c --- /dev/null +++ b/multi_region_serving/src/manage_share.py @@ -0,0 +1,194 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Sync the share and add all required resources +# MAGIC +# MAGIC Add a model and all it's dependencies to the given Share. +# MAGIC +# MAGIC Prerequisit: +# MAGIC +# MAGIC * Create a share in [delta-sharing](https://docs.databricks.com/en/delta-sharing/create-share.html). +# MAGIC * Config the default parameters in resources/manage_share.job.yml + +# COMMAND ---------- + +# MAGIC %pip install databricks-sdk>=0.38.0 +# MAGIC %restart_python + +# COMMAND ---------- + +dbutils.widgets.text("model_name", defaultValue="") +dbutils.widgets.text("share_name", defaultValue="") +dbutils.widgets.text("max_number_of_versions_to_sync", defaultValue="10") + +# COMMAND ---------- + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import ServedEntityInput +from databricks.sdk.service.sharing import ( + SharedDataObjectUpdate, + SharedDataObjectUpdateAction, + SharedDataObjectDataObjectType, + SharedDataObject, + SharedDataObjectHistoryDataSharingStatus, +) + +workspace = WorkspaceClient() + +# COMMAND ---------- + +model_name = dbutils.widgets.get("model_name") +share_name = dbutils.widgets.get("share_name") +max_number_of_versions_to_sync = int( + dbutils.widgets.get("max_number_of_versions_to_sync") +) + +print("~~~ parameters ~~~") +print(f"Model name: {model_name}") +print(f"Share name: {share_name}") +print(f"Max number of versions to sync: {max_number_of_versions_to_sync}") + +# COMMAND ---------- + +def getLatestVersions(model_name: str, max_number_of_versions: int): + versions = workspace.model_versions.list( + full_name=model_name, + ) + result = [] + for version in versions: + result.append( + workspace.model_versions.get(full_name=model_name, version=version.version) + ) + return result + + +def getDependencies(model_versions): + tables = set() + functions = set() + for version in model_versions: + for dependency in version.model_version_dependencies.dependencies: + if dependency.table is not None: + tables.add(dependency.table.table_full_name) + elif dependency.function is not None: + functions.add(dependency.function.function_full_name) + return tables, functions + +# COMMAND ---------- + +versions = getLatestVersions(model_name, max_number_of_versions_to_sync) +tableDependencies, functionDependencies = getDependencies(versions) + +# COMMAND ---------- + +from lib.rest_client import RestClient + +notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() +rc = RestClient(notebook_context) + +# COMMAND ---------- + +shareInfo = rc.get_share_info(share_name) + +# COMMAND ---------- + +sharedTables = sharedFunctions = sharedSchemas = sharedModels = set() +model_is_shared = False + +if "objects" in shareInfo: + sharedTables = set( + [ + obj["name"] + for obj in filter( + lambda obj: obj["data_object_type"] == "TABLE", shareInfo["objects"] + ) + ] + ) + sharedFunctions = set( + [ + obj["name"] + for obj in filter( + lambda obj: obj["data_object_type"] == "FUNCTION", shareInfo["objects"] + ) + ] + ) + sharedSchemas = set( + [ + obj["name"] + for obj in filter( + lambda obj: obj["data_object_type"] == "SCHEMA", shareInfo["objects"] + ) + ] + ) + sharedModels = set( + [ + obj["name"] + for obj in filter( + lambda obj: obj["data_object_type"] == "MODEL", shareInfo["objects"] + ) + ] + ) + model_is_shared = model_name in sharedModels + +# COMMAND ---------- + +def getSchema(full_name): + name_sections = full_name.split(".") + return f"{name_sections[0]}.{name_sections[1]}" + + +def getObjectsToAdd(dependencies, sharedObjects, sharedSchemas): + newDependencies = dependencies - sharedObjects + return list(filter(lambda x: getSchema(x) not in sharedSchemas, newDependencies)) + +# COMMAND ---------- + +tablesToAdd = getObjectsToAdd(tableDependencies, sharedTables, sharedSchemas) +functionsToAdd = getObjectsToAdd(functionDependencies, sharedFunctions, sharedSchemas) + +updates = [] + +for table in tablesToAdd: + updates.append( + SharedDataObjectUpdate( + action=SharedDataObjectUpdateAction.ADD, + data_object=SharedDataObject( + name=table, + data_object_type=SharedDataObjectDataObjectType.TABLE, + history_data_sharing_status=SharedDataObjectHistoryDataSharingStatus.ENABLED, + ), + ) + ) + + +for function in functionsToAdd: + updates.append( + SharedDataObjectUpdate( + action=SharedDataObjectUpdateAction.ADD, + data_object=SharedDataObject( + name=function, data_object_type=SharedDataObjectDataObjectType.FUNCTION + ), + ) + ) + +if not model_is_shared: + updates.append( + SharedDataObjectUpdate( + action=SharedDataObjectUpdateAction.ADD, + data_object=SharedDataObject( + name=model_name, data_object_type=SharedDataObjectDataObjectType.MODEL + ), + ) + ) + + +def print_update_summary(updates): + for update in updates: + print( + f"{update.action.value} {update.data_object.data_object_type} {update.data_object.name}" + ) + + +if updates: + print_update_summary(updates) + workspace.shares.update(name=share_name, updates=updates) +else: + print("The share is already up-to-date.") \ No newline at end of file From a5aa9ee63bdfd1f79ad0c9b0eb4949c4b7c8ed91 Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Mon, 3 Feb 2025 13:20:59 -0800 Subject: [PATCH 8/9] blank line --- multi_region_serving/src/manage_endpoint.py | 3 ++- multi_region_serving/src/manage_share.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/multi_region_serving/src/manage_endpoint.py b/multi_region_serving/src/manage_endpoint.py index a8b778b..8bdf4d0 100644 --- a/multi_region_serving/src/manage_endpoint.py +++ b/multi_region_serving/src/manage_endpoint.py @@ -76,4 +76,5 @@ ) print(f"Updated endpoint {endpoint_name}") else: - print("Endpoint already up-to-date") \ No newline at end of file + print("Endpoint already up-to-date") + diff --git a/multi_region_serving/src/manage_share.py b/multi_region_serving/src/manage_share.py index af0e36c..4a3adf6 100644 --- a/multi_region_serving/src/manage_share.py +++ b/multi_region_serving/src/manage_share.py @@ -191,4 +191,5 @@ def print_update_summary(updates): print_update_summary(updates) workspace.shares.update(name=share_name, updates=updates) else: - print("The share is already up-to-date.") \ No newline at end of file + print("The share is already up-to-date.") + From 0aba7201068d3b74756e70ad5cc1964cbca3898e Mon Sep 17 00:00:00 2001 From: Mingyang Ge Date: Mon, 3 Feb 2025 13:23:24 -0800 Subject: [PATCH 9/9] lint --- multi_region_serving/src/manage_endpoint.py | 3 +-- multi_region_serving/src/manage_share.py | 7 +++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/multi_region_serving/src/manage_endpoint.py b/multi_region_serving/src/manage_endpoint.py index 8bdf4d0..98ab224 100644 --- a/multi_region_serving/src/manage_endpoint.py +++ b/multi_region_serving/src/manage_endpoint.py @@ -5,7 +5,7 @@ # MAGIC Create or Update the deployed serving endpoints with a new model version. # MAGIC # MAGIC * Make sure you've created online tables for all the required feature tables. -# MAGIC * Run this job on the workspace where you want to serve the model. +# MAGIC * Run this job on the workspace where you want to serve the model. # COMMAND ---------- @@ -77,4 +77,3 @@ print(f"Updated endpoint {endpoint_name}") else: print("Endpoint already up-to-date") - diff --git a/multi_region_serving/src/manage_share.py b/multi_region_serving/src/manage_share.py index 4a3adf6..c7fd5cf 100644 --- a/multi_region_serving/src/manage_share.py +++ b/multi_region_serving/src/manage_share.py @@ -7,7 +7,7 @@ # MAGIC Prerequisit: # MAGIC # MAGIC * Create a share in [delta-sharing](https://docs.databricks.com/en/delta-sharing/create-share.html). -# MAGIC * Config the default parameters in resources/manage_share.job.yml +# MAGIC * Config the default parameters in resources/manage_share.job.yml # COMMAND ---------- @@ -49,6 +49,7 @@ # COMMAND ---------- + def getLatestVersions(model_name: str, max_number_of_versions: int): versions = workspace.model_versions.list( full_name=model_name, @@ -72,6 +73,7 @@ def getDependencies(model_versions): functions.add(dependency.function.function_full_name) return tables, functions + # COMMAND ---------- versions = getLatestVersions(model_name, max_number_of_versions_to_sync) @@ -130,6 +132,7 @@ def getDependencies(model_versions): # COMMAND ---------- + def getSchema(full_name): name_sections = full_name.split(".") return f"{name_sections[0]}.{name_sections[1]}" @@ -139,6 +142,7 @@ def getObjectsToAdd(dependencies, sharedObjects, sharedSchemas): newDependencies = dependencies - sharedObjects return list(filter(lambda x: getSchema(x) not in sharedSchemas, newDependencies)) + # COMMAND ---------- tablesToAdd = getObjectsToAdd(tableDependencies, sharedTables, sharedSchemas) @@ -192,4 +196,3 @@ def print_update_summary(updates): workspace.shares.update(name=share_name, updates=updates) else: print("The share is already up-to-date.") -