Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions flexus_client_kit/ckit_ask_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def bot_activate(
skill: str,
first_question: str,
first_calls: Any = None,
cd_instruction: str = "",
title: str = "",
sched_id: str = "",
fexp_id: str = "",
Expand All @@ -188,15 +189,16 @@ async def bot_activate(
http = await client.use_http()
async with http as h:
r = await h.execute(
gql.gql(f"""mutation {camel_case_for_logs}BotActivate($who_is_asking: String!, $persona_id: String!, $skill: String!, $first_question: String!, $first_calls: String!, $title: String!, $sched_id: String!, $fexp_id: String!, $ft_btest_name: String!, $model: String!) {{
bot_activate(who_is_asking: $who_is_asking, persona_id: $persona_id, skill: $skill, first_question: $first_question, first_calls: $first_calls, title: $title, sched_id: $sched_id, fexp_id: $fexp_id, ft_btest_name: $ft_btest_name, model: $model) {{ ft_id }}
gql.gql(f"""mutation {camel_case_for_logs}BotActivate($who_is_asking: String!, $persona_id: String!, $skill: String!, $first_question: String!, $first_calls: String!, $cd_instruction: String!, $title: String!, $sched_id: String!, $fexp_id: String!, $ft_btest_name: String!, $model: String!) {{
bot_activate(who_is_asking: $who_is_asking, persona_id: $persona_id, skill: $skill, first_question: $first_question, first_calls: $first_calls, cd_instruction: $cd_instruction, title: $title, sched_id: $sched_id, fexp_id: $fexp_id, ft_btest_name: $ft_btest_name, model: $model) {{ ft_id }}
}}"""),
variable_values={
"who_is_asking": who_is_asking,
"persona_id": persona_id,
"skill": skill,
"first_question": first_question,
"first_calls": json.dumps(first_calls),
"cd_instruction": cd_instruction,
"title": title,
"sched_id": sched_id,
"fexp_id": fexp_id,
Expand Down
174 changes: 174 additions & 0 deletions flexus_client_kit/ckit_guest_chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import json
import logging
import os
import time
from typing import Dict, Optional

import gql

from flexus_client_kit import ckit_ask_model
from flexus_client_kit import ckit_client

logger = logging.getLogger("gchat")


async def create_guest_accessible_thread_with_crm_context(
fclient: ckit_client.FlexusClient,
persona_id: str,
ws_id: str,
located_fgroup_id: str,
contact_id: str,
platform: str = "web",
title: Optional[str] = None,
additional_context: Optional[Dict[str, str]] = None,
skill: str = "default",
guest_token_ttl: int = 7 * 24 * 3600,
) -> Dict[str, str]:
http = await fclient.use_http()

# Step 1: Load CRM contact data
contact_data = None
try:
async with http as h:
crm_result = await h.execute(
gql.gql("""query GetContactForGuestThread(
$schema_name: String!,
$table_name: String!,
$ws_id: String!,
$skip: Int!,
$limit: Int!,
$sort_by: [String!]!,
$filters: [String!]!,
$include: [String!]!
) {
erp_table_data(
schema_name: $schema_name,
table_name: $table_name,
ws_id: $ws_id,
skip: $skip,
limit: $limit,
sort_by: $sort_by,
filters: $filters,
include: $include
)
}"""),
variable_values={
"schema_name": "erp",
"table_name": "crm_contact",
"ws_id": ws_id,
"skip": 0,
"limit": 1,
"sort_by": [],
"filters": [f"contact_id:=:{contact_id}"],
"include": [],
},
)
contact_data = crm_result.get("erp_table_data", [])
if not contact_data:
logger.warning(f"No CRM contact found for {contact_id}")
except Exception as e:
logger.warning(f"Failed to fetch CRM contact {contact_id}: {e}")

# Step 2: Build context message
parts = [f"{platform.upper()} GUEST CHAT CONTEXT:"]
if contact_data:
logger.info(f"Found CRM contact {contact_id}, data: %s" % json.dumps(contact_data[0], indent=2))
c = contact_data[0]
parts.append(f"Contact: {c.get('contact_first_name', 'Unknown')} {c.get('contact_last_name', '(Last name missing)')}")
if c.get('contact_email'):
parts.append(f"Email: {c['contact_email']}")
if c.get('contact_company'):
parts.append(f"Company: {c['contact_company']}")
if c.get('contact_phone'):
parts.append(f"Phone: {c['contact_phone']}")
parts.append(f"CRM Contact ID: {contact_id}")
else:
parts.append(f"CRM Contact ID: {contact_id}")

if additional_context:
for k, v in additional_context.items():
parts.append(f"{k}: {v}")

parts.append("")
parts.append("This is a guest user accessing via a web chat link. Greet them professionally and assist with their inquiry.")
msg = "\n".join(parts)

if title is None:
title = f"{platform.capitalize()} Guest Chat"

ft_id = await ckit_ask_model.bot_activate(
fclient,
who_is_asking=f"{platform}_guest_activation",
persona_id=persona_id,
skill=skill,
first_question="",
cd_instruction=json.dumps(msg),
title=title,
)
logger.info(f"Created thread {ft_id} for guest chat, platform={platform}")

# Step 5: Mark thread as guest-accessible
await ckit_ask_model.thread_app_capture_patch(
http,
ft_id,
ft_app_searchable="",
ft_app_specific=json.dumps({
"guest_accessible": True,
"platform": platform,
"contact_id": contact_id,
"created_for": "guest_web_chat",
"created_ts": time.time(),
}),
)

async with http as h:
token_result = await h.execute(
gql.gql("""mutation CreateGuestToken($ft_id: String!, $contact_email: String!, $ttl: Int!) {
create_guest_token(ft_id: $ft_id, contact_email: $contact_email, ttl: $ttl)
}"""),
variable_values={
"ft_id": ft_id,
"contact_email": contact_data[0].get("contact_email", "") if contact_data else "",
"ttl": guest_token_ttl,
},
)
guest_token = token_result["create_guest_token"]

base_url = os.environ.get("FLEXUS_WEB_URL", "https://flexus.team")
guest_url = f"{base_url}/{located_fgroup_id}/{persona_id}/persona?ft_id={ft_id}&guest_token={guest_token}"

logger.info(f"Created guest-accessible thread {ft_id} with token, url={guest_url[:80]}...")

return {
"ft_id": ft_id,
"guest_token": guest_token,
"guest_url": guest_url,
}


async def generate_guest_chat_invitation_for_outreach(
fclient: ckit_client.FlexusClient,
persona_id: str,
ws_id: str,
located_fgroup_id: str,
contact_id: str,
invitation_context: str = "",
skill: str = "default",
) -> str:
additional_ctx = None
if invitation_context:
additional_ctx = {"Invitation context": invitation_context}

result = await create_guest_accessible_thread_with_crm_context(
fclient,
persona_id,
ws_id,
located_fgroup_id,
contact_id,
platform="web",
title="Outreach - Guest Chat",
additional_context=additional_ctx,
skill=skill,
)

return result["guest_url"]
187 changes: 187 additions & 0 deletions flexus_client_kit/integrations/fi_messenger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import re
import logging
import time
from typing import Optional, Dict

import gql

from flexus_client_kit import ckit_client
from flexus_client_kit import ckit_ask_model
from flexus_client_kit import ckit_bot_exec

logger = logging.getLogger("messenger")


class IntegrationMessenger:
def __init__(
self,
fclient: ckit_client.FlexusClient,
rcx: ckit_bot_exec.RobotContext,
enable_summarization: bool = True,
message_limit_threshold: int = 80,
budget_limit_threshold: float = 0.8,
):
self.fclient = fclient
self.rcx = rcx
self.enable_summarization = enable_summarization
self.message_limit_threshold = message_limit_threshold
self.budget_limit_threshold = budget_limit_threshold

async def validate_messenger_handle(
self,
platform: str,
handle: str,
) -> Optional[Dict]:
http_client = await self.fclient.use_http()
async with http_client as http:
result = await http.execute(
gql.gql("""query ValidateMessengerHandle($platform: String!, $handle: String!) {
validate_messenger_handle(platform: $platform, handle: $handle) {
valid
persona_id
contact_id
ft_id
platform
}
}"""),
variable_values={
"platform": platform,
"handle": handle,
},
)

handle_result = result["validate_messenger_handle"]

if not handle_result["valid"]:
return None

if handle_result.get("persona_id") != self.rcx.persona.persona_id:
logger.warning(f"Handle persona mismatch: {handle_result.get('persona_id')} != {self.rcx.persona.persona_id}")
return None

return {
"persona_id": handle_result["persona_id"],
"contact_id": handle_result["contact_id"],
"ft_id": handle_result["ft_id"],
"platform": handle_result["platform"],
}

async def should_restart_thread(self, thread) -> tuple[bool, str]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if has passed more than 10 minutes since last message, also should be triggered

if not self.enable_summarization:
return False, ""

try:
msg_count = await self._count_thread_messages(thread.thread_fields.ft_id)

if msg_count >= self.message_limit_threshold:
return True, f"message_limit_{msg_count}"

if thread.thread_fields.ft_budget > 0:
budget_ratio = thread.thread_fields.ft_coins / thread.thread_fields.ft_budget
if budget_ratio >= self.budget_limit_threshold:
return True, f"budget_limit_{budget_ratio:.2f}"

if thread.thread_fields.ft_error:
return True, "thread_error"

return False, ""

except Exception as e:
logger.error("Error checking thread restart conditions", exc_info=e)
return False, ""

async def generate_thread_summary(self, thread) -> str:
try:
http = await self.fclient.use_http()

messages_result = await http.execute_async(
gql.gql("""
query GetThreadMessages($ft_id: String!) {
thread_messages(ft_id: $ft_id) {
ftm_role
ftm_content
ftm_created_ts
}
}
"""),
variable_values={"ft_id": thread.thread_fields.ft_id}
)

messages = messages_result.get("thread_messages", [])

user_messages = [m for m in messages if m["ftm_role"] == "user"]
assistant_messages = [m for m in messages if m["ftm_role"] == "assistant"]

summary = f"""CONVERSATION SUMMARY (Previous Thread)

Thread ID: {thread.thread_fields.ft_id}
Message count: {len(messages)}
Started: {time.strftime('%Y-%m-%d %H:%M', time.localtime(thread.thread_fields.ft_created_ts))}

KEY TOPICS DISCUSSED:
{self._extract_key_topics(user_messages)}

CURRENT STATUS:
{self._extract_current_status(messages)}

ACTION ITEMS:
{self._extract_action_items(assistant_messages)}

Continue the conversation naturally from this context.
"""

return summary

except Exception as e:
logger.error("Error generating thread summary", exc_info=e)
return f"Previous thread: {thread.thread_fields.ft_id}\nContinuing conversation..."

def _extract_key_topics(self, user_msgs) -> str:
topics = []
for msg in user_msgs[:5]:
content = str(msg.get("ftm_content", ""))[:200]
if content:
topics.append(f"- {content}")
return "\n".join(topics) if topics else "- No specific topics recorded"

def _extract_current_status(self, messages) -> str:
recent = messages[-3:] if len(messages) >= 3 else messages
status_lines = []
for msg in recent:
role = msg["ftm_role"]
content = str(msg.get("ftm_content", ""))[:150]
status_lines.append(f"{role}: {content}")
return "\n".join(status_lines) if status_lines else "- No recent context"

def _extract_action_items(self, assistant_msgs) -> str:
actions = []
action_keywords = ["will", "going to", "should", "need to", "follow up"]

for msg in assistant_msgs[-5:]:
content = str(msg.get("ftm_content", ""))
for keyword in action_keywords:
if keyword in content.lower():
sentences = content.split(".")
for sent in sentences:
if keyword in sent.lower():
actions.append(f"- {sent.strip()}")
break
break

return "\n".join(actions[:5]) if actions else "- No pending actions"

async def _count_thread_messages(self, ft_id: str) -> int:
try:
http = await self.fclient.use_http()
result = await http.execute_async(
gql.gql("""
query CountMessages($ft_id: String!) {
thread_message_count(ft_id: $ft_id)
}
"""),
variable_values={"ft_id": ft_id}
)
return result.get("thread_message_count", 0)
except Exception as e:
logger.error("Failed to count messages for thread %s", ft_id, exc_info=e)
return 0
Loading