Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions flexus_client_kit/ckit_bot_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ def __init__(self, fclient: ckit_client.FlexusClient, p: ckit_bot_query.FPersona
self._parked_messages: Dict[str, ckit_ask_model.FThreadMessageOutput] = {}
self._parked_threads: Dict[str, ckit_ask_model.FThreadOutput] = {}
self._parked_tasks: Dict[str, ckit_kanban.FPersonaKanbanTaskOutput] = {}
self._parked_toolcalls: List[ckit_cloudtool.FCloudtoolCall] = []
self._parked_erp_changes: List[tuple[str, str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = []
self._parked_toolcalls: Dict[str, ckit_cloudtool.FCloudtoolCall] = {}
self._processing_toolcalls: set[str] = set()
self._parked_erp_changes: Dict[tuple[str, str], tuple[str, Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] = {}
self._parked_anything_new = asyncio.Event()
# These fields are designed for direct access:
self.fclient = fclient
Expand Down Expand Up @@ -153,9 +154,9 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls
except Exception as e:
logger.error("%s error in on_updated_task handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e)

erp_changes = list(self._parked_erp_changes)
erp_changes = list(self._parked_erp_changes.items())
self._parked_erp_changes.clear()
for table_name, action, new_record_dict, old_record_dict in erp_changes:
for (table_name, record_id), (action, new_record_dict, old_record_dict) in erp_changes:
did_anything = True
handler = self._handler_per_erp_table_change.get(table_name)
if handler:
Expand All @@ -167,7 +168,10 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls
except Exception as e:
logger.error("%s error in on_erp_change(%r) handler: %s\n%s", self.persona.persona_id, table_name, type(e).__name__, e, exc_info=e)

mycalls = list(self._parked_toolcalls)
mycalls = list(self._parked_toolcalls.values())
for c in mycalls:
if c.fcall_name in self._handler_per_tool:
self._processing_toolcalls.add(c.fcall_id)
self._parked_toolcalls.clear()
for c in mycalls:
did_anything = True
Expand All @@ -179,8 +183,11 @@ async def unpark_collected_events(self, sleep_if_no_work: float, turn_tool_calls
await self._local_tool_call(self.fclient, c)
except Exception as e:
logger.error("%s error in on_tool_call() handler: %s\n%s", self.persona.persona_id, type(e).__name__, e, exc_info=e)
finally:
self._processing_toolcalls.discard(c.fcall_id)
else:
task = asyncio.create_task(self._local_tool_call(self.fclient, c))
task.add_done_callback(lambda _, fcall_id=c.fcall_id: self._processing_toolcalls.discard(fcall_id))
task.add_done_callback(lambda t: self.bg_call_tasks.discard(t))
self.bg_call_tasks.add(task)

Expand Down Expand Up @@ -473,9 +480,11 @@ async def subscribe_and_produce_callbacks(
toolcall = upd.news_payload_toolcall
persona_id = toolcall.connected_persona_id
if persona_id in bc.bots_running:
logger.info("%s parked tool call %s %s", persona_id, toolcall.fcall_id, toolcall.fcall_name)
bc.bots_running[persona_id].instance_rcx._parked_toolcalls.append(toolcall)
bc.bots_running[persona_id].instance_rcx._parked_anything_new.set()
rcx = bc.bots_running[persona_id].instance_rcx
if toolcall.fcall_id not in rcx._processing_toolcalls and toolcall.fcall_id not in rcx._parked_toolcalls:
logger.info("%s parked tool call %s %s", persona_id, toolcall.fcall_id, toolcall.fcall_name)
rcx._parked_toolcalls[toolcall.fcall_id] = toolcall
rcx._parked_anything_new.set()
else:
logger.info("%s is about persona=%s which is not running here." % (toolcall.fcall_id, persona_id))

Expand All @@ -502,8 +511,9 @@ async def subscribe_and_produce_callbacks(
handled = True
new_record = upd.news_payload_erp_record_new
old_record = upd.news_payload_erp_record_old
record_id = upd.news_payload_id
for bot in bc.bots_running.values():
bot.instance_rcx._parked_erp_changes.append((table_name, upd.news_action, new_record, old_record))
bot.instance_rcx._parked_erp_changes[(table_name, record_id)] = (upd.news_action, new_record, old_record)
bot.instance_rcx._parked_anything_new.set()

elif upd.news_action == "INITIAL_UPDATES_OVER":
Expand Down
8 changes: 5 additions & 3 deletions flexus_client_kit/ckit_cloudtool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from flexus_client_kit import ckit_client
from flexus_client_kit import ckit_shutdown
from flexus_client_kit import ckit_utils
from flexus_client_kit import ckit_passwords
from flexus_client_kit import gql_utils

logger = logging.getLogger("ctool")
Expand Down Expand Up @@ -441,9 +442,10 @@ async def run_cloudtool_service(
except (websockets.exceptions.ConnectionClosedError, gql.transport.exceptions.TransportError, OSError):
if ckit_shutdown.shutdown_event.is_set():
break
logger.info("got disconnected, will connect again in 60s")
await ckit_shutdown.wait(60)
retry_sec = 5 if ckit_passwords.it_might_be_a_devbox else 60
logger.info("got disconnected, will connect again in %ds", retry_sec)
await ckit_shutdown.wait(retry_sec)

except Exception as e:
logger.error("caught exception %s: %s" % (type(e).__name__, e), exc_info=e)
await ckit_shutdown.wait(60)
await ckit_shutdown.wait(5 if ckit_passwords.it_might_be_a_devbox else 60)
2 changes: 1 addition & 1 deletion flexus_client_kit/integrations/fi_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, fclient: ckit_client.FlexusClient, rcx: ckit_bot_exec.RobotCo
def is_read_only_command(self, args: List[str]) -> bool:
if not args or args[0] in {"search", "status", "help", "--help", "-h", "version", "--version"}:
return True
READ_VERBS = {"view", "list", "status", "search", "browse", "show", "diff", "item-list", "field-list"}
READ_VERBS = {"view", "list", "status", "search", "browse", "show", "diff", "item-list", "field-list", "files"}
return len(args) >= 2 and args[1] in READ_VERBS

def _is_allowed_write_command(self, args: List[str]) -> bool:
Expand Down