|  | 
| 9 | 9 | import random | 
| 10 | 10 | from datetime import timedelta | 
| 11 | 11 | from typing import Final | 
|  | 12 | +from urllib.parse import urlencode | 
| 12 | 13 | from uuid import UUID | 
| 13 | 14 | 
 | 
| 14 | 15 | import jsf | 
| @@ -94,20 +95,40 @@ def map_function(self) -> None: | 
| 94 | 95 |         job_collection_uuid = response.json().get("uid") | 
| 95 | 96 | 
 | 
| 96 | 97 |         # wait for the job to complete | 
|  | 98 | +        query_params = dict( | 
|  | 99 | +            include_status=True, function_job_collection_id=job_collection_uuid | 
|  | 100 | +        ) | 
| 97 | 101 |         for attempt in Retrying( | 
| 98 | 102 |             stop=stop_after_delay(max_delay=max_poll_time), | 
| 99 | 103 |             wait=wait_exponential(multiplier=1, min=1, max=10), | 
| 100 | 104 |             reraise=True, | 
| 101 | 105 |             retry=retry_if_exception_type(ValueError), | 
| 102 | 106 |         ): | 
| 103 | 107 |             with attempt: | 
| 104 |  | -                job_status_response = self.authenticated_get( | 
| 105 |  | -                    f"/v0/function_job_collections/{job_collection_uuid}/status", | 
| 106 |  | -                    name="/v0/function_job_collections/[job_collection_uuid]/status", | 
| 107 |  | -                ) | 
| 108 |  | -                job_status_response.raise_for_status() | 
| 109 |  | -                all_job_statuses = job_status_response.json().get("status") | 
| 110 |  | -                assert isinstance(all_job_statuses, list) | 
|  | 108 | +                # list all jobs in the collection with status | 
|  | 109 | +                next_page_url = "/v0/function_jobs?" + urlencode(query_params) | 
|  | 110 | +                all_job_statuses = [] | 
|  | 111 | +                while next_page_url is not None: | 
|  | 112 | +                    response = self.authenticated_get( | 
|  | 113 | +                        next_page_url, | 
|  | 114 | +                        name="/v0/function_jobs", | 
|  | 115 | +                    ) | 
|  | 116 | +                    response.raise_for_status() | 
|  | 117 | +                    items = response.json().get("items", []) | 
|  | 118 | +                    statuses = [item.get("status", {}) for item in items] | 
|  | 119 | +                    all_job_statuses.extend( | 
|  | 120 | +                        [status.get("status", None) for status in statuses if status] | 
|  | 121 | +                    ) | 
|  | 122 | +                    assert not any( | 
|  | 123 | +                        status is None for status in all_job_statuses | 
|  | 124 | +                    ), f"Test misconfiguration: Function job collection ({job_collection_uuid=}) listed {statuses=} with missing status" | 
|  | 125 | +                    links = response.json().get("links", {}) | 
|  | 126 | +                    assert isinstance(links, dict) | 
|  | 127 | +                    next_page_url = links.get("next", None) | 
|  | 128 | +                assert ( | 
|  | 129 | +                    len(all_job_statuses) == n_jobs | 
|  | 130 | +                ), f"Expected {n_jobs} jobs, got {len(all_job_statuses)} for {job_collection_uuid=}" | 
|  | 131 | + | 
| 111 | 132 |                 if any(status != "SUCCESS" for status in all_job_statuses): | 
| 112 | 133 |                     raise ValueError( | 
| 113 | 134 |                         f"Function job ({job_collection_uuid=}) for function ({function_uuid=}) returned {all_job_statuses=}" | 
|  | 
0 commit comments