|
9 | 9 | import random |
10 | 10 | from datetime import timedelta |
11 | 11 | from typing import Final |
| 12 | +from urllib.parse import urlencode |
12 | 13 | from uuid import UUID |
13 | 14 |
|
14 | 15 | import jsf |
@@ -94,20 +95,40 @@ def map_function(self) -> None: |
94 | 95 | job_collection_uuid = response.json().get("uid") |
95 | 96 |
|
96 | 97 | # wait for the job to complete |
| 98 | + query_params = dict( |
| 99 | + include_status=True, function_job_collection_id=job_collection_uuid |
| 100 | + ) |
97 | 101 | for attempt in Retrying( |
98 | 102 | stop=stop_after_delay(max_delay=max_poll_time), |
99 | 103 | wait=wait_exponential(multiplier=1, min=1, max=10), |
100 | 104 | reraise=True, |
101 | 105 | retry=retry_if_exception_type(ValueError), |
102 | 106 | ): |
103 | 107 | with attempt: |
104 | | - job_status_response = self.authenticated_get( |
105 | | - f"/v0/function_job_collections/{job_collection_uuid}/status", |
106 | | - name="/v0/function_job_collections/[job_collection_uuid]/status", |
107 | | - ) |
108 | | - job_status_response.raise_for_status() |
109 | | - all_job_statuses = job_status_response.json().get("status") |
110 | | - assert isinstance(all_job_statuses, list) |
| 108 | + # list all jobs in the collection with status |
| 109 | + next_page_url = "/v0/function_jobs?" + urlencode(query_params) |
| 110 | + all_job_statuses = [] |
| 111 | + while next_page_url is not None: |
| 112 | + response = self.authenticated_get( |
| 113 | + next_page_url, |
| 114 | + name="/v0/function_jobs", |
| 115 | + ) |
| 116 | + response.raise_for_status() |
| 117 | + items = response.json().get("items", []) |
| 118 | + statuses = [item.get("status", {}) for item in items] |
| 119 | + all_job_statuses.extend( |
| 120 | + [status.get("status", None) for status in statuses if status] |
| 121 | + ) |
| 122 | + assert not any( |
| 123 | + status is None for status in all_job_statuses |
| 124 | + ), f"Test misconfiguration: Function job collection ({job_collection_uuid=}) listed {statuses=} with missing status" |
| 125 | + links = response.json().get("links", {}) |
| 126 | + assert isinstance(links, dict) |
| 127 | + next_page_url = links.get("next", None) |
| 128 | + assert ( |
| 129 | + len(all_job_statuses) == n_jobs |
| 130 | + ), f"Expected {n_jobs} jobs, got {len(all_job_statuses)} for {job_collection_uuid=}" |
| 131 | + |
111 | 132 | if any(status != "SUCCESS" for status in all_job_statuses): |
112 | 133 | raise ValueError( |
113 | 134 | f"Function job ({job_collection_uuid=}) for function ({function_uuid=}) returned {all_job_statuses=}" |
|
0 commit comments