diff --git a/flexus_client_kit/ckit_ask_model.py b/flexus_client_kit/ckit_ask_model.py index da181d6..712ee4f 100644 --- a/flexus_client_kit/ckit_ask_model.py +++ b/flexus_client_kit/ckit_ask_model.py @@ -176,6 +176,7 @@ async def bot_activate( skill: str, first_question: str, first_calls: Any = None, + cd_instruction: str = "", title: str = "", sched_id: str = "", fexp_id: str = "", @@ -188,8 +189,8 @@ async def bot_activate( http = await client.use_http() async with http as h: r = await h.execute( - gql.gql(f"""mutation {camel_case_for_logs}BotActivate($who_is_asking: String!, $persona_id: String!, $skill: String!, $first_question: String!, $first_calls: String!, $title: String!, $sched_id: String!, $fexp_id: String!, $ft_btest_name: String!, $model: String!) {{ - bot_activate(who_is_asking: $who_is_asking, persona_id: $persona_id, skill: $skill, first_question: $first_question, first_calls: $first_calls, title: $title, sched_id: $sched_id, fexp_id: $fexp_id, ft_btest_name: $ft_btest_name, model: $model) {{ ft_id }} + gql.gql(f"""mutation {camel_case_for_logs}BotActivate($who_is_asking: String!, $persona_id: String!, $skill: String!, $first_question: String!, $first_calls: String!, $cd_instruction: String!, $title: String!, $sched_id: String!, $fexp_id: String!, $ft_btest_name: String!, $model: String!) {{ + bot_activate(who_is_asking: $who_is_asking, persona_id: $persona_id, skill: $skill, first_question: $first_question, first_calls: $first_calls, cd_instruction: $cd_instruction, title: $title, sched_id: $sched_id, fexp_id: $fexp_id, ft_btest_name: $ft_btest_name, model: $model) {{ ft_id }} }}"""), variable_values={ "who_is_asking": who_is_asking, @@ -197,6 +198,7 @@ async def bot_activate( "skill": skill, "first_question": first_question, "first_calls": json.dumps(first_calls), + "cd_instruction": cd_instruction, "title": title, "sched_id": sched_id, "fexp_id": fexp_id, diff --git a/flexus_client_kit/ckit_guest_chat.py b/flexus_client_kit/ckit_guest_chat.py new file mode 100644 index 0000000..d2acc1a --- /dev/null +++ b/flexus_client_kit/ckit_guest_chat.py @@ -0,0 +1,174 @@ +import json +import logging +import os +import time +from typing import Dict, Optional + +import gql + +from flexus_client_kit import ckit_ask_model +from flexus_client_kit import ckit_client + +logger = logging.getLogger("gchat") + + +async def create_guest_accessible_thread_with_crm_context( + fclient: ckit_client.FlexusClient, + persona_id: str, + ws_id: str, + located_fgroup_id: str, + contact_id: str, + platform: str = "web", + title: Optional[str] = None, + additional_context: Optional[Dict[str, str]] = None, + skill: str = "default", + guest_token_ttl: int = 7 * 24 * 3600, +) -> Dict[str, str]: + http = await fclient.use_http() + + # Step 1: Load CRM contact data + contact_data = None + try: + async with http as h: + crm_result = await h.execute( + gql.gql("""query GetContactForGuestThread( + $schema_name: String!, + $table_name: String!, + $ws_id: String!, + $skip: Int!, + $limit: Int!, + $sort_by: [String!]!, + $filters: [String!]!, + $include: [String!]! + ) { + erp_table_data( + schema_name: $schema_name, + table_name: $table_name, + ws_id: $ws_id, + skip: $skip, + limit: $limit, + sort_by: $sort_by, + filters: $filters, + include: $include + ) + }"""), + variable_values={ + "schema_name": "erp", + "table_name": "crm_contact", + "ws_id": ws_id, + "skip": 0, + "limit": 1, + "sort_by": [], + "filters": [f"contact_id:=:{contact_id}"], + "include": [], + }, + ) + contact_data = crm_result.get("erp_table_data", []) + if not contact_data: + logger.warning(f"No CRM contact found for {contact_id}") + except Exception as e: + logger.warning(f"Failed to fetch CRM contact {contact_id}: {e}") + + # Step 2: Build context message + parts = [f"{platform.upper()} GUEST CHAT CONTEXT:"] + if contact_data: + logger.info(f"Found CRM contact {contact_id}, data: %s" % json.dumps(contact_data[0], indent=2)) + c = contact_data[0] + parts.append(f"Contact: {c.get('contact_first_name', 'Unknown')} {c.get('contact_last_name', '(Last name missing)')}") + if c.get('contact_email'): + parts.append(f"Email: {c['contact_email']}") + if c.get('contact_company'): + parts.append(f"Company: {c['contact_company']}") + if c.get('contact_phone'): + parts.append(f"Phone: {c['contact_phone']}") + parts.append(f"CRM Contact ID: {contact_id}") + else: + parts.append(f"CRM Contact ID: {contact_id}") + + if additional_context: + for k, v in additional_context.items(): + parts.append(f"{k}: {v}") + + parts.append("") + parts.append("This is a guest user accessing via a web chat link. Greet them professionally and assist with their inquiry.") + msg = "\n".join(parts) + + if title is None: + title = f"{platform.capitalize()} Guest Chat" + + ft_id = await ckit_ask_model.bot_activate( + fclient, + who_is_asking=f"{platform}_guest_activation", + persona_id=persona_id, + skill=skill, + first_question="", + cd_instruction=json.dumps(msg), + title=title, + ) + logger.info(f"Created thread {ft_id} for guest chat, platform={platform}") + + # Step 5: Mark thread as guest-accessible + await ckit_ask_model.thread_app_capture_patch( + http, + ft_id, + ft_app_searchable="", + ft_app_specific=json.dumps({ + "guest_accessible": True, + "platform": platform, + "contact_id": contact_id, + "created_for": "guest_web_chat", + "created_ts": time.time(), + }), + ) + + async with http as h: + token_result = await h.execute( + gql.gql("""mutation CreateGuestToken($ft_id: String!, $contact_email: String!, $ttl: Int!) { + create_guest_token(ft_id: $ft_id, contact_email: $contact_email, ttl: $ttl) + }"""), + variable_values={ + "ft_id": ft_id, + "contact_email": contact_data[0].get("contact_email", "") if contact_data else "", + "ttl": guest_token_ttl, + }, + ) + guest_token = token_result["create_guest_token"] + + base_url = os.environ.get("FLEXUS_WEB_URL", "https://flexus.team") + guest_url = f"{base_url}/{located_fgroup_id}/{persona_id}/persona?ft_id={ft_id}&guest_token={guest_token}" + + logger.info(f"Created guest-accessible thread {ft_id} with token, url={guest_url[:80]}...") + + return { + "ft_id": ft_id, + "guest_token": guest_token, + "guest_url": guest_url, + } + + +async def generate_guest_chat_invitation_for_outreach( + fclient: ckit_client.FlexusClient, + persona_id: str, + ws_id: str, + located_fgroup_id: str, + contact_id: str, + invitation_context: str = "", + skill: str = "default", +) -> str: + additional_ctx = None + if invitation_context: + additional_ctx = {"Invitation context": invitation_context} + + result = await create_guest_accessible_thread_with_crm_context( + fclient, + persona_id, + ws_id, + located_fgroup_id, + contact_id, + platform="web", + title="Outreach - Guest Chat", + additional_context=additional_ctx, + skill=skill, + ) + + return result["guest_url"] diff --git a/flexus_client_kit/integrations/fi_messenger.py b/flexus_client_kit/integrations/fi_messenger.py new file mode 100644 index 0000000..c3ae5ae --- /dev/null +++ b/flexus_client_kit/integrations/fi_messenger.py @@ -0,0 +1,187 @@ +import re +import logging +import time +from typing import Optional, Dict + +import gql + +from flexus_client_kit import ckit_client +from flexus_client_kit import ckit_ask_model +from flexus_client_kit import ckit_bot_exec + +logger = logging.getLogger("messenger") + + +class IntegrationMessenger: + def __init__( + self, + fclient: ckit_client.FlexusClient, + rcx: ckit_bot_exec.RobotContext, + enable_summarization: bool = True, + message_limit_threshold: int = 80, + budget_limit_threshold: float = 0.8, + ): + self.fclient = fclient + self.rcx = rcx + self.enable_summarization = enable_summarization + self.message_limit_threshold = message_limit_threshold + self.budget_limit_threshold = budget_limit_threshold + + async def validate_messenger_handle( + self, + platform: str, + handle: str, + ) -> Optional[Dict]: + http_client = await self.fclient.use_http() + async with http_client as http: + result = await http.execute( + gql.gql("""query ValidateMessengerHandle($platform: String!, $handle: String!) { + validate_messenger_handle(platform: $platform, handle: $handle) { + valid + persona_id + contact_id + ft_id + platform + } + }"""), + variable_values={ + "platform": platform, + "handle": handle, + }, + ) + + handle_result = result["validate_messenger_handle"] + + if not handle_result["valid"]: + return None + + if handle_result.get("persona_id") != self.rcx.persona.persona_id: + logger.warning(f"Handle persona mismatch: {handle_result.get('persona_id')} != {self.rcx.persona.persona_id}") + return None + + return { + "persona_id": handle_result["persona_id"], + "contact_id": handle_result["contact_id"], + "ft_id": handle_result["ft_id"], + "platform": handle_result["platform"], + } + + async def should_restart_thread(self, thread) -> tuple[bool, str]: + if not self.enable_summarization: + return False, "" + + try: + msg_count = await self._count_thread_messages(thread.thread_fields.ft_id) + + if msg_count >= self.message_limit_threshold: + return True, f"message_limit_{msg_count}" + + if thread.thread_fields.ft_budget > 0: + budget_ratio = thread.thread_fields.ft_coins / thread.thread_fields.ft_budget + if budget_ratio >= self.budget_limit_threshold: + return True, f"budget_limit_{budget_ratio:.2f}" + + if thread.thread_fields.ft_error: + return True, "thread_error" + + return False, "" + + except Exception as e: + logger.error("Error checking thread restart conditions", exc_info=e) + return False, "" + + async def generate_thread_summary(self, thread) -> str: + try: + http = await self.fclient.use_http() + + messages_result = await http.execute_async( + gql.gql(""" + query GetThreadMessages($ft_id: String!) { + thread_messages(ft_id: $ft_id) { + ftm_role + ftm_content + ftm_created_ts + } + } + """), + variable_values={"ft_id": thread.thread_fields.ft_id} + ) + + messages = messages_result.get("thread_messages", []) + + user_messages = [m for m in messages if m["ftm_role"] == "user"] + assistant_messages = [m for m in messages if m["ftm_role"] == "assistant"] + + summary = f"""CONVERSATION SUMMARY (Previous Thread) + +Thread ID: {thread.thread_fields.ft_id} +Message count: {len(messages)} +Started: {time.strftime('%Y-%m-%d %H:%M', time.localtime(thread.thread_fields.ft_created_ts))} + +KEY TOPICS DISCUSSED: +{self._extract_key_topics(user_messages)} + +CURRENT STATUS: +{self._extract_current_status(messages)} + +ACTION ITEMS: +{self._extract_action_items(assistant_messages)} + +Continue the conversation naturally from this context. +""" + + return summary + + except Exception as e: + logger.error("Error generating thread summary", exc_info=e) + return f"Previous thread: {thread.thread_fields.ft_id}\nContinuing conversation..." + + def _extract_key_topics(self, user_msgs) -> str: + topics = [] + for msg in user_msgs[:5]: + content = str(msg.get("ftm_content", ""))[:200] + if content: + topics.append(f"- {content}") + return "\n".join(topics) if topics else "- No specific topics recorded" + + def _extract_current_status(self, messages) -> str: + recent = messages[-3:] if len(messages) >= 3 else messages + status_lines = [] + for msg in recent: + role = msg["ftm_role"] + content = str(msg.get("ftm_content", ""))[:150] + status_lines.append(f"{role}: {content}") + return "\n".join(status_lines) if status_lines else "- No recent context" + + def _extract_action_items(self, assistant_msgs) -> str: + actions = [] + action_keywords = ["will", "going to", "should", "need to", "follow up"] + + for msg in assistant_msgs[-5:]: + content = str(msg.get("ftm_content", "")) + for keyword in action_keywords: + if keyword in content.lower(): + sentences = content.split(".") + for sent in sentences: + if keyword in sent.lower(): + actions.append(f"- {sent.strip()}") + break + break + + return "\n".join(actions[:5]) if actions else "- No pending actions" + + async def _count_thread_messages(self, ft_id: str) -> int: + try: + http = await self.fclient.use_http() + result = await http.execute_async( + gql.gql(""" + query CountMessages($ft_id: String!) { + thread_message_count(ft_id: $ft_id) + } + """), + variable_values={"ft_id": ft_id} + ) + return result.get("thread_message_count", 0) + except Exception as e: + logger.error("Failed to count messages for thread %s", ft_id, exc_info=e) + return 0 diff --git a/flexus_client_kit/integrations/fi_telegram.py b/flexus_client_kit/integrations/fi_telegram.py new file mode 100644 index 0000000..7e20d2f --- /dev/null +++ b/flexus_client_kit/integrations/fi_telegram.py @@ -0,0 +1,407 @@ +import asyncio +import json +import logging +import time +import random +from typing import Optional + +import gql +from telegram import Bot as TelegramBot, Update as TelegramUpdate +from telegram.ext import Application, CommandHandler, MessageHandler, filters + +from flexus_client_kit import ckit_client +from flexus_client_kit import ckit_ask_model +from flexus_client_kit import ckit_bot_exec +from flexus_client_kit.integrations.fi_messenger import IntegrationMessenger + +logger = logging.getLogger("tlgrm") + + +class IntegrationTelegram(IntegrationMessenger): + def __init__( + self, + fclient: ckit_client.FlexusClient, + rcx: ckit_bot_exec.RobotContext, + telegram_bot_token: str, + bot_username: str, + enable_summarization: bool = True, + message_limit_threshold: int = 80, + budget_limit_threshold: float = 0.8, + ): + super().__init__(fclient, rcx, enable_summarization, message_limit_threshold, budget_limit_threshold) + self.telegram_bot_token = telegram_bot_token + self.bot_username = bot_username + self.application = None + self.problems = [] + self._conversation_state = {} # In-memory state: {chat_id: {ft_id, contact_id, updated_ts}} + + if not telegram_bot_token: + self.problems.append("No telegram_bot_token provided") + logger.warning("Telegram integration disabled (no token)") + return + + try: + self.application = Application.builder().token(telegram_bot_token).build() + self._setup_handlers() + logger.info("Telegram integration initialized for persona %s", self.rcx.persona.persona_id) + except Exception as e: + logger.error("Failed to initialize Telegram application", exc_info=e) + self.problems.append(f"{type(e).__name__}: {e}") + + def _setup_handlers(self): + if not self.application: + return + + self.application.add_handler(CommandHandler("start", self._handle_start)) + self.application.add_handler(CommandHandler("help", self._handle_help)) + self.application.add_handler(CommandHandler("exit", self._handle_exit)) + self.application.add_handler( + MessageHandler(filters.TEXT & ~filters.COMMAND, self._handle_message) + ) + + async def start_reactive(self): + if not self.application: + logger.warning("Telegram application not initialized, polling disabled") + return + + try: + await self.application.initialize() + await self.application.start() + await self.application.updater.start_polling(allowed_updates=["message"]) + logger.info("Telegram polling started for persona %s", self.rcx.persona.persona_id) + except Exception as e: + logger.error("Failed to start Telegram polling", exc_info=e) + self.problems.append(f"Polling error: {e}") + + async def close(self): + if not self.application: + return + + try: + await self.application.updater.stop() + await self.application.stop() + await self.application.shutdown() + logger.info("Telegram integration closed for persona %s", self.rcx.persona.persona_id) + except Exception as e: + logger.error("Error closing Telegram integration", exc_info=e) + + def _get_messenger_link(self, platform: str, handle: str) -> str: + if platform != "telegram": + raise ValueError(f"IntegrationTelegram only supports telegram platform, not {platform}") + return f"https://t.me/{self.bot_username}?start={handle}" + + async def _handle_start(self, update: TelegramUpdate, context): + if not update.effective_chat: + return + + chat_id = str(update.effective_chat.id) + thread = self._thread_capturing(chat_id) + + if thread: + await update.message.reply_text( + "You already have an active conversation! Just send your message." + ) + return + + await update.message.reply_text( + "Welcome! Please enter your access code to continue.\n\n" + "If you don't have one, please request it from our service." + ) + + async def _handle_help(self, update: TelegramUpdate, context): + await update.message.reply_text( + f"Bot Assistant (@{self.bot_username})\n\n" + "Commands:\n" + "/start - Begin conversation\n" + "/help - Show this help\n" + "/exit - End current conversation\n\n" + "Just send me messages and I'll respond!" + ) + + async def _handle_exit(self, update: TelegramUpdate, context): + if not update.effective_chat: + return + + chat_id = str(update.effective_chat.id) + + # Check both in-memory state and captured threads + thread = self._thread_capturing(chat_id) + conv_state = await self._get_conversation_state(chat_id) + + if not thread and (not conv_state or not conv_state.get("ft_id")): + await update.message.reply_text( + "You don't have an active conversation to exit.\n\n" + "Use /start to begin a new conversation." + ) + return + + ft_id = thread.thread_fields.ft_id if thread else conv_state.get("ft_id") + + try: + # Uncapture the thread + http = await self.fclient.use_http() + await ckit_ask_model.thread_app_capture_patch( + http, + ft_id, + ft_app_searchable="", # Empty string uncaptures + ft_app_specific=json.dumps({ + "platform": "telegram", + "disconnected_ts": time.time(), + }), + ) + + # Clear conversation state + if chat_id in self._conversation_state: + del self._conversation_state[chat_id] + + await update.message.reply_text( + "āœ… Conversation ended successfully.\n\n" + "To start a new conversation, enter a new access code or use /start." + ) + + logger.info("Telegram conversation exited for chat %s, thread %s", chat_id, ft_id) + + except Exception as e: + logger.error("Failed to exit conversation for chat %s", chat_id, exc_info=e) + await update.message.reply_text( + "āš ļø There was an error ending the conversation, but you can still enter a new access code to start fresh." + ) + + async def _handle_message(self, update: TelegramUpdate, context): + if not update.effective_chat or not update.message: + return + + chat_id = str(update.effective_chat.id) + text = update.message.text or "" + username = update.effective_user.username if update.effective_user else f"user_{update.effective_chat.id}" + + conv_state = await self._get_conversation_state(chat_id) + + if conv_state and conv_state.get("awaiting_handle"): + await self._validate_handle_and_create_thread(chat_id, text, update) + return + + thread = self._thread_capturing(chat_id) + + if not thread: + await self._validate_handle_and_create_thread(chat_id, text, update) + return + + if self.enable_summarization: + should_restart, restart_reason = await self.should_restart_thread(thread) + + if should_restart: + await self._restart_thread_with_summary(chat_id, thread, restart_reason) + thread = self._thread_capturing(chat_id) + + await self._post_message_to_thread(chat_id, text, username, thread) + + async def _validate_handle_and_create_thread(self, chat_id: str, handle: str, update: TelegramUpdate): + handle_data = await self.validate_messenger_handle("telegram", handle.strip()) + + if not handle_data: + await update.message.reply_text( + "Invalid or expired access code. Please check and try again.\n\n" + "Codes expire after 10 minutes. If you need a new one, please contact us." + ) + return + + contact_id = handle_data.get("contact_id", "") + web_ft_id = handle_data.get("ft_id", "") + + try: + ft_id = await ckit_ask_model.bot_activate( + self.fclient, + who_is_asking="telegram_activation", + persona_id=self.rcx.persona.persona_id, + skill="default", + first_question="Fulfill the user's request professionally. Don't mention any technical details about threads, tools, CRM or any other functionality.", + cd_instruction=json.dumps(f"TELEGRAM CONNECTION CONTEXT:\n\nCRM contact ID: {contact_id}\n\nFetch all required information about the contact first.\n\nGreet the contact and continue the conversation from where you left off on the web platform."), + title=f"Telegram: {chat_id}", + ) + + http = await self.fclient.use_http() + searchable = f"telegram/{self.rcx.persona.persona_id}/{chat_id}" + + await ckit_ask_model.thread_app_capture_patch( + http, + ft_id, + ft_app_searchable=searchable, + ft_app_specific=json.dumps({ + "last_posted_assistant_ts": time.time(), + "contact_id": contact_id, + "web_ft_id": web_ft_id, + "started_ts": time.time(), + "platform": "telegram", + }), + ) + + await self._update_conversation_state(chat_id, { + "ft_id": ft_id, + "contact_id": contact_id, + "updated_ts": time.time(), + }) + + logger.info("Telegram handle validated and thread created: %s for persona %s", ft_id, self.rcx.persona.persona_id) + + except Exception as e: + logger.error("Failed to create thread after handle validation", exc_info=e) + await update.message.reply_text( + "Sorry, there was an error setting up our conversation. Please try again or contact support." + ) + + async def _restart_thread_with_summary(self, chat_id: str, old_thread, reason: str): + try: + summary = await self.generate_thread_summary(old_thread) + + app_specific = old_thread.thread_fields.ft_app_specific + contact_id = app_specific.get("contact_id", "") if app_specific else "" + + ft_id = await ckit_ask_model.bot_activate( + self.fclient, + who_is_asking="telegram_thread_restart", + persona_id=self.rcx.persona.persona_id, + skill="default", + first_question="", + title=f"Telegram continued: {chat_id}", + ) + + http = await self.fclient.use_http() + + await ckit_ask_model.thread_add_user_message( + http, + ft_id, + summary, + "telegram_restart", + ftm_alt=100, + role="cd_instruction", + ) + + searchable = f"telegram/{self.rcx.persona.persona_id}/{chat_id}" + await ckit_ask_model.thread_app_capture_patch( + http, + ft_id, + ft_app_searchable=searchable, + ft_app_specific=json.dumps({ + "last_posted_assistant_ts": time.time(), + "contact_id": contact_id, + "previous_ft_id": old_thread.thread_fields.ft_id, + "restart_reason": reason, + "started_ts": time.time(), + "platform": "telegram", + }), + ) + + await ckit_ask_model.thread_app_capture_patch( + http, + old_thread.thread_fields.ft_id, + ft_app_searchable="", + ) + + await self._update_conversation_state(chat_id, { + "ft_id": ft_id, + "contact_id": contact_id, + "updated_ts": time.time(), + }) + + logger.info( + "Restarted thread %s -> %s for persona %s, reason: %s", + old_thread.thread_fields.ft_id, + ft_id, + self.rcx.persona.persona_id, + reason + ) + + except Exception as e: + logger.error("Failed to restart thread with summary", exc_info=e) + + async def _post_message_to_thread(self, chat_id: str, text: str, username: str, thread): + try: + http = await self.fclient.use_http() + + await ckit_ask_model.thread_add_user_message( + http, + thread.thread_fields.ft_id, + content=text, + ftm_alt=100, + who_is_asking=f"telegram_user_{username}", + ) + + logger.debug("Posted message to thread %s from Telegram %s", thread.thread_fields.ft_id, chat_id) + + except Exception as e: + logger.error("Failed to post message to thread", exc_info=e) + + if self.application: + await self.application.bot.send_message( + chat_id=int(chat_id), + text="Sorry, there was an error processing your message. Please try again." + ) + + async def look_assistant_might_have_posted_something(self, msg: ckit_ask_model.FThreadMessageOutput) -> bool: + if msg.ftm_role != "assistant": + return False + if not msg.ftm_content: + return False + + fthread = self.rcx.latest_threads.get(msg.ftm_belongs_to_ft_id, None) + if not fthread: + return False + + searchable = fthread.thread_fields.ft_app_searchable + if not searchable or not searchable.startswith(f"telegram/{self.rcx.persona.persona_id}/"): + return False + + parts = searchable.split("/") + if len(parts) < 3: + return False + + chat_id = parts[2] + + app_specific = fthread.thread_fields.ft_app_specific + if app_specific: + last_posted_ts = app_specific.get("last_posted_assistant_ts", 0) + if msg.ftm_created_ts <= last_posted_ts: + return False + + try: + if self.application: + await self.application.bot.send_message( + chat_id=int(chat_id), + text=msg.ftm_content, + ) + + http = await self.fclient.use_http() + await ckit_ask_model.thread_app_capture_patch( + http, + fthread.thread_fields.ft_id, + ft_app_specific=json.dumps({ + **(app_specific or {}), + "last_posted_assistant_ts": msg.ftm_created_ts, + }) + ) + + logger.debug("Sent assistant message to Telegram %s", chat_id) + return True + + except Exception as e: + logger.error("Failed to send message to Telegram %s", chat_id, exc_info=e) + return False + + return False + + def _thread_capturing(self, chat_id: str): + searchable = f"telegram/{self.rcx.persona.persona_id}/{chat_id}" + for t in self.rcx.latest_threads.values(): + if t.thread_fields.ft_app_searchable == searchable: + return t + return None + + async def _get_conversation_state(self, chat_id: str): + """Get conversation state from in-memory cache.""" + return self._conversation_state.get(chat_id) + + async def _update_conversation_state(self, chat_id: str, state: dict): + """Update conversation state in in-memory cache.""" + state["updated_ts"] = time.time() + self._conversation_state[chat_id] = state diff --git a/flexus_client_kit/integrations/fi_whatsapp.py b/flexus_client_kit/integrations/fi_whatsapp.py new file mode 100644 index 0000000..75e9040 --- /dev/null +++ b/flexus_client_kit/integrations/fi_whatsapp.py @@ -0,0 +1,21 @@ +import logging +from typing import Optional + +from pymongo.collection import Collection + +from flexus_client_kit import ckit_client +from flexus_client_kit import ckit_ask_model +from flexus_client_kit import ckit_bot_exec +from flexus_client_kit.integrations.fi_messenger import IntegrationMessenger + +logger = logging.getLogger("whatsapp") + + +class IntegrationWhatsApp(IntegrationMessenger): + def __init__( + self, + fclient: ckit_client.FlexusClient, + rcx: ckit_bot_exec.RobotContext, + ): + super().__init__(fclient, rcx) + logger.info("WhatsApp integration initialized") diff --git a/flexus_simple_bots/rick/rick_bot.py b/flexus_simple_bots/rick/rick_bot.py index 2ccbd4b..a66535c 100644 --- a/flexus_simple_bots/rick/rick_bot.py +++ b/flexus_simple_bots/rick/rick_bot.py @@ -1,5 +1,7 @@ import asyncio import logging +import os +import json from typing import Dict, Any from pymongo import AsyncMongoClient @@ -11,11 +13,13 @@ from flexus_client_kit import ckit_ask_model from flexus_client_kit import ckit_kanban from flexus_client_kit import ckit_mongo +from flexus_client_kit import ckit_guest_chat from flexus_client_kit.integrations import fi_gmail from flexus_client_kit.integrations import fi_pdoc from flexus_client_kit.integrations import fi_erp from flexus_client_kit.integrations import fi_mongo_store from flexus_client_kit.integrations import fi_crm_automations +from flexus_client_kit.integrations import fi_telegram from flexus_simple_bots.rick import rick_install from flexus_simple_bots.version_common import SIMPLE_BOTS_COMMON_VERSION @@ -30,6 +34,36 @@ ERP_TABLES = ["crm_task", "crm_contact"] +TELEGRAM_INVITE_TOOL = ckit_cloudtool.CloudTool( + name="generate_telegram_invite", + description="Generate a Telegram invitation link for the current contact to continue the conversation in Telegram. Call this when the user asks to switch to Telegram or requests a messenger link.", + parameters={ + "type": "object", + "properties": { + "contact_id": { + "type": "string", + "description": "The CRM contact ID for this user (from ERP contact record)" + }, + }, + "required": ["contact_id"], + } +) + +GUEST_CHAT_INVITE_TOOL = ckit_cloudtool.CloudTool( + name="generate_guest_url", + description="Generate a guest chat invitation link for the current contact to continue the conversation as a guest user. Call this when you want to provide a web chat link for the customer to join as a guest if you are asked to send an outreach email to the user.", + parameters={ + "type": "object", + "properties": { + "contact_id": { + "type": "string", + "description": "The CRM contact ID for this user (from ERP contact record)" + }, + }, + "required": ["contact_id"], + } +) + TOOLS = [ fi_gmail.GMAIL_TOOL, fi_pdoc.POLICY_DOCUMENT_TOOL, @@ -38,6 +72,8 @@ fi_erp.ERP_TABLE_CRUD_TOOL, fi_mongo_store.MONGO_STORE_TOOL, fi_crm_automations.CRM_AUTOMATION_TOOL, + TELEGRAM_INVITE_TOOL, + GUEST_CHAT_INVITE_TOOL, ] @@ -59,9 +95,29 @@ def get_setup(): fclient, rcx, get_setup, available_erp_tables=ERP_TABLES, ) + telegram_bot_token = os.getenv("RICK_TELEGRAM_BOT_TOKEN", None) + telegram_bot_username = os.getenv("RICK_TELEGRAM_BOT_USERNAME", "flexus_rick_bot") + telegram_integration = None + + if telegram_bot_token: + telegram_integration = fi_telegram.IntegrationTelegram( + fclient=fclient, + rcx=rcx, + telegram_bot_token=telegram_bot_token, + bot_username=telegram_bot_username, + enable_summarization=True, + message_limit_threshold=80, + budget_limit_threshold=0.8, + ) + await telegram_integration.start_reactive() + logger.info("Telegram integration enabled for Rick persona %s", rcx.persona.persona_id) + else: + logger.warning("Telegram integration disabled (no RICK_TELEGRAM_BOT_TOKEN)") + @rcx.on_updated_message async def updated_message_in_db(msg: ckit_ask_model.FThreadMessageOutput): - pass + if telegram_integration: + await telegram_integration.look_assistant_might_have_posted_something(msg) @rcx.on_updated_thread async def updated_thread_in_db(th: ckit_ask_model.FThreadOutput): @@ -95,6 +151,94 @@ async def toolcall_erp_crud(toolcall: ckit_cloudtool.FCloudtoolCall, model_produ async def toolcall_mongo_store(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str: return await fi_mongo_store.handle_mongo_store(rcx.workdir, mongo_collection, toolcall, model_produced_args) + @rcx.on_tool_call(TELEGRAM_INVITE_TOOL.name) + async def toolcall_telegram_invite(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str: + if not telegram_integration: + return "Telegram integration is not enabled for this bot. Please contact your administrator to set up RICK_TELEGRAM_BOT_TOKEN." + + # Try both direct access and nested "args" key + contact_id = ckit_cloudtool.try_best_to_find_argument( + model_produced_args.get("args", {}), + model_produced_args, + "contact_id", + "" + ) + + if not contact_id: + return f"Error: contact_id is required to generate a Telegram invite. Received: {json.dumps(model_produced_args)}" + + import gql + http_client = await fclient.use_http() + async with http_client as http: + result = await http.execute( + gql.gql("""mutation GenerateTelegramInvite( + $platform: String!, + $ft_id: String!, + $contact_id: String!, + $ttl: Int! + $bot_username: String! + ) { + generate_messenger_invite( + platform: $platform, + ft_id: $ft_id, + contact_id: $contact_id, + ttl: $ttl + bot_username: $bot_username + ) { + handle + link + expires_in + platform + } + }"""), + variable_values={ + "platform": "telegram", + "ft_id": toolcall.fcall_ft_id, + "contact_id": contact_id, + "ttl": 600, + "bot_username": telegram_bot_username, + }, + ) + + invite = result["generate_messenger_invite"] + + return f"""Great! Here's your Telegram invitation: + +šŸ”— Click this link: {invite['link']} +šŸ”‘ Or search for @{telegram_bot_username} and enter code: {invite['handle']} + +ā° This invitation expires in {invite['expires_in'] // 60} minutes. + +Once you're connected, our conversation will continue seamlessly on Telegram!""" + + @rcx.on_tool_call(GUEST_CHAT_INVITE_TOOL.name) + async def toolcall_guest_chat_invite(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str: + contact_id = ckit_cloudtool.try_best_to_find_argument( + model_produced_args.get("args", {}), + model_produced_args, + "contact_id", + "" + ) + + if not contact_id: + return f"Error: contact_id is required to generate a guest chat invite. Received: {json.dumps(model_produced_args)}" + + result = await ckit_guest_chat.create_guest_accessible_thread_with_crm_context( + fclient, + rcx.persona.persona_id, + rcx.persona.ws_id, + rcx.persona.located_fgroup_id, + contact_id, + platform="web", + title="Guest Chat Invitation", + additional_context=None, + skill="default", + ) + + guest_url = result["guest_url"] + + return f"Great! Here's your guest URL:\n\nšŸ”— {guest_url}" + @rcx.on_tool_call(fi_crm_automations.CRM_AUTOMATION_TOOL.name) async def toolcall_crm_automation(toolcall: ckit_cloudtool.FCloudtoolCall, model_produced_args: Dict[str, Any]) -> str: return await automations_integration.handle_crm_automation(toolcall, model_produced_args) @@ -104,6 +248,8 @@ async def toolcall_crm_automation(toolcall: ckit_cloudtool.FCloudtoolCall, model await rcx.unpark_collected_events(sleep_if_no_work=10.0) finally: + if telegram_integration: + await telegram_integration.close() logger.info("%s exit" % (rcx.persona.persona_id,)) diff --git a/flexus_simple_bots/rick/rick_prompts.py b/flexus_simple_bots/rick/rick_prompts.py index 6574937..1a4a445 100644 --- a/flexus_simple_bots/rick/rick_prompts.py +++ b/flexus_simple_bots/rick/rick_prompts.py @@ -33,6 +33,15 @@ {fi_crm_automations.AUTOMATIONS_PROMPT} +Telegram Integration: +When a contact asks to continue the conversation on Telegram or requests a messenger link, use the generate_telegram_invite() tool with their contact_id. +This will create a time-limited invitation link that seamlessly transfers the conversation to Telegram. +Only offer this when Telegram integration is enabled in your setup. + +Outreach Emails: +Right before sending outreach emails, check the contact's details and generate a guest URL using generate_guest_url(). This URL provides a free-of-charge chat thread with you, so customer can ask for a follow-up and get support there. +Don't offer user the guest URL if already talking in messengers. + {prompts_common.PROMPT_KANBAN} {prompts_common.PROMPT_PRINT_WIDGET} {prompts_common.PROMPT_POLICY_DOCUMENTS}