Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,24 @@

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.31.0] - 2026-01-14

### Added

- Modal simulation API support alongside GCP Workflows for economy calculations
- `SimulationAPIModal` class for HTTP-based job submission and polling
- Factory function to select between GCP and Modal backends via `USE_MODAL_SIMULATION_API` env var
- Status constants for both GCP (`ACTIVE`, `SUCCEEDED`, `FAILED`) and Modal (`running`, `complete`, `failed`)
- Unit tests for Modal client, factory, and status handling

### Changed

- `EconomyService` now handles both GCP and Modal execution status values
- Added `httpx` dependency for Modal HTTP client

## [3.30.4] - 2026-01-13 13:30:17

### Changed
Expand Down
10 changes: 10 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
- bump: minor
changes:
added:
- Modal simulation API support alongside GCP Workflows for economy calculations
- SimulationAPIModal class for HTTP-based job submission and polling
- Factory function to select between GCP and Modal backends via USE_MODAL_SIMULATION_API env var
- Status constants for both GCP and Modal execution states
- Unit tests for Modal client, factory, and status handling
changed:
- EconomyService now handles both GCP and Modal execution status values
28 changes: 28 additions & 0 deletions policyengine_api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,32 @@
],
}

# Simulation execution status constants
# GCP Workflow execution states (from google.cloud.workflows.executions_v1.Execution.State)
GCP_EXECUTION_STATUS_ACTIVE = "ACTIVE"
GCP_EXECUTION_STATUS_SUCCEEDED = "SUCCEEDED"
GCP_EXECUTION_STATUS_FAILED = "FAILED"
GCP_EXECUTION_STATUS_CANCELLED = "CANCELLED"

# Modal simulation API status values
MODAL_EXECUTION_STATUS_SUBMITTED = "submitted"
MODAL_EXECUTION_STATUS_RUNNING = "running"
MODAL_EXECUTION_STATUS_COMPLETE = "complete"
MODAL_EXECUTION_STATUS_FAILED = "failed"

# Status groupings for EconomyService._handle_execution_state()
EXECUTION_STATUSES_SUCCESS = (
GCP_EXECUTION_STATUS_SUCCEEDED,
MODAL_EXECUTION_STATUS_COMPLETE,
)
EXECUTION_STATUSES_FAILURE = (
GCP_EXECUTION_STATUS_FAILED,
MODAL_EXECUTION_STATUS_FAILED,
)
EXECUTION_STATUSES_PENDING = (
GCP_EXECUTION_STATUS_ACTIVE,
MODAL_EXECUTION_STATUS_SUBMITTED,
MODAL_EXECUTION_STATUS_RUNNING,
)

__version__ = VERSION
60 changes: 60 additions & 0 deletions policyengine_api/libs/simulation_api_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Factory for selecting the appropriate Simulation API implementation.

This module provides a factory function that returns either the GCP Workflows-based
SimulationAPI or the Modal-based SimulationAPIModal, depending on environment
configuration.

Environment Variables
---------------------
USE_MODAL_SIMULATION_API : str
Set to "true" to use the Modal simulation API. Defaults to "false" (GCP).
"""

import os
from typing import Union

from policyengine_api.gcp_logging import logger


def get_simulation_api() -> (
Union["SimulationAPI", "SimulationAPIModal"] # noqa: F821
):
"""
Get the appropriate simulation API client based on environment configuration.

Returns the Modal-based client if USE_MODAL_SIMULATION_API is set to "true",
otherwise returns the GCP Workflows-based client.

Returns
-------
SimulationAPI or SimulationAPIModal
The simulation API client instance.

Raises
------
ValueError
If GCP client is requested but GOOGLE_APPLICATION_CREDENTIALS is not set.
"""
use_modal = (
os.environ.get("USE_MODAL_SIMULATION_API", "false").lower() == "true"
)

if use_modal:
logger.log_struct(
{"message": "Using Modal simulation API"},
severity="INFO",
)
from policyengine_api.libs.simulation_api_modal import (
simulation_api_modal,
)

return simulation_api_modal
else:
logger.log_struct(
{"message": "Using GCP Workflows simulation API"},
severity="INFO",
)
from policyengine_api.libs.simulation_api import SimulationAPI

return SimulationAPI()
237 changes: 237 additions & 0 deletions policyengine_api/libs/simulation_api_modal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
"""
HTTP client for the Modal Simulation API.

This module provides a client for submitting simulation jobs to the
Modal-based simulation API and polling for results. It implements
the same interface as SimulationAPI (GCP) to allow for easy switching
between backends.
"""

import os
from dataclasses import dataclass
from typing import Optional

import httpx

from policyengine_api.gcp_logging import logger


@dataclass
class ModalSimulationExecution:
"""
Represents a Modal simulation job execution.

