diff --git a/flexus_client_kit/ckit_bot_exec.py b/flexus_client_kit/ckit_bot_exec.py index e9789fd..ca0f26a 100644 --- a/flexus_client_kit/ckit_bot_exec.py +++ b/flexus_client_kit/ckit_bot_exec.py @@ -77,8 +77,9 @@ def __init__(self, fclient: ckit_client.FlexusClient, p: ckit_bot_query.FPersona self._parked_messages: Dict[str, ckit_ask_model.FThreadMessageOutput] = {} self._parked_threads: Dict[str, ckit_ask_model.FThreadOutput] = {} self._parked_tasks: Dict[str, ckit_kanban.FPersonaKanbanTaskOutput] = {} - self._parked_toolcalls: List[ckit_cloudtool.FCloudtoolCall] = [] - self._parked_erp_changes: List[tuple[str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = [] + self._parked_toolcalls: Dict[str, ckit_cloudtool.FCloudtoolCall] = {} + self._processing_toolcalls: set[str] = set() + self._parked_erp_changes: Dict[tuple[str, str], tuple[str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = {} self._parked_anything_new = asyncio.Event() # These fields are designed for direct access: self.fclient = fclient @@ -153,9 +154,9 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls except Exception as e: logger.error("%s error in on_updated_task handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e) - erp_changes = list(self._parked_erp_changes) + erp_changes = list(self._parked_erp_changes.items()) self._parked_erp_changes.clear() - for table_name, action, new_record_dict, old_record_dict in erp_changes: + for (table_name, record_id), (action, new_record_dict, old_record_dict) in erp_changes: did_anything = True handler = self._handler_per_erp_table_change.get(table_name) if handler: @@ -167,7 +168,10 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls except Exception as e: logger.error("%s error in on_erp_change(%r) handler: %s\n%s", self.persona.persona_id, table_name, type(e).__name__, e, exc_info=e) - mycalls = list(self._parked_toolcalls) + mycalls = list(self._parked_toolcalls.values()) + for c in mycalls: + if c.fcall_name in self._handler_per_tool: + self._processing_toolcalls.add(c.fcall_id) self._parked_toolcalls.clear() for c in mycalls: did_anything = True @@ -179,8 +183,11 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls await self._local_tool_call(self.fclient, c) except Exception as e: logger.error("%s error in on_tool_call() handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e) + finally: + self._processing_toolcalls.discard(c.fcall_id) else: task = asyncio.create_task(self._local_tool_call(self.fclient, c)) + task.add_done_callback(lambda _, fcall_id=c.fcall_id: self._processing_toolcalls.discard(fcall_id)) task.add_done_callback(lambda t: self.bg_call_tasks.discard(t)) self.bg_call_tasks.add(task) @@ -473,9 +480,11 @@ async def subscribe_and_produce_callbacks( toolcall = upd.news_payload_toolcall persona_id = toolcall.connected_persona_id if persona_id in bc.bots_running: - logger.info("%s parked tool call %s %s", persona_id, toolcall.fcall_id, toolcall.fcall_name) - bc.bots_running[persona_id].instance_rcx._parked_toolcalls.append(toolcall) - bc.bots_running[persona_id].instance_rcx._parked_anything_new.set() + rcx = bc.bots_running[persona_id].instance_rcx + if toolcall.fcall_id not in rcx._processing_toolcalls and toolcall.fcall_id not in rcx._parked_toolcalls: + logger.info("%s parked tool call %s %s", persona_id, toolcall.fcall_id, toolcall.fcall_name) + rcx._parked_toolcalls[toolcall.fcall_id] = toolcall + rcx._parked_anything_new.set() else: logger.info("%s is about persona=%s which is not running here." % (toolcall.fcall_id, persona_id)) @@ -502,8 +511,9 @@ async def subscribe_and_produce_callbacks( handled = True new_record = upd.news_payload_erp_record_new old_record = upd.news_payload_erp_record_old + record_id = upd.news_payload_id for bot in bc.bots_running.values(): - bot.instance_rcx._parked_erp_changes.append((table_name, upd.news_action, new_record, old_record)) + bot.instance_rcx._parked_erp_changes[(table_name, record_id)] = (upd.news_action, new_record, old_record) bot.instance_rcx._parked_anything_new.set() elif upd.news_action == "INITIAL_UPDATES_OVER": diff --git a/flexus_client_kit/ckit_cloudtool.py b/flexus_client_kit/ckit_cloudtool.py index efd45e4..7f16704 100644 --- a/flexus_client_kit/ckit_cloudtool.py +++ b/flexus_client_kit/ckit_cloudtool.py @@ -13,6 +13,7 @@ from flexus_client_kit import ckit_client from flexus_client_kit import ckit_shutdown from flexus_client_kit import ckit_utils +from flexus_client_kit import ckit_passwords from flexus_client_kit import gql_utils logger = logging.getLogger("ctool") @@ -441,9 +442,10 @@ async def run_cloudtool_service( except (websockets.exceptions.ConnectionClosedError, gql.transport.exceptions.TransportError, OSError): if ckit_shutdown.shutdown_event.is_set(): break - logger.info("got disconnected, will connect again in 60s") - await ckit_shutdown.wait(60) + retry_sec = 5 if ckit_passwords.it_might_be_a_devbox else 60 + logger.info("got disconnected, will connect again in %ds", retry_sec) + await ckit_shutdown.wait(retry_sec) except Exception as e: logger.error("caught exception %s: %s" % (type(e).__name__, e), exc_info=e) - await ckit_shutdown.wait(60) + await ckit_shutdown.wait(5 if ckit_passwords.it_might_be_a_devbox else 60) diff --git a/flexus_client_kit/integrations/fi_github.py b/flexus_client_kit/integrations/fi_github.py index 4172519..34825c9 100644 --- a/flexus_client_kit/integrations/fi_github.py +++ b/flexus_client_kit/integrations/fi_github.py @@ -39,7 +39,7 @@ def __init__(self, fclient: ckit_client.FlexusClient, rcx: ckit_bot_exec.RobotCo def is_read_only_command(self, args: List[str]) -> bool: if not args or args[0] in {"search", "status", "help", "--help", "-h", "version", "--version"}: return True - READ_VERBS = {"view", "list", "status", "search", "browse", "show", "diff", "item-list", "field-list"} + READ_VERBS = {"view", "list", "status", "search", "browse", "show", "diff", "item-list", "field-list", "files"} return len(args) >= 2 and args[1] in READ_VERBS def _is_allowed_write_command(self, args: List[str]) -> bool: