diff --git a/backend/app/main.py b/backend/app/main.py index d2cab38..ad14434 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -109,27 +109,41 @@ async def startup_event() -> None: ) app.state.eval_storage = None - # Initialize and start schedulers + # Initialize schedulers (but don't start background jobs on Cloud Run) + # Background APScheduler jobs get killed by Cloud Run when idle + # Use Cloud Scheduler to call /scheduler/trigger-forecast instead forecast_output_dir = Path(os.getenv("FORECAST_OUTPUT_DIR", "forecasts")) app.state.forecast_scheduler = ForecastScheduler( engine=app.state.engine, storage=app.state.eval_storage, output_dir=forecast_output_dir, ) - app.state.forecast_scheduler.start() - logger.info("Forecast scheduler started (runs hourly at :15)") - if app.state.eval_storage: - app.state.eval_scheduler = EvaluationScheduler(storage=app.state.eval_storage) - app.state.eval_scheduler.start() - logger.info("Evaluation scheduler started (runs daily at 00:30 UTC)") + # Only start background scheduler if explicitly enabled (for local dev) + enable_background_scheduler = ( + os.getenv("ENABLE_BACKGROUND_SCHEDULER", "false").lower() == "true" + ) + if enable_background_scheduler: + app.state.forecast_scheduler.start() + logger.info("Background forecast scheduler started (runs hourly at :15)") + + if app.state.eval_storage: + app.state.eval_scheduler = EvaluationScheduler( + storage=app.state.eval_storage + ) + app.state.eval_scheduler.start() + logger.info( + "Background evaluation scheduler started (runs daily at 00:30 UTC)" + ) else: - logger.warning("Evaluation scheduler disabled (BigQuery not available)") + logger.info("Background schedulers DISABLED (Cloud Scheduler mode)") + logger.info("Forecasts triggered via: POST /scheduler/trigger-forecast") + logger.info( + "Evaluations triggered via: POST /scheduler/trigger-evaluation (TODO)" + ) logger.info("=" * 80) logger.info("GACA Early Warning System is online") - logger.info("Automated forecasting: ENABLED (hourly at :15)") - logger.info("Automated evaluation: ENABLED (daily at 00:30 UTC)") logger.info("=" * 80) @@ -139,11 +153,17 @@ async def shutdown_event() -> None: logger = get_logger() logger.info("Shutting down GACA Early Warning System...") - if hasattr(app.state, "forecast_scheduler"): + if ( + hasattr(app.state, "forecast_scheduler") + and app.state.forecast_scheduler.scheduler.running + ): app.state.forecast_scheduler.stop() logger.info("Forecast scheduler stopped") - if hasattr(app.state, "eval_scheduler"): + if ( + hasattr(app.state, "eval_scheduler") + and app.state.eval_scheduler.scheduler.running + ): app.state.eval_scheduler.stop() logger.info("Evaluation scheduler stopped") @@ -444,6 +464,61 @@ async def get_static_evaluation() -> dict[str, Any]: ) from e +@app.post("/scheduler/trigger-forecast") +async def trigger_forecast() -> dict[str, Any]: + """Manually trigger a forecast run via HTTP. + + This endpoint is designed to be called by Cloud Scheduler to ensure + the forecast job completes within an HTTP request context, preventing + Cloud Run from killing the instance mid-execution. + + Returns + ------- + dict[str, Any] + Status of the forecast run including duration and records generated + """ + if not hasattr(app.state, "forecast_scheduler"): + raise HTTPException( + status_code=503, detail="Forecast scheduler not initialized" + ) + + logger = get_logger() + + # Check if already running + if app.state.forecast_scheduler.is_running: + logger.warning("Forecast job already running, rejecting duplicate request") + return { + "status": "already_running", + "message": "A forecast job is already in progress", + } + + try: + # Run the forecast job directly (not via scheduler) + await app.state.forecast_scheduler._run_forecast_job() + + return { + "status": "success", + "message": "Forecast completed successfully", + "last_run_timestamp": ( + app.state.forecast_scheduler.last_run_timestamp.isoformat() + if app.state.forecast_scheduler.last_run_timestamp + else None + ), + "last_data_timestamp": ( + app.state.forecast_scheduler.last_data_timestamp.isoformat() + if app.state.forecast_scheduler.last_data_timestamp + else None + ), + } + + except Exception as e: + logger.error(f"Triggered forecast failed: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Forecast execution failed: {str(e)}", + ) from e + + @app.get("/logs/forecast-runs") async def get_forecast_logs(limit: int = 100) -> dict[str, Any]: """Get recent forecast run logs.