This class mirrors the interface of GCP's executions_v1.Execution
to allow the EconomyService to work with either backend.
"""

job_id: str
status: str
result: Optional[dict] = None
error: Optional[str] = None

@property
def name(self) -> str:
"""Alias for job_id to match GCP Execution interface."""
return self.job_id


class SimulationAPIModal:
"""
HTTP client for the Modal Simulation API.

This class provides methods for submitting simulation jobs and
polling for their status/results via HTTP endpoints, replacing
the GCP Workflows SDK calls used in SimulationAPI.
"""

def __init__(self):
self.base_url = os.environ.get(
"SIMULATION_API_URL",
"https://policyengine--policyengine-simulation-gateway-web-app.modal.run",
)
self.client = httpx.Client(timeout=30.0)

def run(self, payload: dict) -> ModalSimulationExecution:
"""
Submit a simulation job to the Modal API.

Parameters
----------
payload : dict
The simulation parameters (country, reform, baseline, etc.)
Expected to match SimulationOptions schema.

Returns
-------
ModalSimulationExecution
Execution object with job_id and initial status.

Raises
------
httpx.HTTPStatusError
If the API returns an error response.
"""
try:
# Map field names from SimulationOptions to Modal API format
# SimulationOptions uses 'model_version', Modal expects 'version'
modal_payload = dict(payload)
if "model_version" in modal_payload:
modal_payload["version"] = modal_payload.pop("model_version")
# Remove data_version as Modal doesn't use it
modal_payload.pop("data_version", None)

response = self.client.post(
f"{self.base_url}/simulate/economy/comparison",
json=modal_payload,
)
response.raise_for_status()
data = response.json()

logger.log_struct(
{
"message": "Modal simulation job submitted",
"job_id": data.get("job_id"),
"status": data.get("status"),
},
severity="INFO",
)

return ModalSimulationExecution(
job_id=data["job_id"],
status=data["status"],
)

except httpx.HTTPStatusError as e:
logger.log_struct(
{
"message": f"Modal API HTTP error: {e.response.status_code}",
"response_text": e.response.text[:500],
},
severity="ERROR",
)
raise

except httpx.RequestError as e:
logger.log_struct(
{
"message": f"Modal API request error: {str(e)}",
},
severity="ERROR",
)
raise

def get_execution_id(self, execution: ModalSimulationExecution) -> str:
"""
Get the job ID from an execution.

Parameters
----------
execution : ModalSimulationExecution
The execution object returned from run().

Returns
-------
str
The job ID.
"""
return execution.job_id

def get_execution_by_id(self, job_id: str) -> ModalSimulationExecution:
"""
Poll the Modal API for the current status of a job.

Parameters
----------
job_id : str
The job ID returned from run().

Returns
-------
ModalSimulationExecution
Execution object with current status and result if complete.
"""
try:
response = self.client.get(f"{self.base_url}/jobs/{job_id}")
# Note: Modal returns 202 for running, 200 for complete, 500 for failed
# We handle all cases by checking the status field in the response
data = response.json()

return ModalSimulationExecution(
job_id=job_id,
status=data["status"],
result=data.get("result"),
error=data.get("error"),
)

except httpx.HTTPStatusError as e:
logger.log_struct(
{
"message": f"Modal API HTTP error polling job {job_id}: {e.response.status_code}",
"response_text": e.response.text[:500],
},
severity="ERROR",
)
raise

except httpx.RequestError as e:
logger.log_struct(
{
"message": f"Modal API request error polling job {job_id}: {str(e)}",
},
severity="ERROR",
)
raise

def get_execution_status(self, execution: ModalSimulationExecution) -> str:
"""
Get the status string from an execution.

Parameters
----------
execution : ModalSimulationExecution
The execution object.

Returns
-------
str
The status string ("submitted", "running", "complete", "failed").
"""
return execution.status

def get_execution_result(
self, execution: ModalSimulationExecution
) -> Optional[dict]:
"""
Get the result from a completed execution.

Parameters
----------
execution : ModalSimulationExecution
The execution object.

Returns
-------
dict or None
The simulation result if complete, None otherwise.
"""
return execution.result

def health_check(self) -> bool:
"""
Check if the Modal API is healthy.

Returns
-------
bool
True if the API is healthy, False otherwise.
"""
try:
response = self.client.get(f"{self.base_url}/health")
return response.status_code == 200
except Exception:
return False


# Global instance for use throughout the application
simulation_api_modal = SimulationAPIModal()
Loading
Loading