Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions FLEXUS_BOT_REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@ Use `prompts_common.SCHED_TASK_SORT_10M` and `prompts_common.SCHED_TODO_5M` as d
| `@rcx.on_tool_call("name")` | `async def(toolcall, args) -> str` |
| `@rcx.on_erp_change("table")` | `async def(action, new_record, old_record)` |

**ERP action types**: `"INSERT"`, `"UPDATE"`, `"DELETE"`, `"ARCHIVE"`
- `ARCHIVE`: soft delete (archived_ts: 0 → >0)
- `DELETE`: hard delete (removed from db)

---

## Setup Schema
Expand Down
2 changes: 1 addition & 1 deletion flexus_client_kit/ckit_bot_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ async def subscribe_and_produce_callbacks(

elif upd.news_about.startswith("erp."):
table_name = upd.news_about[4:]
if upd.news_action in ["INSERT", "UPDATE", "DELETE"]:
if upd.news_action in ["INSERT", "UPDATE", "DELETE", "ARCHIVE"]:
handled = True
new_record = upd.news_payload_erp_record_new
old_record = upd.news_payload_erp_record_old
Expand Down
61 changes: 57 additions & 4 deletions flexus_client_kit/ckit_erp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import gql

from flexus_client_kit import ckit_client, gql_utils
from flexus_client_kit import ckit_client, gql_utils, erp_schema

T = TypeVar('T')

Expand All @@ -19,6 +19,26 @@ def dataclass_or_dict_to_dict(x: Any) -> dict:
raise ValueError(f"must be a dataclass or dict, got {type(x)}")


async def get_erp_table_meta(
client: ckit_client.FlexusClient,
table_name: str,
) -> erp_schema.ErpTableMeta:
http = await client.use_http()
async with http as h:
r = await h.execute(
gql.gql(f"""query ErpTableMeta($schema_name: String!, $table_name: String!) {{
erp_table_meta(schema_name: $schema_name, table_name: $table_name) {{
{gql_utils.gql_fields(erp_schema.ErpTableMeta)}
}}
}}"""),
variable_values={
"schema_name": "erp",
"table_name": table_name,
},
)
return gql_utils.dataclass_from_dict(r["erp_table_meta"], erp_schema.ErpTableMeta)


async def query_erp_table(
client: ckit_client.FlexusClient,
table_name: str,
Expand Down Expand Up @@ -141,6 +161,33 @@ async def delete_erp_record(
return r["erp_table_delete"]


async def batch_upsert_erp_records(
client: ckit_client.FlexusClient,
table_name: str,
ws_id: str,
upsert_key: str,
records: List[Any],
) -> dict:
http = await client.use_http()
async with http as h:
r = await h.execute(gql.gql("""
mutation ErpTableBatchUpsert($schema_name: String!, $table_name: String!, $ws_id: String!, $upsert_key: String!, $records_json: String!) {
erp_table_batch_upsert(schema_name: $schema_name, table_name: $table_name, ws_id: $ws_id, upsert_key: $upsert_key, records_json: $records_json)
}"""),
variable_values={
"schema_name": "erp",
"table_name": table_name,
"ws_id": ws_id,
"upsert_key": upsert_key,
"records_json": json.dumps([dataclass_or_dict_to_dict(r) for r in records]),
},
)
result = r["erp_table_batch_upsert"]
if isinstance(result, str):
return json.loads(result)
return result


def check_record_matches_filters(record: dict, filters: List[Union[str, dict]], col_names: set = None) -> bool:
"""
Check if a record (dict) matches all filters.
Expand Down Expand Up @@ -296,18 +343,24 @@ def check_record_matches_filter(record: dict, f: str, col_names: set = None) ->


async def test():
from flexus_client_kit.erp_schema import ProductTemplate, ProductProduct
client = ckit_client.FlexusClient("ckit_erp_test")
ws_id = "solarsystem"

meta = await get_erp_table_meta(client, "product_product")
print(f"Table: {meta.table_name}")
print(f"Primary key: {meta.table_pk}")
print(f"Columns: {len(meta.table_columns)}")
print(f"Relations: {len(meta.table_outbound_rels)}")

products = await query_erp_table(
client,
"product_product",
ws_id,
ProductProduct,
erp_schema.ProductProduct,
limit=10,
include=["prodt"],
)
print(f"Found {len(products)} products:")
print(f"\nFound {len(products)} products:")
for p in products:
print(p)

Expand Down
25 changes: 23 additions & 2 deletions flexus_client_kit/erp_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,29 @@
from typing import Optional, Dict, Type, List


@dataclass
class ErpColumnMeta:
column_name: str
column_type: str
column_nullable: bool
column_default: Optional[str] = None


@dataclass
class ErpRelationMeta:
rel_column: str
rel_fk_table: str
rel_fk_column: str


@dataclass
class ErpTableMeta:
table_name: str
table_pk: str
table_columns: List[ErpColumnMeta]
table_outbound_rels: List[ErpRelationMeta]


@dataclass
class CrmContact:
ws_id: str
Expand Down Expand Up @@ -128,7 +151,6 @@ class ProductM2mTemplateTag:
"product_m2m_template_tag": ProductM2mTemplateTag,
}


ERP_DEFAULT_VISIBLE_FIELDS: Dict[str, List[str]] = {
"crm_contact": [
"contact_first_name",
Expand Down Expand Up @@ -179,4 +201,3 @@ class ProductM2mTemplateTag:
"uom_active",
],
}

1 change: 1 addition & 0 deletions flexus_client_kit/integrations/fi_crm_automations.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@


CRM_AUTOMATION_TOOL = ckit_cloudtool.CloudTool(
strict=False,
name="crm_automation",
description="Manage CRM automations. Start with op='help' to see complete documentation. IMPORTANT: Never use flexus_my_setup to modify 'crm_automations' - only use this tool!",
parameters={
Expand Down
154 changes: 152 additions & 2 deletions flexus_client_kit/integrations/fi_erp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import csv
import dataclasses
import io
import json
import time
import logging
from typing import Dict, Any, Optional, List
from typing import Dict, Any, Optional, List, Type, Union, get_origin, get_args
from pymongo.collection import Collection

import gql.transport.exceptions
Expand Down Expand Up @@ -85,6 +88,25 @@
)


ERP_CSV_IMPORT_TOOL = ckit_cloudtool.CloudTool(
strict=False,
name="erp_csv_import",
description=(
"Import a normalized CSV (columns must match ERP table fields) stored via mongo_store. "
"Provide mongo_path of the CSV, target table_name, and an optional upsert_key column."
),
parameters={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Target ERP table name", "order": 1},
"mongo_path": {"type": "string", "description": "Path of the CSV stored via mongo_store or python_execute artifacts", "order": 2},
"upsert_key": {"type": "string", "description": "Column used to detect existing records (e.g., contact_email). Leave blank to always create.", "order": 3},
},
"required": ["table_name", "mongo_path"],
},
)


def _format_table_meta_text(table_name: str, schema_class: type) -> str:
result = f"Table: erp.{table_name}\n"
result += "\nColumns:\n"
Expand Down Expand Up @@ -137,6 +159,42 @@ def _rows_to_text(rows: list, table_name: str, safety_valve_chars: int = 5000) -
return "\n".join(result), full_json


def _resolve_field_type(field_type: Optional[Type[Any]]) -> Optional[Type[Any]]:
if not field_type:
return None
origin = get_origin(field_type)
if origin is Union:
if non_none := [arg for arg in get_args(field_type) if arg is not type(None)]:
return _resolve_field_type(non_none[0])
if origin in (list, dict):
return origin
return field_type


def _convert_csv_value(raw_value: str, field_type: Optional[Type[Any]]) -> Any:
value = raw_value.strip()
if value == "":
return None
normalized_type = _resolve_field_type(field_type)
if normalized_type is bool:
lowered = value.lower()
if lowered in ("true", "1", "yes", "y"):
return True
if lowered in ("false", "0", "no", "n"):
return False
raise ValueError(f"Value {value!r} is not a valid boolean")
if normalized_type is int:
return int(value)
if normalized_type is float:
return float(value)
if normalized_type in (list, dict):
try:
return json.loads(value)
except json.JSONDecodeError as e:
raise ValueError(f"Expected JSON for {normalized_type.__name__}: {e}")
return value


class IntegrationErp:
def __init__(
self,
Expand Down Expand Up @@ -239,7 +297,7 @@ async def handle_erp_data(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: D

display_text, full_json = _rows_to_text(rows_as_dicts, table_name, safety_valve_chars)

if full_json and self.mongo_collection:
if full_json and self.mongo_collection is not None:
mongo_path = f"erp_query_results/{table_name}_{int(time.time())}.json"
try:
await ckit_mongo.mongo_overwrite(
Expand Down Expand Up @@ -341,3 +399,95 @@ async def handle_erp_crud(self, toolcall: ckit_cloudtool.FCloudtoolCall, model_p

else:
return f"❌ Error: Unknown operation '{op}'. Use create, patch, or delete."


async def handle_csv_import(self, toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str:
if self.mongo_collection is None:
return "❌ Cannot read CSV because MongoDB storage is unavailable for this bot."

if not (table_name := args.get("table_name", "").strip()) or not (mongo_path := args.get("mongo_path", "").strip()):
return "❌ table_name and mongo_path are required"
upsert_key = args.get("upsert_key", "").strip()

if not (schema_class := erp_schema.ERP_TABLE_TO_SCHEMA.get(table_name)):
return f"❌ Unknown table '{table_name}'. Run erp_table_meta for available tables."
meta = await ckit_erp.get_erp_table_meta(self.client, table_name)
pk_field = meta.table_pk

if not (document := await ckit_mongo.mongo_retrieve_file(self.mongo_collection, mongo_path)):
return f"❌ File {mongo_path!r} not found in MongoDB."

if not (file_bytes := document.get("data") or (json.dumps(document["json"]).encode("utf-8") if document.get("json") is not None else None)):
return f"❌ File {mongo_path!r} is empty."

try:
csv_text = file_bytes.decode("utf-8-sig")
except UnicodeDecodeError:
return "❌ CSV must be UTF-8 encoded."

reader = csv.DictReader(io.StringIO(csv_text))
if not reader.fieldnames:
return "❌ CSV header row is missing."
reader.fieldnames = trimmed_headers = [(name or "").strip() for name in reader.fieldnames]

allowed_fields = set(schema_class.__annotations__.keys())
details_field = next((f for f in allowed_fields if f.endswith("_details")), None)

if unknown_headers := [h for h in trimmed_headers if h and h not in allowed_fields]:
fix_hint = f"Fix: Remove them, add to '{details_field}' as JSON, or map to existing columns." if details_field else "Fix: Remove them or map to existing columns (use erp_table_meta to see valid columns)."
return f"❌ Unknown columns: {', '.join(unknown_headers)}\n\n{fix_hint}"

if upsert_key and upsert_key not in trimmed_headers:
return f"❌ upsert_key '{upsert_key}' is not present in the CSV header."

field_types = schema_class.__annotations__
required_fields = {name for name, field_info in schema_class.__dataclass_fields__.items() if field_info.default == dataclasses.MISSING and field_info.default_factory == dataclasses.MISSING and name != pk_field and name != "ws_id"}

errors: List[str] = []
records = []
for row_idx, row in enumerate(reader, start=1):
try:
record = {}
for column in trimmed_headers:
if column and column != pk_field and (raw_value := str(row.get(column, "")).strip()):
record[column] = _convert_csv_value(raw_value, field_types.get(column))

if "ws_id" in allowed_fields and not record.get("ws_id"):
record["ws_id"] = self.ws_id

if upsert_key and not (key_value := str(row.get(upsert_key, "")).strip()):
raise ValueError(f"Missing value for upsert_key '{upsert_key}'")

if missing := required_fields - record.keys():
raise ValueError(f"Missing required fields: {', '.join(sorted(missing))}")

records.append(record)
except Exception as e:
errors.append(f"Row {row_idx}: {e}")

BATCH_SIZE = 1000
total_created = total_updated = 0
total_failed = sum(1 for e in errors if e.startswith('Row '))

for i in range(0, len(records), BATCH_SIZE):
try:
result = await ckit_erp.batch_upsert_erp_records(self.client, table_name, self.ws_id, upsert_key or "", records[i:i+BATCH_SIZE])
total_created += result.get("created", 0)
total_updated += result.get("updated", 0)
total_failed += result.get("failed", 0)
errors.extend(f"Batch {i//BATCH_SIZE + 1}: {err}" for err in result.get("errors", []))
except Exception as e:
total_failed += len(records[i:i+BATCH_SIZE])
errors.append(f"Batch {i//BATCH_SIZE + 1} failed: {e}", exc_info=True)

lines = [
f"Processed {len(records) + sum(1 for e in errors if e.startswith('Row '))} row(s) from {mongo_path}.",
f"Created: {total_created}, Updated: {total_updated}, Failed: {total_failed}.",
]
if errors:
lines.append("Errors:")
lines.extend(f" • {err}" for err in errors[:5])
if len(errors) > 5:
lines.append(f" …and {len(errors) - 5} more errors.")

return "\n".join(lines)
1 change: 1 addition & 0 deletions flexus_client_kit/integrations/fi_gmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
logger = logging.getLogger("gmail")

GMAIL_TOOL = ckit_cloudtool.CloudTool(
strict=False,
name="gmail",
description="Interact with Gmail, call with op=\"help\" to print usage",
parameters={
Expand Down
5 changes: 5 additions & 0 deletions flexus_simple_bots/rick/rick_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
fi_erp.ERP_TABLE_META_TOOL,
fi_erp.ERP_TABLE_DATA_TOOL,
fi_erp.ERP_TABLE_CRUD_TOOL,
fi_erp.ERP_CSV_IMPORT_TOOL,
fi_mongo_store.MONGO_STORE_TOOL,
fi_crm_automations.CRM_AUTOMATION_TOOL,
]
Expand Down Expand Up @@ -91,6 +92,10 @@ async def toolcall_erp_data(toolcall: ckit_cloudtool.FCloudtoolCall, model_produ
async def toolcall_erp_crud(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
return await erp_integration.handle_erp_crud(toolcall, model_produced_args)

@rcx.on_tool_call(fi_erp.ERP_CSV_IMPORT_TOOL.name)
async def toolcall_erp_csv_import(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
return await erp_integration.handle_csv_import(toolcall, model_produced_args)

@rcx.on_tool_call(fi_mongo_store.MONGO_STORE_TOOL.name)
async def toolcall_mongo_store(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str:
return await fi_mongo_store.handle_mongo_store(rcx.workdir, mongo_collection, toolcall, model_produced_args)
Expand Down
Loading