diff --git a/examples/scheduler_example.py b/examples/scheduler_example.py new file mode 100644 index 0000000000..589a6c9311 --- /dev/null +++ b/examples/scheduler_example.py @@ -0,0 +1,106 @@ +""" +Flask Scheduler 使用示例 +""" +from datetime import timedelta, datetime +from flask import Flask, jsonify +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# 创建Flask应用 +app = Flask(__name__) + +# 配置调度器 +app.config.update({ + 'SCHEDULER_ENABLED': True, # 启用调度器 + 'SCHEDULER_AUTOSTART': True, # 自动启动调度器 + 'SCHEDULER_TICK_INTERVAL': 1.0, # 检查间隔(秒) + 'SCHEDULER_MAX_WORKERS': 4, # 最大工作线程数 + 'SCHEDULER_STORAGE_PATH': 'scheduler_data.json' # 存储路径 +}) + +# 初始化调度器 +from flask.scheduler import Scheduler +scheduler = Scheduler(app) + + +# 定义任务 +from flask.scheduler.decorators import interval_task, delay_task, cron_task + +@interval_task(interval=timedelta(seconds=10), description="每10秒执行一次的示例任务") +def my_interval_task(): + """每10秒执行一次的示例任务""" + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"[间隔任务] 执行时间: {current_time}") + return f"间隔任务执行成功: {current_time}" + + +@delay_task(delay=timedelta(seconds=5), description="延迟5秒后执行的示例任务") +def my_delay_task(): + """延迟5秒后执行的示例任务""" + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"[延迟任务] 执行时间: {current_time}") + return f"延迟任务执行成功: {current_time}" + + +@cron_task(cron_expression="*/2 * * * *", description="每2分钟执行一次的cron任务") +def my_cron_task(): + """每2分钟执行一次的cron任务""" + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"[Cron任务] 执行时间: {current_time}") + return f"Cron任务执行成功: {current_time}" + + +# 创建示例路由 + +@app.route('/') +def index(): + """首页""" + return jsonify({ + 'message': 'Flask Scheduler 示例应用', + 'scheduler_running': scheduler.is_running(), + 'endpoints': { + 'metrics': '/_internal/metrics', + 'tasks': '/_internal/tasks', + 'scheduler_status': '/_internal/scheduler/status', + 'health': '/_internal/health' + } + }) + + +@app.route('/run-task/', methods=['POST']) +def run_task(task_name): + """手动运行任务""" + success = scheduler.run_task(task_name) + if success: + return jsonify({'status': 'success', 'message': f'Task {task_name} started'}) + else: + return jsonify({'status': 'error', 'message': f'Failed to start task {task_name}'}), 400 + + +@app.route('/scheduler-info') +def scheduler_info(): + """获取调度器信息""" + tasks = scheduler.get_all_tasks() + metrics = scheduler.get_metrics() + + return jsonify({ + 'scheduler_running': scheduler.is_running(), + 'total_tasks': len(tasks), + 'tasks': [task.name for task in tasks], + 'metrics': metrics + }) + + +# 注册管理蓝图 +from flask.scheduler.blueprint import create_scheduler_blueprint +app.register_blueprint(create_scheduler_blueprint(scheduler, name='scheduler_admin')) + +if __name__ == '__main__': + # 导入示例任务(确保它们被注册) + from flask.scheduler import examples # 这会触发任务注册 + + logger.info("启动Flask应用和调度器...") + app.run(debug=True, port=5000) \ No newline at end of file diff --git a/scheduler_data.json b/scheduler_data.json new file mode 100644 index 0000000000..bbccb88ef0 --- /dev/null +++ b/scheduler_data.json @@ -0,0 +1,74 @@ +{ + "tasks": { + "my_interval_task": { + "name": "my_interval_task", + "task_type": "interval", + "description": "每10秒执行一次的示例任务", + "enabled": true, + "max_retries": 0, + "status": "running", + "next_run_at": "2025-11-29T15:41:23.195414", + "last_run_at": "2025-11-29T15:41:13.195410", + "current_run_id": "2b8af199-73c5-491a-a8b2-a945e35687d5", + "last_error": null, + "retry_count": 0, + "metrics": { + "total_runs": 38, + "successful_runs": 38, + "failed_runs": 0, + "last_run_at": "2025-11-29T15:41:13.201351", + "last_success_at": "2025-11-29T15:41:13.201355", + "last_failure_at": null, + "average_duration": 0.016269001867437103, + "last_error": null + } + }, + "my_delay_task": { + "name": "my_delay_task", + "task_type": "delay", + "description": "延迟5秒后执行的示例任务", + "enabled": false, + "max_retries": 0, + "status": "success", + "next_run_at": null, + "last_run_at": "2025-11-29T15:35:06.924327", + "current_run_id": null, + "last_error": null, + "retry_count": 0, + "metrics": { + "total_runs": 1, + "successful_runs": 1, + "failed_runs": 0, + "last_run_at": "2025-11-29T15:35:06.926838", + "last_success_at": "2025-11-29T15:35:06.926846", + "last_failure_at": null, + "average_duration": 0.001039, + "last_error": null + } + }, + "my_cron_task": { + "name": "my_cron_task", + "task_type": "cron", + "description": "每2分钟执行一次的cron任务", + "enabled": true, + "max_retries": 0, + "status": "success", + "next_run_at": "2025-11-29T15:42:00", + "last_run_at": "2025-11-29T15:40:59.069031", + "current_run_id": null, + "last_error": null, + "retry_count": 0, + "metrics": { + "total_runs": 176, + "successful_runs": 176, + "failed_runs": 0, + "last_run_at": "2025-11-29T15:40:59.070000", + "last_success_at": "2025-11-29T15:40:59.070004", + "last_failure_at": null, + "average_duration": 0.00045138152762598126, + "last_error": null + } + } + }, + "last_updated": "2025-11-29T15:41:13.204217" +} \ No newline at end of file diff --git a/src/flask/__init__.py b/src/flask/__init__.py index 30dce6fd7d..7a877f4339 100644 --- a/src/flask/__init__.py +++ b/src/flask/__init__.py @@ -37,3 +37,20 @@ from .templating import stream_template_string as stream_template_string from .wrappers import Request as Request from .wrappers import Response as Response + +# Flask Scheduler Extension +try: + from .scheduler import Scheduler as Scheduler + from .scheduler import Task as Task + from .scheduler import TaskStatus as TaskStatus + from .scheduler import TaskType as TaskType + from .scheduler import interval_task as interval_task + from .scheduler import delay_task as delay_task + from .scheduler import cron_task as cron_task + from .scheduler import TaskStorage as TaskStorage + from .scheduler import SchedulerError as SchedulerError + from .scheduler import TaskError as TaskError + from .scheduler import CronParseError as CronParseError +except ImportError: + # 如果scheduler模块不可用,静默处理 + pass diff --git a/src/flask/scheduler/__init__.py b/src/flask/scheduler/__init__.py new file mode 100644 index 0000000000..a8ea095a93 --- /dev/null +++ b/src/flask/scheduler/__init__.py @@ -0,0 +1,30 @@ +""" +Flask Scheduler Extension + +A comprehensive task scheduling extension for Flask applications with support for: +- Interval tasks +- Delayed tasks +- Cron tasks +- Task management and monitoring +- Metrics collection +""" + +from .scheduler import Scheduler +from .tasks import Task, TaskStatus, TaskType +from .storage import TaskStorage +from .decorators import interval_task, delay_task, cron_task +from .exceptions import SchedulerError, TaskError, CronParseError + +__all__ = [ + 'Scheduler', + 'Task', + 'TaskStatus', + 'TaskType', + 'TaskStorage', + 'interval_task', + 'delay_task', + 'cron_task', + 'SchedulerError', + 'TaskError', + 'CronParseError' +] \ No newline at end of file diff --git a/src/flask/scheduler/blueprint.py b/src/flask/scheduler/blueprint.py new file mode 100644 index 0000000000..7e052e864e --- /dev/null +++ b/src/flask/scheduler/blueprint.py @@ -0,0 +1,288 @@ +""" +Scheduler management blueprint +""" +from flask import Blueprint, jsonify, request, current_app +from datetime import datetime +from typing import Dict, Any + + +def create_scheduler_blueprint(scheduler, name='scheduler'): + """创建调度器管理蓝图""" + bp = Blueprint(name, __name__, url_prefix='/_internal') + + @bp.route('/metrics', methods=['GET']) + def get_metrics(): + """获取调度器指标""" + try: + metrics = scheduler.get_metrics() + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'data': metrics + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/tasks', methods=['GET']) + def list_tasks(): + """获取所有任务列表""" + try: + tasks = scheduler.get_all_tasks() + tasks_data = [] + + for task in tasks: + task_data = task.to_dict() + # 添加运行状态 + task_data['is_running'] = task.status.value == 'running' + task_data['scheduler_status'] = 'running' if scheduler.is_running() else 'stopped' + tasks_data.append(task_data) + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'data': { + 'tasks': tasks_data, + 'scheduler_running': scheduler.is_running(), + 'total_tasks': len(tasks_data) + } + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/tasks/', methods=['GET']) + def get_task(task_name: str): + """获取特定任务详情""" + try: + task = scheduler.get_task(task_name) + if not task: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': f'Task "{task_name}" not found' + }), 404 + + task_data = task.to_dict() + task_data['is_running'] = task.status.value == 'running' + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'data': task_data + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/tasks//run', methods=['POST']) + def run_task(task_name: str): + """手动运行任务""" + try: + success = scheduler.run_task(task_name) + + if success: + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'message': f'Task "{task_name}" started successfully' + }) + else: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': f'Failed to start task "{task_name}". Task may be disabled, already running, or not found.' + }), 400 + + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/tasks//enable', methods=['POST']) + def enable_task(task_name: str): + """启用任务""" + try: + task = scheduler.get_task(task_name) + if not task: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': f'Task "{task_name}" not found' + }), 404 + + task.enabled = True + scheduler.storage.add_task(task) + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'message': f'Task "{task_name}" enabled successfully' + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/tasks//disable', methods=['POST']) + def disable_task(task_name: str): + """禁用任务""" + try: + task = scheduler.get_task(task_name) + if not task: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': f'Task "{task_name}" not found' + }), 404 + + task.enabled = False + scheduler.storage.add_task(task) + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'message': f'Task "{task_name}" disabled successfully' + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/scheduler/start', methods=['POST']) + def start_scheduler(): + """启动调度器""" + try: + if scheduler.is_running(): + return jsonify({ + 'status': 'warning', + 'timestamp': datetime.now().isoformat(), + 'message': 'Scheduler is already running' + }) + + scheduler.start() + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'message': 'Scheduler started successfully' + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/scheduler/stop', methods=['POST']) + def stop_scheduler(): + """停止调度器""" + try: + if not scheduler.is_running(): + return jsonify({ + 'status': 'warning', + 'timestamp': datetime.now().isoformat(), + 'message': 'Scheduler is not running' + }) + + scheduler.stop() + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'message': 'Scheduler stopped successfully' + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/scheduler/status', methods=['GET']) + def get_scheduler_status(): + """获取调度器状态""" + try: + is_running = scheduler.is_running() + tasks = scheduler.get_all_tasks() + + running_tasks = sum(1 for task in tasks if task.status.value == 'running') + enabled_tasks = sum(1 for task in tasks if task.enabled) + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'data': { + 'scheduler_running': is_running, + 'total_tasks': len(tasks), + 'enabled_tasks': enabled_tasks, + 'disabled_tasks': len(tasks) - enabled_tasks, + 'running_tasks': running_tasks, + 'tick_interval': scheduler._tick_interval, + 'max_workers': scheduler._max_workers + } + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/reload', methods=['POST']) + def reload_scheduler(): + """重新加载调度器配置""" + try: + scheduler.reload() + + return jsonify({ + 'status': 'success', + 'timestamp': datetime.now().isoformat(), + 'message': 'Scheduler configuration reloaded successfully' + }) + except Exception as e: + return jsonify({ + 'status': 'error', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + @bp.route('/health', methods=['GET']) + def health_check(): + """健康检查""" + try: + is_running = scheduler.is_running() + tasks = scheduler.get_all_tasks() + + return jsonify({ + 'status': 'healthy' if is_running else 'degraded', + 'timestamp': datetime.now().isoformat(), + 'data': { + 'scheduler_running': is_running, + 'total_tasks': len(tasks), + 'enabled_tasks': sum(1 for task in tasks if task.enabled) + } + }) + except Exception as e: + return jsonify({ + 'status': 'unhealthy', + 'timestamp': datetime.now().isoformat(), + 'error': str(e) + }), 500 + + return bp \ No newline at end of file diff --git a/src/flask/scheduler/cron.py b/src/flask/scheduler/cron.py new file mode 100644 index 0000000000..b8a33ebee0 --- /dev/null +++ b/src/flask/scheduler/cron.py @@ -0,0 +1,160 @@ +""" +Cron expression parser for scheduling tasks +""" +import re +from datetime import datetime, timedelta +from typing import Optional, Set +from .exceptions import CronParseError + + +class CronParser: + """Parse and validate cron expressions""" + + # Cron字段: 分 时 日 月 周 + FIELD_NAMES = ['minute', 'hour', 'day', 'month', 'weekday'] + FIELD_RANGES = { + 'minute': (0, 59), + 'hour': (0, 23), + 'day': (1, 31), + 'month': (1, 12), + 'weekday': (0, 6) # 0=Monday, 6=Sunday + } + + MONTH_NAMES = { + 'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, + 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12 + } + + WEEKDAY_NAMES = { + 'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3, + 'fri': 4, 'sat': 5, 'sun': 6 + } + + def __init__(self, expression: str): + """Initialize cron parser with expression""" + self.expression = expression.strip() + self.fields = {} + self._parse() + + def _parse(self): + """Parse cron expression""" + parts = self.expression.split() + + if len(parts) != 5: + raise CronParseError(f"Invalid cron expression '{self.expression}': expected 5 fields, got {len(parts)}") + + for i, (field_name, part) in enumerate(zip(self.FIELD_NAMES, parts)): + self.fields[field_name] = self._parse_field(field_name, part) + + def _parse_field(self, field_name: str, field_value: str) -> Set[int]: + """Parse individual cron field""" + min_val, max_val = self.FIELD_RANGES[field_name] + result = set() + + # 处理逗号分隔的多个值 + for part in field_value.split(','): + result.update(self._parse_field_part(field_name, part.strip(), min_val, max_val)) + + return result + + def _parse_field_part(self, field_name: str, part: str, min_val: int, max_val: int) -> Set[int]: + """Parse field part (handles ranges, steps, wildcards)""" + # 处理通配符 + if part == '*': + return set(range(min_val, max_val + 1)) + + # 处理步长 (*/5, 1-10/2) + if '/' in part: + base, step = part.split('/', 1) + try: + step = int(step) + if step <= 0: + raise ValueError("Step must be positive") + except ValueError as e: + raise CronParseError(f"Invalid step value in '{part}': {e}") + + if base == '*': + values = set(range(min_val, max_val + 1)) + else: + values = self._parse_field_part(field_name, base, min_val, max_val) + + return {v for v in values if (v - min_val) % step == 0} + + # 处理范围 (1-5) + if '-' in part: + start, end = part.split('-', 1) + try: + start_val = self._parse_single_value(field_name, start, min_val, max_val) + end_val = self._parse_single_value(field_name, end, min_val, max_val) + if start_val > end_val: + raise ValueError("Range start must be <= end") + return set(range(start_val, end_val + 1)) + except ValueError as e: + raise CronParseError(f"Invalid range in '{part}': {e}") + + # 处理单个值 + try: + return {self._parse_single_value(field_name, part, min_val, max_val)} + except ValueError as e: + raise CronParseError(f"Invalid value in '{part}': {e}") + + def _parse_single_value(self, field_name: str, value: str, min_val: int, max_val: int) -> int: + """Parse single value (handles numeric and named values)""" + # 尝试解析数字 + try: + num_val = int(value) + if min_val <= num_val <= max_val: + return num_val + else: + raise ValueError(f"Value {num_val} out of range [{min_val}, {max_val}]") + except ValueError: + pass + + # 尝试解析名称 (月份或星期) + if field_name == 'month': + lower_value = value.lower() + if lower_value in self.MONTH_NAMES: + return self.MONTH_NAMES[lower_value] + elif field_name == 'weekday': + lower_value = value.lower() + if lower_value in self.WEEKDAY_NAMES: + return self.WEEKDAY_NAMES[lower_value] + + raise ValueError(f"Invalid value '{value}' for field {field_name}") + + def get_next_run_time(self, from_time: Optional[datetime] = None) -> datetime: + """Get next execution time after from_time""" + if from_time is None: + from_time = datetime.now() + + # 从下一秒开始检查 + next_time = from_time.replace(microsecond=0) + timedelta(seconds=1) + + # 限制最大搜索时间(避免无限循环) + max_iterations = 366 * 24 * 60 # 一年内的分钟数 + iterations = 0 + + while iterations < max_iterations: + if self._matches_time(next_time): + return next_time + + next_time += timedelta(minutes=1) + iterations += 1 + + raise CronParseError("Could not find next run time within reasonable timeframe") + + def _matches_time(self, dt: datetime) -> bool: + """Check if datetime matches cron expression""" + # 转换星期 (datetime weekday: 0=Monday, cron weekday: 0=Monday) + weekday = dt.weekday() + + return ( + dt.minute in self.fields['minute'] and + dt.hour in self.fields['hour'] and + dt.day in self.fields['day'] and + dt.month in self.fields['month'] and + weekday in self.fields['weekday'] + ) + + def __str__(self): + return self.expression \ No newline at end of file diff --git a/src/flask/scheduler/decorators.py b/src/flask/scheduler/decorators.py new file mode 100644 index 0000000000..b5d3e634a6 --- /dev/null +++ b/src/flask/scheduler/decorators.py @@ -0,0 +1,165 @@ +""" +Task decorators for easy task definition +""" +import functools +from datetime import timedelta +from typing import Callable, Optional +from .tasks import Task, TaskType + +# 延迟导入避免循环依赖 +_scheduler = None + +def get_scheduler(): + """获取调度器实例""" + global _scheduler + if _scheduler is None: + try: + from .scheduler import Scheduler + _scheduler = Scheduler.get_instance() + except ImportError: + pass + return _scheduler + + +def interval_task(interval: timedelta, name: Optional[str] = None, + description: Optional[str] = None, enabled: bool = True, + max_retries: int = 0, timeout: Optional[timedelta] = None): + """ + Decorator for interval-based tasks + + Args: + interval: Task execution interval + name: Task name (default: function name) + description: Task description + enabled: Whether task is enabled + max_retries: Maximum retry attempts + timeout: Task timeout + """ + def decorator(func: Callable) -> Callable: + task_name = name or func.__name__ + + # 创建任务定义 + task = Task( + name=task_name, + func=func, + task_type=TaskType.INTERVAL, + interval=interval, + description=description or func.__doc__, + enabled=enabled, + max_retries=max_retries, + timeout=timeout + ) + + # 注册到调度器 + scheduler = get_scheduler() + if scheduler: + scheduler.add_task(task) + + # 保持原始函数可用 + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + # 附加任务信息 + wrapper._scheduler_task = task + wrapper._scheduler_task_name = task_name + + return wrapper + + return decorator + + +def delay_task(delay: timedelta, name: Optional[str] = None, + description: Optional[str] = None, enabled: bool = True, + max_retries: int = 0, timeout: Optional[timedelta] = None): + """ + Decorator for delayed tasks + + Args: + delay: Delay before execution + name: Task name (default: function name) + description: Task description + enabled: Whether task is enabled + max_retries: Maximum retry attempts + timeout: Task timeout + """ + def decorator(func: Callable) -> Callable: + task_name = name or func.__name__ + + # 创建任务定义 + task = Task( + name=task_name, + func=func, + task_type=TaskType.DELAY, + delay=delay, + description=description or func.__doc__, + enabled=enabled, + max_retries=max_retries, + timeout=timeout + ) + + # 注册到调度器 + scheduler = get_scheduler() + if scheduler: + scheduler.add_task(task) + + # 保持原始函数可用 + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + # 附加任务信息 + wrapper._scheduler_task = task + wrapper._scheduler_task_name = task_name + + return wrapper + + return decorator + + +def cron_task(cron_expression: str, name: Optional[str] = None, + description: Optional[str] = None, enabled: bool = True, + max_retries: int = 0, timeout: Optional[timedelta] = None): + """ + Decorator for cron-based tasks + + Args: + cron_expression: Cron expression (5 fields: minute hour day month weekday) + name: Task name (default: function name) + description: Task description + enabled: Whether task is enabled + max_retries: Maximum retry attempts + timeout: Task timeout + """ + def decorator(func: Callable) -> Callable: + task_name = name or func.__name__ + + # 创建任务定义 + task = Task( + name=task_name, + func=func, + task_type=TaskType.CRON, + cron_expression=cron_expression, + description=description or func.__doc__, + enabled=enabled, + max_retries=max_retries, + timeout=timeout + ) + + # 注册到调度器 + scheduler = get_scheduler() + if scheduler: + scheduler.add_task(task) + + # 保持原始函数可用 + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + # 附加任务信息 + wrapper._scheduler_task = task + wrapper._scheduler_task_name = task_name + + return wrapper + + return decorator \ No newline at end of file diff --git a/src/flask/scheduler/examples.py b/src/flask/scheduler/examples.py new file mode 100644 index 0000000000..9c0021a97c --- /dev/null +++ b/src/flask/scheduler/examples.py @@ -0,0 +1,149 @@ +""" +示例任务 - 展示Flask Scheduler的功能 +""" +from datetime import datetime, timedelta +import logging +from flask import current_app +from .decorators import interval_task, delay_task, cron_task + + +logger = logging.getLogger(__name__) + + +@interval_task(interval=timedelta(seconds=10), description="每10秒执行一次的示例任务") +def example_interval_task(): + """每10秒执行一次的示例任务""" + try: + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"[间隔任务] 当前时间: {current_time}") + + # 模拟一些工作 + import time + time.sleep(0.5) + + logger.info(f"[间隔任务] 任务执行完成") + return f"间隔任务执行成功: {current_time}" + + except Exception as e: + logger.error(f"[间隔任务] 执行失败: {e}") + raise + + +@delay_task(delay=timedelta(seconds=5), description="延迟5秒后执行的示例任务") +def example_delay_task(): + """延迟5秒后执行的示例任务""" + try: + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"[延迟任务] 开始执行,当前时间: {current_time}") + + # 模拟一些初始化工作 + import time + time.sleep(1) + + logger.info(f"[延迟任务] 执行完成") + return f"延迟任务执行成功: {current_time}" + + except Exception as e: + logger.error(f"[延迟任务] 执行失败: {e}") + raise + + +@cron_task(cron_expression="*/2 * * * *", description="每2分钟执行一次的cron任务") +def example_cron_task(): + """每2分钟执行一次的cron任务""" + try: + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"[Cron任务] 执行时间: {current_time}") + + # 模拟定期维护工作 + import random + work_duration = random.uniform(0.5, 2.0) + import time + time.sleep(work_duration) + + logger.info(f"[Cron任务] 定期维护完成,耗时: {work_duration:.2f}秒") + return f"Cron任务执行成功: {current_time} (耗时: {work_duration:.2f}秒)" + + except Exception as e: + logger.error(f"[Cron任务] 执行失败: {e}") + raise + + +# 额外的示例任务 + +@interval_task(interval=timedelta(minutes=1), description="每分钟执行的健康检查任务") +def health_check_task(): + """每分钟执行的健康检查任务""" + try: + # 检查应用状态 + if current_app: + config_count = len(current_app.config) + logger.info(f"[健康检查] 应用配置项数量: {config_count}") + + # 检查内存使用情况(简化版) + import psutil + memory = psutil.virtual_memory() + memory_usage = memory.percent + + if memory_usage > 90: + logger.warning(f"[健康检查] 内存使用率过高: {memory_usage}%") + else: + logger.info(f"[健康检查] 内存使用率正常: {memory_usage}%") + + return f"健康检查通过,内存使用率: {memory_usage}%" + + except ImportError: + logger.info("[健康检查] psutil模块未安装,跳过内存检查") + return "健康检查通过(基础检查)" + except Exception as e: + logger.error(f"[健康检查] 执行失败: {e}") + raise + + +@cron_task(cron_expression="0 9 * * *", description="每天早上9点执行的数据清理任务") +def daily_cleanup_task(): + """每天早上9点执行的数据清理任务""" + try: + current_date = datetime.now().strftime("%Y-%m-%d") + logger.info(f"[数据清理] 开始执行每日清理任务: {current_date}") + + # 模拟数据清理工作 + import time + cleanup_duration = 3.0 # 模拟3秒的清理工作 + time.sleep(cleanup_duration) + + logger.info(f"[数据清理] 完成每日数据清理") + return f"每日数据清理完成: {current_date}" + + except Exception as e: + logger.error(f"[数据清理] 执行失败: {e}") + raise + + +@delay_task(delay=timedelta(seconds=30), description="30秒后执行的初始化任务") +def initialization_task(): + """30秒后执行的初始化任务""" + try: + logger.info("[初始化任务] 开始执行应用初始化") + + # 模拟初始化工作 + import time + time.sleep(2) + + # 检查调度器状态 + from .scheduler import Scheduler + scheduler = Scheduler.get_instance() + + if scheduler and scheduler.is_running(): + logger.info("[初始化任务] 调度器运行正常") + status = "调度器运行正常" + else: + logger.warning("[初始化任务] 调度器未运行") + status = "调度器未运行" + + logger.info(f"[初始化任务] 初始化完成: {status}") + return f"应用初始化完成: {status}" + + except Exception as e: + logger.error(f"[初始化任务] 执行失败: {e}") + raise \ No newline at end of file diff --git a/src/flask/scheduler/exceptions.py b/src/flask/scheduler/exceptions.py new file mode 100644 index 0000000000..ea27681ebe --- /dev/null +++ b/src/flask/scheduler/exceptions.py @@ -0,0 +1,23 @@ +""" +Scheduler exceptions +""" + + +class SchedulerError(Exception): + """Base exception for scheduler errors""" + pass + + +class TaskError(SchedulerError): + """Task-related errors""" + pass + + +class CronParseError(SchedulerError): + """Cron expression parsing errors""" + pass + + +class StorageError(SchedulerError): + """Task storage errors""" + pass \ No newline at end of file diff --git a/src/flask/scheduler/scheduler.py b/src/flask/scheduler/scheduler.py new file mode 100644 index 0000000000..721d4fce1c --- /dev/null +++ b/src/flask/scheduler/scheduler.py @@ -0,0 +1,367 @@ +""" +Flask Scheduler - Main scheduler implementation +""" +import logging +import threading +import time +import uuid +from datetime import datetime, timedelta +from typing import Optional, Dict, Any, Callable +from concurrent.futures import ThreadPoolExecutor, Future + +from flask import Flask, current_app +from .tasks import Task, TaskType, TaskStatus +from .storage import TaskStorage +from .cron import CronParser +from .exceptions import SchedulerError, TaskError +from .blueprint import create_scheduler_blueprint + + +logger = logging.getLogger(__name__) + + +class Scheduler: + """Flask任务调度器""" + + _instance = None + _instance_lock = threading.Lock() + + def __init__(self, app: Optional[Flask] = None, storage_path: Optional[str] = None): + """初始化调度器""" + self.app = app + self.storage = TaskStorage(storage_path) + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._executor: Optional[ThreadPoolExecutor] = None + self._running_tasks: Dict[str, Future] = {} + self._tick_interval = 1.0 # 默认1秒检查间隔 + self._max_workers = 4 + self._enabled = True + self._autostart = True + + if app is not None: + self.init_app(app) + + @classmethod + def get_instance(cls) -> Optional['Scheduler']: + """获取调度器实例""" + return cls._instance + + def init_app(self, app: Flask) -> None: + """初始化Flask应用""" + self.app = app + + # 配置项 + app.config.setdefault('SCHEDULER_ENABLED', True) + app.config.setdefault('SCHEDULER_AUTOSTART', True) + app.config.setdefault('SCHEDULER_TICK_INTERVAL', 1.0) + app.config.setdefault('SCHEDULER_MAX_WORKERS', 4) + app.config.setdefault('SCHEDULER_STORAGE_PATH', None) + + # 应用配置 + self._enabled = app.config['SCHEDULER_ENABLED'] + self._autostart = app.config['SCHEDULER_AUTOSTART'] + self._tick_interval = app.config['SCHEDULER_TICK_INTERVAL'] + self._max_workers = app.config['SCHEDULER_MAX_WORKERS'] + + # 重新初始化存储 + storage_path = app.config['SCHEDULER_STORAGE_PATH'] + if storage_path: + self.storage = TaskStorage(storage_path) + + # 注册蓝图 + blueprint = create_scheduler_blueprint(self) + app.register_blueprint(blueprint, url_prefix='/_internal') + + # 设置实例 + with self._instance_lock: + Scheduler._instance = self + + # 启动调度器 + if self._enabled and self._autostart: + self.start() + + logger.info(f"Flask Scheduler initialized (enabled={self._enabled}, autostart={self._autostart})") + + def start(self) -> None: + """启动调度器""" + if not self._enabled: + logger.warning("Scheduler is disabled, cannot start") + return + + if self.is_running(): + logger.warning("Scheduler is already running") + return + + self._stop_event.clear() + self._executor = ThreadPoolExecutor(max_workers=self._max_workers) + self._thread = threading.Thread(target=self._run_scheduler, daemon=True) + self._thread.start() + + logger.info("Flask Scheduler started") + + def stop(self) -> None: + """停止调度器""" + if not self.is_running(): + return + + logger.info("Stopping Flask Scheduler...") + self._stop_event.set() + + # 等待调度线程结束 + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + + # 关闭执行器 + if self._executor: + self._executor.shutdown(wait=True) + + # 取消所有运行中的任务 + for future in self._running_tasks.values(): + if not future.done(): + future.cancel() + self._running_tasks.clear() + + logger.info("Flask Scheduler stopped") + + def is_running(self) -> bool: + """检查调度器是否运行中""" + return (self._thread is not None and + self._thread.is_alive() and + not self._stop_event.is_set()) + + def add_task(self, task: Task) -> None: + """添加任务""" + self.storage.add_task(task) + logger.debug(f"Task '{task.name}' added to scheduler") + + def remove_task(self, name: str) -> bool: + """移除任务""" + # 取消运行中的任务 + if name in self._running_tasks: + future = self._running_tasks[name] + if not future.done(): + future.cancel() + del self._running_tasks[name] + + return self.storage.remove_task(name) + + def get_task(self, name: str) -> Optional[Task]: + """获取任务""" + return self.storage.get_task(name) + + def get_all_tasks(self) -> list: + """获取所有任务""" + return self.storage.get_all_tasks() + + def run_task(self, name: str) -> bool: + """手动运行任务""" + task = self.get_task(name) + if not task: + logger.error(f"Task '{name}' not found") + return False + + if not task.enabled: + logger.warning(f"Task '{name}' is disabled") + return False + + if task.status == TaskStatus.RUNNING: + logger.warning(f"Task '{name}' is already running") + return False + + # 提交任务到线程池 + self._submit_task(task, force_run=True) + return True + + def _run_scheduler(self) -> None: + """调度器主循环""" + logger.info("Scheduler thread started") + + while not self._stop_event.is_set(): + try: + self._check_and_run_tasks() + time.sleep(self._tick_interval) + except Exception as e: + logger.error(f"Error in scheduler loop: {e}") + time.sleep(self._tick_interval) + + logger.info("Scheduler thread stopped") + + def _check_and_run_tasks(self) -> None: + """检查并运行到期任务""" + now = datetime.now() + tasks = self.storage.get_all_tasks() + + for task in tasks: + if not task.enabled: + continue + + if task.status == TaskStatus.RUNNING: + continue + + if self._should_run_task(task, now): + self._submit_task(task) + + def _should_run_task(self, task: Task, now: datetime) -> bool: + """检查任务是否应该运行""" + # 延迟任务 + if task.task_type == TaskType.DELAY: + if task.next_run_at is None: + # 首次运行 + task.next_run_at = now + task.delay + return False + return now >= task.next_run_at + + # 间隔任务 + if task.task_type == TaskType.INTERVAL: + if task.next_run_at is None: + # 首次运行 + task.next_run_at = now + return True + return now >= task.next_run_at + + # Cron任务 + if task.task_type == TaskType.CRON: + try: + parser = CronParser(task.cron_expression) + if task.next_run_at is None: + # 首次运行 + task.next_run_at = parser.get_next_run_time(now) + return False + return now >= task.next_run_at + except Exception as e: + logger.error(f"Error parsing cron for task '{task.name}': {e}") + return False + + return False + + def _submit_task(self, task: Task, force_run: bool = False) -> None: + """提交任务到线程池""" + if not self._executor: + logger.error("Task executor not available") + return + + # 生成运行ID + run_id = str(uuid.uuid4()) + task.current_run_id = run_id + task.status = TaskStatus.RUNNING + task.last_run_at = datetime.now() + + # 更新下次运行时间 + if not force_run: + self._update_next_run_time(task) + + # 提交到线程池 + future = self._executor.submit(self._execute_task, task, run_id) + self._running_tasks[task.name] = future + + # 添加完成回调 + future.add_done_callback( + lambda f: self._task_completed(task.name, run_id, f) + ) + + logger.debug(f"Task '{task.name}' submitted (run_id: {run_id})") + + def _execute_task(self, task: Task, run_id: str) -> Any: + """执行任务""" + logger.info(f"Executing task '{task.name}' (run_id: {run_id})") + start_time = datetime.now() + + try: + # 执行任务函数 + if self.app: + with self.app.app_context(): + result = task.func() + else: + result = task.func() + + duration = (datetime.now() - start_time).total_seconds() + + # 更新指标 + self.storage.update_task_metrics( + task.name, duration, success=True + ) + + logger.info(f"Task '{task.name}' completed successfully in {duration:.2f}s") + return result + + except Exception as e: + duration = (datetime.now() - start_time).total_seconds() + error_msg = str(e) + + # 更新指标 + self.storage.update_task_metrics( + task.name, duration, success=False, error=error_msg + ) + + logger.error(f"Task '{task.name}' failed after {duration:.2f}s: {error_msg}") + raise TaskError(f"Task execution failed: {error_msg}") from e + + def _task_completed(self, task_name: str, run_id: str, future: Future) -> None: + """任务完成回调""" + try: + # 移除运行记录 + if task_name in self._running_tasks: + del self._running_tasks[task_name] + + # 获取任务 + task = self.get_task(task_name) + if not task: + return + + # 更新状态 + if future.exception(): + task.status = TaskStatus.FAILED + task.last_error = str(future.exception()) + logger.error(f"Task '{task_name}' failed: {future.exception()}") + else: + task.status = TaskStatus.SUCCESS + task.last_error = None + task.retry_count = 0 + + task.current_run_id = None + + except Exception as e: + logger.error(f"Error in task completion handler: {e}") + + def _update_next_run_time(self, task: Task) -> None: + """更新任务下次运行时间""" + now = datetime.now() + + if task.task_type == TaskType.INTERVAL: + task.next_run_at = now + task.interval + + elif task.task_type == TaskType.CRON: + try: + parser = CronParser(task.cron_expression) + task.next_run_at = parser.get_next_run_time(now) + except Exception as e: + logger.error(f"Error updating cron next run time for '{task.name}': {e}") + + elif task.task_type == TaskType.DELAY: + # 延迟任务只运行一次,禁用任务 + task.enabled = False + task.next_run_at = None + + def get_metrics(self) -> Dict[str, Any]: + """获取调度器指标""" + return self.storage.get_metrics_summary() + + def reload(self) -> None: + """重新加载调度器配置""" + logger.info("Reloading scheduler configuration...") + + # 这里可以实现配置重新加载逻辑 + # 目前只是重启调度器 + was_running = self.is_running() + + if was_running: + self.stop() + + # 重新初始化 + if self.app: + self.init_app(self.app) + elif was_running: + self.start() + + logger.info("Scheduler configuration reloaded") \ No newline at end of file diff --git a/src/flask/scheduler/storage.py b/src/flask/scheduler/storage.py new file mode 100644 index 0000000000..97480f53d4 --- /dev/null +++ b/src/flask/scheduler/storage.py @@ -0,0 +1,168 @@ +""" +Task storage for persisting task state and metrics +""" +import json +import threading +from datetime import datetime +from pathlib import Path +from typing import Dict, Any, Optional, List +from .tasks import Task, TaskStatus +from .exceptions import StorageError + + +class TaskStorage: + """Task state and metrics storage""" + + def __init__(self, storage_path: Optional[str] = None): + """Initialize storage""" + self.storage_path = Path(storage_path) if storage_path else None + self._lock = threading.RLock() + self._tasks: Dict[str, Task] = {} + self._metrics: Dict[str, Dict[str, Any]] = {} + + if self.storage_path: + self.storage_path.parent.mkdir(parents=True, exist_ok=True) + self._load_from_disk() + + def add_task(self, task: Task) -> None: + """Add or update task""" + with self._lock: + self._tasks[task.name] = task + if task.name not in self._metrics: + self._metrics[task.name] = {} + self._save_to_disk() + + def get_task(self, name: str) -> Optional[Task]: + """Get task by name""" + with self._lock: + return self._tasks.get(name) + + def get_all_tasks(self) -> List[Task]: + """Get all tasks""" + with self._lock: + return list(self._tasks.values()) + + def remove_task(self, name: str) -> bool: + """Remove task by name""" + with self._lock: + if name in self._tasks: + del self._tasks[name] + if name in self._metrics: + del self._metrics[name] + self._save_to_disk() + return True + return False + + def update_task_status(self, name: str, status: TaskStatus, + error: Optional[str] = None) -> None: + """Update task status""" + with self._lock: + task = self._tasks.get(name) + if task: + task.status = status + if error: + task.last_error = error + task.metrics.last_error = error + self._save_to_disk() + + def update_task_metrics(self, name: str, duration: float, + success: bool, error: Optional[str] = None) -> None: + """Update task execution metrics""" + with self._lock: + task = self._tasks.get(name) + if task: + metrics = task.metrics + metrics.total_runs += 1 + metrics.last_run_at = datetime.now() + + if success: + metrics.successful_runs += 1 + metrics.last_success_at = datetime.now() + else: + metrics.failed_runs += 1 + metrics.last_failure_at = datetime.now() + if error: + metrics.last_error = error + + # 更新平均执行时间 + if metrics.average_duration == 0: + metrics.average_duration = duration + else: + metrics.average_duration = (metrics.average_duration + duration) / 2 + + self._save_to_disk() + + def get_metrics_summary(self) -> Dict[str, Any]: + """Get metrics summary for all tasks""" + with self._lock: + total_tasks = len(self._tasks) + enabled_tasks = sum(1 for task in self._tasks.values() if task.enabled) + + total_runs = 0 + successful_runs = 0 + failed_runs = 0 + + for task in self._tasks.values(): + total_runs += task.metrics.total_runs + successful_runs += task.metrics.successful_runs + failed_runs += task.metrics.failed_runs + + return { + 'total_tasks': total_tasks, + 'enabled_tasks': enabled_tasks, + 'disabled_tasks': total_tasks - enabled_tasks, + 'total_executions': total_runs, + 'successful_executions': successful_runs, + 'failed_executions': failed_runs, + 'success_rate': (successful_runs / total_runs * 100) if total_runs > 0 else 0, + 'tasks': { + task.name: task.to_dict() for task in self._tasks.values() + } + } + + def _save_to_disk(self) -> None: + """Save state to disk""" + if not self.storage_path: + return + + try: + data = { + 'tasks': { + name: task.to_dict() for name, task in self._tasks.items() + }, + 'last_updated': datetime.now().isoformat() + } + + # 使用临时文件确保原子性写入 + temp_path = self.storage_path.with_suffix('.tmp') + with open(temp_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + # 原子性替换 + temp_path.replace(self.storage_path) + + except Exception as e: + raise StorageError(f"Failed to save task state: {e}") + + def _load_from_disk(self) -> None: + """Load state from disk""" + if not self.storage_path or not self.storage_path.exists(): + return + + try: + with open(self.storage_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + # 这里简化处理,实际应该重建Task对象 + # 由于Task包含函数引用,序列化会比较复杂 + # 这里只恢复基本状态信息 + + except Exception as e: + # 加载失败不影响正常运行 + print(f"Warning: Failed to load task state: {e}") + + def cleanup_old_metrics(self, days: int = 30) -> int: + """Clean up old metrics data""" + # 这里可以实现更复杂的清理逻辑 + # 目前简化处理 + return 0 \ No newline at end of file diff --git a/src/flask/scheduler/tasks.py b/src/flask/scheduler/tasks.py new file mode 100644 index 0000000000..d6491bd530 --- /dev/null +++ b/src/flask/scheduler/tasks.py @@ -0,0 +1,101 @@ +""" +Task models and enums +""" +import enum +from datetime import datetime, timedelta +from typing import Any, Dict, Optional, Callable +from dataclasses import dataclass, field + + +class TaskType(enum.Enum): + """Task execution types""" + INTERVAL = "interval" + DELAY = "delay" + CRON = "cron" + + +class TaskStatus(enum.Enum): + """Task execution status""" + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + CANCELLED = "cancelled" + + +@dataclass +class TaskMetrics: + """Task execution metrics""" + total_runs: int = 0 + successful_runs: int = 0 + failed_runs: int = 0 + last_run_at: Optional[datetime] = None + last_success_at: Optional[datetime] = None + last_failure_at: Optional[datetime] = None + average_duration: float = 0.0 + last_error: Optional[str] = None + + +@dataclass +class Task: + """Task definition and state""" + name: str + func: Callable + task_type: TaskType + + # Timing configuration + interval: Optional[timedelta] = None + delay: Optional[timedelta] = None + cron_expression: Optional[str] = None + + # Task metadata + description: Optional[str] = None + enabled: bool = True + max_retries: int = 0 + timeout: Optional[timedelta] = None + + # Runtime state + status: TaskStatus = TaskStatus.PENDING + metrics: TaskMetrics = field(default_factory=TaskMetrics) + next_run_at: Optional[datetime] = None + last_run_at: Optional[datetime] = None + current_run_id: Optional[str] = None + + # Error tracking + last_error: Optional[str] = None + retry_count: int = 0 + + def __post_init__(self): + """Post-initialization setup""" + if self.task_type == TaskType.INTERVAL and not self.interval: + raise ValueError("Interval tasks must specify interval") + if self.task_type == TaskType.DELAY and not self.delay: + raise ValueError("Delay tasks must specify delay") + if self.task_type == TaskType.CRON and not self.cron_expression: + raise ValueError("Cron tasks must specify cron_expression") + + def to_dict(self) -> Dict[str, Any]: + """Convert task to dictionary for serialization""" + return { + 'name': self.name, + 'task_type': self.task_type.value, + 'description': self.description, + 'enabled': self.enabled, + 'max_retries': self.max_retries, + 'status': self.status.value, + 'next_run_at': self.next_run_at.isoformat() if self.next_run_at else None, + 'last_run_at': self.last_run_at.isoformat() if self.last_run_at else None, + 'current_run_id': self.current_run_id, + 'last_error': self.last_error, + 'retry_count': self.retry_count, + 'metrics': { + 'total_runs': self.metrics.total_runs, + 'successful_runs': self.metrics.successful_runs, + 'failed_runs': self.metrics.failed_runs, + 'last_run_at': self.metrics.last_run_at.isoformat() if self.metrics.last_run_at else None, + 'last_success_at': self.metrics.last_success_at.isoformat() if self.metrics.last_success_at else None, + 'last_failure_at': self.metrics.last_failure_at.isoformat() if self.metrics.last_failure_at else None, + 'average_duration': self.metrics.average_duration, + 'last_error': self.metrics.last_error + } + } \ No newline at end of file