Skip to content

Commit 80a4bc7

Browse files
author
Joe Stubbs
committed
health processes now check and clean up apim clients during every health check.
1 parent 9b344b1 commit 80a4bc7

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

actors/aga.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,13 @@ def delete(self, clientName):
178178
return {}
179179
except Exception as e:
180180
raise AgaveError('Error creating client: {}'.format(e))
181+
182+
def list(self):
183+
"""List all Agave OAuth2 clients."""
184+
auth = requests.auth.HTTPBasicAuth(self.parent.username, self.parent.password)
185+
try:
186+
rsp = requests.get(url='{}/clients/v2'.format(self.parent.api_server), auth=auth)
187+
rsp.raise_for_status()
188+
return rsp
189+
except Exception as e:
190+
raise AgaveError('Error listing clients: {}'.format(e))

actors/health.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
import shutil
1414
import time
1515

16-
from agaveflask.logs import get_log_file_strategy
16+
from agaveflask.auth import get_api_server
1717
import channelpy
1818

19+
from aga import Agave
20+
from auth import get_tenants, get_tenant_verify
1921
import codes
2022
from config import Config
2123
from docker_utils import rm_container, DockerError, container_running, run_container_with_docker
22-
from models import Actor, Worker
24+
from models import Actor, Worker, is_hashid
2325
from channels import ClientsChannel, CommandChannel, WorkerChannel
2426
from stores import actors_store, clients_store, executions_store, workers_store
2527
from worker import shutdown_worker
@@ -86,6 +88,72 @@ def clean_up_ipc_dirs():
8688
clean_up_socket_dirs()
8789
clean_up_fifo_dirs()
8890

91+
def delete_client(ag, client_name):
92+
"""Remove a client from the APIM."""
93+
try:
94+
ag.clients.delete(clientName=client_name)
95+
except Exception as e:
96+
m = 'Not able to delete client from APIM. Got an exception: {}'.format(e)
97+
logger.error(m)
98+
return None
99+
100+
def clean_up_apim_clients(tenant):
101+
"""Check the list of clients registered in APIM and remove any that are associated with retired workers."""
102+
username = os.environ.get('_abaco_{}_username'.format(tenant), '')
103+
password = os.environ.get('_abaco_{}_password'.format(tenant), '')
104+
if not username:
105+
msg = "Health process did not get a username for tenant {}; " \
106+
"returning from clean_up_apim_clients".format(tenant)
107+
if tenant in ['SD2E', 'TACC-PROD']:
108+
logger.error(msg)
109+
else:
110+
logger.info(msg)
111+
return None
112+
if not password:
113+
msg = "Health process did not get a password for tenant {}; " \
114+
"returning from clean_up_apim_clients".format(tenant)
115+
if tenant in ['SD2E', 'TACC-PROD']:
116+
logger.error(msg)
117+
else:
118+
logger.info(msg)
119+
return None
120+
api_server = get_api_server(tenant)
121+
verify = get_tenant_verify(tenant)
122+
ag = Agave(api_server=api_server,
123+
username=username,
124+
password=password,
125+
verify=verify)
126+
logger.debug("health process created an ag for tenant: {}".format(tenant))
127+
try:
128+
cs = ag.clients.list()
129+
clients = cs.json()['result']
130+
except Exception as e:
131+
msg = "Health process got an exception trying to retrieve clients; exception: {}".format(e)
132+
logger.error(msg)
133+
return None
134+
for client in clients:
135+
# check if the name of the client is an abaco hash (i.e., a worker id). if not, we ignore it from the beginning
136+
name = client.get('name')
137+
if not is_hashid(name):
138+
logger.debug("client {} is not an abaco hash id; skipping.".format(name))
139+
continue
140+
# we know this client came from a worker, so we need to check to see if the worker is still active;
141+
# first check if the worker even exists; if it does, the id will be the client name:
142+
worker = get_worker(name)
143+
if not worker:
144+
logger.info("no worker associated with id: {}; deleting client.".format(name))
145+
delete_client(ag, name)
146+
logger.info("client {} deleted by health process.".format(name))
147+
continue
148+
# if the worker exists, we should check the status:
149+
status = worker.get('status')
150+
if status == codes.ERROR:
151+
logger.info("worker {} was in ERROR status so deleting client; worker: {}.".format(name, worker))
152+
delete_client(ag, name)
153+
logger.info("client {} deleted by health process.".format(name))
154+
else:
155+
logger.debug("worker {} still active; not deleting client.".format(worker))
156+
89157
def clean_up_clients_store():
90158
logger.debug("top of clean_up_clients_store")
91159
secret = os.environ.get('_abaco_secret')
@@ -399,6 +467,10 @@ def main():
399467
for id in ids:
400468
# manage_workers(id)
401469
check_workers(id, ttl)
470+
tenants = get_tenants()
471+
for t in tenants:
472+
logger.debug("health process cleaning up apim_clients for tenant: {}".format(t))
473+
clean_up_apim_clients(t)
402474

403475
# TODO - turning off the check_workers_store for now. unclear that removing worker objects
404476
# check_workers_store(ttl)

0 commit comments

Comments
 (0)