diff --git a/2025-11-Multi-Agent-GenAI-System/00-init-requirements.ipynb b/2025-11-Multi-Agent-GenAI-System/00-init-requirements.ipynb new file mode 100644 index 0000000..8bda06f --- /dev/null +++ b/2025-11-Multi-Agent-GenAI-System/00-init-requirements.ipynb @@ -0,0 +1,83 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e4045340-601b-4d4a-9797-5b61a9159ced", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "output_type": "stream", + "text": [ + "Catalog created smriti_sridhar\nSchema created smriti_sridhar.workday_demos\nVolume created /Volumes/smriti_sridhar/workday_demos/workday_unstructure_data\n" + ] + } + ], + "source": [ + "import pandas as pd\n", + "import numpy as np\n", + "from datetime import datetime, timedelta\n", + "import random\n", + "from databricks.sdk import WorkspaceClient \n", + "import time\n", + "\n", + "w = WorkspaceClient()\n", + "current_user = w.current_user.me()\n", + "user_name = current_user.user_name.replace('@databricks.com', '').replace('.', '_')\n", + "token = w.tokens.create(comment=f\"sdk-{time.time_ns()}\").token_value\n", + "\n", + "## Catalog \n", + "catalog_name = user_name\n", + "spark.sql(f\"CREATE CATALOG IF NOT EXISTS {catalog_name}\")\n", + "\n", + "## Schema\n", + "schema_name = 'workday_demos'\n", + "spark.sql(f\"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}\")\n", + "\n", + "## Volume \n", + "volume_path = f\"/Volumes/{catalog_name}/{schema_name}/workday_unstructure_data\"\n", + "spark.sql(f\"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.workday_unstructure_data\")\n", + "\n", + "\n", + "print(f\"Catalog created {catalog_name}\")\n", + "print(f\"Schema created {catalog_name}.workday_demos\")\n", + "print(f\"Volume created /Volumes/{catalog_name}/{schema_name}/workday_unstructure_data\")" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "4" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "00-init-requirements", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/2025-11-Multi-Agent-GenAI-System/01-create-synthetic-sales-data.ipynb b/2025-11-Multi-Agent-GenAI-System/01-create-synthetic-sales-data.ipynb new file mode 100644 index 0000000..6b2e669 --- /dev/null +++ b/2025-11-Multi-Agent-GenAI-System/01-create-synthetic-sales-data.ipynb @@ -0,0 +1,1840 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "bcd94c29-ebd6-47d2-af52-0e9850518fc4", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "!pip install reportlab" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d7e3c318-c1c5-4cae-9c78-3a1d25b51c20", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%run ./00-init-requirements" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "97960680-82d4-4ea1-98fc-c065f5790c4d", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Sales CRM Data Generator - Code Summary\n", + "This script generates synthetic sales and CRM data. Key features:\n", + "- Creates comprehensive sales datasets with lead, opportunity, and customer data\n", + "- Generates realistic US-based sales profiles including:\n", + " 1. Lead generation and qualification data\n", + " 2. Sales opportunities with stages and probabilities\n", + " 3. Customer accounts with demographics and firmographics\n", + " 4. Sales performance metrics and KPIs\n", + " 5. Territory and quota management\n", + " 6. Revenue and pipeline tracking\n", + " 7. Sales rep performance data\n", + "\n", + "The data includes US-specific elements like states, zip codes, industries, and company sizes. Key KPIs tracked include conversion rates, average deal size, sales cycle length, quota attainment, and revenue metrics." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "7a1a6574-a37f-45a2-b48c-67a600165743", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Sales Representatives" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0ee92dfa-092b-4ead-a05b-b3c3c441bcb8", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "def generate_sales_reps(num_reps=50, seed=42):\n", + " \"\"\"\n", + " Generate synthetic sales representative data for Workday\n", + " \n", + " Parameters:\n", + " -----------\n", + " num_reps : int\n", + " Number of sales rep records to generate\n", + " seed : int\n", + " Random seed for reproducibility\n", + " \n", + " Returns:\n", + " --------\n", + " pandas.DataFrame\n", + " DataFrame containing synthetic sales rep data\n", + " \"\"\"\n", + " # Set random seed for reproducibility\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + " \n", + " # Generate Rep IDs\n", + " rep_ids = [f'REP{str(i).zfill(4)}' for i in range(1, num_reps + 1)]\n", + " \n", + " # First names by gender\n", + " male_names = ['James', 'Michael', 'Robert', 'David', 'William', 'Richard', 'Thomas', 'Mark', 'Daniel', 'Paul',\n", + " 'Steven', 'Andrew', 'Joshua', 'Kenneth', 'Kevin', 'Brian', 'George', 'Timothy', 'Ronald', 'Jason']\n", + " female_names = ['Mary', 'Patricia', 'Jennifer', 'Linda', 'Elizabeth', 'Barbara', 'Susan', 'Jessica', 'Sarah', 'Karen',\n", + " 'Lisa', 'Nancy', 'Betty', 'Helen', 'Sandra', 'Donna', 'Carol', 'Ruth', 'Sharon', 'Michelle']\n", + " \n", + " last_names = ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Garcia', 'Miller', 'Davis', 'Rodriguez', 'Martinez',\n", + " 'Hernandez', 'Lopez', 'Gonzales', 'Wilson', 'Anderson', 'Thomas', 'Taylor', 'Moore', 'Jackson', 'Martin',\n", + " 'Lee', 'Perez', 'Thompson', 'White', 'Harris', 'Sanchez', 'Clark', 'Ramirez', 'Lewis', 'Robinson']\n", + " \n", + " # Generate names and demographics\n", + " genders = np.random.choice(['Male', 'Female'], num_reps, p=[0.52, 0.48])\n", + " first_names = []\n", + " for gender in genders:\n", + " if gender == 'Male':\n", + " first_names.append(random.choice(male_names))\n", + " else:\n", + " first_names.append(random.choice(female_names))\n", + " \n", + " last_names_list = [random.choice(last_names) for _ in range(num_reps)]\n", + " \n", + " # Generate ages and experience\n", + " ages = np.random.normal(35, 8, num_reps).astype(int)\n", + " ages = np.clip(ages, 23, 65)\n", + " \n", + " # Years of experience (correlated with age)\n", + " experience_years = [max(1, age - random.randint(22, 26)) for age in ages]\n", + " \n", + " # Sales territories (US regions and states)\n", + " territories = [\n", + " 'Northeast - NY/NJ/CT', 'Northeast - MA/RI/VT/NH/ME', 'Southeast - FL/GA/SC/NC',\n", + " 'Southeast - AL/MS/TN/KY', 'Midwest - IL/IN/OH/MI', 'Midwest - WI/MN/IA/ND/SD',\n", + " 'Southwest - TX/OK/AR/LA', 'West - CA/NV/AZ', 'West - WA/OR/ID/MT',\n", + " 'Mountain - CO/UT/WY/NM', 'Mid-Atlantic - PA/MD/DE/DC/WV/VA'\n", + " ]\n", + " \n", + " assigned_territories = [random.choice(territories) for _ in range(num_reps)]\n", + " \n", + " # Sales roles and levels\n", + " roles = ['Sales Development Rep', 'Account Executive', 'Senior Account Executive', \n", + " 'Enterprise Account Executive', 'Sales Manager', 'Senior Sales Manager']\n", + " \n", + " role_weights = [0.15, 0.35, 0.25, 0.15, 0.08, 0.02]\n", + " assigned_roles = np.random.choice(roles, num_reps, p=role_weights)\n", + " \n", + " # Annual quotas based on role\n", + " quota_ranges = {\n", + " 'Sales Development Rep': (150000, 300000),\n", + " 'Account Executive': (500000, 1000000),\n", + " 'Senior Account Executive': (800000, 1500000),\n", + " 'Enterprise Account Executive': (1200000, 2500000),\n", + " 'Sales Manager': (2000000, 4000000),\n", + " 'Senior Sales Manager': (3000000, 6000000)\n", + " }\n", + " \n", + " annual_quotas = []\n", + " for role in assigned_roles:\n", + " min_quota, max_quota = quota_ranges[role]\n", + " quota = random.randint(min_quota, max_quota)\n", + " annual_quotas.append(quota)\n", + " \n", + " # Hire dates (within last 10 years, weighted towards more recent)\n", + " current_date = datetime.now()\n", + " hire_dates = []\n", + " for _ in range(num_reps):\n", + " days_ago = int(np.random.exponential(500)) # Exponential distribution favoring recent hires\n", + " days_ago = min(days_ago, 3650) # Cap at 10 years\n", + " hire_date = current_date - timedelta(days=days_ago)\n", + " hire_dates.append(hire_date.strftime('%Y-%m-%d'))\n", + " \n", + " # Manager hierarchy\n", + " managers = ['Sarah Johnson', 'Mike Chen', 'Lisa Rodriguez', 'David Kim', 'Jennifer Walsh',\n", + " 'Robert Martinez', 'Amanda Thompson', 'Kevin O\\'Connor', 'Maria Gonzalez', 'Tom Wilson']\n", + " \n", + " assigned_managers = [random.choice(managers) for _ in range(num_reps)]\n", + " \n", + " # Performance metrics\n", + " # Quota attainment (normal distribution around 85%)\n", + " quota_attainment = np.random.normal(85, 20, num_reps)\n", + " quota_attainment = np.clip(quota_attainment, 0, 200)\n", + " \n", + " # Create the DataFrame\n", + " data = {\n", + " 'RepID': rep_ids,\n", + " 'FirstName': first_names,\n", + " 'LastName': last_names_list,\n", + " 'Gender': genders,\n", + " 'Age': ages,\n", + " 'ExperienceYears': experience_years,\n", + " 'Role': assigned_roles,\n", + " 'Territory': assigned_territories,\n", + " 'Manager': assigned_managers,\n", + " 'HireDate': hire_dates,\n", + " 'AnnualQuota': annual_quotas,\n", + " 'QuotaAttainmentPercent': np.round(quota_attainment, 1),\n", + " 'IsActive': [1] * num_reps # All reps are currently active\n", + " }\n", + " \n", + " return pd.DataFrame(data)\n", + "\n", + "# Generate sales rep data\n", + "sales_reps = spark.createDataFrame(generate_sales_reps(num_reps=75))\n", + "sales_reps.write.format('delta').mode('overwrite').saveAsTable(f'{catalog_name}.{schema_name}.sales_reps')\n", + "print(f\"Created f'{catalog_name}.{schema_name}.sales_reps table\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d0e85f33-15d8-492a-b756-8776b9435ff8", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Customer Accounts" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "57c180b0-2dd7-4f44-a6be-6ff6ef506a49", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "def generate_customer_accounts(num_accounts=500, seed=42):\n", + " \"\"\"\n", + " Generate synthetic customer account data for Workday sales\n", + " \n", + " Parameters:\n", + " -----------\n", + " num_accounts : int\n", + " Number of customer account records to generate\n", + " seed : int\n", + " Random seed for reproducibility\n", + " \n", + " Returns:\n", + " --------\n", + " pandas.DataFrame\n", + " DataFrame containing synthetic customer account data\n", + " \"\"\"\n", + " # Set random seed for reproducibility\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + " \n", + " # Generate Account IDs\n", + " account_ids = [f'ACC{str(i).zfill(5)}' for i in range(1, num_accounts + 1)]\n", + " \n", + " # Company name components\n", + " company_prefixes = ['Global', 'Advanced', 'Premier', 'Elite', 'Strategic', 'Dynamic', 'Innovative', \n", + " 'Integrated', 'Digital', 'Smart', 'NextGen', 'Future', 'Pro', 'Alpha', 'Beta']\n", + " \n", + " company_roots = ['Tech', 'Systems', 'Solutions', 'Dynamics', 'Corp', 'Industries', 'Enterprises',\n", + " 'Services', 'Group', 'Holdings', 'Partners', 'Consulting', 'Analytics', 'Data']\n", + " \n", + " company_suffixes = ['Inc.', 'LLC', 'Corp.', 'Ltd.', 'Co.', 'Group', 'Solutions', 'Systems']\n", + " \n", + " # Generate company names\n", + " company_names = []\n", + " for _ in range(num_accounts):\n", + " if random.random() < 0.3: # 30% chance of having prefix\n", + " name = f\"{random.choice(company_prefixes)} {random.choice(company_roots)} {random.choice(company_suffixes)}\"\n", + " else:\n", + " name = f\"{random.choice(company_roots)} {random.choice(company_suffixes)}\"\n", + " company_names.append(name)\n", + " \n", + " # Industries\n", + " industries = [\n", + " 'Technology', 'Financial Services', 'Healthcare', 'Manufacturing', 'Retail', 'Education',\n", + " 'Government', 'Non-profit', 'Energy', 'Transportation', 'Real Estate', 'Media & Entertainment',\n", + " 'Telecommunications', 'Professional Services', 'Construction', 'Hospitality'\n", + " ]\n", + " \n", + " industry_weights = [0.18, 0.12, 0.10, 0.08, 0.08, 0.06, 0.06, 0.04, 0.05, 0.04, 0.04, 0.03, 0.03, 0.05, 0.03, 0.01]\n", + " assigned_industries = np.random.choice(industries, num_accounts, p=industry_weights)\n", + " \n", + " # Company sizes\n", + " company_sizes = ['Small (1-100)', 'Medium (101-1000)', 'Large (1001-5000)', 'Enterprise (5000+)']\n", + " size_weights = [0.3, 0.35, 0.25, 0.1]\n", + " assigned_sizes = np.random.choice(company_sizes, num_accounts, p=size_weights)\n", + " \n", + " # Employee counts based on company size\n", + " employee_counts = []\n", + " for size in assigned_sizes:\n", + " if size == 'Small (1-100)':\n", + " employee_counts.append(random.randint(1, 100))\n", + " elif size == 'Medium (101-1000)':\n", + " employee_counts.append(random.randint(101, 1000))\n", + " elif size == 'Large (1001-5000)':\n", + " employee_counts.append(random.randint(1001, 5000))\n", + " else: # Enterprise\n", + " employee_counts.append(random.randint(5000, 50000))\n", + " \n", + " # US states and cities\n", + " us_locations = [\n", + " ('New York', 'NY'), ('Los Angeles', 'CA'), ('Chicago', 'IL'), ('Houston', 'TX'),\n", + " ('Phoenix', 'AZ'), ('Philadelphia', 'PA'), ('San Antonio', 'TX'), ('San Diego', 'CA'),\n", + " ('Dallas', 'TX'), ('San Jose', 'CA'), ('Austin', 'TX'), ('Jacksonville', 'FL'),\n", + " ('San Francisco', 'CA'), ('Columbus', 'OH'), ('Charlotte', 'NC'), ('Fort Worth', 'TX'),\n", + " ('Indianapolis', 'IN'), ('Seattle', 'WA'), ('Denver', 'CO'), ('Boston', 'MA'),\n", + " ('El Paso', 'TX'), ('Nashville', 'TN'), ('Detroit', 'MI'), ('Oklahoma City', 'OK'),\n", + " ('Portland', 'OR'), ('Las Vegas', 'NV'), ('Memphis', 'TN'), ('Louisville', 'KY'),\n", + " ('Baltimore', 'MD'), ('Milwaukee', 'WI'), ('Albuquerque', 'NM'), ('Tucson', 'AZ'),\n", + " ('Fresno', 'CA'), ('Sacramento', 'CA'), ('Kansas City', 'MO'), ('Mesa', 'AZ'),\n", + " ('Atlanta', 'GA'), ('Colorado Springs', 'CO'), ('Raleigh', 'NC'), ('Omaha', 'NE')\n", + " ]\n", + " \n", + " selected_locations = [random.choice(us_locations) for _ in range(num_accounts)]\n", + " cities = [loc[0] for loc in selected_locations]\n", + " states = [loc[1] for loc in selected_locations]\n", + " \n", + " # Generate ZIP codes (simplified)\n", + " zip_codes = [f'{random.randint(10000, 99999)}' for _ in range(num_accounts)]\n", + " \n", + " # Annual revenue based on company size\n", + " annual_revenues = []\n", + " for size in assigned_sizes:\n", + " if size == 'Small (1-100)':\n", + " revenue = random.randint(100000, 5000000)\n", + " elif size == 'Medium (101-1000)':\n", + " revenue = random.randint(5000000, 50000000)\n", + " elif size == 'Large (1001-5000)':\n", + " revenue = random.randint(50000000, 500000000)\n", + " else: # Enterprise\n", + " revenue = random.randint(500000000, 10000000000)\n", + " annual_revenues.append(revenue)\n", + " \n", + " # Account status\n", + " account_statuses = ['Prospect', 'Customer', 'Former Customer', 'Partner']\n", + " status_weights = [0.4, 0.45, 0.1, 0.05]\n", + " assigned_statuses = np.random.choice(account_statuses, num_accounts, p=status_weights)\n", + " \n", + " # Create dates\n", + " current_date = datetime.now()\n", + " \n", + " # Account creation dates\n", + " create_dates = []\n", + " for _ in range(num_accounts):\n", + " days_ago = random.randint(30, 1825) # Between 1 month and 5 years ago\n", + " create_date = current_date - timedelta(days=days_ago)\n", + " create_dates.append(create_date.strftime('%Y-%m-%d'))\n", + " \n", + " # Last activity dates (more recent)\n", + " last_activity_dates = []\n", + " for _ in range(num_accounts):\n", + " days_ago = int(np.random.exponential(30)) # Exponential distribution favoring recent activity\n", + " days_ago = min(days_ago, 365) # Cap at 1 year\n", + " activity_date = current_date - timedelta(days=days_ago)\n", + " last_activity_dates.append(activity_date.strftime('%Y-%m-%d'))\n", + " \n", + " # Primary contact information\n", + " contact_titles = [\n", + " 'CEO', 'CTO', 'CFO', 'CHRO', 'VP of Engineering', 'VP of Sales', 'VP of Marketing',\n", + " 'Director of IT', 'Director of Operations', 'Head of HR', 'Chief Data Officer',\n", + " 'VP of Product', 'General Manager', 'President', 'COO'\n", + " ]\n", + " \n", + " assigned_contact_titles = [random.choice(contact_titles) for _ in range(num_accounts)]\n", + " \n", + " # Create the DataFrame\n", + " data = {\n", + " 'AccountID': account_ids,\n", + " 'CompanyName': company_names,\n", + " 'Industry': assigned_industries,\n", + " 'CompanySize': assigned_sizes,\n", + " 'EmployeeCount': employee_counts,\n", + " 'AnnualRevenue': annual_revenues,\n", + " 'City': cities,\n", + " 'State': states,\n", + " 'ZipCode': zip_codes,\n", + " 'AccountStatus': assigned_statuses,\n", + " 'CreatedDate': create_dates,\n", + " 'LastActivityDate': last_activity_dates,\n", + " 'PrimaryContactTitle': assigned_contact_titles\n", + " }\n", + " \n", + " return pd.DataFrame(data)\n", + "\n", + "# Generate customer account data\n", + "customer_accounts = spark.createDataFrame(generate_customer_accounts(num_accounts=750))\n", + "display(customer_accounts)\n", + "customer_accounts.write.format('delta').mode('overwrite').saveAsTable(f'{catalog_name}.{schema_name}.customer_accounts')\n", + "print(f\"Created f'{catalog_name}.{schema_name}.customer_accounts table\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "c8e77e5c-f613-4dd4-8ac7-e142f1039cc5", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Sales Opportunities" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "183f2ac8-697e-4dea-9ee9-e2110793e04c", + "showTitle": false, + "tableResultSettingsMap": { + "0": { + "dataGridStateBlob": "{\"version\":1,\"tableState\":{\"columnPinning\":{\"left\":[\"#row_number#\"],\"right\":[]},\"columnSizing\":{\"ProductCategory\":107},\"columnVisibility\":{}},\"settings\":{\"columns\":{}},\"syncTimestamp\":1761232763755}", + "filterBlob": null, + "queryPlanFiltersBlob": null, + "tableResultIndex": 0 + } + }, + "title": "" + } + }, + "outputs": [], + "source": [ + "def generate_sales_opportunities(sales_reps_df, accounts_df, num_opportunities=1500, seed=42):\n", + " \"\"\"\n", + " Generate synthetic sales opportunities data\n", + " \n", + " Parameters:\n", + " -----------\n", + " sales_reps_df : pandas.DataFrame\n", + " DataFrame containing sales rep data\n", + " accounts_df : pandas.DataFrame\n", + " DataFrame containing account data\n", + " num_opportunities : int\n", + " Number of opportunities to generate\n", + " seed : int\n", + " Random seed for reproducibility\n", + " \n", + " Returns:\n", + " --------\n", + " pandas.DataFrame\n", + " DataFrame containing synthetic sales opportunities data\n", + " \"\"\"\n", + " # Set random seed for reproducibility\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + " \n", + " # Convert to pandas for easier manipulation\n", + " reps_pd = sales_reps_df.toPandas()\n", + " accounts_pd = accounts_df.toPandas()\n", + " \n", + " # Generate Opportunity IDs\n", + " opp_ids = [f'OPP{str(i).zfill(6)}' for i in range(1, num_opportunities + 1)]\n", + " \n", + " # Workday product categories\n", + " product_categories = [\n", + " 'Human Capital Management (HCM)',\n", + " 'Financial Management',\n", + " 'Planning',\n", + " 'Analytics',\n", + " 'Student',\n", + " 'Adaptive Planning',\n", + " 'Peakon Employee Voice'\n", + " ]\n", + " \n", + " # Opportunity names based on products\n", + " opp_name_templates = [\n", + " '{company} - HCM Implementation',\n", + " '{company} - Financial Management Upgrade',\n", + " '{company} - Planning Solution',\n", + " '{company} - Analytics Platform',\n", + " '{company} - Student Information System',\n", + " '{company} - Adaptive Planning Deployment',\n", + " '{company} - Employee Voice Initiative',\n", + " '{company} - Complete Workday Suite',\n", + " '{company} - HCM + Financials Bundle'\n", + " ]\n", + " \n", + " # Sales stages\n", + " sales_stages = [\n", + " 'Prospecting',\n", + " 'Discovery',\n", + " 'Proposal',\n", + " 'Negotiation',\n", + " 'Closed Won',\n", + " 'Closed Lost'\n", + " ]\n", + " \n", + " # Stage probabilities\n", + " stage_probabilities = {\n", + " 'Prospecting': 10,\n", + " 'Discovery': 25,\n", + " 'Proposal': 50,\n", + " 'Negotiation': 75,\n", + " 'Closed Won': 100,\n", + " 'Closed Lost': 0\n", + " }\n", + " \n", + " # Generate opportunities\n", + " opportunities = []\n", + " \n", + " for i in range(num_opportunities):\n", + " # Select random account and rep\n", + " account = accounts_pd.sample(n=1).iloc[0]\n", + " rep = reps_pd.sample(n=1).iloc[0]\n", + " \n", + " # Generate opportunity name\n", + " opp_name_template = random.choice(opp_name_templates)\n", + " opp_name = opp_name_template.format(company=account['CompanyName'])\n", + " \n", + " # Select product category\n", + " product_category = random.choice(product_categories)\n", + " \n", + " # Select sales stage (weighted towards earlier stages for active opps)\n", + " stage_weights = [0.25, 0.20, 0.15, 0.10, 0.20, 0.10]\n", + " sales_stage = np.random.choice(sales_stages, p=stage_weights)\n", + " \n", + " # Generate opportunity value based on account size\n", + " company_size = account['CompanySize']\n", + " if company_size == 'Small (1-100)':\n", + " base_value = random.randint(50000, 300000)\n", + " elif company_size == 'Medium (101-1000)':\n", + " base_value = random.randint(200000, 800000)\n", + " elif company_size == 'Large (1001-5000)':\n", + " base_value = random.randint(500000, 2000000)\n", + " else: # Enterprise\n", + " base_value = random.randint(1000000, 5000000)\n", + " \n", + " # Adjust value based on product category\n", + " if product_category == 'Complete Workday Suite' or 'Bundle' in opp_name:\n", + " base_value = int(base_value * random.uniform(1.5, 2.5))\n", + " elif product_category in ['Human Capital Management (HCM)', 'Financial Management']:\n", + " base_value = int(base_value * random.uniform(1.2, 1.8))\n", + " \n", + " opportunity_value = base_value\n", + " \n", + " # Set probability based on stage\n", + " probability = stage_probabilities[sales_stage]\n", + " if sales_stage not in ['Closed Won', 'Closed Lost']:\n", + " # Add some randomness to probability\n", + " probability += random.randint(-10, 10)\n", + " probability = max(0, min(100, probability))\n", + " \n", + " # Generate dates\n", + " current_date = datetime.now()\n", + " \n", + " # Create date (opportunity creation)\n", + " days_ago_created = random.randint(30, 365)\n", + " create_date = current_date - timedelta(days=days_ago_created)\n", + " \n", + " # Expected close date\n", + " if sales_stage in ['Closed Won', 'Closed Lost']:\n", + " # Closed deals have close date in the past\n", + " close_date = create_date + timedelta(days=random.randint(30, 180))\n", + " else:\n", + " # Open deals have future close dates\n", + " days_to_close = random.randint(30, 270)\n", + " close_date = current_date + timedelta(days=days_to_close)\n", + " \n", + " # Last activity date\n", + " if sales_stage in ['Closed Won', 'Closed Lost']:\n", + " last_activity = close_date\n", + " else:\n", + " days_since_activity = int(np.random.exponential(7)) # Most activity is recent\n", + " days_since_activity = min(days_since_activity, 60)\n", + " last_activity = current_date - timedelta(days=days_since_activity)\n", + " \n", + " # Deal source\n", + " deal_sources = ['Inbound Marketing', 'Outbound Prospecting', 'Partner Referral', 'Event/Conference', \n", + " 'Customer Referral', 'Cold Call', 'LinkedIn', 'Website']\n", + " source_weights = [0.25, 0.20, 0.15, 0.10, 0.10, 0.08, 0.07, 0.05]\n", + " deal_source = np.random.choice(deal_sources, p=source_weights)\n", + " \n", + " # Competition (for competitive deals)\n", + " competitors = ['SAP SuccessFactors', 'Oracle HCM', 'ADP', 'BambooHR', 'Cornerstone OnDemand', \n", + " 'Ultimate Software', 'Paycom', 'None']\n", + " competitor_weights = [0.20, 0.15, 0.12, 0.08, 0.08, 0.07, 0.05, 0.25]\n", + " primary_competitor = np.random.choice(competitors, p=competitor_weights)\n", + " \n", + " opportunity = {\n", + " 'OpportunityID': opp_ids[i],\n", + " 'OpportunityName': opp_name,\n", + " 'AccountID': account['AccountID'],\n", + " 'RepID': rep['RepID'],\n", + " 'ProductCategory': product_category,\n", + " 'OpportunityValue': opportunity_value,\n", + " 'SalesStage': sales_stage,\n", + " 'Probability': probability,\n", + " 'ExpectedCloseDate': close_date.strftime('%Y-%m-%d'),\n", + " 'CreateDate': create_date.strftime('%Y-%m-%d'),\n", + " 'LastActivityDate': last_activity.strftime('%Y-%m-%d'),\n", + " 'DealSource': deal_source,\n", + " 'PrimaryCompetitor': primary_competitor,\n", + " 'IsActive': 1 if sales_stage not in ['Closed Won', 'Closed Lost'] else 0\n", + " }\n", + " \n", + " opportunities.append(opportunity)\n", + " \n", + " return pd.DataFrame(opportunities)\n", + "\n", + "# Generate sales opportunities\n", + "sales_opportunities = generate_sales_opportunities(sales_reps, customer_accounts, num_opportunities=2000)\n", + "sales_opportunities_spark = spark.createDataFrame(sales_opportunities)\n", + "display(sales_opportunities_spark)\n", + "sales_opportunities_spark.write.format('delta').mode('overwrite').saveAsTable(f'{catalog_name}.{schema_name}.sales_opportunities')\n", + "print(f\"Created f'{catalog_name}.{schema_name}.sales_opportunities table\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "bf8ba768-01c4-419d-bb24-a18ebb33caf8", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Sales Activities & Interactions" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ea9934a6-9d64-4116-99fc-a5265ba1cbc2", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "def generate_sales_activities(opportunities_df, num_activities=5000, seed=42):\n", + " \"\"\"\n", + " Generate synthetic sales activities data\n", + " \n", + " Parameters:\n", + " -----------\n", + " opportunities_df : pandas.DataFrame\n", + " DataFrame containing opportunities data\n", + " num_activities : int\n", + " Number of activities to generate\n", + " seed : int\n", + " Random seed for reproducibility\n", + " \n", + " Returns:\n", + " --------\n", + " pandas.DataFrame\n", + " DataFrame containing synthetic sales activities data\n", + " \"\"\"\n", + " # Set random seed for reproducibility\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + " \n", + " # Convert to pandas\n", + " opps_pd = opportunities_df.toPandas()\n", + " \n", + " # Activity types\n", + " activity_types = [\n", + " 'Phone Call', 'Email', 'Meeting', 'Demo', 'Proposal Sent', \n", + " 'Contract Review', 'Discovery Call', 'Follow-up Call', 'Event'\n", + " ]\n", + " \n", + " # Activity outcomes\n", + " activity_outcomes = [\n", + " 'Positive', 'Neutral', 'Negative', 'No Response', 'Scheduled Follow-up'\n", + " ]\n", + " \n", + " # Generate activities\n", + " activities = []\n", + " current_date = datetime.now()\n", + " \n", + " for i in range(num_activities):\n", + " # Select random opportunity\n", + " opp = opps_pd.sample(n=1).iloc[0]\n", + " \n", + " # Activity ID\n", + " activity_id = f'ACT{str(i).zfill(6)}'\n", + " \n", + " # Activity type (weighted based on typical sales activities)\n", + " type_weights = [0.25, 0.20, 0.15, 0.10, 0.08, 0.05, 0.07, 0.08, 0.02]\n", + " activity_type = np.random.choice(activity_types, p=type_weights)\n", + " \n", + " # Activity date (should be between opportunity creation and last activity)\n", + " opp_create = datetime.strptime(opp['CreateDate'], '%Y-%m-%d')\n", + " opp_last_activity = datetime.strptime(opp['LastActivityDate'], '%Y-%m-%d')\n", + " \n", + " # Random date between creation and last activity\n", + " date_range = (opp_last_activity - opp_create).days\n", + " if date_range > 0:\n", + " random_days = random.randint(0, date_range)\n", + " activity_date = opp_create + timedelta(days=random_days)\n", + " else:\n", + " activity_date = opp_create\n", + " \n", + " # Activity outcome\n", + " outcome_weights = [0.4, 0.25, 0.15, 0.1, 0.1]\n", + " outcome = np.random.choice(activity_outcomes, p=outcome_weights)\n", + " \n", + " # Duration (in minutes)\n", + " if activity_type == 'Demo':\n", + " duration = random.randint(45, 120)\n", + " elif activity_type == 'Meeting':\n", + " duration = random.randint(30, 90)\n", + " elif activity_type in ['Phone Call', 'Discovery Call', 'Follow-up Call']:\n", + " duration = random.randint(15, 60)\n", + " elif activity_type == 'Event':\n", + " duration = random.randint(120, 480)\n", + " else: # Email, Proposal, etc.\n", + " duration = 0\n", + " \n", + " # Notes/Description\n", + " activity_descriptions = {\n", + " 'Phone Call': ['Initial contact call', 'Follow-up discussion', 'Budget confirmation call', 'Technical requirements call'],\n", + " 'Email': ['Sent proposal', 'Follow-up email', 'Meeting recap', 'Resource sharing'],\n", + " 'Meeting': ['Discovery meeting', 'Stakeholder meeting', 'Technical review', 'Executive briefing'],\n", + " 'Demo': ['Product demonstration', 'Technical demo', 'Pilot demonstration', 'POC presentation'],\n", + " 'Proposal Sent': ['Initial proposal', 'Revised proposal', 'Final proposal', 'Contract proposal'],\n", + " 'Contract Review': ['Legal review', 'Terms negotiation', 'Contract discussion', 'Final review'],\n", + " 'Discovery Call': ['Needs assessment', 'Requirements gathering', 'Pain point discussion', 'Current state review'],\n", + " 'Follow-up Call': ['Status check', 'Decision timeline', 'Next steps discussion', 'Objection handling'],\n", + " 'Event': ['Trade show meeting', 'Conference presentation', 'User group event', 'Industry event']\n", + " }\n", + " \n", + " description = random.choice(activity_descriptions.get(activity_type, ['General activity']))\n", + " \n", + " activity = {\n", + " 'ActivityID': activity_id,\n", + " 'OpportunityID': opp['OpportunityID'],\n", + " 'ActivityType': activity_type,\n", + " 'ActivityDate': activity_date.strftime('%Y-%m-%d'),\n", + " 'Duration': duration,\n", + " 'Outcome': outcome,\n", + " 'Description': description\n", + " }\n", + " \n", + " activities.append(activity)\n", + " \n", + " return pd.DataFrame(activities)\n", + "\n", + "# Generate sales activities\n", + "sales_activities = generate_sales_activities(sales_opportunities_spark, num_activities=7500)\n", + "sales_activities_spark = spark.createDataFrame(sales_activities)\n", + "display(sales_activities_spark)\n", + "sales_activities_spark.write.format('delta').mode('overwrite').saveAsTable(f'{catalog_name}.{schema_name}.sales_activities')\n", + "print(f\"Created '{catalog_name}.{schema_name}.sales_activities table\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "278bf646-1059-4f6c-b6ad-bb1b00f1628b", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Unstructured Data: Proposal Documents & Contract Content" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "590f150b-f889-4009-b6c1-bcc7c97cd7c9", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "import io\n", + "import random\n", + "import numpy as np\n", + "import pandas as pd\n", + "from datetime import datetime, timedelta\n", + "from reportlab.pdfgen import canvas\n", + "from reportlab.lib.pagesizes import LETTER\n", + "from reportlab.lib.units import inch\n", + "from databricks.sdk import WorkspaceClient\n", + "\n", + "# ---------- FEEDBACK GENERATOR ----------\n", + "def generate_customer_feedback(accounts_df, opportunities_df, num_feedback=10, seed=42):\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + " accounts_pd = accounts_df.toPandas()\n", + " opps_pd = opportunities_df.toPandas()\n", + "\n", + " feedback_templates = {\n", + " \"positive\": [\n", + " \"\"\"Excellent demo! Highlights included real-time reporting, a clean interface, and easy integrations.\"\"\",\n", + " \"\"\"Outstanding experience with the Workday sales team. Clear communication and excellent support.\"\"\",\n", + " ],\n", + " \"neutral\": [\n", + " \"\"\"Mixed impressions. The system appears capable, but implementation may be complex for our team.\"\"\",\n", + " \"\"\"Professional process, though timelines and pricing need further clarification.\"\"\",\n", + " ],\n", + " \"negative\": [\n", + " \"\"\"Disappointed with the demo. Performance concerns and unclear customization options.\"\"\",\n", + " \"\"\"Sales process felt rushed and didn’t align with our needs.\"\"\",\n", + " ],\n", + " }\n", + "\n", + " records, now = [], datetime.now()\n", + "\n", + " for i in range(num_feedback):\n", + " account = accounts_pd.sample(n=1).iloc[0]\n", + " related = opps_pd[opps_pd[\"AccountID\"] == account[\"AccountID\"]]\n", + " opp = related.sample(n=1).iloc[0] if len(related) > 0 else opps_pd.sample(n=1).iloc[0]\n", + " stage = opp[\"SalesStage\"]\n", + "\n", + " if stage == \"Closed Won\":\n", + " sentiment = random.choice([\"positive\", \"positive\", \"neutral\"])\n", + " elif stage == \"Closed Lost\":\n", + " sentiment = random.choice([\"negative\", \"neutral\"])\n", + " else:\n", + " sentiment = random.choice([\"positive\", \"neutral\", \"negative\"])\n", + "\n", + " content = random.choice(feedback_templates[sentiment])\n", + "\n", + " days_ago = min(int(np.random.exponential(45)), 200)\n", + " dt = now - timedelta(days=days_ago)\n", + " score = (\n", + " random.uniform(0.6, 0.9)\n", + " if sentiment == \"positive\"\n", + " else random.uniform(0.3, 0.6)\n", + " if sentiment == \"neutral\"\n", + " else random.uniform(0.1, 0.4)\n", + " )\n", + "\n", + " records.append(\n", + " {\n", + " \"FeedbackID\": f\"FB{str(i + 1).zfill(5)}\",\n", + " \"AccountID\": account[\"AccountID\"],\n", + " \"OpportunityID\": opp[\"OpportunityID\"],\n", + " \"FeedbackDate\": dt.strftime(\"%Y-%m-%d\"),\n", + " \"Sentiment\": sentiment,\n", + " \"Score\": round(score, 3),\n", + " \"Content\": content,\n", + " \"CustomerRole\": random.choice(\n", + " [\"IT Director\", \"CFO\", \"CTO\", \"CHRO\", \"Operations Manager\"]\n", + " ),\n", + " \"Source\": random.choice([\"Email\", \"Survey\", \"Phone Interview\"]),\n", + " }\n", + " )\n", + "\n", + " return pd.DataFrame(records)\n", + "\n", + "\n", + "# ---------- FETCH DATA FROM EXISTING SPARK TABLES ----------\n", + "customer_feedback = generate_customer_feedback(\n", + " customer_accounts, sales_opportunities_spark, num_feedback=25\n", + ")\n", + "\n", + "# ---------- GENERATE & UPLOAD PDFs ----------\n", + "for _, row in customer_feedback.iterrows():\n", + " buffer = io.BytesIO()\n", + " c = canvas.Canvas(buffer, pagesize=LETTER)\n", + " c.setFont(\"Helvetica\", 10)\n", + " width, height = LETTER\n", + "\n", + " # Header\n", + " c.setFont(\"Helvetica-Bold\", 14)\n", + " c.drawCentredString(width / 2, height - 0.8 * inch, \"Customer Feedback Report\")\n", + "\n", + " # Metadata\n", + " c.setFont(\"Helvetica\", 10)\n", + " y = height - 1.3 * inch\n", + " meta_lines = [\n", + " f\"Feedback ID: {row['FeedbackID']}\",\n", + " f\"Account ID: {row['AccountID']}\",\n", + " f\"Opportunity ID: {row['OpportunityID']}\",\n", + " f\"Feedback Date: {row['FeedbackDate']}\",\n", + " f\"Sentiment: {row['Sentiment'].capitalize()}\",\n", + " f\"Sentiment Score: {row['Score']}\",\n", + " f\"Customer Role: {row['CustomerRole']}\",\n", + " f\"Source: {row['Source']}\",\n", + " \"\",\n", + " \"Feedback Content:\",\n", + " ]\n", + "\n", + " for line in meta_lines:\n", + " c.drawString(72, y, line)\n", + " y -= 15\n", + "\n", + " # Content (word-wrapped)\n", + " text_lines = row[\"Content\"].split(\"\\n\")\n", + " for paragraph in text_lines:\n", + " for subline in paragraph.strip().split(\". \"):\n", + " wrapped = []\n", + " words = subline.split()\n", + " while words:\n", + " chunk, words = words[:12], words[12:]\n", + " wrapped.append(\" \".join(chunk))\n", + " for line in wrapped:\n", + " if y < 72:\n", + " c.showPage()\n", + " c.setFont(\"Helvetica\", 10)\n", + " y = height - 72\n", + " c.drawString(72, y, line)\n", + " y -= 14\n", + " y -= 10\n", + "\n", + " c.save()\n", + " buffer.seek(0)\n", + "\n", + " # Upload directly to UC Volume\n", + " filename = f\"{row['FeedbackID']}.pdf\"\n", + " target_path = f\"{volume_path}/customer_feedback/{filename}\"\n", + " w.files.upload(target_path, buffer, overwrite=True)\n", + "\n", + " # print(f\"Uploaded: {target_path}\")\n", + "\n", + "print(f\"✅ Successfully uploaded {len(customer_feedback)} PDFs to {volume_path}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ddfaec60-b8f3-466e-b4e7-a6dce031d595", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Unstructured Data: Meeting Notes & Call Summaries\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "5371018a-4ada-4e22-b0c7-5e178bcd4cf7", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "import io\n", + "import random\n", + "import numpy as np\n", + "import pandas as pd\n", + "from datetime import datetime, timedelta\n", + "\n", + "from reportlab.pdfgen import canvas\n", + "from reportlab.lib.pagesizes import LETTER\n", + "from reportlab.lib.units import inch\n", + "from databricks.sdk import WorkspaceClient\n", + "\n", + "# ---------- GENERATOR ----------\n", + "def generate_meeting_notes(\n", + " activities_df,\n", + " opportunities_df,\n", + " accounts_df,\n", + " sales_reps_df,\n", + " seed=42\n", + "):\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + "\n", + " activities_pd = activities_df.toPandas()\n", + " opps_pd = opportunities_df.toPandas()\n", + " accounts_pd = accounts_df.toPandas()\n", + " reps_pd = sales_reps_df.toPandas()\n", + "\n", + " meeting_activities = activities_pd[\n", + " activities_pd['ActivityType'].isin(\n", + " ['Meeting', 'Demo', 'Phone Call', 'Discovery Call']\n", + " )\n", + " ]\n", + "\n", + " # One comprehensive template that supports every placeholder used below\n", + " base_template = \"\"\"\\\n", + "Company: {company_name}\n", + "Meeting Type: {meeting_type}\n", + "Date: {meeting_date} | Duration: {duration} min | Location: {meeting_location}\n", + "Databricks Rep: {rep_name} (SE: {se_name})\n", + "Attendees: {attendees}\n", + "Client Attendees: {client_attendees}\n", + "\n", + "Business Area: {business_area} ({department})\n", + "Focus Area: {focus_area}\n", + "Current System: {current_system} | Team Size: {team_size} | Locations: {locations}\n", + "\n", + "Pain Points: {pain_points}\n", + "Main Challenge: {main_challenge}\n", + "Delay Area: {delay_area} | Error Area: {error_area}\n", + "Compliance Area: {compliance_area} | Resource Area: {resource_area}\n", + "Data Sources: {data_sources} | Reporting Frequency: {reporting_freq} | Users: {user_count}\n", + "\n", + "Requirements:\n", + " 1) {requirement_1}\n", + " 2) {requirement_2}\n", + " 3) {requirement_3}\n", + "Integration Focus: {integration_focus} | Integration System: {integration_system}\n", + "Product Category: {product_category}\n", + "\n", + "Fit Assessment:\n", + " - {fit_point_1}\n", + " - {fit_point_2}\n", + " - {fit_point_3}\n", + "\n", + "Concerns:\n", + " - {concern_1}\n", + " - {concern_2}\n", + "\n", + "Feature Reactions:\n", + " * {feature_1}: {feature_1_reaction}\n", + " * {feature_2}: {feature_2_reaction}\n", + " * {feature_3}: {feature_3_reaction}\n", + "Workflow Reaction: {workflow_reaction}\n", + "\n", + "Q&A:\n", + " Q: {question_1}\n", + " A: {answer_1}\n", + "\n", + " Q: {question_2}\n", + " A: {answer_2}\n", + "\n", + " Q: {question_3}\n", + " A: {answer_3}\n", + "\n", + "Implementation:\n", + " Timeline: {impl_timeline}\n", + " Phases: {impl_phases}\n", + " Resource Estimate: {resource_estimate}\n", + " Training Needs: {training_needs}\n", + "\n", + "Commercials:\n", + " Budget Range: {budget_range}\n", + " Timeline Target: {timeline}\n", + " Decision Makers: {decision_makers}\n", + " Evaluation Criteria: {evaluation_criteria}\n", + " Proposal Date: {proposal_date}\n", + "\n", + "Discussion Highlights:\n", + " - {discussion_topic_1}: {discussion_detail_1}\n", + " - {discussion_topic_2}: {discussion_detail_2}\n", + " - {discussion_topic_3}: {discussion_detail_3}\n", + "\n", + "Decisions:\n", + " 1) {decision_1}\n", + " 2) {decision_2}\n", + " 3) {decision_3}\n", + "\n", + "Issues & Owners:\n", + " - {issue_1} (Owner: {issue_owner_1})\n", + " - {issue_2} (Owner: {issue_owner_2})\n", + "Risk Assessment: {risk_assessment}\n", + "\n", + "Next Steps:\n", + " Actions:\n", + " - {action_1}\n", + " - {action_2}\n", + " - {action_3}\n", + " Action Items:\n", + "{action_items}\n", + " Next Meeting: {next_meeting_date} | Demo Date: {demo_date}\n", + "\n", + "Attendees List:\n", + "{attendees_list}\n", + "\n", + "Additional Notes: {additional_notes}\n", + "\n", + "Call Summary:\n", + " Purpose: {call_purpose}\n", + " Sentiment: {sentiment} (Score: {sentiment_score}/10)\n", + " Opportunity Score: {opp_score}/10\n", + " Outcome: {call_outcome}\n", + "\n", + "Customer Update: {customer_update}\n", + "Our Response: {our_response}\n", + "Key Points:\n", + " - {point_1}\n", + " - {point_2}\n", + " - {point_3}\n", + " - {point_4}\n", + "\"\"\"\n", + "\n", + " meeting_templates = {\n", + " 'Meeting': [base_template],\n", + " 'Demo': [base_template],\n", + " 'Phone Call': [base_template],\n", + " 'Discovery Call': [base_template],\n", + " }\n", + "\n", + " meeting_notes = []\n", + "\n", + " for _, activity in meeting_activities.head(25).iterrows():\n", + " opp = opps_pd[opps_pd['OpportunityID'] == activity['OpportunityID']].iloc[0]\n", + " account = accounts_pd[accounts_pd['AccountID'] == opp['AccountID']].iloc[0]\n", + " rep = reps_pd[reps_pd['RepID'] == opp['RepID']].iloc[0]\n", + "\n", + " activity_type = activity['ActivityType']\n", + " template = random.choice(\n", + " meeting_templates.get(activity_type, meeting_templates['Phone Call'])\n", + " )\n", + "\n", + " rep_name = f\"{rep['FirstName']} {rep['LastName']}\"\n", + " company_name = account['CompanyName']\n", + " industry = account.get('Industry', 'Unknown')\n", + "\n", + " attendees = random.choice([\n", + " 'Sarah Johnson (IT Director), Mike Chen (HR Director)',\n", + " 'David Kim (CFO), Lisa Rodriguez (Operations Manager)',\n", + " 'Jennifer Walsh (CTO), Robert Martinez (VP Finance)',\n", + " \"Amanda Thompson (CHRO), Kevin O'Connor (IT Manager)\"\n", + " ])\n", + "\n", + " pain_points = random.choice([\n", + " 'manual data entry, inconsistent reporting, compliance gaps',\n", + " 'siloed systems, delayed reporting, audit trail issues',\n", + " 'spreadsheet-based processes, version control problems',\n", + " 'lack of real-time visibility, integration challenges'\n", + " ])\n", + "\n", + " note_content = template.format(\n", + " company_name=company_name,\n", + " meeting_type=activity_type,\n", + " meeting_date=activity['ActivityDate'],\n", + " meeting_location=random.choice(['Client HQ', 'Virtual (Zoom)', 'Databricks Office']),\n", + " rep_name=rep_name,\n", + " se_name=random.choice(['Alex Rodriguez', 'Priya Shah', 'Tom Nguyen']),\n", + " attendees=attendees,\n", + " client_attendees='C-suite and department heads',\n", + " business_area=random.choice(['HR processes', 'financial planning', 'reporting', 'compliance']),\n", + " department=random.choice(['HR', 'Finance', 'Operations', 'IT']),\n", + " focus_area=industry.lower(),\n", + " current_system=random.choice(['Excel', 'Legacy HRIS', 'Multiple systems', 'Paper-based process']),\n", + " team_size=random.randint(5, 50),\n", + " locations=random.randint(1, 5),\n", + " pain_points=pain_points,\n", + " main_challenge=random.choice(['scaling operations', 'regulatory compliance', 'process efficiency']),\n", + " delay_area='month-end close',\n", + " error_area='data consolidation',\n", + " compliance_area=f'{industry} regulations',\n", + " resource_area='finance team',\n", + " data_sources='3 different systems',\n", + " reporting_freq='Monthly',\n", + " user_count=random.randint(10, 100),\n", + " requirement_1=random.choice(['Real-time reporting', 'Mobile access', 'API integration']),\n", + " requirement_2=random.choice(['Automated workflows', 'Compliance tracking', 'Self-service']),\n", + " requirement_3=random.choice(['Custom dashboards', 'Role-based access', 'Audit trails']),\n", + " integration_focus=random.choice(['ERP', 'CRM', 'HRIS']),\n", + " integration_system=random.choice(['ERP', 'CRM', 'HRIS']),\n", + " product_category=opp['ProductCategory'],\n", + " fit_point_1='Strong alignment with current needs',\n", + " fit_point_2='Proven track record in ' + industry.lower(),\n", + " fit_point_3='Scalable architecture',\n", + " concern_1='Implementation timeline',\n", + " concern_2='Change management',\n", + " feature_1='Automated reporting', feature_1_reaction='Impressed with real-time capabilities',\n", + " feature_2='Mobile interface', feature_2_reaction='Liked the user experience',\n", + " feature_3='Analytics dashboard',feature_3_reaction='Requested custom metrics',\n", + " workflow_reaction='Perfect fit for their processes',\n", + " question_1='How long is implementation?', answer_1='Typically 6-9 months for your size',\n", + " question_2='What about data migration?', answer_2='We handle that with a dedicated team',\n", + " question_3='Training requirements?', answer_3='2-week program with ongoing support',\n", + " impl_timeline='6-8 months',\n", + " impl_phases='3 phases: Foundation, Rollout, Optimization',\n", + " resource_estimate='2-3 dedicated resources',\n", + " training_needs='Train-the-trainer approach',\n", + " budget_range=f\"{random.randint(100, 500)}K - {random.randint(500, 1000)}K\",\n", + " timeline=random.choice(['Q2 2025', 'end of year', 'next fiscal year']),\n", + " decision_makers='IT Director, CFO',\n", + " evaluation_criteria='ROI, implementation time, user adoption',\n", + " proposal_date='End of month',\n", + " discussion_topic_1='Technical Integration',\n", + " discussion_detail_1='Reviewed API capabilities and data flow requirements',\n", + " discussion_topic_2='Change Management',\n", + " discussion_detail_2='Discussed training approach and user adoption strategy',\n", + " discussion_topic_3='Timeline & Budget',\n", + " discussion_detail_3='Aligned on project phases and investment levels',\n", + " decision_1='Proceed with technical evaluation',\n", + " decision_2='Include IT security team in next review',\n", + " decision_3='Request detailed implementation plan',\n", + " issue_1='Security review pending', issue_owner_1='Client IT team',\n", + " issue_2='Budget approval needed', issue_owner_2='Client Finance',\n", + " risk_assessment='Low risk - strong stakeholder buy-in',\n", + " next_meeting_date='Next Friday',\n", + " action_1='Send detailed proposal',\n", + " action_2='Schedule technical review',\n", + " action_3='Provide reference contacts',\n", + " action_items=' • Workday: Security documentation\\n • Client: Finalize budget',\n", + " demo_date='Next Tuesday',\n", + " attendees_list='• Sarah J. (IT Director)\\n• Mike C. (Finance Manager)',\n", + " additional_notes='Client seems engaged and ready to move forward.',\n", + " call_purpose=random.choice(['Status update', 'Address concerns', 'Next steps discussion']),\n", + " sentiment=random.choice(['Positive', 'Neutral', 'Cautious', 'Enthusiastic']),\n", + " sentiment_score=random.randint(6, 9),\n", + " opp_score=random.randint(5, 9),\n", + " call_outcome='Positive - moving to next stage',\n", + " customer_update='Evaluation committee formed',\n", + " our_response='Provided requested documentation',\n", + " point_1='Discussed current pain points',\n", + " point_2='Reviewed Workday capabilities',\n", + " point_3='Addressed pricing questions',\n", + " point_4='Next steps alignment',\n", + " contact_name=random.choice(['John', 'Sarah', 'Mike', 'Jennifer']),\n", + " urgency_level=random.choice(['High', 'Medium', 'Low']),\n", + " duration=random.randint(30, 120) if activity['Duration'] == 0 else activity['Duration'],\n", + " )\n", + "\n", + " note = {\n", + " 'NoteID': f'NOTE{activity[\"ActivityID\"][3:]}',\n", + " 'ActivityID': activity['ActivityID'],\n", + " 'OpportunityID': activity['OpportunityID'],\n", + " 'RepID': opp['RepID'],\n", + " 'AccountID': opp['AccountID'],\n", + " 'NoteType': activity_type,\n", + " 'CreatedDate': activity['ActivityDate'],\n", + " 'NoteContent': note_content,\n", + " 'WordCount': len(note_content.split()),\n", + " 'SentimentScore': random.uniform(0.3, 0.9),\n", + " 'KeyTopics': random.choice([\n", + " 'budget,timeline,requirements',\n", + " 'integration,security,compliance',\n", + " 'roi,implementation,training',\n", + " 'competitive,pricing,features'\n", + " ])\n", + " }\n", + "\n", + " meeting_notes.append(note)\n", + "\n", + " return pd.DataFrame(meeting_notes)\n", + "\n", + "\n", + "# ---------- Generate meeting notes from your Spark DFs ----------\n", + "meeting_notes = generate_meeting_notes(\n", + " sales_activities_spark,\n", + " sales_opportunities_spark,\n", + " customer_accounts,\n", + " sales_reps\n", + ")\n", + "\n", + "# ---------- PDF Uploads (WorkspaceClient -> UC Volume) ----------\n", + "def wrap_and_draw(c, text, x, y, width, font_name=\"Helvetica\", font_size=10, line_height=14):\n", + " \"\"\"Simple word-wrap printer. Returns new y.\"\"\"\n", + " c.setFont(font_name, font_size)\n", + " # Split into paragraphs on newlines\n", + " paragraphs = [p.rstrip() for p in (text or \"\").split(\"\\n\")]\n", + " for para in paragraphs:\n", + " if not para:\n", + " y -= line_height\n", + " continue\n", + " words, line = para.split(), []\n", + " for w in words:\n", + " trial = (\" \".join(line + [w])).strip()\n", + " if c.stringWidth(trial, font_name, font_size) <= width:\n", + " line.append(w)\n", + " else:\n", + " c.drawString(x, y, \" \".join(line))\n", + " y -= line_height\n", + " line = [w]\n", + " if y < 0.9 * inch:\n", + " c.showPage()\n", + " c.setFont(font_name, font_size)\n", + " y = LETTER[1] - 1.0 * inch\n", + " if line:\n", + " c.drawString(x, y, \" \".join(line))\n", + " y -= line_height\n", + " y -= 0.4 * line_height\n", + " if y < 0.9 * inch:\n", + " c.showPage()\n", + " c.setFont(font_name, font_size)\n", + " y = LETTER[1] - 1.0 * inch\n", + " return y\n", + "\n", + "for _, row in meeting_notes.iterrows():\n", + " buffer = io.BytesIO()\n", + " c = canvas.Canvas(buffer, pagesize=LETTER)\n", + "\n", + " width, height = LETTER\n", + " lm, rm = 1.0 * inch, 1.0 * inch\n", + " usable_w = width - lm - rm\n", + " y = height - 0.9 * inch\n", + "\n", + " # Header\n", + " c.setFont(\"Helvetica-Bold\", 14)\n", + " c.drawCentredString(width / 2, y, \"Meeting Notes\")\n", + " y -= 0.4 * inch\n", + "\n", + " # Metadata block\n", + " c.setFont(\"Helvetica\", 10)\n", + " meta = [\n", + " f\"NoteID: {row['NoteID']}\",\n", + " f\"Type: {row['NoteType']}\",\n", + " f\"Created: {row['CreatedDate']}\",\n", + " f\"AccountID: {row['AccountID']} OpportunityID: {row['OpportunityID']} RepID: {row['RepID']}\",\n", + " f\"SentimentScore: {round(row['SentimentScore'],3)} KeyTopics: {row['KeyTopics']}\",\n", + " \"\"\n", + " ]\n", + " for line in meta:\n", + " c.drawString(lm, y, line)\n", + " y -= 14\n", + " if y < 0.9 * inch:\n", + " c.showPage(); c.setFont(\"Helvetica\", 10)\n", + " y = height - 1.0 * inch\n", + "\n", + " # Body\n", + " c.setFont(\"Helvetica-Bold\", 11)\n", + " c.drawString(lm, y, \"Notes:\")\n", + " y -= 16\n", + "\n", + " y = wrap_and_draw(\n", + " c=c,\n", + " text=row['NoteContent'],\n", + " x=lm,\n", + " y=y,\n", + " width=usable_w,\n", + " font_name=\"Helvetica\",\n", + " font_size=10,\n", + " line_height=14\n", + " )\n", + "\n", + " c.save()\n", + " buffer.seek(0)\n", + "\n", + " # Upload to Volume\n", + " filename = f\"{row['NoteID']}.pdf\"\n", + " target_path = f\"{volume_path}/meeting_notes/{filename}\"\n", + " w.files.upload(target_path, buffer, overwrite=True)\n", + " # print(f\"Uploaded: {target_path}\")\n", + "\n", + "print(f\"✅ Successfully uploaded {len(meeting_notes)} meeting note PDFs to {volume_path}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a5e3abbc-85dd-4ea3-a990-a078ed82cf3b", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Unstructured Data: Email communications" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9c43a2dd-d005-476f-a100-57655e84f31f", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "import io\n", + "import random\n", + "import numpy as np\n", + "import pandas as pd\n", + "from datetime import datetime, timedelta\n", + "\n", + "from reportlab.pdfgen import canvas\n", + "from reportlab.lib.pagesizes import LETTER\n", + "from reportlab.lib.units import inch\n", + "from databricks.sdk import WorkspaceClient\n", + "\n", + "# ---------- YOUR GENERATOR (unchanged) ----------\n", + "def generate_email_communications(opportunities_df, sales_reps_df, accounts_df, num_emails=800, seed=42):\n", + " \"\"\"\n", + " Generate synthetic email communications between sales reps and customers\n", + " \"\"\"\n", + " # Set random seed for reproducibility\n", + " np.random.seed(seed)\n", + " random.seed(seed)\n", + " \n", + " # Convert to pandas\n", + " opps_pd = opportunities_df.toPandas()\n", + " reps_pd = sales_reps_df.toPandas()\n", + " accounts_pd = accounts_df.toPandas()\n", + " \n", + " # Email templates by type\n", + " email_templates = {\n", + " 'initial_outreach': [\n", + " \"\"\"Subject: Workday Solutions for {company_name} - Let's Connect\n", + "\n", + "Hi {contact_name},\n", + "\n", + "I hope this email finds you well. I'm {rep_name}, {rep_role} at Workday, and I've been following {company_name}'s growth in the {industry} space.\n", + "\n", + "I'd love to share how companies like yours are leveraging Workday's {product_category} solutions to streamline operations and drive efficiency. Would you be open to a brief 15-minute conversation next week?\n", + "\n", + "Best regards,\n", + "{rep_name}\n", + "{rep_email}\n", + "Workday\"\"\",\n", + " \n", + " \"\"\"Subject: {company_name} + Workday: Driving Digital Transformation\n", + "\n", + "Hello {contact_name},\n", + "\n", + "I noticed {company_name} has been expanding rapidly - congratulations! As you scale, managing {business_challenge} becomes increasingly complex.\n", + "\n", + "At Workday, we've helped similar {industry} companies like {example_company} achieve:\n", + "- 40% reduction in administrative overhead\n", + "- 25% faster reporting cycles\n", + "- Improved employee satisfaction scores\n", + "\n", + "Would you be interested in a quick call to discuss how we could help {company_name}?\n", + "\n", + "Looking forward to connecting,\n", + "{rep_name}\"\"\"\n", + " ],\n", + " \n", + " 'follow_up': [\n", + " \"\"\"Subject: Re: Following up on our conversation\n", + "\n", + "Hi {contact_name},\n", + "\n", + "Thank you for taking the time to speak with me yesterday about {company_name}'s {business_challenge} initiatives. I really enjoyed learning about your current processes and future goals.\n", + "\n", + "As promised, I'm attaching:\n", + "- ROI calculator specific to your use case\n", + "- Case study from {example_company} (similar size/industry)\n", + "- Implementation timeline overview\n", + "\n", + "Based on our discussion, I believe our {product_category} solution could deliver significant value. Would you like to schedule a demo for your team next week?\n", + "\n", + "Best,\n", + "{rep_name}\"\"\",\n", + " \n", + " \"\"\"Subject: Quick follow-up from {rep_name}\n", + "\n", + "{contact_name},\n", + "\n", + "Hope you're having a great week! I wanted to circle back on the {product_category} discussion we had last month.\n", + "\n", + "I know timing wasn't quite right then, but I thought you'd be interested to know that we just helped {example_company} achieve a 30% improvement in {business_metric} within 6 months of implementation.\n", + "\n", + "Given {company_name}'s similar challenges, this could be relevant. Would you be open to a brief catch-up call?\n", + "\n", + "Thanks,\n", + "{rep_name}\"\"\"\n", + " ],\n", + " \n", + " 'demo_invite': [\n", + " \"\"\"Subject: Workday Demo - {company_name} Custom Walkthrough\n", + "\n", + "Dear {contact_name},\n", + "\n", + "Thank you for expressing interest in Workday's {product_category} capabilities. I'm excited to show you how our platform can address {company_name}'s specific needs.\n", + "\n", + "Demo Details:\n", + "- Date: [Next Tuesday]\n", + "- Duration: 45 minutes\n", + "- Focus: {business_challenge} solutions\n", + "- Attendees: Please invite key stakeholders from IT, HR, and Finance\n", + "\n", + "I'll tailor the demo to show:\n", + "1. Integration with your existing systems\n", + "2. Automated reporting for your {industry} requirements\n", + "3. ROI projections based on your current processes\n", + "\n", + "Looking forward to it!\n", + "\n", + "{rep_name}\n", + "Workday Solutions\"\"\"\n", + " ],\n", + " \n", + " 'proposal_follow_up': [\n", + " \"\"\"Subject: Workday Proposal for {company_name} - Next Steps\n", + "\n", + "{contact_name},\n", + "\n", + "I hope you and the team had a chance to review the proposal I sent last week. The solution we've designed specifically addresses the key challenges you mentioned:\n", + "\n", + "✓ Streamlined {business_process}\n", + "✓ Real-time reporting and analytics \n", + "✓ Compliance with {industry} regulations\n", + "✓ Seamless integration with your current tech stack\n", + "\n", + "I'm confident this solution will deliver the ROI we discussed. Do you have any questions about the proposal? I'd be happy to walk through any sections in detail.\n", + "\n", + "What would be the best way to move forward?\n", + "\n", + "Best regards,\n", + "{rep_name}\"\"\"\n", + " ],\n", + " \n", + " 'objection_handling': [\n", + " \"\"\"Subject: Re: Budget concerns for Workday implementation\n", + "\n", + "{contact_name},\n", + "\n", + "I completely understand your budget considerations - this is an investment decision that needs to make financial sense.\n", + "\n", + "Let me share a few points that might help:\n", + "\n", + "1. ROI Timeline: Most clients see payback within 18 months\n", + "2. Flexible Payment: We offer phased implementation to spread costs\n", + "3. Hidden Costs: Consider current manual process costs (estimated $200K annually for companies your size)\n", + "\n", + "Would it help to have our finance team create a detailed cost-benefit analysis specific to {company_name}? We can also explore a pilot program to prove value before full commitment.\n", + "\n", + "Happy to discuss further,\n", + "{rep_name}\"\"\"\n", + " ]\n", + " }\n", + " \n", + " # Sample company names for examples\n", + " example_companies = [\n", + " 'TechFlow Industries', 'DataCorp Solutions', 'InnovateCorp', 'GlobalTech Systems',\n", + " 'NextGen Enterprises', 'SmartOps Inc.', 'VelocityPro', 'OptimalSystems'\n", + " ]\n", + " \n", + " # Business challenges by industry\n", + " business_challenges = {\n", + " 'Technology': ['talent management', 'rapid scaling', 'project tracking', 'performance management'],\n", + " 'Healthcare': ['compliance reporting', 'staff scheduling', 'cost management', 'regulatory compliance'],\n", + " 'Financial Services': ['risk management', 'regulatory reporting', 'audit trails', 'compliance monitoring'],\n", + " 'Manufacturing': ['workforce planning', 'safety compliance', 'operational efficiency', 'cost control'],\n", + " 'Retail': ['seasonal staffing', 'inventory planning', 'customer analytics', 'supply chain management']\n", + " }\n", + " \n", + " emails = []\n", + " current_date = datetime.now()\n", + " \n", + " for i in range(num_emails):\n", + " # Select random opportunity and related data\n", + " opp = opps_pd.sample(n=1).iloc[0]\n", + " rep = reps_pd[reps_pd['RepID'] == opp['RepID']].iloc[0]\n", + " account = accounts_pd[accounts_pd['AccountID'] == opp['AccountID']].iloc[0]\n", + " \n", + " # Email type based on opportunity stage\n", + " stage = opp['SalesStage']\n", + " if stage == 'Prospecting':\n", + " email_type = random.choice(['initial_outreach', 'follow_up'])\n", + " elif stage == 'Discovery':\n", + " email_type = random.choice(['demo_invite', 'follow_up'])\n", + " elif stage == 'Proposal':\n", + " email_type = 'proposal_follow_up'\n", + " elif stage == 'Negotiation':\n", + " email_type = 'objection_handling'\n", + " else:\n", + " email_type = random.choice(['initial_outreach', 'follow_up'])\n", + " \n", + " # Select template\n", + " template = random.choice(email_templates[email_type])\n", + " \n", + " # Generate email content\n", + " rep_name = f\"{rep['FirstName']} {rep['LastName']}\"\n", + " rep_email = f\"{rep['FirstName'].lower()}.{rep['LastName'].lower()}@workday.com\"\n", + " contact_name = random.choice(['John', 'Sarah', 'Mike', 'Jennifer', 'David', 'Lisa', 'Robert', 'Michelle'])\n", + " company_name = account['CompanyName']\n", + " industry = account['Industry']\n", + " product_category = opp['ProductCategory']\n", + " \n", + " # Business challenge based on industry\n", + " challenges = business_challenges.get(industry, ['operational efficiency', 'cost management'])\n", + " business_challenge = random.choice(challenges)\n", + " \n", + " # Fill in template\n", + " email_content = template.format(\n", + " company_name=company_name,\n", + " contact_name=contact_name,\n", + " rep_name=rep_name,\n", + " rep_role=rep['Role'],\n", + " rep_email=rep_email,\n", + " industry=industry.lower(),\n", + " product_category=product_category,\n", + " business_challenge=business_challenge,\n", + " example_company=random.choice(example_companies),\n", + " business_metric=random.choice(['efficiency', 'accuracy', 'compliance', 'productivity']),\n", + " business_process=random.choice(['payroll processing', 'reporting', 'onboarding', 'planning'])\n", + " )\n", + " \n", + " # Email metadata\n", + " days_ago = int(np.random.exponential(30)) # Most emails recent\n", + " days_ago = min(days_ago, 365)\n", + " sent_date = current_date - timedelta(days=days_ago)\n", + " \n", + " email_direction = random.choice(['Outbound', 'Inbound']) if random.random() < 0.2 else 'Outbound'\n", + " \n", + " email = {\n", + " 'EmailID': f'EMAIL{str(i+1).zfill(6)}',\n", + " 'OpportunityID': opp['OpportunityID'],\n", + " 'RepID': opp['RepID'],\n", + " 'AccountID': opp['AccountID'],\n", + " 'EmailType': email_type,\n", + " 'EmailDirection': email_direction,\n", + " 'SentDate': sent_date.strftime('%Y-%m-%d %H:%M:%S'),\n", + " 'EmailContent': email_content,\n", + " 'WordCount': len(email_content.split()),\n", + " 'HasAttachment': 1 if 'attach' in email_content.lower() else 0,\n", + " 'ResponseReceived': random.choice([1, 0]) if email_direction == 'Outbound' else 0\n", + " }\n", + " \n", + " emails.append(email)\n", + " \n", + " return pd.DataFrame(emails)\n", + "\n", + "# ---------- Build DataFrame from your Spark DFs ----------\n", + "email_communications = generate_email_communications(\n", + " sales_opportunities_spark, sales_reps, customer_accounts, num_emails=1200\n", + ")\n", + "\n", + "# ---------- PDF helper ----------\n", + "def wrap_and_draw(c, text, x, y, width, font_name=\"Helvetica\", font_size=10, line_height=14):\n", + " \"\"\"Simple word-wrap printer. Returns new y.\"\"\"\n", + " c.setFont(font_name, font_size)\n", + " paragraphs = [p.rstrip() for p in (text or \"\").split(\"\\n\")]\n", + " for para in paragraphs:\n", + " if not para:\n", + " y -= line_height\n", + " continue\n", + " words, line = para.split(), []\n", + " for w in words:\n", + " trial = (\" \".join(line + [w])).strip()\n", + " if c.stringWidth(trial, font_name, font_size) <= width:\n", + " line.append(w)\n", + " else:\n", + " c.drawString(x, y, \" \".join(line))\n", + " y -= line_height\n", + " line = [w]\n", + " if y < 0.9 * inch:\n", + " c.showPage()\n", + " c.setFont(font_name, font_size)\n", + " y = LETTER[1] - 1.0 * inch\n", + " if line:\n", + " c.drawString(x, y, \" \".join(line))\n", + " y -= line_height\n", + " y -= 0.4 * line_height\n", + " if y < 0.9 * inch:\n", + " c.showPage()\n", + " c.setFont(font_name, font_size)\n", + " y = LETTER[1] - 1.0 * inch\n", + " return y\n", + "\n", + "# ---------- Generate & Upload PDFs ----------\n", + "upload_count = 0\n", + "max_pdfs = 25\n", + "for _, row in email_communications.iterrows():\n", + " if max_pdfs is not None and upload_count >= max_pdfs:\n", + " break\n", + "\n", + " buffer = io.BytesIO()\n", + " c = canvas.Canvas(buffer, pagesize=LETTER)\n", + " width, height = LETTER\n", + "\n", + " lm, rm = 1.0 * inch, 1.0 * inch\n", + " usable_w = width - lm - rm\n", + " y = height - 0.9 * inch\n", + "\n", + " # Header\n", + " c.setFont(\"Helvetica-Bold\", 14)\n", + " c.drawCentredString(width / 2, y, \"Email Communication\")\n", + " y -= 0.35 * inch\n", + "\n", + " # Subject line (first line up to newline)\n", + " subject_line = row[\"EmailContent\"].split(\"\\n\", 1)[0].strip()\n", + " c.setFont(\"Helvetica-Bold\", 11)\n", + " wrap_and_draw(c, subject_line, lm, y, usable_w, font_name=\"Helvetica-Bold\", font_size=11, line_height=14)\n", + " y -= 4\n", + "\n", + " # Meta\n", + " c.setFont(\"Helvetica\", 10)\n", + " meta = [\n", + " f\"EmailID: {row['EmailID']} Type: {row['EmailType']} Direction: {row['EmailDirection']}\",\n", + " f\"Sent: {row['SentDate']} AccountID: {row['AccountID']} OpportunityID: {row['OpportunityID']} RepID: {row['RepID']}\",\n", + " \"\"\n", + " ]\n", + " for line in meta:\n", + " c.drawString(lm, y, line)\n", + " y -= 14\n", + " if y < 0.9 * inch:\n", + " c.showPage(); c.setFont(\"Helvetica\", 10)\n", + " y = height - 1.0 * inch\n", + "\n", + " # Body (skip the first line which is the subject)\n", + " body = row[\"EmailContent\"].split(\"\\n\", 1)[1] if \"\\n\" in row[\"EmailContent\"] else \"\"\n", + " c.setFont(\"Helvetica-Bold\", 11)\n", + " c.drawString(lm, y, \"Body:\")\n", + " y -= 16\n", + " y = wrap_and_draw(\n", + " c=c,\n", + " text=body,\n", + " x=lm,\n", + " y=y,\n", + " width=usable_w,\n", + " font_name=\"Helvetica\",\n", + " font_size=10,\n", + " line_height=14\n", + " )\n", + "\n", + " c.save()\n", + " buffer.seek(0)\n", + "\n", + " # Upload\n", + " filename = f\"{row['EmailID']}.pdf\"\n", + " target_path = f\"{volume_path}/email_communications/{filename}\"\n", + " w.files.upload(target_path, buffer, overwrite=True)\n", + " upload_count += 1\n", + " # print(f\"Uploaded: {target_path}\")\n", + "\n", + "print(f\"✅ Successfully uploaded {upload_count} email PDFs to {volume_path}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "4ab64a97-2d5d-4c43-973b-2553d13a7d62", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": { + "hardware": { + "accelerator": null, + "gpuPoolId": null, + "memory": null + } + }, + "dashboards": [], + "environmentMetadata": null, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "01-create-synthetic-sales-data", + "widgets": {} + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/2025-11-Multi-Agent-GenAI-System/02-create-vector-index.ipynb b/2025-11-Multi-Agent-GenAI-System/02-create-vector-index.ipynb new file mode 100644 index 0000000..220d1aa --- /dev/null +++ b/2025-11-Multi-Agent-GenAI-System/02-create-vector-index.ipynb @@ -0,0 +1,459 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "fbb369ec-3a45-449e-a6ed-368cc0c5898c", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%pip install -U -qqqq mlflow langgraph==0.3.4 databricks-langchain databricks-agents uv databricks-vectorsearch --upgrade langgraph\n", + "dbutils.library.restartPython()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f2cfd55d-1115-44d7-9c2a-83e47644293e", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%run ./00-init-requirements" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "baefdc5b-5794-44d0-a866-f56c2f0127d0", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "# Create Vector Search Indexes\n", + "\n", + "Create the indexes via python SDK. There are two steps involved:\n", + "\n", + "1. Create vector search endpoint (one endpoint can serve multiple vector search indexes)\n", + "2. Create vector search indexes for different data types:\n", + " - Email communications\n", + " - Meeting notes\n", + " - Customer feedback\n", + " - Employee records\n", + " - Job requisitions" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "768d26ac-0707-44cd-90e1-09c09456d508", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql.functions import expr, col, explode, monotonically_increasing_id\n", + "\n", + "# Parse PDF documents from volumes using ai_parse_document\n", + "customer_feedback_parsed = (\n", + " spark.read.format(\"binaryFile\")\n", + " .load(f\"/Volumes/{catalog_name}/{schema_name}/workday_unstructure_data/customer_feedback/\")\n", + " .withColumn(\"parsed\", expr(\"ai_parse_document(content, map('version', '2.0'))\"))\n", + " .withColumn(\"content\", expr(\"array_join(transform(parsed:document.elements::ARRAY>, x -> x.content), '\\n')\"))\n", + " .select(\n", + " \"content\",\n", + " expr(\"parsed:document\").alias(\"document\"),\n", + " expr(\"parsed:document:pages\").alias(\"pages\"),\n", + " expr(\"parsed:error_status\").alias(\"error_status\"),\n", + " col(\"path\").alias(\"doc_uri\")\n", + " )\n", + ")\n", + "\n", + "meeting_notes_parsed = (\n", + " spark.read.format(\"binaryFile\")\n", + " .load(f\"/Volumes/{catalog_name}/{schema_name}/workday_unstructure_data/meeting_notes/\")\n", + " .withColumn(\"parsed\", expr(\"ai_parse_document(content, map('version', '2.0'))\"))\n", + " .withColumn(\"content\", expr(\"array_join(transform(parsed:document.elements::ARRAY>, x -> x.content), '\\n')\"))\n", + " .select(\n", + " \"content\",\n", + " expr(\"parsed:document\").alias(\"document\"),\n", + " expr(\"parsed:document:pages\").alias(\"pages\"),\n", + " expr(\"parsed:error_status\").alias(\"error_status\"),\n", + " col(\"path\").alias(\"doc_uri\")\n", + " )\n", + ")\n", + "\n", + "email_communications_parsed = (\n", + " spark.read.format(\"binaryFile\")\n", + " .load(f\"/Volumes/{catalog_name}/{schema_name}/workday_unstructure_data/email_communications/\")\n", + " .withColumn(\"parsed\", expr(\"ai_parse_document(content, map('version', '2.0'))\"))\n", + " .withColumn(\"content\", expr(\"array_join(transform(parsed:document.elements::ARRAY>, x -> x.content), '\\n')\"))\n", + " .select(\n", + " \"content\",\n", + " expr(\"parsed:document\").alias(\"document\"),\n", + " expr(\"parsed:document:pages\").alias(\"pages\"),\n", + " expr(\"parsed:error_status\").alias(\"error_status\"),\n", + " col(\"path\").alias(\"doc_uri\")\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c10a37ae-05d2-41f2-a37e-09e601e8684b", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "customer_feedback_parsed.createOrReplaceTempView(\"vf_customer_feedback\")\n", + "meeting_notes_parsed.createOrReplaceTempView(\"vf_meeting_notes\")\n", + "email_communications_parsed.createOrReplaceTempView(\"vf_email_communications\")\n", + "\n", + "def create_kb_table_from_parsed(view_name, kb_table_fqn):\n", + " \"\"\"Create knowledge base table from ai_parse_document output using SQL\"\"\"\n", + " # Drop and recreate table\n", + " spark.sql(f\"DROP TABLE IF EXISTS {kb_table_fqn}\")\n", + " \n", + " spark.sql(f\"\"\"\n", + " CREATE TABLE {kb_table_fqn} (\n", + " id BIGINT GENERATED ALWAYS AS IDENTITY,\n", + " content STRING,\n", + " doc_uri STRING\n", + " ) TBLPROPERTIES (delta.enableChangeDataFeed = true)\n", + " \"\"\")\n", + " \n", + " # Insert data directly from temp view\n", + " spark.sql(f\"\"\"\n", + " INSERT INTO {kb_table_fqn} (content, doc_uri)\n", + " SELECT\n", + " content,\n", + " doc_uri\n", + " FROM {view_name}\n", + " WHERE content IS NOT NULL\n", + " \"\"\")\n", + " \n", + " record_count = spark.table(kb_table_fqn).count()\n", + " print(f\"✅ {kb_table_fqn} created with {record_count} records\")\n", + "\n", + "# Create knowledge base tables from parsed documents\n", + "create_kb_table_from_parsed(\n", + " \"vf_customer_feedback\",\n", + " f\"{catalog_name}.{schema_name}.customer_feedback_knowledge_base\"\n", + ")\n", + "\n", + "create_kb_table_from_parsed(\n", + " \"vf_meeting_notes\",\n", + " f\"{catalog_name}.{schema_name}.meeting_notes_knowledge_base\"\n", + ")\n", + "\n", + "create_kb_table_from_parsed(\n", + " \"vf_email_communications\",\n", + " f\"{catalog_name}.{schema_name}.email_communications_knowledge_base\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "999632ab-cc0f-4e7d-82f7-15db7f9c62cb", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "vs_endpoint_name = f\"sales-endpoint-{user_name}\"\n", + "\n", + "# Email communications index\n", + "email_vs_index_name = f\"{catalog_name}.{schema_name}.email_communications_index\"\n", + "email_vs_input_table = f\"{catalog_name}.{schema_name}.email_communications_knowledge_base\"\n", + "\n", + "# Meeting notes index\n", + "notes_vs_index_name = f\"{catalog_name}.{schema_name}.meeting_notes_index\"\n", + "notes_vs_input_table = f\"{catalog_name}.{schema_name}.meeting_notes_knowledge_base\"\n", + "\n", + "# Customer feedback index\n", + "feedback_vs_index_name = f\"{catalog_name}.{schema_name}.customer_feedback_index\"\n", + "feedback_vs_input_table = f\"{catalog_name}.{schema_name}.customer_feedback_knowledge_base\"" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f01cc107-34c7-4424-8ff3-8debf1c0787f", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "from databricks.vector_search.client import VectorSearchClient\n", + "\n", + "client = VectorSearchClient(disable_notice=True)\n", + "\n", + "try:\n", + " client.delete_endpoint(name=vs_endpoint_name)\n", + " print(f\"✅ Vector search endpoint '{vs_endpoint_name}' deleted successfully\")\n", + "except Exception as e:\n", + " print(f\"⚠️ Could not delete vector search endpoint '{vs_endpoint_name}': {str(e)}\")\n", + "\n", + "for index_name in [\n", + " \"andrea_tardif.workday_demos.email_communications_index\",\n", + " \"andrea_tardif.workday_demos.meeting_notes_index\",\n", + " \"andrea_tardif.workday_demos.customer_feedback_index\"\n", + "]:\n", + " try:\n", + " client.delete_index(index_name=index_name)\n", + " print(f\"✅ Vector search index '{index_name}' deleted successfully\")\n", + " except Exception as e:\n", + " print(f\"⚠️ Could not delete vector search index '{index_name}': {str(e)}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "4fb375fc-13ed-4822-891e-e23e2b8cd332", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "from databricks.vector_search.client import VectorSearchClient\n", + "\n", + "# Create vector search endpoint\n", + "client = VectorSearchClient(disable_notice=True)\n", + "\n", + "try:\n", + " client.create_endpoint(\n", + " name=vs_endpoint_name,\n", + " endpoint_type=\"STANDARD\"\n", + " )\n", + " print(f\"✅ Vector search endpoint '{vs_endpoint_name}' created successfully\")\n", + "except Exception as e:\n", + " print(f\"ℹ️ Vector search endpoint '{vs_endpoint_name}' already exists\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a18777e7-373e-4095-b319-61dd8d57a634", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "import time\n", + "\n", + "def create_vs_index(endpoint_name, source_table, index_name):\n", + " \"\"\"Create a vector search index with error handling\"\"\"\n", + " try:\n", + " index = client.create_delta_sync_index(\n", + " endpoint_name=endpoint_name,\n", + " source_table_name=source_table,\n", + " index_name=index_name,\n", + " pipeline_type=\"TRIGGERED\",\n", + " primary_key=\"id\",\n", + " embedding_source_column=\"content\",\n", + " embedding_model_endpoint_name=\"databricks-bge-large-en\"\n", + " )\n", + " print(f\"✅ {index_name} created successfully\")\n", + " return index\n", + " except Exception as e:\n", + " if \"already exists\" in str(e).lower():\n", + " print(f\"ℹ️ {index_name} already exists\")\n", + " # return client.get_index(endpoint_name=endpoint_name, index_name=index_name)\n", + " else:\n", + " print(f\"❌ Error creating {index_name}: {str(e)}\")\n", + " return None\n", + "\n", + "# Create all three indexes\n", + "email_index = create_vs_index(\n", + " vs_endpoint_name, \n", + " email_vs_input_table, \n", + " email_vs_index_name,\n", + ")\n", + "\n", + "notes_index = create_vs_index(\n", + " vs_endpoint_name, \n", + " notes_vs_input_table, \n", + " notes_vs_index_name,\n", + ")\n", + "\n", + "feedback_index = create_vs_index(\n", + " vs_endpoint_name, \n", + " feedback_vs_input_table, \n", + " feedback_vs_index_name,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d1c29ab6-ac44-4da0-9113-c7f60ca2bf6a", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "print(\"Syncing vector search indexes...\")\n", + "\n", + "for index_name in [\n", + " (email_vs_index_name),\n", + " (notes_vs_index_name),\n", + " (feedback_vs_index_name)\n", + "]:\n", + " try:\n", + " client.get_index(endpoint_name=vs_endpoint_name, index_name=index_name).sync()\n", + " print(f\"✅ {index_name} index synced\")\n", + " except Exception as e:\n", + " print(f\"⚠️ Could not sync {index_name} index: {str(e)}\")\n", + "\n", + "print(\"\\n🎉 Vector search setup complete!\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "fb693608-3f0f-48d5-9c76-ac9f9685a356", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": { + "hardware": { + "accelerator": null, + "gpuPoolId": null, + "memory": null + } + }, + "dashboards": [], + "environmentMetadata": { + "base_environment": "dbe_0c235d96-4bc7-4fb5-b118-17fd1dad0124", + "environment_version": "4" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "mostRecentlyExecutedCommandWithImplicitDF": { + "commandId": 6588635690552682, + "dataframes": [ + "_sqldf" + ] + }, + "pythonIndentUnit": 2 + }, + "notebookName": "02-create-vector-index", + "widgets": {} + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/2025-11-Multi-Agent-GenAI-System/03-create-multi-agent-with-genie.ipynb b/2025-11-Multi-Agent-GenAI-System/03-create-multi-agent-with-genie.ipynb new file mode 100644 index 0000000..e73c89a --- /dev/null +++ b/2025-11-Multi-Agent-GenAI-System/03-create-multi-agent-with-genie.ipynb @@ -0,0 +1,876 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "bdb29bed-d9f7-4cf8-ae11-6fcc2b989eea", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%pip install -U -qqq mlflow[databricks]==3.1.1 langgraph==0.5.4 databricks-langchain==0.6.0 databricks-agents==1.1.0 pydantic<2.12.0 uv\n", + "dbutils.library.restartPython()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "db2ef15b-42d8-4e4d-9b2f-5f507ba509bd", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Sales Support Multi-Agent Framework\n", + "\n", + " This notebook creates a multi-agent system for sales support with the following components:\n", + " - **Structured Data Agent**: Queries structured sales data (opportunities, accounts, activities)\n", + " - **Vector search Agent**: Retrieves information from unstructured documents (emails, meeting notes, feedback)\n", + " - **Supervisor**: Routes queries to appropriate agents and orchestrates responses\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "7e1b5b7d-4a97-4626-98d0-6b1cbc627637", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%%writefile agent.py\n", + "\n", + "import warnings\n", + "import os\n", + "os.environ[\"DATABRICKS_DISABLE_NOTICE\"] = \"true\"\n", + "warnings.filterwarnings(\"ignore\", message=\".*notebook authentication token.*\")\n", + "\n", + "import functools\n", + "from typing import Any, Generator, Literal, Optional\n", + "\n", + "import mlflow\n", + "from databricks.sdk import WorkspaceClient\n", + "from databricks.vector_search.client import VectorSearchClient\n", + "from databricks_langchain import ChatDatabricks, VectorSearchRetrieverTool\n", + "from databricks_langchain.genie import GenieAgent\n", + "from langchain_core.runnables import RunnableLambda\n", + "from langgraph.graph import END, StateGraph\n", + "from langgraph.graph.state import CompiledStateGraph\n", + "from langgraph.prebuilt import create_react_agent\n", + "from mlflow.entities import SpanType\n", + "from mlflow.langchain.chat_agent_langgraph import ChatAgentState\n", + "from mlflow.pyfunc import ChatAgent\n", + "from mlflow.types.agent import (\n", + " ChatAgentChunk,\n", + " ChatAgentMessage,\n", + " ChatAgentResponse,\n", + " ChatContext,\n", + ")\n", + "from pydantic import BaseModel\n", + "import time\n", + "\n", + "w = WorkspaceClient()\n", + "token = w.tokens.create(comment=f\"sdk-{time.time_ns()}\").token_value\n", + "\n", + "# Get catalog and schema from environment or config\n", + "CATALOG_NAME = 'andrea_tardif'\n", + "SCHEMA_NAME = 'workday_demos'\n", + "GENIE_SPACE_ID = '01f0c0291e6f1feabc4a8a46085cebd1'\n", + "MLFLOW_EXPERIMENT_NAME = f'multiagent_genie_{CATALOG_NAME}'\n", + "host='https://dbc-d079f94e-4181.cloud.databricks.com/'\n", + "\n", + "###################################################\n", + "## Configure LLM\n", + "###################################################\n", + "llm = ChatDatabricks(endpoint=\"databricks-claude-3-7-sonnet\")\n", + "\n", + "###################################################\n", + "## Create RAG Agent with Vector Search Tools\n", + "###################################################\n", + "\n", + "# Create retriever tools for each document type with disable_notice\n", + "email_retriever = VectorSearchRetrieverTool(\n", + " index_name=f\"{CATALOG_NAME}.{SCHEMA_NAME}.email_communications_index\",\n", + " columns=[\"content\", \"doc_uri\"],\n", + " name=\"email_search\",\n", + " description=(\n", + " \"Searches through email communications between sales reps and customers. \"\n", + " \"Use this to find information about: pricing discussions, objections, \"\n", + " \"follow-ups, proposal details, and customer email correspondence.\"\n", + " ),\n", + " disable_notice=True,\n", + ")\n", + "\n", + "meeting_notes_retriever = VectorSearchRetrieverTool(\n", + " index_name=f\"{CATALOG_NAME}.{SCHEMA_NAME}.meeting_notes_index\",\n", + " columns=[\"content\", \"doc_uri\"],\n", + " name=\"meeting_notes_search\",\n", + " description=(\n", + " \"Searches through meeting notes and call summaries. \"\n", + " \"Use this to find information about: customer meetings, demos, \"\n", + " \"discovery calls, requirements discussions, and decision-maker feedback.\"\n", + " ),\n", + " disable_notice=True,\n", + ")\n", + "\n", + "feedback_retriever = VectorSearchRetrieverTool(\n", + " index_name=f\"{CATALOG_NAME}.{SCHEMA_NAME}.customer_feedback_index\",\n", + " columns=[\"content\", \"doc_uri\"],\n", + " name=\"feedback_search\",\n", + " description=(\n", + " \"Searches through customer feedback and reviews. \"\n", + " \"Use this to find information about: customer satisfaction, \"\n", + " \"product impressions, concerns raised, and post-demo feedback.\"\n", + " ),\n", + " disable_notice=True,\n", + ")\n", + "\n", + "# Combine all RAG tools\n", + "rag_tools = [email_retriever, meeting_notes_retriever, feedback_retriever]\n", + "\n", + "rag_agent_description = (\n", + " \"Specializes in retrieving information from unstructured sales documents including \"\n", + " \"emails, meeting notes, and customer feedback. Use this agent for questions about: \"\n", + " \"customer communications, meeting discussions, feedback and concerns, proposal details, \"\n", + " \"and qualitative sales information.\"\n", + ")\n", + "\n", + "rag_agent = create_react_agent(llm, tools=rag_tools)\n", + "\n", + "###################################################\n", + "## Create SQL Agent for Structured Data\n", + "###################################################\n", + "\n", + "sql_agent_system_prompt = f\"\"\"You are a sales data analyst with access to Workday sales CRM data.\n", + "\n", + "Available tables in {CATALOG_NAME}.{SCHEMA_NAME}:\n", + "- sales_reps: Information about sales representatives\n", + "- customer_accounts: Customer account details and company information\n", + "- sales_opportunities: Sales opportunities and deal pipeline\n", + "- sales_activities: Sales activities and interactions\n", + "\n", + "When asked about structured data like:\n", + "- Sales metrics and KPIs\n", + "- Rep performance and quotas\n", + "- Customer demographics and firmographics\n", + "- Opportunity stages and values\n", + "- Activity tracking and history\n", + "\n", + "Provide analytical insights based on the available structured data tables.\n", + "\"\"\"\n", + "\n", + "genie_description = (\n", + " \"Specializes in analyzing structured sales data from CRM tables. \"\n", + " \"Use this agent for questions about: sales metrics, rep performance, \"\n", + " \"customer demographics, opportunity pipelines, deal values, quota attainment, \"\n", + " \"and quantitative sales analytics.\"\n", + ")\n", + "\n", + "genie_agent = GenieAgent(\n", + " genie_space_id=GENIE_SPACE_ID,\n", + " genie_agent_name=\"Genie\",\n", + " description=genie_description,\n", + " client=WorkspaceClient(\n", + " host=host,\n", + " token=token,\n", + " ),\n", + ")\n", + "\n", + "#############################\n", + "# Define the supervisor agent\n", + "#############################\n", + "\n", + "MAX_ITERATIONS = 4\n", + "\n", + "worker_descriptions = {\n", + " \"GenieAgent\": genie_description,\n", + " \"RAGAgent\": rag_agent_description,\n", + "}\n", + "\n", + "formatted_descriptions = \"\\n\".join(\n", + " f\"- {name}: {desc}\" for name, desc in worker_descriptions.items()\n", + ")\n", + "\n", + "system_prompt = f\"\"\"You are a strategic supervisor coordinating between specialized sales support agents.\n", + "\n", + "Your role is to:\n", + "1. Analyze the user's question to determine which agent(s) can best answer it\n", + "2. Route to the appropriate agent based on the question type\n", + "3. Ensure complete answers without redundant work\n", + "4. Synthesize information from multiple agents if needed\n", + "\n", + "Available agents:\n", + "{formatted_descriptions}\n", + "\n", + "Routing Guidelines:\n", + "- Use GenieAgent for: metrics, numbers, quotas, pipeline values, rep performance, account counts, etc.\n", + "- Use RAGAgent for: customer communications, meeting context, feedback, concerns, proposals, etc.\n", + "- You can route to multiple agents if the question requires both types of information\n", + "\n", + "Only respond with FINISH when:\n", + "- The user's question has been fully answered\n", + "- All necessary information has been gathered and processed\n", + "\n", + "Avoid routing to the same agent multiple times for the same information.\n", + "\n", + "Important:\n", + "- Do not choose FINISH until at least one specialized agent has been invoked.\n", + "- Prefer GenieAgent for numeric/metric queries; RAGAgent for unstructured text queries.\n", + "\"\"\"\n", + "\n", + "options = [\"FINISH\"] + list(worker_descriptions.keys())\n", + "FINISH = {\"next_node\": \"FINISH\"}\n", + "\n", + "@mlflow.trace(span_type=SpanType.AGENT, name=\"supervisor_agent\")\n", + "def supervisor_agent(state):\n", + " count = state.get(\"iteration_count\", 0) + 1\n", + " \n", + " if count > MAX_ITERATIONS:\n", + " return FINISH\n", + " \n", + " class NextNode(BaseModel):\n", + " next_node: Literal[tuple(options)]\n", + "\n", + " preprocessor = RunnableLambda(\n", + " lambda state: [{\"role\": \"system\", \"content\": system_prompt}] + state[\"messages\"]\n", + " )\n", + " supervisor_chain = preprocessor | llm.with_structured_output(NextNode)\n", + " result = supervisor_chain.invoke(state)\n", + " next_node = result.next_node\n", + " \n", + " # Prevent routing to the same node consecutively\n", + " if state.get(\"next_node\") == next_node:\n", + " return FINISH\n", + " \n", + " return {\n", + " \"iteration_count\": count,\n", + " \"next_node\": next_node\n", + " }\n", + "\n", + "#######################################\n", + "# Define multiagent graph structure\n", + "#######################################\n", + "\n", + "def agent_node(state, agent, name):\n", + " \"\"\"Execute agent and return results\"\"\"\n", + " result = agent.invoke({\"messages\": state[\"messages\"]})\n", + " return {\n", + " \"messages\": [\n", + " {\n", + " \"role\": \"assistant\",\n", + " \"content\": result[\"messages\"][-1].content,\n", + " \"name\": name,\n", + " }\n", + " ]\n", + " }\n", + "\n", + "def final_answer(state):\n", + " \"\"\"Generate final synthesized answer\"\"\"\n", + " prompt = (\n", + " \"Based on the information gathered by the specialized agents, \"\n", + " \"provide a comprehensive answer to the user's question. \"\n", + " \"Synthesize insights from all agents and present a clear, helpful response.\"\n", + " )\n", + " preprocessor = RunnableLambda(\n", + " lambda state: state[\"messages\"] + [{\"role\": \"user\", \"content\": prompt}]\n", + " )\n", + " final_answer_chain = preprocessor | llm\n", + " return {\"messages\": [final_answer_chain.invoke(state)]}\n", + "\n", + "class AgentState(ChatAgentState):\n", + " next_node: str\n", + " iteration_count: int\n", + "\n", + "# Create agent nodes\n", + "rag_node = functools.partial(agent_node, agent=rag_agent, name=\"RAGAgent\")\n", + "genie_node = functools.partial(agent_node, agent=genie_agent, name=\"GenieAgent\")\n", + "\n", + "# Build the workflow graph\n", + "workflow = StateGraph(AgentState)\n", + "workflow.add_node(\"GenieAgent\", genie_node)\n", + "workflow.add_node(\"RAGAgent\", rag_node)\n", + "workflow.add_node(\"supervisor\", supervisor_agent)\n", + "workflow.add_node(\"final_answer\", final_answer)\n", + "\n", + "workflow.set_entry_point(\"supervisor\")\n", + "\n", + "# Workers report back to supervisor\n", + "for worker in worker_descriptions.keys():\n", + " workflow.add_edge(worker, \"supervisor\")\n", + "\n", + "# Supervisor decides next node\n", + "workflow.add_conditional_edges(\n", + " \"supervisor\",\n", + " lambda x: x[\"next_node\"],\n", + " {**{k: k for k in worker_descriptions.keys()}, \"FINISH\": \"final_answer\"},\n", + ")\n", + "\n", + "workflow.add_edge(\"final_answer\", END)\n", + "multi_agent = workflow.compile()\n", + "\n", + "###################################\n", + "# Wrap in Databricks ChatAgent\n", + "###################################\n", + "\n", + "class LangGraphChatAgent(ChatAgent):\n", + " def __init__(self, agent: CompiledStateGraph):\n", + " self.agent = agent\n", + "\n", + " def predict(\n", + " self,\n", + " messages: list[ChatAgentMessage],\n", + " context: Optional[ChatContext] = None,\n", + " custom_inputs: Optional[dict[str, Any]] = None,\n", + " ) -> ChatAgentResponse:\n", + " request = {\n", + " \"messages\": [m.model_dump_compat(exclude_none=True) for m in messages]\n", + " }\n", + "\n", + " messages = []\n", + " for event in self.agent.stream(request, stream_mode=\"updates\"):\n", + " for node_data in event.values():\n", + " messages.extend(\n", + " ChatAgentMessage(**msg) for msg in node_data.get(\"messages\", [])\n", + " )\n", + " return ChatAgentResponse(messages=messages)\n", + "\n", + " def predict_stream(\n", + " self,\n", + " messages: list[ChatAgentMessage],\n", + " context: Optional[ChatContext] = None,\n", + " custom_inputs: Optional[dict[str, Any]] = None,\n", + " ) -> Generator[ChatAgentChunk, None, None]:\n", + " request = {\n", + " \"messages\": [m.model_dump_compat(exclude_none=True) for m in messages]\n", + " }\n", + " for event in self.agent.stream(request, stream_mode=\"updates\"):\n", + " for node_data in event.values():\n", + " yield from (\n", + " ChatAgentChunk(**{\"delta\": msg})\n", + " for msg in node_data.get(\"messages\", [])\n", + " )\n", + "\n", + "# Create the agent\n", + "mlflow.langchain.autolog()\n", + "AGENT = LangGraphChatAgent(multi_agent)\n", + "mlflow.models.set_model(AGENT)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1eb839e2-725b-4ab0-80a2-f4a7c8fb4080", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "import mlflow\n", + "from agent import *\n", + "from databricks.sdk import WorkspaceClient \n", + "\n", + "w = WorkspaceClient()\n", + "current_user = w.current_user.me()\n", + "\n", + "experiment_fqdn = (\n", + " f\"/Workspace/Users/{current_user.user_name}/{MLFLOW_EXPERIMENT_NAME}\"\n", + ")\n", + "\n", + "# Check if the experiment exists\n", + "experiment = mlflow.get_experiment_by_name(experiment_fqdn)\n", + "\n", + "if experiment:\n", + " experiment_id = experiment.experiment_id\n", + "else:\n", + " # Create the experiment if it does not exist\n", + " experiment_id = mlflow.create_experiment(experiment_fqdn)\n", + "\n", + "mlflow.set_experiment(experiment_fqdn)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "5643ba67-edb0-4d98-af82-e0190bc97b5b", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "from IPython.display import Image, display\n", + "\n", + "display(Image(AGENT.agent.get_graph().draw_mermaid_png()))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "e0ff325d-72b3-4b69-8b40-e6edf7ed7b98", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "%md\n", + "\n", + "### Test the LangGraph Agent" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2e121c60-ca5c-4f8e-b898-3569ca1cb4c4", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "sample_questions = [\n", + " \"How many outbound emails were sent by each sales rep last quarter?\",\n", + " \"Summarize the main concerns customers raised about implementation delays.\",\n", + " \"Extract any customer feedback that praises response time or support quality.\"\n", + "]\n", + "\n", + "input_example = {\n", + " \"messages\": [\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": sample_questions[0],\n", + " }\n", + " ]\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e6f88001-3798-4965-9474-cdbe85292ade", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "response = AGENT.predict(input_example)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1b476203-0f02-49a8-bd7b-20bc5bfabe2e", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "input_example = {\n", + " \"messages\": [\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": sample_questions[1],\n", + " }\n", + " ]\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "991d46c9-db33-437b-b4ba-a9c06887eef6", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "response = AGENT.predict(input_example)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "5b86bd96-a575-4aa9-b52b-7a8539cb8187", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "input_example = {\n", + " \"messages\": [\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": sample_questions[2],\n", + " }\n", + " ]\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c4b474b2-96a7-430c-bd08-8b18367b8df1", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "response = AGENT.predict(input_example)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "3a4c5be2-8644-4f55-afcc-61a2c7bdabb5", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "%md\n", + "\n", + "### Log into MLFlow" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9268b3c7-f1a1-4246-980e-66f8c39c75a3", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Determine Databricks resources to specify for automatic auth passthrough at deployment time\n", + "import mlflow\n", + "from pkg_resources import get_distribution\n", + "from databricks_langchain import UnityCatalogTool, VectorSearchRetrieverTool\n", + "from mlflow.models.resources import (\n", + " DatabricksFunction,\n", + " DatabricksGenieSpace,\n", + " DatabricksServingEndpoint,\n", + " DatabricksVectorSearchIndex,\n", + " DatabricksSQLWarehouse,\n", + " DatabricksTable,\n", + ")\n", + "\n", + "resources = [\n", + " DatabricksServingEndpoint(endpoint_name=\"databricks-claude-3-7-sonnet\"),\n", + " DatabricksGenieSpace(genie_space_id=GENIE_SPACE_ID),\n", + " DatabricksSQLWarehouse(warehouse_id='4b9b953939869799'),\n", + "]\n", + "\n", + "# for tool in tools:\n", + "# if isinstance(tool, VectorSearchRetrieverTool):\n", + "# resources.extend(tool.resources)\n", + "# elif isinstance(tool, UnityCatalogTool):\n", + "# resources.append(DatabricksFunction(function_name=tool.uc_function_name))\n", + "\n", + "with mlflow.start_run():\n", + " logged_agent_info = mlflow.pyfunc.log_model(\n", + " name=f\"multi_agent_at\",\n", + " python_model=\"agent.py\",\n", + " # model_config=os.path.join(os.getcwd(), \"00-init-requirements\"),\n", + " input_example=input_example,\n", + " resources=resources,\n", + " pip_requirements=[\n", + " f\"databricks-connect=={get_distribution('databricks-connect').version}\",\n", + " f\"mlflow=={get_distribution('mlflow').version}\",\n", + " f\"databricks-langchain=={get_distribution('databricks-langchain').version}\",\n", + " f\"langgraph=={get_distribution('langgraph').version}\",\n", + " ]\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "038afed1-cfe8-4cda-bde7-4842d39b06fe", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "\n", + "### Local Validation" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a7dff315-894b-47e2-bf33-c13dd479b66c", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "input_example = {\n", + " 'messages': [\n", + " {'role': 'user',\n", + " 'content': 'provide a summary of what do our policies say about customers eligible for promotions but receiving poor service?'\n", + " }\n", + " ]\n", + " }\n", + " \n", + "response = mlflow.models.predict(\n", + " model_uri=f\"runs:/{logged_agent_info.run_id}/multi_agent_at\",\n", + " input_data=input_example,\n", + " env_manager=\"uv\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b59c2cd8-e94e-4b76-9166-92b4ade3fdd8", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "\n", + "### Register Model" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "479f654b-4dbb-40c8-9a13-5e5ca87f8bd4", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "mlflow.set_registry_uri(\"databricks-uc\")\n", + "\n", + "# Define the catalog, schema, and model name for your UC model\n", + "catalog = catalog_name\n", + "schema = schema_name\n", + "model_name = f\"multi_agent_demo_{catalog_name}\"\n", + "UC_MODEL_NAME = f\"{catalog}.{schema}.{model_name}\"\n", + "\n", + "# register the model to UC\n", + "uc_registered_model_info = mlflow.register_model(\n", + " model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "20b3882e-1d8e-4b46-b0e6-cd0edd94863d", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "\n", + "### Create Endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d88590e5-c50d-43f6-9cb3-405447f8cf9b", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "from databricks import agents\n", + "\n", + "agents.deploy(\n", + " UC_MODEL_NAME,\n", + " uc_registered_model_info.version,\n", + " tags={\"endpointSource\": \"playground\"},\n", + " endpoint_name = f\"multi_agent_{catalog_name}\",\n", + " environment_vars={\n", + " \"DATABRICKS_GENIE_PAT\": token\n", + " },\n", + ")" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": { + "hardware": { + "accelerator": null, + "gpuPoolId": null, + "memory": null + } + }, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "2" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "mostRecentlyExecutedCommandWithImplicitDF": { + "commandId": -1, + "dataframes": [ + "_sqldf" + ] + }, + "pythonIndentUnit": 4 + }, + "notebookName": "03-create-multi-agent-with-genie", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/2025-11-Multi-Agent-GenAI-System/agent.py b/2025-11-Multi-Agent-GenAI-System/agent.py new file mode 100644 index 0000000..2eb22c3 --- /dev/null +++ b/2025-11-Multi-Agent-GenAI-System/agent.py @@ -0,0 +1,315 @@ + +import warnings +import os +os.environ["DATABRICKS_DISABLE_NOTICE"] = "true" +warnings.filterwarnings("ignore", message=".*notebook authentication token.*") + +import functools +from typing import Any, Generator, Literal, Optional + +import mlflow +from databricks.sdk import WorkspaceClient +from databricks.vector_search.client import VectorSearchClient +from databricks_langchain import ChatDatabricks, VectorSearchRetrieverTool +from databricks_langchain.genie import GenieAgent +from langchain_core.runnables import RunnableLambda +from langgraph.graph import END, StateGraph +from langgraph.graph.state import CompiledStateGraph +from langgraph.prebuilt import create_react_agent +from mlflow.entities import SpanType +from mlflow.langchain.chat_agent_langgraph import ChatAgentState +from mlflow.pyfunc import ChatAgent +from mlflow.types.agent import ( + ChatAgentChunk, + ChatAgentMessage, + ChatAgentResponse, + ChatContext, +) +from pydantic import BaseModel +import time + +w = WorkspaceClient() +token = w.tokens.create(comment=f"sdk-{time.time_ns()}").token_value + +# Get catalog and schema from environment or config +CATALOG_NAME = 'andrea_tardif' +SCHEMA_NAME = 'workday_demos' +GENIE_SPACE_ID = '01f0c0291e6f1feabc4a8a46085cebd1' +MLFLOW_EXPERIMENT_NAME = f'multiagent_genie_{CATALOG_NAME}' +host='https://dbc-d079f94e-4181.cloud.databricks.com/' + +################################################### +## Configure LLM +################################################### +llm = ChatDatabricks(endpoint="databricks-claude-3-7-sonnet") + +################################################### +## Create RAG Agent with Vector Search Tools +################################################### + +# Create retriever tools for each document type with disable_notice +email_retriever = VectorSearchRetrieverTool( + index_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.email_communications_index", + columns=["content", "doc_uri"], + name="email_search", + description=( + "Searches through email communications between sales reps and customers. " + "Use this to find information about: pricing discussions, objections, " + "follow-ups, proposal details, and customer email correspondence." + ), + disable_notice=True, +) + +meeting_notes_retriever = VectorSearchRetrieverTool( + index_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.meeting_notes_index", + columns=["content", "doc_uri"], + name="meeting_notes_search", + description=( + "Searches through meeting notes and call summaries. " + "Use this to find information about: customer meetings, demos, " + "discovery calls, requirements discussions, and decision-maker feedback." + ), + disable_notice=True, +) + +feedback_retriever = VectorSearchRetrieverTool( + index_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.customer_feedback_index", + columns=["content", "doc_uri"], + name="feedback_search", + description=( + "Searches through customer feedback and reviews. " + "Use this to find information about: customer satisfaction, " + "product impressions, concerns raised, and post-demo feedback." + ), + disable_notice=True, +) + +# Combine all RAG tools +rag_tools = [email_retriever, meeting_notes_retriever, feedback_retriever] + +rag_agent_description = ( + "Specializes in retrieving information from unstructured sales documents including " + "emails, meeting notes, and customer feedback. Use this agent for questions about: " + "customer communications, meeting discussions, feedback and concerns, proposal details, " + "and qualitative sales information." +) + +rag_agent = create_react_agent(llm, tools=rag_tools) + +################################################### +## Create SQL Agent for Structured Data +################################################### + +sql_agent_system_prompt = f"""You are a sales data analyst with access to Workday sales CRM data. + +Available tables in {CATALOG_NAME}.{SCHEMA_NAME}: +- sales_reps: Information about sales representatives +- customer_accounts: Customer account details and company information +- sales_opportunities: Sales opportunities and deal pipeline +- sales_activities: Sales activities and interactions + +When asked about structured data like: +- Sales metrics and KPIs +- Rep performance and quotas +- Customer demographics and firmographics +- Opportunity stages and values +- Activity tracking and history + +Provide analytical insights based on the available structured data tables. +""" + +genie_description = ( + "Specializes in analyzing structured sales data from CRM tables. " + "Use this agent for questions about: sales metrics, rep performance, " + "customer demographics, opportunity pipelines, deal values, quota attainment, " + "and quantitative sales analytics." +) + +genie_agent = GenieAgent( + genie_space_id=GENIE_SPACE_ID, + genie_agent_name="Genie", + description=genie_description, + client=WorkspaceClient( + host=host, + token=token, + ), +) + +############################# +# Define the supervisor agent +############################# + +MAX_ITERATIONS = 4 + +worker_descriptions = { + "GenieAgent": genie_description, + "RAGAgent": rag_agent_description, +} + +formatted_descriptions = "\n".join( + f"- {name}: {desc}" for name, desc in worker_descriptions.items() +) + +system_prompt = f"""You are a strategic supervisor coordinating between specialized sales support agents. + +Your role is to: +1. Analyze the user's question to determine which agent(s) can best answer it +2. Route to the appropriate agent based on the question type +3. Ensure complete answers without redundant work +4. Synthesize information from multiple agents if needed + +Available agents: +{formatted_descriptions} + +Routing Guidelines: +- Use GenieAgent for: metrics, numbers, quotas, pipeline values, rep performance, account counts, etc. +- Use RAGAgent for: customer communications, meeting context, feedback, concerns, proposals, etc. +- You can route to multiple agents if the question requires both types of information + +Only respond with FINISH when: +- The user's question has been fully answered +- All necessary information has been gathered and processed + +Avoid routing to the same agent multiple times for the same information. + +Important: +- Do not choose FINISH until at least one specialized agent has been invoked. +- Prefer GenieAgent for numeric/metric queries; RAGAgent for unstructured text queries. +""" + +options = ["FINISH"] + list(worker_descriptions.keys()) +FINISH = {"next_node": "FINISH"} + +@mlflow.trace(span_type=SpanType.AGENT, name="supervisor_agent") +def supervisor_agent(state): + count = state.get("iteration_count", 0) + 1 + + if count > MAX_ITERATIONS: + return FINISH + + class NextNode(BaseModel): + next_node: Literal[tuple(options)] + + preprocessor = RunnableLambda( + lambda state: [{"role": "system", "content": system_prompt}] + state["messages"] + ) + supervisor_chain = preprocessor | llm.with_structured_output(NextNode) + result = supervisor_chain.invoke(state) + next_node = result.next_node + + # Prevent routing to the same node consecutively + if state.get("next_node") == next_node: + return FINISH + + return { + "iteration_count": count, + "next_node": next_node + } + +####################################### +# Define multiagent graph structure +####################################### + +def agent_node(state, agent, name): + """Execute agent and return results""" + result = agent.invoke({"messages": state["messages"]}) + return { + "messages": [ + { + "role": "assistant", + "content": result["messages"][-1].content, + "name": name, + } + ] + } + +def final_answer(state): + """Generate final synthesized answer""" + prompt = ( + "Based on the information gathered by the specialized agents, " + "provide a comprehensive answer to the user's question. " + "Synthesize insights from all agents and present a clear, helpful response." + ) + preprocessor = RunnableLambda( + lambda state: state["messages"] + [{"role": "user", "content": prompt}] + ) + final_answer_chain = preprocessor | llm + return {"messages": [final_answer_chain.invoke(state)]} + +class AgentState(ChatAgentState): + next_node: str + iteration_count: int + +# Create agent nodes +rag_node = functools.partial(agent_node, agent=rag_agent, name="RAGAgent") +genie_node = functools.partial(agent_node, agent=genie_agent, name="GenieAgent") + +# Build the workflow graph +workflow = StateGraph(AgentState) +workflow.add_node("GenieAgent", genie_node) +workflow.add_node("RAGAgent", rag_node) +workflow.add_node("supervisor", supervisor_agent) +workflow.add_node("final_answer", final_answer) + +workflow.set_entry_point("supervisor") + +# Workers report back to supervisor +for worker in worker_descriptions.keys(): + workflow.add_edge(worker, "supervisor") + +# Supervisor decides next node +workflow.add_conditional_edges( + "supervisor", + lambda x: x["next_node"], + {**{k: k for k in worker_descriptions.keys()}, "FINISH": "final_answer"}, +) + +workflow.add_edge("final_answer", END) +multi_agent = workflow.compile() + +################################### +# Wrap in Databricks ChatAgent +################################### + +class LangGraphChatAgent(ChatAgent): + def __init__(self, agent: CompiledStateGraph): + self.agent = agent + + def predict( + self, + messages: list[ChatAgentMessage], + context: Optional[ChatContext] = None, + custom_inputs: Optional[dict[str, Any]] = None, + ) -> ChatAgentResponse: + request = { + "messages": [m.model_dump_compat(exclude_none=True) for m in messages] + } + + messages = [] + for event in self.agent.stream(request, stream_mode="updates"): + for node_data in event.values(): + messages.extend( + ChatAgentMessage(**msg) for msg in node_data.get("messages", []) + ) + return ChatAgentResponse(messages=messages) + + def predict_stream( + self, + messages: list[ChatAgentMessage], + context: Optional[ChatContext] = None, + custom_inputs: Optional[dict[str, Any]] = None, + ) -> Generator[ChatAgentChunk, None, None]: + request = { + "messages": [m.model_dump_compat(exclude_none=True) for m in messages] + } + for event in self.agent.stream(request, stream_mode="updates"): + for node_data in event.values(): + yield from ( + ChatAgentChunk(**{"delta": msg}) + for msg in node_data.get("messages", []) + ) + +# Create the agent +mlflow.langchain.autolog() +AGENT = LangGraphChatAgent(multi_agent) +mlflow.models.set_model(AGENT) diff --git a/2025-11-Multi-Agent-GenAI-System/manifest.mf b/2025-11-Multi-Agent-GenAI-System/manifest.mf new file mode 100644 index 0000000..87a1c0c --- /dev/null +++ b/2025-11-Multi-Agent-GenAI-System/manifest.mf @@ -0,0 +1 @@ +{"version":"Manifest","guid":"17da002d-cbe9-4201-a8d5-f2c41d82f25d","origId":-1,"name":"manifest.mf"} \ No newline at end of file