Skip to content

Commit 1c13b2c

Browse files
committed
Merge branch 'dev'
2 parents 7165a83 + 691d8ef commit 1c13b2c

File tree

17 files changed

+492
-40
lines changed

17 files changed

+492
-40
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"python.pythonPath": "/usr/bin/python3"
3+
}

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,22 @@
22
All notable changes to this project will be documented in this file.
33

44

5+
## 1.7.0 - 2020-11-10
6+
### Added
7+
- Added support for a cron scheduling feature for actors. The cron schedule feature allows users to
8+
instruct Abaco to automatically execute actors based on a schedule provided by the user. More
9+
information available from the docs (https://tacc-cloud.readthedocs.io/projects/abaco/en/latest/technical/messages.html#cron-schedule).
10+
- Added support for configuring Abaco with a DockerHub credential to be used when pulling images
11+
from DockerHub. In particular, Abaco can be configured with the credentials of a licensed account
12+
with increased pull quota to avoid "toomanyrequests: You have reached your pull rate limit" errors from the Docker daemon.
13+
14+
### Changed
15+
- No change.
16+
17+
### Removed
18+
- No change.
19+
20+
521
## 1.6.0 - 2020-04-30
622
### Added
723
- Added the `GET /actors/search/{search_type}?{search_terms}` endpoint for mongo database full-text search and matching

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ deploy:
4545
build-core:
4646
@docker build -t abaco/core:$$TAG ./
4747

48+
# Builds prometheus locally
49+
build-prom:
50+
@docker build -t abaco/prom:$$TAG prometheus/.
51+
4852

4953
# Builds nginx
5054
build-nginx:

abaco.conf

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ show_traceback: false
109109
# Here we set the to 12 hours.
110110
log_ex: 43200
111111

112+
# Max amount of time that any log can be set.
113+
log_ex_limit: 86400
114+
115+
# Amount of time, in seconds, to which sd2e's logs will be set.
116+
sd2e_log_ex: 10800
117+
118+
# Max amount of time, in seconds, to which sd2e's logs can be set.
119+
sd2e_log_ex_limit: 21600
120+
121+
# Max amount of time, in seconds, to which this DEV's logs can be set.
122+
DEV_log_ex_limit: 30000
123+
124+
# Amount of time, in seconds, to which this DEV's logs will be set.
125+
DEV_log_ex: 15000
126+
112127
# Max length (in bytes) to store an actor execution's log. If a log exceeds this length, the log will be truncated.
113128
# Note: max_log_length must not exceed the maximum document length for the log store.
114129
# here we default it to 1 MB

abaco.log

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

actors/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212

1313
def read_config():
14-
parser = ConfigParser()
14+
parser = ConfigParser(interpolation=None)
1515
places = ['/service.conf',
1616
'/etc/service.conf']
1717
place = places[0]

actors/controllers.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from flask_restful import Resource, Api, inputs
1313
from werkzeug.exceptions import BadRequest
1414
from agaveflask.utils import RequestParser, ok
15+
from parse import parse
1516

1617
from auth import check_permissions, get_tas_data, tenant_can_use_tas, get_uid_gid_homedir, get_token_default
1718
from channels import ActorMsgChannel, CommandChannel, ExecutionResultsChannel, WorkerChannel
@@ -65,6 +66,68 @@ def get(self, search_type):
6566
return ok(result=result, msg="Search completed successfully.")
6667

6768

69+
class CronResource(Resource):
70+
def get(self):
71+
logger.debug("HERE I AM IN GET /cron")
72+
actor_ids = [actor['db_id'] for actor in actors_store.items()]
73+
logger.debug(f"actor ids are {actor_ids}")
74+
# Loop through all actor ids to check for cron schedules
75+
for actor_id in actor_ids:
76+
# Create actor based on the actor_id
77+
actor = actors_store[actor_id]
78+
logger.debug(f"cron_on equals {actor.get('cron_on')} for actor {actor_id}")
79+
try:
80+
# Check if next execution == UTC current time
81+
if self.cron_execution_datetime(actor):
82+
# Check if cron switch is on
83+
if actor.get('cron_on'):
84+
d = {}
85+
logger.debug("the current time is the same as the next cron scheduled, adding execution")
86+
# Execute actor
87+
before_exc_time = timeit.default_timer()
88+
exc = Execution.add_execution(actor_id, {'cpu': 0,
89+
'io': 0,
90+
'runtime': 0,
91+
'status': codes.SUBMITTED,
92+
'executor': 'cron'})
93+
logger.debug("execution has been added, now making message")
94+
# Create & add message to the queue
95+
d['Time_msg_queued'] = before_exc_time
96+
d['_abaco_execution_id'] = exc
97+
d['_abaco_Content_Type'] = 'str'
98+
ch = ActorMsgChannel(actor_id=actor_id)
99+
ch.put_msg(message="This is your cron execution", d=d)
100+
ch.close()
101+
logger.debug("Message added to actor inbox. id: {}.".format(actor_id))
102+
# Update the actor's next execution
103+
actors_store[actor_id, 'cron_next_ex'] = Actor.set_next_ex(actor, actor_id)
104+
else:
105+
logger.debug("Actor's cron is not activated, but next execution will be incremented")
106+
actors_store[actor_id, 'cron_next_ex'] = Actor.set_next_ex(actor, actor_id)
107+
else:
108+
logger.debug("now is not the time")
109+
except:
110+
logger.debug("Actor has no cron setup")
111+
112+
def cron_execution_datetime(self, actor):
113+
logger.debug("inside cron_execution_datetime method")
114+
now = get_current_utc_time()
115+
now = datetime.datetime(now.year, now.month, now.day, now.hour)
116+
logger.debug(f"the current utc time is {now}")
117+
# Get cron execution datetime
118+
cron = actor['cron_next_ex']
119+
logger.debug(f"cron_next_ex is {cron}")
120+
# Parse the next execution into a list of the form: [year,month,day,hour]
121+
cron_datetime = parse("{}-{}-{} {}", cron)
122+
logger.debug(f"cron datetime is {cron_datetime}")
123+
# Create a datetime out of cron_datetime
124+
cron_execution = datetime.datetime(int(cron_datetime[0]), int(cron_datetime[1]), int(cron_datetime[2]), int(cron_datetime[3]))
125+
logger.debug(f"cron execution is {cron_execution}")
126+
# Return true/false comparing now with the next cron execution
127+
logger.debug(f"does cron == now? {cron_execution == now}")
128+
return cron_execution == now
129+
130+
68131
class MetricsResource(Resource):
69132
def get(self):
70133
enable_autoscaling = Config.get('workers', 'autoscaling')
@@ -769,6 +832,36 @@ def post(self):
769832
else:
770833
token = get_token_default()
771834
args['token'] = token
835+
# adding check for 'log_ex'
836+
if 'logEx' in args and args.get('logEx') is not None:
837+
log_ex = int(args.get('logEx'))
838+
args['log_ex'] = log_ex
839+
# cron attribute
840+
cron = None
841+
if Config.get('web', 'case') == 'camel':
842+
logger.debug("Case is camel")
843+
if 'cronSchedule' in args and args.get('cronSchedule') is not None:
844+
cron = args.get('cronSchedule')
845+
else:
846+
if 'cron_schedule' in args and args.get('cron_schedule') is not None:
847+
logger.debug("Case is snake")
848+
cron = args.get('cron_schedule')
849+
if cron is not None:
850+
logger.debug("Cron has been posted")
851+
# set_cron checks for the 'now' alias
852+
# It also checks that the cron schedule is greater than or equal to the current UTC time
853+
r = Actor.set_cron(cron)
854+
logger.debug(f"r is {r}")
855+
if r.fixed[2] in ['hours', 'hour', 'days', 'day', 'weeks', 'week', 'months', 'month']:
856+
args['cron_schedule'] = cron
857+
logger.debug(f"setting cron_next_ex to {r.fixed[0]}")
858+
args['cron_next_ex'] = r.fixed[0]
859+
args['cron_on'] = True
860+
else:
861+
raise BadRequest(f'{r.fixed[2]} is an invalid unit of time')
862+
args['cron_on'] = False
863+
else:
864+
logger.debug("Cron schedule was not sent in")
772865
if Config.get('web', 'case') == 'camel':
773866
max_workers = args.get('maxWorkers')
774867
args['max_workers'] = max_workers
@@ -891,6 +984,36 @@ def put(self, actor_id):
891984
args = self.validate_put(actor)
892985
logger.debug("PUT args validated successfully.")
893986
args['tenant'] = g.tenant
987+
cron = None
988+
if 'logEx' in args and args.get('logEx') is not None:
989+
log_ex = int(args.get('logEx'))
990+
logger.debug(f"log_ex in args; using: {log_ex}")
991+
args['log_ex'] = log_ex
992+
# Check for both camel and snake case
993+
if Config.get('web', 'case') == 'camel':
994+
if 'cronSchedule' in args and args.get('cronSchedule') is not None:
995+
cron = args.get('cronSchedule')
996+
if 'cronOn' in args and args.get('cronOn') is not None:
997+
actor['cron_on'] = args.get('cronOn')
998+
else:
999+
if 'cron_schedule' in args and args.get('cron_schedule') is not None:
1000+
cron = args.get('cron_schedule')
1001+
if 'cron_on' in args and args.get('cron_on') is not None:
1002+
actor['cron_on'] = args.get('cron_on')
1003+
if cron is not None:
1004+
# set_cron checks for the 'now' alias
1005+
# It also checks that the cron schedule is greater than or equal to the current UTC time
1006+
# Check for proper unit of time
1007+
r = Actor.set_cron(cron)
1008+
if r.fixed[2] in ['hours', 'hour', 'days', 'day', 'weeks', 'week', 'months', 'month']:
1009+
args['cron_schedule'] = cron
1010+
logger.debug(f"setting cron_next_ex to {r.fixed[0]}")
1011+
args['cron_next_ex'] = r.fixed[0]
1012+
else:
1013+
raise BadRequest(f'{r.fixed[2]} is an invalid unit of time')
1014+
args['cron_on'] = False
1015+
else:
1016+
logger.debug("No cron schedule has been sent")
8941017
if args['queue']:
8951018
queues_list = Config.get('spawner', 'host_queues').replace(' ', '')
8961019
valid_queues = queues_list.split(',')
@@ -970,6 +1093,7 @@ def validate_put(self, actor):
9701093
actor.pop('max_workers')
9711094
actor.pop('mem_limit')
9721095
actor.pop('max_cpus')
1096+
actor.pop('log_ex')
9731097

9741098
# this update overrides all required and optional attributes
9751099
try:

actors/docker_utils.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
import timeit
77
import datetime
8+
import random
89

910
import docker
1011
from requests.packages.urllib3.exceptions import ReadTimeoutError
@@ -17,7 +18,7 @@
1718
from config import Config
1819
from codes import BUSY, READY, RUNNING
1920
import globals
20-
from models import Execution, get_current_utc_time, display_time
21+
from models import Actor, Execution, get_current_utc_time, display_time
2122
from stores import workers_store
2223

2324

@@ -52,13 +53,66 @@ class DockerStartContainerError(DockerError):
5253
class DockerStopContainerError(DockerError):
5354
pass
5455

56+
def get_docker_credentials():
57+
"""
58+
Get the docker credentials from the config.
59+
"""
60+
# we try to get as many credentials as have been
61+
creds = []
62+
cnt = 1
63+
while True:
64+
try:
65+
username = Config.get('docker', f'dockerhub_username_{cnt}')
66+
password = Config.get('docker', f'dockerhub_password_{cnt}')
67+
except:
68+
break
69+
if not username or not password:
70+
break
71+
creds.append({'username': username, 'password': password})
72+
cnt = cnt + 1
73+
return creds
74+
75+
76+
dockerhub_creds = get_docker_credentials()
77+
78+
79+
def get_random_dockerhub_cred():
80+
"""
81+
Chose a dockerhub credential at random
82+
"""
83+
if len(dockerhub_creds) == 0:
84+
return None, None
85+
creds = random.choice(dockerhub_creds)
86+
try:
87+
username = creds['username']
88+
password = creds['password']
89+
except Exception as e:
90+
logger.debug("Got exception trying to get dockerhub credentials")
91+
return None, None
92+
return username, password
93+
94+
95+
def cli_login(cli, username, password):
96+
"""
97+
Try to login a dockerhub cli with a username and password
98+
"""
99+
try:
100+
cli.login(username=username, password=password)
101+
except Exception as e:
102+
logger.error(f"Could not login using dockerhub creds; username: {username}."
103+
f"Exception: {e}")
104+
105+
55106
def rm_container(cid):
56107
"""
57108
Remove a container.
58109
:param cid:
59110
:return:
60111
"""
61112
cli = docker.APIClient(base_url=dd, version="auto")
113+
username, password = get_random_dockerhub_cred()
114+
if username and password:
115+
cli_login(cli, username, password)
62116
try:
63117
rsp = cli.remove_container(cid, force=True)
64118
except Exception as e:
@@ -74,6 +128,9 @@ def pull_image(image):
74128
"""
75129
logger.debug("top of pull_image()")
76130
cli = docker.APIClient(base_url=dd, version="auto")
131+
username, password = get_random_dockerhub_cred()
132+
if username and password:
133+
cli_login(cli, username, password)
77134
try:
78135
rsp = cli.pull(repository=image)
79136
except Exception as e:
@@ -607,6 +664,7 @@ def execute_actor(actor_id,
607664
# a counter of the number of iterations through the main "running" loop;
608665
# this counter is used to determine when less frequent actions, such as log aggregation, need to run.
609666
loop_idx = 0
667+
log_ex = Actor.get_actor_log_ttl(actor_id)
610668
while running and not globals.force_quit:
611669
loop_idx += 1
612670
logger.debug("top of while running loop; loop_idx: {}".format(loop_idx))
@@ -667,7 +725,7 @@ def execute_actor(actor_id,
667725
# grab the logs every 5th iteration --
668726
if loop_idx % 5 == 0:
669727
logs = cli.logs(container.get('Id'))
670-
Execution.set_logs(execution_id, logs, actor_id, tenant, worker_id)
728+
Execution.set_logs(execution_id, logs, actor_id, tenant, worker_id, log_ex)
671729
logs = None
672730

673731
# checking the container status to see if it is still running ----

actors/metrics_api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from agaveflask.utils import AgaveApi, handle_error
66

7-
from controllers import MetricsResource
7+
from controllers import MetricsResource, CronResource
88

99
from errors import errors
1010

@@ -28,6 +28,7 @@
2828

2929
# Resources
3030
api.add_resource(MetricsResource, '/metrics')
31+
api.add_resource(CronResource, '/cron')
3132

3233
if __name__ == '__main__':
3334
app.run(host='0.0.0.0', debug=True)

0 commit comments

Comments
 (0)