|
| 1 | +import functools |
1 | 2 | import json |
2 | 3 | import logging |
3 | 4 | import os |
|
56 | 57 |
|
57 | 58 | log = logging.getLogger("cmg.tests.integration") |
58 | 59 |
|
| 60 | +cms_compose: DockerCompose = None |
| 61 | +cms_mlflow_compose: DockerCompose = None |
| 62 | + |
| 63 | + |
| 64 | +def dump_cms_logs_on_failure(func): |
| 65 | + @functools.wraps(func) |
| 66 | + def wrapper(*args, **kwargs): |
| 67 | + try: |
| 68 | + return func(*args, **kwargs) |
| 69 | + except Exception as e: |
| 70 | + logs = cms_compose.get_logs() |
| 71 | + log.error("CogStack Model Serve Logs:\nstdout: %s\nstderr: %s", logs[0], logs[1]) |
| 72 | + |
| 73 | + mlflow_logs = cms_mlflow_compose.get_logs() |
| 74 | + log.error("MLflow Logs:\nstdout: %s\nstderr: %s", mlflow_logs[0], mlflow_logs[1]) |
| 75 | + |
| 76 | + raise e |
| 77 | + |
| 78 | + return wrapper |
| 79 | + |
59 | 80 |
|
60 | 81 | def setup_testcontainers(request: pytest.FixtureRequest): |
61 | 82 | postgres = PostgresContainer(POSTGRES_IMAGE) |
@@ -211,29 +232,29 @@ def start_cogstack_model_serve(model_services: list[str]) -> list[DockerCompose] |
211 | 232 |
|
212 | 233 | log.debug(f"CogStack Model Serve environment file: {env_file_path}") |
213 | 234 |
|
214 | | - compose: DockerCompose = None |
215 | | - compose_mlflow: DockerCompose = None |
| 235 | + global cms_compose, cms_mlflow_compose |
216 | 236 |
|
217 | 237 | try: |
218 | | - compose = DockerCompose( |
| 238 | + cms_compose = DockerCompose( |
219 | 239 | context=COGSTACK_MODEL_SERVE_LOCAL_PATH, |
220 | 240 | compose_file_name=COGSTACK_MODEL_SERVE_COMPOSE, |
221 | 241 | env_file=".env", |
222 | 242 | services=model_services, |
223 | 243 | ) |
224 | | - compose.start() |
| 244 | + cms_compose.start() |
225 | 245 |
|
226 | | - compose_mlflow = DockerCompose( |
| 246 | + cms_mlflow_compose = DockerCompose( |
227 | 247 | context=COGSTACK_MODEL_SERVE_LOCAL_PATH, |
228 | 248 | compose_file_name=COGSTACK_MODEL_SERVE_COMPOSE_MLFLOW, |
229 | 249 | env_file=".env", |
230 | 250 | services=["mlflow-ui", "mlflow-db", "minio", "model-bucket-init"], |
231 | 251 | ) |
232 | | - compose_mlflow.start() |
233 | | - return [compose, compose_mlflow] |
| 252 | + cms_mlflow_compose.start() |
| 253 | + |
| 254 | + return [cms_compose, cms_mlflow_compose] |
234 | 255 | except subprocess.CalledProcessError as e: |
235 | 256 | log.info(e.stderr) |
236 | | - stop_cogstack_model_serve([env for env in (compose, compose_mlflow) if env]) |
| 257 | + stop_cogstack_model_serve([env for env in (cms_compose, cms_mlflow_compose) if env]) |
237 | 258 | raise |
238 | 259 |
|
239 | 260 |
|
@@ -281,11 +302,12 @@ def verify_task_submitted_successfully(task_uuid: str, tm: TaskManager): |
281 | 302 | assert tm.get_task(task_uuid), "Failed to submit task: not found in the database" |
282 | 303 |
|
283 | 304 |
|
| 305 | +@dump_cms_logs_on_failure |
284 | 306 | def wait_for_task_completion(task_uuid: str, tm: TaskManager, expected_status: Status) -> Task: |
285 | 307 | """Wait for a task to complete and verify its results.""" |
286 | 308 | while (task := tm.get_task(task_uuid)).status != expected_status: |
287 | 309 | if task.status in [Status.FAILED, Status.SUCCEEDED, Status.REQUEUED]: |
288 | | - pytest.fail(f"Task '{task_uuid}' completed with unexpected status '{task.status}'") |
| 310 | + raise ValueError(f"Task '{task_uuid}' completed with unexpected status '{task.status}'") |
289 | 311 |
|
290 | 312 | # Verify task results |
291 | 313 | if expected_status == Status.SUCCEEDED: |
|
0 commit comments