diff --git a/FLEXUS_BOT_REFERENCE.md b/FLEXUS_BOT_REFERENCE.md index 847621f..b5376cf 100644 --- a/FLEXUS_BOT_REFERENCE.md +++ b/FLEXUS_BOT_REFERENCE.md @@ -397,6 +397,10 @@ Use `prompts_common.SCHED_TASK_SORT_10M` and `prompts_common.SCHED_TODO_5M` as d | `@rcx.on_tool_call("name")` | `async def(toolcall, args) -> str` | | `@rcx.on_erp_change("table")` | `async def(action, new_record, old_record)` | +**ERP action types**: `"INSERT"`, `"UPDATE"`, `"DELETE"`, `"ARCHIVE"` +- `ARCHIVE`: soft delete (archived_ts: 0 → >0) +- `DELETE`: hard delete (removed from db) + --- ## Setup Schema diff --git a/flexus_client_kit/ckit_bot_exec.py b/flexus_client_kit/ckit_bot_exec.py index 95efba3..462a228 100644 --- a/flexus_client_kit/ckit_bot_exec.py +++ b/flexus_client_kit/ckit_bot_exec.py @@ -494,7 +494,7 @@ async def subscribe_and_produce_callbacks( elif upd.news_about.startswith("erp."): table_name = upd.news_about[4:] - if upd.news_action in ["INSERT", "UPDATE", "DELETE"]: + if upd.news_action in ["INSERT", "UPDATE", "DELETE", "ARCHIVE"]: handled = True new_record = upd.news_payload_erp_record_new old_record = upd.news_payload_erp_record_old diff --git a/flexus_client_kit/ckit_erp.py b/flexus_client_kit/ckit_erp.py index ed28b11..eb52e9a 100644 --- a/flexus_client_kit/ckit_erp.py +++ b/flexus_client_kit/ckit_erp.py @@ -4,7 +4,7 @@ import json import gql -from flexus_client_kit import ckit_client, gql_utils +from flexus_client_kit import ckit_client, gql_utils, erp_schema T = TypeVar('T') @@ -19,6 +19,26 @@ def dataclass_or_dict_to_dict(x: Any) -> dict: raise ValueError(f"must be a dataclass or dict, got {type(x)}") +async def get_erp_table_meta( + client: ckit_client.FlexusClient, + table_name: str, +) -> erp_schema.ErpTableMeta: + http = await client.use_http() + async with http as h: + r = await h.execute( + gql.gql(f"""query ErpTableMeta($schema_name: String!, $table_name: String!) {{ + erp_table_meta(schema_name: $schema_name, table_name: $table_name) {{ + {gql_utils.gql_fields(erp_schema.ErpTableMeta)} + }} + }}"""), + variable_values={ + "schema_name": "erp", + "table_name": table_name, + }, + ) + return gql_utils.dataclass_from_dict(r["erp_table_meta"], erp_schema.ErpTableMeta) + + async def query_erp_table( client: ckit_client.FlexusClient, table_name: str, @@ -141,6 +161,33 @@ async def delete_erp_record( return r["erp_table_delete"] +async def batch_upsert_erp_records( + client: ckit_client.FlexusClient, + table_name: str, + ws_id: str, + upsert_key: str, + records: List[Any], +) -> dict: + http = await client.use_http() + async with http as h: + r = await h.execute(gql.gql(""" + mutation ErpTableBatchUpsert($schema_name: String!, $table_name: String!, $ws_id: String!, $upsert_key: String!, $records_json: String!) { + erp_table_batch_upsert(schema_name: $schema_name, table_name: $table_name, ws_id: $ws_id, upsert_key: $upsert_key, records_json: $records_json) + }"""), + variable_values={ + "schema_name": "erp", + "table_name": table_name, + "ws_id": ws_id, + "upsert_key": upsert_key, + "records_json": json.dumps([dataclass_or_dict_to_dict(r) for r in records]), + }, + ) + result = r["erp_table_batch_upsert"] + if isinstance(result, str): + return json.loads(result) + return result + + def check_record_matches_filters(record: dict, filters: List[Union[str, dict]], col_names: set = None) -> bool: """ Check if a record (dict) matches all filters. @@ -296,18 +343,24 @@ def check_record_matches_filter(record: dict, f: str, col_names: set = None) -> async def test(): - from flexus_client_kit.erp_schema import ProductTemplate, ProductProduct client = ckit_client.FlexusClient("ckit_erp_test") ws_id = "solarsystem" + + meta = await get_erp_table_meta(client, "product_product") + print(f"Table: {meta.table_name}") + print(f"Primary key: {meta.table_pk}") + print(f"Columns: {len(meta.table_columns)}") + print(f"Relations: {len(meta.table_outbound_rels)}") + products = await query_erp_table( client, "product_product", ws_id, - ProductProduct, + erp_schema.ProductProduct, limit=10, include=["prodt"], ) - print(f"Found {len(products)} products:") + print(f"\nFound {len(products)} products:") for p in products: print(p) diff --git a/flexus_client_kit/erp_schema.py b/flexus_client_kit/erp_schema.py index 3f76b00..561dec3 100644 --- a/flexus_client_kit/erp_schema.py +++ b/flexus_client_kit/erp_schema.py @@ -3,6 +3,29 @@ from typing import Optional, Dict, Type, List +@dataclass +class ErpColumnMeta: + column_name: str + column_type: str + column_nullable: bool + column_default: Optional[str] = None + + +@dataclass +class ErpRelationMeta: + rel_column: str + rel_fk_table: str + rel_fk_column: str + + +@dataclass +class ErpTableMeta: + table_name: str + table_pk: str + table_columns: List[ErpColumnMeta] + table_outbound_rels: List[ErpRelationMeta] + + @dataclass class CrmContact: ws_id: str @@ -128,7 +151,6 @@ class ProductM2mTemplateTag: "product_m2m_template_tag": ProductM2mTemplateTag, } - ERP_DEFAULT_VISIBLE_FIELDS: Dict[str, List[str]] = { "crm_contact": [ "contact_first_name", @@ -179,4 +201,3 @@ class ProductM2mTemplateTag: "uom_active", ], } - diff --git a/flexus_client_kit/integrations/fi_crm_automations.py b/flexus_client_kit/integrations/fi_crm_automations.py index f3d8ec9..0205917 100644 --- a/flexus_client_kit/integrations/fi_crm_automations.py +++ b/flexus_client_kit/integrations/fi_crm_automations.py @@ -60,6 +60,7 @@ CRM_AUTOMATION_TOOL = ckit_cloudtool.CloudTool( + strict=False, name="crm_automation", description="Manage CRM automations. Start with op='help' to see complete documentation. IMPORTANT: Never use flexus_my_setup to modify 'crm_automations' - only use this tool!", parameters={ diff --git a/flexus_client_kit/integrations/fi_erp.py b/flexus_client_kit/integrations/fi_erp.py index cbf985d..3aab230 100644 --- a/flexus_client_kit/integrations/fi_erp.py +++ b/flexus_client_kit/integrations/fi_erp.py @@ -1,7 +1,10 @@ +import csv +import dataclasses +import io import json import time import logging -from typing import Dict, Any, Optional, List +from typing import Dict, Any, Optional, List, Type, Union, get_origin, get_args from pymongo.collection import Collection import gql.transport.exceptions @@ -85,6 +88,25 @@ ) +ERP_CSV_IMPORT_TOOL = ckit_cloudtool.CloudTool( + strict=False, + name="erp_csv_import", + description=( + "Import a normalized CSV (columns must match ERP table fields) stored via mongo_store. " + "Provide mongo_path of the CSV, target table_name, and an optional upsert_key column." + ), + parameters={ + "type": "object", + "properties": { + "table_name": {"type": "string", "description": "Target ERP table name", "order": 1}, + "mongo_path": {"type": "string", "description": "Path of the CSV stored via mongo_store or python_execute artifacts", "order": 2}, + "upsert_key": {"type": "string", "description": "Column used to detect existing records (e.g., contact_email). Leave blank to always create.", "order": 3}, + }, + "required": ["table_name", "mongo_path"], + }, +) + + def _format_table_meta_text(table_name: str, schema_class: type) -> str: result = f"Table: erp.{table_name}\n" result += "\nColumns:\n" @@ -137,6 +159,42 @@ def _rows_to_text(rows: list, table_name: str, safety_valve_chars: int = 5000) - return "\n".join(result), full_json +def _resolve_field_type(field_type: Optional[Type[Any]]) -> Optional[Type[Any]]: + if not field_type: + return None + origin = get_origin(field_type) + if origin is Union: + if non_none := [arg for arg in get_args(field_type) if arg is not type(None)]: + return _resolve_field_type(non_none[0]) + if origin in (list, dict): + return origin + return field_type + + +def _convert_csv_value(raw_value: str, field_type: Optional[Type[Any]]) -> Any: + value = raw_value.strip() + if value == "": + return None + normalized_type = _resolve_field_type(field_type) + if normalized_type is bool: + lowered = value.lower() + if lowered in ("true", "1", "yes", "y"): + return True + if lowered in ("false", "0", "no", "n"): + return False + raise ValueError(f"Value {value!r} is not a valid boolean") + if normalized_type is int: + return int(value) + if normalized_type is float: + return float(value) + if normalized_type in (list, dict): + try: + return json.loads(value) + except json.JSONDecodeError as e: + raise ValueError(f"Expected JSON for {normalized_type.__name__}: {e}") + return value + + class IntegrationErp: def __init__( self, @@ -239,7 +297,7 @@ async def handle_erp_data(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: D display_text, full_json = _rows_to_text(rows_as_dicts, table_name, safety_valve_chars) - if full_json and self.mongo_collection: + if full_json and self.mongo_collection is not None: mongo_path = f"erp_query_results/{table_name}_{int(time.time())}.json" try: await ckit_mongo.mongo_overwrite( @@ -341,3 +399,95 @@ async def handle_erp_crud(self, toolcall: ckit_cloudtool.FCloudtoolCall, model_p else: return f"❌ Error: Unknown operation '{op}'. Use create, patch, or delete." + + + async def handle_csv_import(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str: + if self.mongo_collection is None: + return "❌ Cannot read CSV because MongoDB storage is unavailable for this bot." + + if not (table_name := args.get("table_name", "").strip()) or not (mongo_path := args.get("mongo_path", "").strip()): + return "❌ table_name and mongo_path are required" + upsert_key = args.get("upsert_key", "").strip() + + if not (schema_class := erp_schema.ERP_TABLE_TO_SCHEMA.get(table_name)): + return f"❌ Unknown table '{table_name}'. Run erp_table_meta for available tables." + meta = await ckit_erp.get_erp_table_meta(self.client, table_name) + pk_field = meta.table_pk + + if not (document := await ckit_mongo.mongo_retrieve_file(self.mongo_collection, mongo_path)): + return f"❌ File {mongo_path!r} not found in MongoDB." + + if not (file_bytes := document.get("data") or (json.dumps(document["json"]).encode("utf-8") if document.get("json") is not None else None)): + return f"❌ File {mongo_path!r} is empty." + + try: + csv_text = file_bytes.decode("utf-8-sig") + except UnicodeDecodeError: + return "❌ CSV must be UTF-8 encoded." + + reader = csv.DictReader(io.StringIO(csv_text)) + if not reader.fieldnames: + return "❌ CSV header row is missing." + reader.fieldnames = trimmed_headers = [(name or "").strip() for name in reader.fieldnames] + + allowed_fields = set(schema_class.__annotations__.keys()) + details_field = next((f for f in allowed_fields if f.endswith("_details")), None) + + if unknown_headers := [h for h in trimmed_headers if h and h not in allowed_fields]: + fix_hint = f"Fix: Remove them, add to '{details_field}' as JSON, or map to existing columns." if details_field else "Fix: Remove them or map to existing columns (use erp_table_meta to see valid columns)." + return f"❌ Unknown columns: {', '.join(unknown_headers)}\n\n{fix_hint}" + + if upsert_key and upsert_key not in trimmed_headers: + return f"❌ upsert_key '{upsert_key}' is not present in the CSV header." + + field_types = schema_class.__annotations__ + required_fields = {name for name, field_info in schema_class.__dataclass_fields__.items() if field_info.default == dataclasses.MISSING and field_info.default_factory == dataclasses.MISSING and name != pk_field and name != "ws_id"} + + errors: List[str] = [] + records = [] + for row_idx, row in enumerate(reader, start=1): + try: + record = {} + for column in trimmed_headers: + if column and column != pk_field and (raw_value := str(row.get(column, "")).strip()): + record[column] = _convert_csv_value(raw_value, field_types.get(column)) + + if "ws_id" in allowed_fields and not record.get("ws_id"): + record["ws_id"] = self.ws_id + + if upsert_key and not (key_value := str(row.get(upsert_key, "")).strip()): + raise ValueError(f"Missing value for upsert_key '{upsert_key}'") + + if missing := required_fields - record.keys(): + raise ValueError(f"Missing required fields: {', '.join(sorted(missing))}") + + records.append(record) + except Exception as e: + errors.append(f"Row {row_idx}: {e}") + + BATCH_SIZE = 1000 + total_created = total_updated = 0 + total_failed = sum(1 for e in errors if e.startswith('Row ')) + + for i in range(0, len(records), BATCH_SIZE): + try: + result = await ckit_erp.batch_upsert_erp_records(self.client, table_name, self.ws_id, upsert_key or "", records[i:i+BATCH_SIZE]) + total_created += result.get("created", 0) + total_updated += result.get("updated", 0) + total_failed += result.get("failed", 0) + errors.extend(f"Batch {i//BATCH_SIZE + 1}: {err}" for err in result.get("errors", [])) + except Exception as e: + total_failed += len(records[i:i+BATCH_SIZE]) + errors.append(f"Batch {i//BATCH_SIZE + 1} failed: {e}", exc_info=True) + + lines = [ + f"Processed {len(records) + sum(1 for e in errors if e.startswith('Row '))} row(s) from {mongo_path}.", + f"Created: {total_created}, Updated: {total_updated}, Failed: {total_failed}.", + ] + if errors: + lines.append("Errors:") + lines.extend(f" • {err}" for err in errors[:5]) + if len(errors) > 5: + lines.append(f" …and {len(errors) - 5} more errors.") + + return "\n".join(lines) diff --git a/flexus_client_kit/integrations/fi_gmail.py b/flexus_client_kit/integrations/fi_gmail.py index e9564af..1646df6 100644 --- a/flexus_client_kit/integrations/fi_gmail.py +++ b/flexus_client_kit/integrations/fi_gmail.py @@ -23,6 +23,7 @@ logger = logging.getLogger("gmail") GMAIL_TOOL = ckit_cloudtool.CloudTool( + strict=False, name="gmail", description="Interact with Gmail, call with op=\"help\" to print usage", parameters={ diff --git a/flexus_simple_bots/rick/rick_bot.py b/flexus_simple_bots/rick/rick_bot.py index 2ccbd4b..a1937da 100644 --- a/flexus_simple_bots/rick/rick_bot.py +++ b/flexus_simple_bots/rick/rick_bot.py @@ -36,6 +36,7 @@ fi_erp.ERP_TABLE_META_TOOL, fi_erp.ERP_TABLE_DATA_TOOL, fi_erp.ERP_TABLE_CRUD_TOOL, + fi_erp.ERP_CSV_IMPORT_TOOL, fi_mongo_store.MONGO_STORE_TOOL, fi_crm_automations.CRM_AUTOMATION_TOOL, ] @@ -91,6 +92,10 @@ async def toolcall_erp_data(toolcall: ckit_cloudtool.FCloudtoolCall, model_produ async def toolcall_erp_crud(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str: return await erp_integration.handle_erp_crud(toolcall, model_produced_args) + @rcx.on_tool_call(fi_erp.ERP_CSV_IMPORT_TOOL.name) + async def toolcall_erp_csv_import(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str: + return await erp_integration.handle_csv_import(toolcall, model_produced_args) + @rcx.on_tool_call(fi_mongo_store.MONGO_STORE_TOOL.name) async def toolcall_mongo_store(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str: return await fi_mongo_store.handle_mongo_store(rcx.workdir, mongo_collection, toolcall, model_produced_args) diff --git a/flexus_simple_bots/rick/rick_prompts.py b/flexus_simple_bots/rick/rick_prompts.py index 3f12be6..245c967 100644 --- a/flexus_simple_bots/rick/rick_prompts.py +++ b/flexus_simple_bots/rick/rick_prompts.py @@ -1,20 +1,12 @@ from flexus_simple_bots import prompts_common from flexus_client_kit.integrations import fi_crm_automations -crm_prompt = f""" -Use erp_table_*() tools to interact with the CRM. -CRM tables always start with the prefix "crm_", such as crm_contact or crm_task. - -Contacts will be ingested very often from forms in landing pages or main websites, or imported from other systems. -Tasks are a short actionable item linked to a contact that some bot or human needs to do, like an email, follow-up or call. - -Extra fields that are not defined in the database schema will be in details, e.x. in contact_details, or task_details. - +crm_import_landing_pages_prompt = """ ## Importing Contacts from Landing Pages When users ask about importing contacts from landing pages or website forms, explain they need their form to POST to: -https://flexus.team/api/erp-ingest/crm-contact/{{{{ws_id}}}} +https://flexus.team/api/erp-ingest/crm-contact/{{ws_id}} Required fields: - contact_email @@ -37,6 +29,95 @@ ``` """ +crm_import_csv_prompt = """ +## Bulk Importing Records from CSV + +When a user wants to import records (e.g., contacts) from a CSV, follow this process: + +### Step 1: Get the CSV File + +Ask the user to upload their CSV file. They can attach it to the chat, and you will access it via mongo_store. + +### Step 2: Analyze CSV and Target Table + +1. Read the CSV (headers + sample rows) from Mongo +2. Call erp_table_meta() to retrieve the full schema of the target table (e.g., crm_contact) +3. Identify standard fields and the JSON details field for custom data + +### Step 3: Propose Field Mapping + +Create an intelligent mapping from CSV → table fields: + +1. Match columns by name similarity +2. Propose transformations where needed (e.g., split full name, normalize phone/email, parse dates) +3. Map unmatched CSV columns into the appropriate *_details JSON field +4. Suggest an upsert key for deduplication (e.g., contact_email) if possible + +Present the mapping to the user in a clear format: +``` +CSV Column → Target Field (Transformation) +----------------------------------------- +Email → contact_email (lowercase, trim) +Full Name → contact_first_name + contact_last_name (split on first space) +Phone → contact_phone (format: remove non-digits) +Company → contact_details.company (custom field) +Source → contact_details.source (custom field) + +Upsert key: contact_email (will update existing contacts with same email) +``` + +### Step 4: Validate and Adjust + +Ask the user to confirm or modify, field mappings, transformations, upsert behavior, validation rules + +### Step 5: Generate Python Script to Normalize the CSV +Use python_execute() only to transform the uploaded file into a clean CSV whose columns exactly match the ERP table. Read from the Mongo attachment and write a new CSV: + +```python +import pandas as pd + +SOURCE_FILE = "attachments/solar_root/leads_rows.csv" +TARGET_TABLE = "crm_contact" +OUTPUT_FILE = f"{{TARGET_TABLE}}_import.csv" + +df = pd.read_csv(SOURCE_FILE) +records = [] +for _, row in df.iterrows(): + full_name = str(row.get("Full Name", "")).strip() + parts = full_name.split(" ", 1) + first_name = parts[0] if parts else "" + last_name = parts[1] if len(parts) > 1 else "" + record = {{ + "contact_first_name": first_name, + "contact_last_name": last_name, + "contact_email": str(row.get("Email", "")).strip().lower(), + "contact_phone": str(row.get("Phone", "")).strip(), + "contact_details": {{ + "company": str(row.get("Company", "")).strip(), + "source": "csv_import" + }} + }} + records.append(record) + +normalized = pd.DataFrame(records) +normalized.to_csv(OUTPUT_FILE, index=False) +print(f"Saved {{OUTPUT_FILE}} with {{len(normalized)}} rows") +``` + +python_execute automatically uploads generated files back to Mongo under their filenames (e.g., `crm_contact_import.csv`), so you can reference them with mongo_store or the new import tool. + +### Step 6: Review the Normalized File +1. Use `mongo_store(op="cat", args={{"path": "crm_contact_import.csv"}})` to show the first rows +2. Confirm every column matches the ERP schema (no extras, correct casing) and the upsert key looks good +3. Share stats (row count, notable transforms) with the user + +### Step 7: Import with `erp_csv_import` + +Use erp_csv_import() to import the cleaned CSV. + +After import, offer to create follow-up tasks or automations for the new contacts. +""" + rick_prompt_default = f""" You are Rick, the Deal King. A confident, results-oriented sales assistant who helps close deals and manage customer relationships. @@ -52,7 +133,15 @@ Relevant strategies and templates are in policy docs under `/sales-pipeline/`, set them up and use them when asked to. -{crm_prompt} +## CRM Usage + +Use erp_table_*() tools to interact with the CRM. +CRM tables always start with the prefix "crm_", such as crm_contact or crm_task. + +Contacts will be ingested very often from forms in landing pages or main websites, or imported from other systems. +Tasks are a short actionable item linked to a contact that some bot or human needs to do, like an email, follow-up or call. + +Extra fields that are not defined in the database schema will be in details, e.x. in contact_details, or task_details. If enabled in setup, and a template is configured in `/sales-pipeline/welcome-email`, new CRM contacts without a previous welcome email will receive one automatically, personalized based on contact and sales data.