diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 340fddc1032..b42549b8ed3 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -30,6 +30,8 @@ from api.db.services.file_service import FileService from api.db.services.user_service import TenantService from api.db.services.user_canvas_version import UserCanvasVersionService +from api.db.services.schedule_agent_service import ScheduleAgentService + from api.settings import RetCode from api.utils import get_uuid from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result @@ -471,6 +473,277 @@ def sessions(canvas_id): return server_error_response(e) +@manager.route('/schedule/create', methods=['POST']) # type: ignore # noqa: F821 +@validate_request("canvas_id", "name", "frequency_type") +@login_required +def create_schedule(): + req = request.json + req["tenant_id"] = current_user.id # Using user_id as tenant_id for simplicity + req["created_by"] = current_user.id + req["id"] = get_uuid() + + try: + # Validate schedule data + ScheduleAgentService.validate_schedule_data(**req) + + # Check if canvas exists and user has permission + e, canvas = UserCanvasService.get_by_id(req["canvas_id"]) + if not e: + return get_data_error_result(message="Canvas not found") + + if not UserCanvasService.query(user_id=current_user.id, id=req["canvas_id"]): + return get_json_result( + data=False, message='Only owner of canvas authorized for this operation.', + code=RetCode.OPERATING_ERROR) + + # Handle execute_date conversion - only date part + if req.get('execute_date') and isinstance(req['execute_date'], str): + from datetime import datetime + # Parse date string (YYYY-MM-DD format) and convert to datetime with midnight time + date_str = req['execute_date'].split('T')[0] # Remove time part if present + req['execute_date'] = datetime.strptime(date_str, '%Y-%m-%d').date() + + schedule = ScheduleAgentService.create_schedule(**req) + return get_json_result(data=schedule.to_dict() if hasattr(schedule, 'to_dict') else schedule) + + except ValueError as e: + return get_data_error_result(message=str(e)) + except Exception as e: + return server_error_response(e) + +@manager.route('/schedule/update', methods=['POST']) # type: ignore # noqa: F821 +@validate_request("frequency_type", "id") +@login_required +def update_schedule(): + req = request.json + schedule_id = req.get("id") + if not schedule_id: + return get_data_error_result(message="Schedule ID is required for update") + try: + ScheduleAgentService.validate_schedule_data(**req) + + e, schedule = ScheduleAgentService.get_by_id(schedule_id) + if not e: + return get_data_error_result(message="Schedule not found") + + if schedule.created_by != current_user.id: + return get_json_result( + data=False, message='Only owner of schedule authorized for this operation.', + code=RetCode.OPERATING_ERROR) + ScheduleAgentService.validate_schedule_data(**req) + result = ScheduleAgentService.update_by_id(schedule_id, req) + + if result: + logging.info(f"[CANVAS_APP] Successfully updated schedule {schedule_id}") + else: + logging.error(f"[CANVAS_APP] Failed to update schedule {schedule_id}") + + return get_json_result(data=True) + + except ValueError as e: + return get_data_error_result(message=str(e)) + except Exception as e: + return server_error_response(e) + +@manager.route('/schedule/frequency-options', methods=['GET']) # type: ignore # noqa: F821 +@login_required +def get_frequency_options(): + """Get available frequency options and their configurations""" + options = { + "frequency_types": [ + { + "value": "once", + "label": "One Time", + "description": "Execute once at a specific date and time", + "required_fields": ["execute_date", "execute_time"] + }, + { + "value": "daily", + "label": "Daily", + "description": "Execute every day at a specific time", + "required_fields": ["execute_time"] + }, + { + "value": "weekly", + "label": "Weekly", + "description": "Execute on specific days of the week", + "required_fields": ["days_of_week", "execute_time"] + }, + { + "value": "monthly", + "label": "Monthly", + "description": "Execute on a specific day of each month", + "required_fields": ["day_of_month", "execute_time"] + } + ], + "days_of_week": [ + {"value": 1, "label": "Monday"}, + {"value": 2, "label": "Tuesday"}, + {"value": 3, "label": "Wednesday"}, + {"value": 4, "label": "Thursday"}, + {"value": 5, "label": "Friday"}, + {"value": 6, "label": "Saturday"}, + {"value": 7, "label": "Sunday"} + ], + "time_format": "HH:MM:SS", + "date_format": "ISO 8601 (YYYY-MM-DDTHH:MM:SS.sssZ)" + } + + return get_json_result(data=options) + +@manager.route('/schedule/list', methods=['GET']) # type: ignore # noqa: F821 +@login_required +def list_schedules(): + """Get schedules list with pagination and search""" + try: + page = int(request.args.get('page', 1)) + page_size = int(request.args.get('page_size', 20)) + keywords = request.args.get('keywords', '') + canvas_id = request.args.get('canvas_id', '') + + # Get schedules created by current user + schedules, total = ScheduleAgentService.get_schedules_paginated( + created_by=current_user.id, + canvas_id=canvas_id, + keywords=keywords, + page=page, + page_size=page_size + ) + + # Convert to dict and add canvas info + schedule_list = [] + for schedule in schedules: + schedule_dict = schedule.to_dict() if hasattr(schedule, 'to_dict') else schedule.__dict__['__data__'] + + # Get canvas title for display + try: + e, canvas = UserCanvasService.get_by_id(schedule_dict['canvas_id']) + if e: + schedule_dict['canvas_title'] = canvas.title + else: + schedule_dict['canvas_title'] = 'Unknown Canvas' + except Exception: + schedule_dict['canvas_title'] = 'Unknown Canvas' + + schedule_list.append(schedule_dict) + + return get_json_result(data={ + "schedules": schedule_list, + "total": total, + "page": page, + "page_size": page_size + }) + + except Exception as e: + return server_error_response(e) + +@manager.route('/schedule/toggle/', methods=['POST']) # type: ignore # noqa: F821 +@login_required +def toggle_schedule(schedule_id): + """Toggle schedule enabled status""" + try: + e, schedule = ScheduleAgentService.get_by_id(schedule_id) + if not e: + return get_data_error_result(message="Schedule not found") + + if schedule.created_by != current_user.id: + return get_json_result( + data=False, message='Only owner of schedule authorized for this operation.', + code=RetCode.OPERATING_ERROR) + + # Toggle enabled status + new_enabled = not schedule.enabled + ScheduleAgentService.update_by_id(schedule_id, {'enabled': new_enabled}) + + return get_json_result(data={'enabled': new_enabled}) + + except Exception as e: + return server_error_response(e) + +@manager.route('/schedule/delete/', methods=['DELETE']) # type: ignore # noqa: F821 +@login_required +def delete_schedule(schedule_id): + """Delete a schedule""" + try: + e, schedule = ScheduleAgentService.get_by_id(schedule_id) + if not e: + return get_data_error_result(message="Schedule not found") + + if schedule.created_by != current_user.id: + return get_json_result( + data=False, message='Only owner of schedule authorized for this operation.', + code=RetCode.OPERATING_ERROR) + + # Delete the schedule + ScheduleAgentService.delete_by_id(schedule_id) + + return get_json_result(data=True) + + except Exception as e: + return server_error_response(e) + +@manager.route('/schedule/history/', methods=['GET']) # type: ignore # noqa: F821 +@login_required +def get_schedule_history(schedule_id): + """Get execution history for a schedule""" + try: + e, schedule = ScheduleAgentService.get_by_id(schedule_id) + if not e: + return get_data_error_result(message="Schedule not found") + + if schedule.created_by != current_user.id: + return get_json_result( + data=False, message='Only owner of schedule authorized for this operation.', + code=RetCode.OPERATING_ERROR) + + limit = int(request.args.get('limit', 20)) + history = ScheduleAgentService.get_schedule_execution_history(schedule_id, limit) + + # Convert to dict + history_list = [] + for run in history: + run_dict = run.to_dict() if hasattr(run, 'to_dict') else run.__dict__['__data__'] + + + + history_list.append(run_dict) + + return get_json_result(data=history_list) + + except Exception as e: + return server_error_response(e) + +@manager.route('/schedule/stats/', methods=['GET']) # type: ignore # noqa: F821 +@login_required +def get_schedule_stats(schedule_id): + """Get execution statistics for a schedule""" + try: + e, schedule = ScheduleAgentService.get_by_id(schedule_id) + if not e: + return get_data_error_result(message="Schedule not found") + + if schedule.created_by != current_user.id: + return get_json_result( + data=False, message='Only owner of schedule authorized for this operation.', + code=RetCode.OPERATING_ERROR) + + stats = { + 'total_runs': ScheduleAgentService._get_run_count(schedule_id, success_only=False), + 'successful_runs': ScheduleAgentService._get_run_count(schedule_id, success_only=True), + 'failed_runs': ScheduleAgentService._get_run_count(schedule_id, success_only=False) - ScheduleAgentService._get_run_count(schedule_id, success_only=True), + 'last_successful_run': None, + 'is_currently_running': ScheduleAgentService._is_currently_running(schedule_id) + } + + last_run = ScheduleAgentService._get_last_successful_run(schedule_id) + if last_run: + stats['last_successful_run'] = last_run.to_dict() if hasattr(last_run, 'to_dict') else last_run.__dict__['__data__'] + + return get_json_result(data=stats) + + except Exception as e: + return server_error_response(e) + @manager.route('/prompts', methods=['GET']) # noqa: F821 @login_required def prompts(): diff --git a/api/db/db_models.py b/api/db/db_models.py index 1ff6dfc9637..3d7fa244122 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -883,6 +883,46 @@ class Meta: db_table = "user_canvas_version" +class ScheduleAgent(DataBaseModel): + id = CharField(max_length=32, primary_key=True) + tenant_id = CharField(max_length=32, null=False, index=True) + canvas_id = CharField(max_length=32, null=False, help_text="canvas id to execute", index=True) + name = CharField(max_length=255, null=False, help_text="schedule name", index=True) + description = TextField(null=True, help_text="schedule description") + + # Frequency options + frequency_type = CharField(max_length=20, null=False, help_text="once|daily|weekly|monthly", default="once", index=True) + + # Time settings + execute_time = CharField(max_length=8, null=True, help_text="HH:MM:SS format", index=True) + execute_date = DateTimeField(null=True, help_text="specific date for one-time execution", index=True) + + # Weekly settings + days_of_week = JSONField(null=False, default=[], help_text="[1,2,3,4,5,6,7] where 1=Monday") + + # Monthly settings + day_of_month = IntegerField(null=True, help_text="day of month (1-31)", index=True) + + enabled = BooleanField(default=True, help_text="whether the schedule is enabled", index=True) + input_params = JSONField(null=False, default={}, help_text="input parameters for agent") + created_by = CharField(max_length=32, null=False, help_text="who created it", index=True) + status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True) + + class Meta: + db_table = "schedule_agent" + + +class ScheduleAgentRun(DataBaseModel): + id = CharField(max_length=32, primary_key=True) + schedule_id = CharField(max_length=32, null=False, help_text="schedule agent id", index=True) + started_at = DateTimeField(null=False, help_text="execution start datetime", index=True) + finished_at = DateTimeField(null=True, help_text="execution finish datetime", index=True) + success = BooleanField(null=True, help_text="execution result", index=True) + error_message = TextField(null=True, help_text="error message if failed") + conversation_id = CharField(max_length=32, null=True, help_text="conversation id from execution", index=True) + + class Meta: + db_table = "schedule_agent_run" class MCPServer(DataBaseModel): id = CharField(max_length=32, primary_key=True) name = CharField(max_length=255, null=False, help_text="MCP Server name") @@ -942,6 +982,9 @@ class Meta: db_table = "search" + + + def migrate_db(): logging.disable(logging.ERROR) migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB) diff --git a/api/db/services/schedule_agent_service.py b/api/db/services/schedule_agent_service.py new file mode 100644 index 00000000000..3f588dd8048 --- /dev/null +++ b/api/db/services/schedule_agent_service.py @@ -0,0 +1,349 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from typing import List +from datetime import datetime, timedelta +from api.db.db_models import ScheduleAgent, ScheduleAgentRun +from api.db.services.common_service import CommonService +from api.utils import datetime_format, get_uuid + + +class ScheduleAgentService(CommonService): + model = ScheduleAgent + + @classmethod + def create_schedule(cls, **kwargs): + """Create a new schedule""" + logging.info(f"Creating schedule: {kwargs.get('name', 'Unknown')}") + + try: + cls.validate_schedule_data(**kwargs) + result = cls.save(**kwargs) + + if not result: + raise Exception("Failed to save schedule") + + schedule_id = kwargs.get("id") + if not schedule_id: + raise Exception("No schedule ID provided") + + e, schedule_obj = cls.get_by_id(schedule_id) + if not (e and schedule_obj): + raise Exception(f"Failed to retrieve created schedule: {schedule_id}") + + logging.info(f"Successfully created schedule: {schedule_obj.id}") + return schedule_obj + + except Exception as e: + logging.error(f"Error creating schedule: {e}") + raise + + @classmethod + def get_schedules_paginated(cls, created_by=None, canvas_id=None, keywords="", page=1, page_size=20): + """Get schedules with pagination and filtering""" + try: + conditions = [cls.model.status == "1"] + + if created_by: + conditions.append(cls.model.created_by == created_by) + if canvas_id: + conditions.append(cls.model.canvas_id == canvas_id) + if keywords: + conditions.append((cls.model.name.contains(keywords)) | (cls.model.description.contains(keywords))) + + query = cls.model.select().where(*conditions) + total = query.count() + + schedules = query.order_by(cls.model.create_time.desc()).paginate(page, page_size) + return list(schedules), total + + except Exception as e: + logging.error(f"Error fetching schedules: {e}") + raise + + @classmethod + def get_pending_schedules(cls) -> List[ScheduleAgent]: + """Get schedules ready to run""" + try: + current_datetime = datetime.now() + all_schedules = cls.query(enabled=True, status="1") + + if not all_schedules: + return [] + + valid_schedules = [] + disabled_count = 0 + + for schedule in all_schedules: + try: + if cls._is_currently_running(schedule.id): + continue + + should_run = cls._should_schedule_run(schedule, current_datetime) + + if should_run: + valid_schedules.append(schedule) + elif schedule.frequency_type == "once" and cls._has_successful_run(schedule.id): + cls.update_by_id(schedule.id, {"enabled": False}) + disabled_count += 1 + + except Exception as e: + logging.error(f"Error processing schedule {schedule.id}: {e}") + continue + + if disabled_count > 0: + logging.info(f"Disabled {disabled_count} completed one-time schedules") + + logging.info(f"Found {len(valid_schedules)} pending schedules") + return valid_schedules + + except Exception as e: + logging.error(f"Error getting pending schedules: {e}") + return [] + + @classmethod + def _should_schedule_run(cls, schedule, current_datetime): + """Check if schedule should run based on type""" + schedule_checks = {"once": cls._should_run_once, "daily": lambda s, dt: cls._should_run_recurring(s, dt, cls._has_run_today), "weekly": cls._should_run_weekly, "monthly": cls._should_run_monthly} + + try: + check_func = schedule_checks.get(schedule.frequency_type) + if check_func: + return check_func(schedule, current_datetime) + return False + except Exception as e: + logging.error(f"Error checking schedule {schedule.id}: {e}") + return False + + @classmethod + def _should_run_once(cls, schedule: ScheduleAgent, current_datetime): + """Check if one-time schedule should run""" + if not schedule.execute_date or not schedule.execute_time: + return False + + execute_datetime = cls._get_execute_datetime(schedule.execute_date, schedule.execute_time) + return current_datetime >= execute_datetime and not cls._has_any_run(schedule.id) + + @classmethod + def _should_run_recurring(cls, schedule: ScheduleAgent, current_datetime, check_already_run_func): + """Generic check for recurring schedules""" + if not schedule.execute_time: + return False + + today_execute_time = cls._get_today_execute_time(current_datetime, schedule.execute_time) + return current_datetime >= today_execute_time and not check_already_run_func(schedule.id) + + @classmethod + def _should_run_weekly(cls, schedule: ScheduleAgent, current_datetime): + """Check if weekly schedule should run""" + if not schedule.execute_time or not schedule.days_of_week: + return False + + current_weekday = current_datetime.weekday() + 1 + if current_weekday not in schedule.days_of_week: + return False + + return cls._should_run_recurring(schedule, current_datetime, cls._has_run_today) + + @classmethod + def _should_run_monthly(cls, schedule: ScheduleAgent, current_datetime): + """Check if monthly schedule should run""" + if not schedule.execute_time or not schedule.day_of_month: + return False + + if current_datetime.day != schedule.day_of_month: + return False + + return cls._should_run_recurring(schedule, current_datetime, cls._has_run_this_month) + + @classmethod + def start_execution(cls, schedule_id): + """Start execution tracking""" + run_id = get_uuid() + ScheduleAgentRun.create(id=run_id, schedule_id=schedule_id, started_at=datetime_format(datetime.now())) + return run_id + + @classmethod + def finish_execution(cls, run_id, success=True, error_message=None, conversation_id=None): + """Finish execution tracking""" + try: + run = ScheduleAgentRun.get_by_id(run_id) + finish_time = datetime_format(datetime.now()) + + ScheduleAgentRun.update(finished_at=finish_time, success=success, error_message=error_message, conversation_id=conversation_id).where(ScheduleAgentRun.id == run_id).execute() + + # Disable one-time schedules after successful execution + if success: + schedule = ScheduleAgent.get_by_id(run.schedule_id) + if schedule.frequency_type == "once": + cls.update_by_id(run.schedule_id, {"enabled": False}) + + return True + except Exception as e: + logging.error(f"Error finishing execution {run_id}: {e}") + return False + + @classmethod + def _run_exists(cls, schedule_id, success_filter=None, time_range=None): + """Generic method to check if runs exist with optional filters""" + try: + query = ScheduleAgentRun.select().where(ScheduleAgentRun.schedule_id == schedule_id) + + if success_filter is not None: + query = query.where(ScheduleAgentRun.success == success_filter) + + if time_range: + start_dt, end_dt = time_range + query = query.where((ScheduleAgentRun.started_at >= start_dt) & (ScheduleAgentRun.started_at <= end_dt)) + + return query.exists() + except Exception: + return False + + @classmethod + def _is_currently_running(cls, schedule_id): + """Check if schedule is currently running""" + try: + return ScheduleAgentRun.select().where((ScheduleAgentRun.schedule_id == schedule_id) & (ScheduleAgentRun.finished_at.is_null(True))).exists() + except Exception: + return False + + @classmethod + def _has_successful_run(cls, schedule_id): + """Check if schedule has any successful run""" + return cls._run_exists(schedule_id, success_filter=True) + + @classmethod + def _has_any_run(cls, schedule_id): + """Check if schedule has any run""" + return cls._run_exists(schedule_id) + + @classmethod + def _has_run_today(cls, schedule_id): + """Check if schedule ran successfully today""" + today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + today_end = today_start.replace(hour=23, minute=59, second=59, microsecond=999999) + time_range = (today_start, today_end) + return cls._run_exists(schedule_id, success_filter=True, time_range=time_range) + + @classmethod + def _has_run_this_month(cls, schedule_id): + """Check if schedule ran successfully this month""" + current_date = datetime.now() + month_start = current_date.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + + if current_date.month == 12: + next_month = current_date.replace(year=current_date.year + 1, month=1, day=1) + else: + next_month = current_date.replace(month=current_date.month + 1, day=1) + month_end = next_month - timedelta(microseconds=1) + + time_range = (month_start, month_end) + return cls._run_exists(schedule_id, success_filter=True, time_range=time_range) + + @classmethod + def _get_last_successful_run(cls, schedule_id): + """Get the last successful run for a schedule""" + try: + return ScheduleAgentRun.select().where((ScheduleAgentRun.schedule_id == schedule_id) & (ScheduleAgentRun.success)).order_by(ScheduleAgentRun.started_at.desc()).first() + except Exception: + return None + + @classmethod + def _get_run_count(cls, schedule_id, success_only=True): + """Get run count for a schedule""" + try: + query = ScheduleAgentRun.select().where(ScheduleAgentRun.schedule_id == schedule_id) + if success_only: + query = query.where(ScheduleAgentRun.success) + return query.count() + except Exception: + return 0 + + @classmethod + def get_schedule_execution_history(cls, schedule_id, limit=10): + """Get execution history for a schedule""" + try: + return list(ScheduleAgentRun.select().where(ScheduleAgentRun.schedule_id == schedule_id).order_by(ScheduleAgentRun.started_at.desc()).limit(limit)) + except Exception: + return [] + + @classmethod + def validate_schedule_data(cls, **kwargs): + """Validate schedule data""" + frequency_type = kwargs.get("frequency_type", "once") + validation_rules = {"once": lambda k: k.get("execute_date"), "weekly": lambda k: k.get("days_of_week") and all(1 <= day <= 7 for day in k.get("days_of_week", [])), "monthly": lambda k: k.get("day_of_month") and 1 <= k.get("day_of_month", 0) <= 31} + + try: + # Validate frequency-specific requirements + if frequency_type in validation_rules: + if not validation_rules[frequency_type](kwargs): + raise ValueError(f"Invalid data for {frequency_type} schedule") + + # Validate time format + execute_time = kwargs.get("execute_time", "00:00:00") + if execute_time: + cls._validate_time_format(execute_time) + + return True + + except Exception as e: + logging.error(f"Schedule validation failed: {e}") + raise + + @classmethod + def _validate_time_format(cls, execute_time): + """Validate time format""" + try: + time_parts = execute_time.split(":") + hour, minute = int(time_parts[0]), int(time_parts[1]) + second = int(time_parts[2]) if len(time_parts) > 2 else 0 + + if not (0 <= hour <= 23 and 0 <= minute <= 59 and 0 <= second <= 59): + raise ValueError("Invalid time format") + except (ValueError, IndexError): + raise ValueError("Time must be in HH:MM:SS format") + + @classmethod + def _get_execute_datetime(cls, execute_date, execute_time): + """Convert execute_date and execute_time to datetime""" + if isinstance(execute_date, str): + date_obj = datetime.strptime(execute_date, "%Y-%m-%d").date() + else: + date_obj = execute_date + + if isinstance(execute_time, str): + time_parts = execute_time.split(":") + hour, minute = int(time_parts[0]), int(time_parts[1]) + second = int(time_parts[2]) if len(time_parts) > 2 else 0 + time_obj = datetime.min.time().replace(hour=hour, minute=minute, second=second) + else: + time_obj = execute_time + + return datetime.combine(date_obj, time_obj) + + @classmethod + def _get_today_execute_time(cls, current_datetime, execute_time): + """Get today's execution time""" + if isinstance(execute_time, str): + time_parts = execute_time.split(":") + hour, minute = int(time_parts[0]), int(time_parts[1]) + second = int(time_parts[2]) if len(time_parts) > 2 else 0 + else: + hour, minute, second = execute_time.hour, execute_time.minute, execute_time.second + + return current_datetime.replace(hour=hour, minute=minute, second=second, microsecond=0) diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 7570b73dd5c..04cd72090bd 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -6,30 +6,34 @@ set -e # Usage and command-line argument parsing # ----------------------------------------------------------------------------- function usage() { - echo "Usage: $0 [--disable-webserver] [--disable-taskexecutor] [--consumer-no-beg=] [--consumer-no-end=] [--workers=] [--host-id=]" + echo "Usage: $0 [--disable-webserver] [--disable-taskexecutor] [--disable-agentexecutor] [--consumer-no-beg=] [--consumer-no-end=] [--workers=] [--agent-workers=] [--host-id=]" echo echo " --disable-webserver Disables the web server (nginx + ragflow_server)." echo " --disable-taskexecutor Disables task executor workers." + echo " --disable-agentexecutor Disables agent executor workers." echo " --enable-mcpserver Enables the MCP server." echo " --consumer-no-beg= Start range for consumers (if using range-based)." echo " --consumer-no-end= End range for consumers (if using range-based)." echo " --workers= Number of task executors to run (if range is not used)." + echo " --agent-workers= Number of agent executors to run." echo " --host-id= Unique ID for the host (defaults to \`hostname\`)." echo echo "Examples:" echo " $0 --disable-taskexecutor" echo " $0 --disable-webserver --consumer-no-beg=0 --consumer-no-end=5" - echo " $0 --disable-webserver --workers=2 --host-id=myhost123" + echo " $0 --disable-webserver --workers=2 --agent-workers=1 --host-id=myhost123" echo " $0 --enable-mcpserver" exit 1 } ENABLE_WEBSERVER=1 # Default to enable web server ENABLE_TASKEXECUTOR=1 # Default to enable task executor +ENABLE_AGENTEXECUTOR=1 # Default to enable agent executor ENABLE_MCP_SERVER=0 CONSUMER_NO_BEG=0 CONSUMER_NO_END=0 WORKERS=1 +AGENT_WORKERS=1 MCP_HOST="127.0.0.1" MCP_PORT=9382 @@ -66,6 +70,10 @@ for arg in "$@"; do ENABLE_TASKEXECUTOR=0 shift ;; + --disable-agentexecutor) + ENABLE_AGENTEXECUTOR=0 + shift + ;; --enable-mcpserver) ENABLE_MCP_SERVER=1 shift @@ -118,6 +126,10 @@ for arg in "$@"; do WORKERS="${arg#*=}" shift ;; + --agent-workers=*) + AGENT_WORKERS="${arg#*=}" + shift + ;; --host-id=*) HOST_ID="${arg#*=}" shift @@ -142,7 +154,6 @@ done < "${TEMPLATE_FILE}" export LD_LIBRARY_PATH="/usr/lib/x86_64-linux-gnu/" PY=python3 - # ----------------------------------------------------------------------------- # Function(s) # ----------------------------------------------------------------------------- @@ -158,6 +169,17 @@ function task_exe() { done } +function agent_exe() { + local consumer_id="$1" + local host_id="$2" + + JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so" + while true; do + LD_PRELOAD="$JEMALLOC_PATH" \ + "$PY" rag/svr/agent_executor.py "${host_id}_${consumer_id}" + done +} + function start_mcp_server() { echo "Starting MCP Server on ${MCP_HOST}:${MCP_PORT} with base URL ${MCP_BASE_URL}..." "$PY" "${MCP_SCRIPT_PATH}" \ @@ -207,4 +229,13 @@ if [[ "${ENABLE_TASKEXECUTOR}" -eq 1 ]]; then fi fi + +if [[ "${ENABLE_AGENTEXECUTOR}" -eq 1 ]]; then + echo "Starting ${AGENT_WORKERS} agent executor(s) on host '${HOST_ID}'..." + for (( i=0; i 0: + REDIS_CONN.zpopmin(CONSUMER_NAME, expired) + + # Clean expired workers + if redis_lock.acquire(): + _clean_expired_workers(now) + + except Exception as e: + logging.error(f"Error in status reporting: {e}") + finally: + redis_lock.release() + + await trio.sleep(30) + + +def _clean_expired_workers(now): + """Clean expired agent executors from Redis""" + try: + agent_executors = REDIS_CONN.smembers("AGENTEXE") + + for consumer_name in agent_executors: + if consumer_name == CONSUMER_NAME: + continue + + expired = REDIS_CONN.zcount( + consumer_name, + now.timestamp() - WORKER_HEARTBEAT_TIMEOUT, + now.timestamp() + 10 + ) + + if expired == 0: + REDIS_CONN.srem("AGENTEXE", consumer_name) + REDIS_CONN.delete(consumer_name) + logging.info(f"Cleaned expired worker: {consumer_name}") + + except Exception as e: + logging.error(f"Error cleaning expired workers: {e}") + + +async def main(): + """Main execution loop""" + logging.info(f""" + ___ __ ______ __ + / _ | ___ ___ ___ __ / /_ / ____/ _____ _______ __/ /_____ _____ + / __ |/ _ \/ -_) _ \/ // __/ / __/ | |/_/ _ \/ ___/ / / / __/ __ \/ ___/ +/_/ |_|\_, /\__/_//_/\_,_\__/ / /____> { + const { data, isLoading } = useQuery({ + queryKey: ['frequencyOptions'], + queryFn: async () => { + const { data } = await agentService.getFrequencyOptions(); + return data?.data ?? {}; + }, + staleTime: 5 * 60 * 1000, // Cache for 5 minutes + }); + + return { data, loading: isLoading }; +}; + +export const useFetchSchedules = ( + canvas_id = '', + page = 1, + pageSize = 20, + keywords = '', +) => { + const { data, isLoading, refetch } = useQuery({ + queryKey: ['schedules', canvas_id, page, pageSize, keywords], + queryFn: async () => { + const { data } = await agentService.listSchedules({ + page, + page_size: pageSize, + canvas_id, + keywords, + }); + return data?.data ?? { schedules: [], total: 0 }; + }, + }); + + return { + schedules: data?.schedules ?? [], + total: data?.total ?? 0, + loading: isLoading, + refetch, + }; +}; + +export const useCreateSchedule = () => { + const queryClient = useQueryClient(); + const { t } = useTranslation(); + + const { mutateAsync, isPending } = useMutation({ + mutationFn: async (params: ICreateScheduleRequest) => { + const { data } = await agentService.createSchedule(params); + return data; + }, + onSuccess: () => { + message.success(t('flow.schedule.createSuccess')); + queryClient.invalidateQueries({ queryKey: ['schedules'] }); + }, + onError: (error: any) => { + message.error( + error?.response?.data?.message || t('flow.schedule.createError'), + ); + }, + }); + + return { createSchedule: mutateAsync, loading: isPending }; +}; + +export const useUpdateSchedule = () => { + const queryClient = useQueryClient(); + const { t } = useTranslation(); + + const { mutateAsync, isPending } = useMutation({ + mutationFn: async (params: IUpdateScheduleRequest) => { + const { data } = await agentService.updateSchedule(params); + return data; + }, + onSuccess: () => { + message.success(t('flow.schedule.updateSuccess')); + queryClient.invalidateQueries({ queryKey: ['schedules'] }); + }, + onError: (error: any) => { + console.error('Update schedule error:', error); + message.error( + error?.response?.data?.message || t('flow.schedule.updateError'), + ); + }, + }); + + return { updateSchedule: mutateAsync, loading: isPending }; +}; + +export const useToggleSchedule = () => { + const queryClient = useQueryClient(); + const { t } = useTranslation(); + + const { mutateAsync, isPending } = useMutation({ + mutationFn: async (id: string) => { + const { data } = await toggleScheduleById({}, id); + return data; + }, + onSuccess: () => { + message.success(t('flow.schedule.toggleSuccess')); + queryClient.invalidateQueries({ queryKey: ['schedules'] }); + }, + onError: (error: any) => { + message.error( + error?.response?.data?.message || t('flow.schedule.toggleError'), + ); + }, + }); + + return { toggleSchedule: mutateAsync, loading: isPending }; +}; + +export const useDeleteSchedule = () => { + const queryClient = useQueryClient(); + const { t } = useTranslation(); + + const { mutateAsync, isPending } = useMutation({ + mutationFn: async (id: string) => { + const { data } = await deleteScheduleById({}, id); + return data; + }, + onSuccess: () => { + message.success(t('flow.schedule.deleteSuccess')); + queryClient.invalidateQueries({ queryKey: ['schedules'] }); + }, + onError: (error: any) => { + message.error( + error?.response?.data?.message || t('flow.schedule.deleteError'), + ); + }, + }); + + return { deleteSchedule: mutateAsync, loading: isPending }; +}; + +export const useFetchScheduleHistory = (scheduleId: string, limit = 20) => { + const { data, isLoading, refetch } = useQuery({ + queryKey: ['scheduleHistory', scheduleId, limit], + queryFn: async () => { + const { data } = await getScheduleHistoryById( + { + limit, + }, + scheduleId, + ); + return data?.data ?? []; + }, + enabled: !!scheduleId, + }); + + return { + history: data ?? [], + loading: isLoading, + refetch, + }; +}; + +export const useFetchScheduleStats = (scheduleId: string) => { + const { data, isLoading, refetch } = useQuery({ + queryKey: ['scheduleStats', scheduleId], + queryFn: async () => { + const { data } = await getScheduleStatsById({}, scheduleId); + return data?.data ?? {}; + }, + enabled: !!scheduleId, + refetchInterval: 30000, // Refresh every 30 seconds + }); + + return { + stats: data ?? {}, + loading: isLoading, + refetch, + }; +}; diff --git a/web/src/interfaces/database/schedule.ts b/web/src/interfaces/database/schedule.ts new file mode 100644 index 00000000000..ac1022862dc --- /dev/null +++ b/web/src/interfaces/database/schedule.ts @@ -0,0 +1,69 @@ +export interface ISchedule { + id: string; + tenant_id: string; + canvas_id: string; + name: string; + description?: string; + frequency_type: 'once' | 'daily' | 'weekly' | 'monthly'; + execute_time?: string; + execute_date?: string; + days_of_week?: number[]; + day_of_month?: number; + enabled: boolean; + input_params?: Record; + created_by: string; + status: string; + create_time: number; + update_time: number; + canvas_title?: string; +} + +export interface IScheduleRun { + id: string; + schedule_id: string; + started_at: Date; + finished_at?: Date; + success?: boolean; + error_message?: string; + conversation_id?: string; +} + +export interface IScheduleStats { + total_runs: number; + successful_runs: number; + failed_runs: number; + last_successful_run?: IScheduleRun; + is_currently_running: boolean; +} + +export interface IFrequencyOptions { + frequency_types: Array<{ + value: string; + label: string; + description: string; + required_fields: string[]; + }>; + days_of_week: Array<{ + value: number; + label: string; + }>; + time_format: string; + date_format: string; +} + +export interface ICreateScheduleRequest { + canvas_id: string; + name: string; + description?: string; + frequency_type: string; + execute_time?: string; + execute_date?: string; + days_of_week?: number[]; + day_of_month?: number; + input_params?: Record; +} + +export interface IUpdateScheduleRequest + extends Omit { + id: string; +} diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 8d9a9c3f16e..d4beab69fcf 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1496,6 +1496,71 @@ This delimiter is used to split the input text into several text pieces echo of task: 'task', beginInputTip: 'By defining input parameters, this content can be accessed by other components in subsequent processes.', + schedule: { + title: 'Schedule', + name: 'Schedule name', + nameRequired: 'Please enter a schedule name', + namePlaceholder: 'Please enter a schedule name', + frequency: 'Frequency', + runCount: 'Run count', + status: 'Status', + for: 'For', + create: 'Create', + enabled: 'Enabled', + disabled: 'Disabled', + toggleSuccess: 'Schedule status updated successfully', + updateError: 'Failed to update schedule', + description: 'Description', + descriptionPlaceholder: 'Please enter a description', + executeTime: 'Execute time', + executeDate: 'Execute date', + daysOfWeek: 'Days of week', + daysOfWeekRequired: 'Please select at least one day of the week', + daysOfWeekPlaceholder: 'Please select days of the week', + updateSuccess: 'Schedule updated successfully', + createSuccess: 'Schedule created successfully', + deleteConfirm: 'Are you sure you want to delete this schedule?', + deleteSuccess: 'Schedule deleted successfully', + at: 'at', + executeDateRequired: 'Please select an execute date', + executeDatePlaceholder: 'Please select an execute date', + dayOfMonth: 'Day of month', + edit: 'Edit', + totalRuns: 'Total runs', + successfulRuns: 'Successful runs', + failedRuns: 'Failed runs', + currentStatus: 'Current status', + idle: 'Idle', + executionHistory: 'Execution history', + startTime: 'Start time', + endTime: 'End time', + duration: 'Duration', + errorMessage: 'Error message', + statistics: 'Statistics', + runInfo: 'Run info', + viewRuns: 'View runs', + lastSuccessfulRun: 'Last successful run', + runs: 'runs', + success: 'Success', + currentlyRunning: 'Currently running', + currentlyRunningDesc: 'The schedule is currently executing.', + running: 'Running', + failed: 'Failed', + }, + common: { + action: 'Action', + cancel: 'Cancel', + create: 'Create', + delete: 'Delete', + edit: 'Edit', + save: 'Save', + update: 'Update', + yes: 'Yes', + no: 'No', + confirm: 'Confirm', + of: 'of', + refresh: 'Refresh', + }, query: 'Query variables', queryTip: 'Select the variable you want to use', agent: 'Agent', diff --git a/web/src/pages/agent/index.tsx b/web/src/pages/agent/index.tsx index 4c49c6daa01..a3faeaec4dd 100644 --- a/web/src/pages/agent/index.tsx +++ b/web/src/pages/agent/index.tsx @@ -29,6 +29,7 @@ import { Logs, ScreenShare, Settings, + Timer, Upload, } from 'lucide-react'; import { ComponentPropsWithoutRef, useCallback } from 'react'; @@ -44,6 +45,7 @@ import { useSaveGraphBeforeOpeningDebugDrawer, useWatchAgentChange, } from './hooks/use-save-graph'; +import { ScheduleModal, useScheduleModal } from './schedule-modal'; import { SettingDialog } from './setting-dialog'; import { useAgentHistoryManager } from './use-agent-history-manager'; import { VersionDialog } from './version-dialog'; @@ -87,6 +89,11 @@ export default function Agent() { hideModal: hideVersionDialog, showModal: showVersionDialog, } = useSetModalState(); + const { + visible: scheduleVisible, + showModal: showScheduleModal, + hideModal: hideScheduleModal, + } = useScheduleModal(); const { visible: settingDialogVisible, @@ -155,6 +162,10 @@ export default function Agent() { {t('flow.export')} + + + {t('flow.schedule.title')} + @@ -196,6 +207,14 @@ export default function Agent() { )} + {scheduleVisible && ( + + )} {settingDialogVisible && ( )} diff --git a/web/src/pages/agent/schedule-modal/index.tsx b/web/src/pages/agent/schedule-modal/index.tsx new file mode 100644 index 00000000000..2dab3d929a4 --- /dev/null +++ b/web/src/pages/agent/schedule-modal/index.tsx @@ -0,0 +1,1270 @@ +import { Calendar } from '@/components/originui/calendar'; +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, + AlertDialogTrigger, +} from '@/components/ui/alert-dialog'; +import { Badge } from '@/components/ui/badge'; +import { Button } from '@/components/ui/button'; +import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; +import { + Dialog, + DialogContent, + DialogFooter, + DialogHeader, + DialogTitle, +} from '@/components/ui/dialog'; +import { + Form, + FormControl, + FormField, + FormItem, + FormLabel, + FormMessage, +} from '@/components/ui/form'; +import { Input } from '@/components/ui/input'; +import { Label } from '@/components/ui/label'; +import { + Popover, + PopoverContent, + PopoverTrigger, +} from '@/components/ui/popover'; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select'; +import { Separator } from '@/components/ui/separator'; +import { + Sheet, + SheetContent, + SheetHeader, + SheetTitle, +} from '@/components/ui/sheet'; +import { Switch } from '@/components/ui/switch'; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from '@/components/ui/table'; +import { Textarea } from '@/components/ui/textarea'; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from '@/components/ui/tooltip'; +import { useTranslate } from '@/hooks/common-hooks'; +import { + useCreateSchedule, + useDeleteSchedule, + useFetchFrequencyOptions, + useFetchScheduleHistory, + useFetchSchedules, + useFetchScheduleStats, + useToggleSchedule, + useUpdateSchedule, +} from '@/hooks/schedule-hooks'; +import { + ICreateScheduleRequest, + ISchedule, + IScheduleRun, + IScheduleStats, +} from '@/interfaces/database/schedule'; +import { cn } from '@/lib/utils'; +import { + CheckCircleOutlined, + ClockCircleOutlined, + CloseCircleOutlined, + DeleteOutlined, + EditOutlined, + HistoryOutlined, + ReloadOutlined, +} from '@ant-design/icons'; +import { zodResolver } from '@hookform/resolvers/zod'; +import dayjs from 'dayjs'; +import timezone from 'dayjs/plugin/timezone'; +import utc from 'dayjs/plugin/utc'; +import { CalendarIcon, Clock, Loader2 } from 'lucide-react'; +import React, { useCallback, useState } from 'react'; +import { useForm } from 'react-hook-form'; +import * as z from 'zod'; + +// Configure dayjs plugins +dayjs.extend(utc); +dayjs.extend(timezone); + +const scheduleFormSchema = z.object({ + name: z.string().min(1, 'Name is required'), + description: z.string().optional(), + frequency_type: z.string().min(1, 'Frequency type is required'), + execute_time: z.date().optional(), + execute_date: z.date().optional(), + days_of_week: z.array(z.number()).optional(), + day_of_month: z.number().optional(), +}); + +interface ScheduleFormModalProps { + visible: boolean; + onCancel: () => void; + onSave: () => void; + editingSchedule: ISchedule | null; + canvasId: string; + loading: boolean; +} + +function ScheduleFormModal({ + visible, + onCancel, + onSave, + editingSchedule, + canvasId, + loading, +}: ScheduleFormModalProps) { + const { t } = useTranslate('flow'); + const [timePickerOpen, setTimePickerOpen] = useState(false); + const [datePickerOpen, setDatePickerOpen] = useState(false); + + const { data: frequencyOptions, loading: loadingOptions } = + useFetchFrequencyOptions(); + const { createSchedule, loading: creating } = useCreateSchedule(); + const { updateSchedule, loading: updating } = useUpdateSchedule(); + + const form = useForm>({ + resolver: zodResolver(scheduleFormSchema), + defaultValues: { + name: '', + description: '', + frequency_type: 'once', + execute_time: dayjs().add(1, 'hour').toDate(), + execute_date: dayjs().add(1, 'hour').toDate(), + days_of_week: [], + day_of_month: undefined, + }, + }); + + const frequencyType = form.watch('frequency_type'); + + const getRequiredFields = useCallback(() => { + if (!frequencyOptions?.frequency_types || !frequencyType) return []; + + const option = frequencyOptions.frequency_types.find( + (type) => type.value === frequencyType, + ); + return option?.required_fields || []; + }, [frequencyOptions, frequencyType]); + + const handleSave = useCallback( + async (values: z.infer) => { + try { + + + + + + // Ensure frequency_type is always present - get from form if missing + const formFrequencyType = form.getValues('frequency_type'); + if (!values.frequency_type && !formFrequencyType) { + form.setError('frequency_type', { + message: 'Frequency type is required', + }); + return; + } + + // Use the value from form state if not in values + const finalFrequencyType = values.frequency_type || formFrequencyType; + + const payload: ICreateScheduleRequest = { + canvas_id: canvasId, + name: values.name.trim(), + description: values.description?.trim() || '', + frequency_type: finalFrequencyType, + }; + + // Get required fields for current frequency type + const currentRequiredFields = getRequiredFields(); + + // Handle time conversion + if ( + currentRequiredFields.includes('execute_time') && + values.execute_time + ) { + payload.execute_time = dayjs(values.execute_time).format('HH:mm:ss'); + } + + // Handle date conversion + if ( + currentRequiredFields.includes('execute_date') && + values.execute_date + ) { + payload.execute_date = dayjs(values.execute_date).toISOString(); + } + + // Handle days of week + if ( + currentRequiredFields.includes('days_of_week') && + values.days_of_week && + values.days_of_week.length > 0 + ) { + payload.days_of_week = values.days_of_week; + } + + // Handle day of month + if ( + currentRequiredFields.includes('day_of_month') && + values.day_of_month + ) { + payload.day_of_month = values.day_of_month; + } + + console.log('Final payload:', payload); + + if (editingSchedule) { + const updatePayload = { id: editingSchedule.id, ...payload }; + console.log('Update payload:', updatePayload); + await updateSchedule(updatePayload); + } else { + await createSchedule(payload); + } + + form.reset(); + onSave(); + } catch (error) { + console.error('Save failed:', error); + } + }, + [ + canvasId, + editingSchedule, + createSchedule, + updateSchedule, + onSave, + form, + getRequiredFields, + ], + ); + // Set form values when editing schedule changes + React.useEffect(() => { + console.log('=== FORM EFFECT START ==='); + console.log('visible:', visible); + console.log('editingSchedule:', editingSchedule); + console.log( + 'frequencyOptions loaded:', + !!frequencyOptions?.frequency_types, + ); + + if (visible && editingSchedule && frequencyOptions?.frequency_types) { + console.log('Setting form values for editing:', editingSchedule); + + const formData: any = { + name: editingSchedule.name || '', + description: editingSchedule.description || '', + frequency_type: editingSchedule.frequency_type || 'once', + days_of_week: editingSchedule.days_of_week || [], + day_of_month: editingSchedule.day_of_month || undefined, + }; + + // Handle time conversion + if (editingSchedule.execute_time) { + try { + const timeStr = editingSchedule.execute_time; + const timeParts = timeStr.split(':'); + const hours = parseInt(timeParts[0], 10); + const minutes = parseInt(timeParts[1], 10); + const seconds = parseInt(timeParts[2] || '0', 10); + + formData.execute_time = dayjs() + .hour(hours) + .minute(minutes) + .second(seconds) + .toDate(); + } catch (error) { + console.warn( + 'Failed to parse execute_time:', + editingSchedule.execute_time, + ); + formData.execute_time = dayjs().toDate(); + } + } + + // Handle date conversion + if (editingSchedule.execute_date) { + try { + formData.execute_date = dayjs(editingSchedule.execute_date).toDate(); + } catch (error) { + console.warn( + 'Failed to parse execute_date:', + editingSchedule.execute_date, + ); + formData.execute_date = dayjs().toDate(); + } + } + + console.log('Form data to set:', formData); + + // Reset form with proper values + form.reset(formData); + + // Trigger validation after form reset + setTimeout(() => { + console.log('Form values after reset:', form.getValues()); + console.log( + 'frequency_type after reset:', + form.getValues('frequency_type'), + ); + form.trigger(); + }, 200); + } else if (visible && !editingSchedule) { + // Set default values for new schedule + const defaultTime = dayjs().add(1, 'hour').toDate(); + const defaultData = { + name: '', + description: '', + frequency_type: 'once', + execute_time: defaultTime, + execute_date: defaultTime, + days_of_week: [], + day_of_month: undefined, + }; + + console.log('Setting default form data:', defaultData); + form.reset(defaultData); + setTimeout(() => { + form.trigger(); + }, 100); + } + }, [visible, editingSchedule, form, frequencyOptions]); + + const requiredFields = getRequiredFields(); + + return ( + !open && onCancel()}> + + + + {editingSchedule ? t('schedule.edit') : t('schedule.create')} + + + +
+ +
+ ( + + {t('schedule.name')} + + + + + + )} + /> + + { + console.log('Frequency field render - value:', field.value); + return ( + + {t('schedule.frequency')} + + + + ); + }} + /> +
+ + ( + + {t('schedule.description')} + +