Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 87 additions & 12 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,41 @@ async def startup_event() -> None:
)
app.state.eval_storage = None

# Initialize and start schedulers
# Initialize schedulers (but don't start background jobs on Cloud Run)
# Background APScheduler jobs get killed by Cloud Run when idle
# Use Cloud Scheduler to call /scheduler/trigger-forecast instead
forecast_output_dir = Path(os.getenv("FORECAST_OUTPUT_DIR", "forecasts"))
app.state.forecast_scheduler = ForecastScheduler(
engine=app.state.engine,
storage=app.state.eval_storage,
output_dir=forecast_output_dir,
)
app.state.forecast_scheduler.start()
logger.info("Forecast scheduler started (runs hourly at :15)")

if app.state.eval_storage:
app.state.eval_scheduler = EvaluationScheduler(storage=app.state.eval_storage)
app.state.eval_scheduler.start()
logger.info("Evaluation scheduler started (runs daily at 00:30 UTC)")
# Only start background scheduler if explicitly enabled (for local dev)
enable_background_scheduler = (
os.getenv("ENABLE_BACKGROUND_SCHEDULER", "false").lower() == "true"
)
if enable_background_scheduler:
app.state.forecast_scheduler.start()
logger.info("Background forecast scheduler started (runs hourly at :15)")

if app.state.eval_storage:
app.state.eval_scheduler = EvaluationScheduler(
storage=app.state.eval_storage
)
app.state.eval_scheduler.start()
logger.info(
"Background evaluation scheduler started (runs daily at 00:30 UTC)"
)
else:
logger.warning("Evaluation scheduler disabled (BigQuery not available)")
logger.info("Background schedulers DISABLED (Cloud Scheduler mode)")
logger.info("Forecasts triggered via: POST /scheduler/trigger-forecast")
logger.info(
"Evaluations triggered via: POST /scheduler/trigger-evaluation (TODO)"
)

logger.info("=" * 80)
logger.info("GACA Early Warning System is online")
logger.info("Automated forecasting: ENABLED (hourly at :15)")
logger.info("Automated evaluation: ENABLED (daily at 00:30 UTC)")
logger.info("=" * 80)


Expand All @@ -139,11 +153,17 @@ async def shutdown_event() -> None:
logger = get_logger()
logger.info("Shutting down GACA Early Warning System...")

if hasattr(app.state, "forecast_scheduler"):
if (
hasattr(app.state, "forecast_scheduler")
and app.state.forecast_scheduler.scheduler.running
):
app.state.forecast_scheduler.stop()
logger.info("Forecast scheduler stopped")

if hasattr(app.state, "eval_scheduler"):
if (
hasattr(app.state, "eval_scheduler")
and app.state.eval_scheduler.scheduler.running
):
app.state.eval_scheduler.stop()
logger.info("Evaluation scheduler stopped")

Expand Down Expand Up @@ -444,6 +464,61 @@ async def get_static_evaluation() -> dict[str, Any]:
) from e


@app.post("/scheduler/trigger-forecast")
async def trigger_forecast() -> dict[str, Any]:
"""Manually trigger a forecast run via HTTP.

This endpoint is designed to be called by Cloud Scheduler to ensure
the forecast job completes within an HTTP request context, preventing
Cloud Run from killing the instance mid-execution.

Returns
-------
dict[str, Any]
Status of the forecast run including duration and records generated
"""
if not hasattr(app.state, "forecast_scheduler"):
raise HTTPException(
status_code=503, detail="Forecast scheduler not initialized"
)

logger = get_logger()

# Check if already running
if app.state.forecast_scheduler.is_running:
logger.warning("Forecast job already running, rejecting duplicate request")
return {
"status": "already_running",
"message": "A forecast job is already in progress",
}

try:
# Run the forecast job directly (not via scheduler)
await app.state.forecast_scheduler._run_forecast_job()

return {
"status": "success",
"message": "Forecast completed successfully",
"last_run_timestamp": (
app.state.forecast_scheduler.last_run_timestamp.isoformat()
if app.state.forecast_scheduler.last_run_timestamp
else None
),
"last_data_timestamp": (
app.state.forecast_scheduler.last_data_timestamp.isoformat()
if app.state.forecast_scheduler.last_data_timestamp
else None
),
}

except Exception as e:
logger.error(f"Triggered forecast failed: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Forecast execution failed: {str(e)}",
) from e


@app.get("/logs/forecast-runs")
async def get_forecast_logs(limit: int = 100) -> dict[str, Any]:
"""Get recent forecast run logs.
Expand Down