|
| 1 | +import json |
1 | 2 | import logging |
2 | | -from pdb import Pdb |
| 3 | +import os |
3 | 4 | import sys |
4 | 5 | import time |
| 6 | +from datetime import datetime |
5 | 7 | from pathlib import Path |
6 | | -from typing import List |
| 8 | +from pdb import Pdb |
7 | 9 | from pprint import pformat |
| 10 | +from typing import Dict, List |
8 | 11 |
|
9 | 12 | import docker |
10 | 13 | import yaml |
| 14 | +from tenacity import ( |
| 15 | + RetryError, |
| 16 | + Retrying, |
| 17 | + before_sleep_log, |
| 18 | + stop_after_attempt, |
| 19 | + wait_fixed, |
| 20 | +) |
11 | 21 |
|
12 | 22 | logger = logging.getLogger(__name__) |
13 | 23 |
|
14 | 24 | current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent |
15 | 25 |
|
16 | | -WAIT_TIME_SECS = 20 |
17 | | -RETRY_COUNT = 7 |
| 26 | +WAIT_BEFORE_RETRY = 10 |
| 27 | +MAX_RETRY_COUNT = 10 |
18 | 28 | MAX_WAIT_TIME = 240 |
19 | 29 |
|
20 | | -# https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/ |
21 | | -pre_states = ["NEW", "PENDING", "ASSIGNED", "PREPARING", "STARTING"] |
22 | | - |
23 | | -failed_states = [ |
24 | | - "COMPLETE", |
25 | | - "FAILED", |
26 | | - "SHUTDOWN", |
27 | | - "REJECTED", |
28 | | - "ORPHANED", |
29 | | - "REMOVE", |
30 | | - "CREATED", |
31 | | -] |
32 | | -# UTILS -------------------------------- |
| 30 | +# SEE https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/ |
33 | 31 |
|
| 32 | +PRE_STATES = [ |
| 33 | + "new", # The task was initialized. |
| 34 | + "pending", # Resources for the task were allocated. |
| 35 | + "assigned", # Docker assigned the task to nodes. |
| 36 | + "accepted", # The task was accepted by a worker node. If a worker node rejects the task, the state changes to REJECTED. |
| 37 | + "preparing", # Docker is preparing the task. |
| 38 | + "starting", # Docker is starting the task. |
| 39 | +] |
34 | 40 |
|
35 | | -def get_tasks_summary(tasks): |
36 | | - msg = "" |
37 | | - for t in tasks: |
38 | | - t["Status"].setdefault("Err", "") |
39 | | - msg += "- task ID:{ID}, STATE: {Status[State]}, ERROR: '{Status[Err]}' \n".format( |
40 | | - **t |
41 | | - ) |
42 | | - return msg |
43 | | - |
| 41 | +RUNNING_STATE = "running" # The task is executing. |
44 | 42 |
|
45 | | -def get_failed_tasks_logs(service, docker_client): |
46 | | - failed_logs = "" |
47 | | - for t in service.tasks(): |
48 | | - if t["Status"]["State"].upper() in failed_states: |
49 | | - cid = t["Status"]["ContainerStatus"]["ContainerID"] |
50 | | - failed_logs += "{2} {0} - {1} BEGIN {2}\n".format( |
51 | | - service.name, t["ID"], "=" * 10 |
52 | | - ) |
53 | | - if cid: |
54 | | - container = docker_client.containers.get(cid) |
55 | | - failed_logs += container.logs().decode("utf-8") |
56 | | - else: |
57 | | - failed_logs += " log unavailable. container does not exists\n" |
58 | | - failed_logs += "{2} {0} - {1} END {2}\n".format( |
59 | | - service.name, t["ID"], "=" * 10 |
60 | | - ) |
| 43 | +FAILED_STATES = [ |
| 44 | + "complete", # The task exited without an error code. |
| 45 | + "failed", # The task exited with an error code. |
| 46 | + "shutdown", # Docker requested the task to shut down. |
| 47 | + "rejected", # The worker node rejected the task. |
| 48 | + "orphaned", # The node was down for too long. |
| 49 | + "remove", # The task is not terminal but the associated service was removed or scaled down. |
| 50 | +] |
61 | 51 |
|
62 | | - return failed_logs |
63 | 52 |
|
| 53 | +def get_tasks_summary(service_tasks): |
| 54 | + msg = "" |
| 55 | + for task in service_tasks: |
| 56 | + status: Dict = task["Status"] |
| 57 | + msg += f"- task ID:{task['ID']}, CREATED: {task['CreatedAt']}, UPDATED: {task['UpdatedAt']}, DESIRED_STATE: {task['DesiredState']}, STATE: {status['State']}" |
| 58 | + error = status.get("Err") |
| 59 | + if error: |
| 60 | + msg += f", ERROR: {error}" |
| 61 | + msg += "\n" |
64 | 62 |
|
65 | | -# -------------------------------------------------------------------------------- |
| 63 | + return msg |
66 | 64 |
|
67 | 65 |
|
68 | 66 | def osparc_simcore_root_dir() -> Path: |
@@ -100,52 +98,74 @@ def ops_services() -> List[str]: |
100 | 98 | return [x for x in dc_specs["services"].keys()] |
101 | 99 |
|
102 | 100 |
|
103 | | -def wait_for_services() -> None: |
104 | | - # get all services |
105 | | - services = core_services() + ops_services() |
| 101 | +def to_datetime(datetime_str: str) -> datetime: |
| 102 | + # datetime_str is typically '2020-10-09T12:28:14.771034099Z' |
| 103 | + # - The T separates the date portion from the time-of-day portion |
| 104 | + # - The Z on the end means UTC, that is, an offset-from-UTC |
| 105 | + # The 099 before the Z is not clear, therefore we will truncate the last part |
| 106 | + N = len("2020-10-09T12:28:14.771034") |
| 107 | + if len(datetime_str) > N: |
| 108 | + datetime_str = datetime_str[:N] |
| 109 | + return datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S.%f") |
| 110 | + |
| 111 | + |
| 112 | +def by_service_creation(service): |
| 113 | + datetime_str = service.attrs["CreatedAt"] |
| 114 | + return to_datetime(datetime_str) |
| 115 | + |
| 116 | + |
| 117 | +def wait_for_services() -> int: |
| 118 | + expected_services = core_services() + ops_services() |
106 | 119 |
|
107 | 120 | client = docker.from_env() |
108 | | - running_services = [ |
109 | | - x for x in client.services.list() if x.name.split("_")[-1] in services |
110 | | - ] |
111 | | - |
112 | | - # check all services are in |
113 | | - assert len(running_services), "no services started!" |
114 | | - assert len(services) == len( |
115 | | - running_services |
116 | | - ), f"Some services are missing or unexpected:\nexpected: {len(services)} {services}\ngot: {len(running_services)} {[service.name for service in running_services]}" |
117 | | - # now check they are in running mode |
118 | | - for service in running_services: |
119 | | - task = None |
120 | | - for n in range(RETRY_COUNT): |
121 | | - # get last updated task |
122 | | - sorted_tasks = sorted(service.tasks(), key=lambda task: task["UpdatedAt"]) |
123 | | - task = sorted_tasks[-1] |
124 | | - |
125 | | - if task["Status"]["State"].upper() in pre_states: |
126 | | - print( |
127 | | - "Waiting [{}/{}] for {}...\n{}".format( |
128 | | - n, RETRY_COUNT, service.name, get_tasks_summary(service.tasks()) |
| 121 | + started_services = sorted( |
| 122 | + [ |
| 123 | + s |
| 124 | + for s in client.services.list() |
| 125 | + if s.name.split("_")[-1] in expected_services |
| 126 | + ], |
| 127 | + key=by_service_creation, |
| 128 | + ) |
| 129 | + |
| 130 | + assert len(started_services), "no services started!" |
| 131 | + assert len(expected_services) == len(started_services), ( |
| 132 | + f"Some services are missing or unexpected:\n" |
| 133 | + "expected: {len(expected_services)} {expected_services}\n" |
| 134 | + "got: {len(started_services)} {[s.name for s in started_services]}" |
| 135 | + ) |
| 136 | + |
| 137 | + for service in started_services: |
| 138 | + |
| 139 | + expected_replicas = service.attrs["Spec"]["Mode"]["Replicated"]["Replicas"] |
| 140 | + print(f"Service: {service.name} expects {expected_replicas} replicas", "-" * 10) |
| 141 | + |
| 142 | + try: |
| 143 | + for attempt in Retrying( |
| 144 | + stop=stop_after_attempt(MAX_RETRY_COUNT), |
| 145 | + wait=wait_fixed(WAIT_BEFORE_RETRY), |
| 146 | + ): |
| 147 | + with attempt: |
| 148 | + service_tasks: List[Dict] = service.tasks() # freeze |
| 149 | + print(get_tasks_summary(service_tasks)) |
| 150 | + |
| 151 | + # |
| 152 | + # NOTE: a service could set 'ready' as desired-state instead of 'running' if |
| 153 | + # it constantly breaks and the swarm desides to "stopy trying". |
| 154 | + # |
| 155 | + valid_replicas = sum( |
| 156 | + task["Status"]["State"] == RUNNING_STATE |
| 157 | + for task in service_tasks |
129 | 158 | ) |
130 | | - ) |
131 | | - time.sleep(WAIT_TIME_SECS) |
132 | | - elif task["Status"]["State"].upper() in failed_states: |
133 | | - print( |
134 | | - f"Waiting [{n}/{RETRY_COUNT}] Service {service.name} failed once...\n{get_tasks_summary(service.tasks())}" |
135 | | - ) |
136 | | - time.sleep(WAIT_TIME_SECS) |
137 | | - else: |
138 | | - break |
139 | | - assert task |
140 | | - assert ( |
141 | | - task["Status"]["State"].upper() == "RUNNING" |
142 | | - ), "Expected running, got \n{}\n{}".format( |
143 | | - pformat(task), get_tasks_summary(service.tasks()) |
144 | | - ) |
145 | | - # get_failed_tasks_logs(service, client)) |
| 159 | + assert valid_replicas == expected_replicas |
| 160 | + except RetryError: |
| 161 | + print( |
| 162 | + f"ERROR: Service {service.name} failed to start {expected_replicas} replica/s" |
| 163 | + ) |
| 164 | + print(json.dumps(service.attrs, indent=1)) |
| 165 | + return os.EX_SOFTWARE |
| 166 | + |
| 167 | + return os.EX_OK |
146 | 168 |
|
147 | 169 |
|
148 | 170 | if __name__ == "__main__": |
149 | | - # get retry parameters |
150 | | - # wait for the services |
151 | 171 | sys.exit(wait_for_services()) |
0 commit comments