Skip to content

Commit cd6e04c

Browse files
author
Joe Stubbs
committed
merge 1.2.0
2 parents 543aa0f + 75c6c7b commit cd6e04c

29 files changed

+1467
-209
lines changed

CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
11
# Change Log
22
All notable changes to this project will be documented in this file.
33

4+
## 1.2.0 - 2019-07-15
5+
### Added
6+
- Added actor events subsystem with events agent that reads from the events queue.
7+
- Added support for actor links to send an actor's events to another actor.
8+
- Added support for an actor webhook property for sending an actor's events as an HTTP POST to an endpoint.
9+
- Added timing data to messages POST processing.
10+
11+
### Changed
12+
- Executions now change to status "RUNNING" as soon as a worker starts the corresponing actor container.
13+
- Force halting an execution fails if the status is not RUNNING.
14+
- Reading and managing nonces associated with aliases requires permissions on both the alias and the actor.
15+
- Spawner now sets actor to READY state before setting worker to READY state to prevent autoscaler from stopping worker before actor is update to READY.
16+
- Updated ActorMsgQueue to use a new, simpler class, TaskQueue, removing dependency on channelpy.
17+
18+
### Removed
19+
- No change.
20+
21+
422
## 1.1.0 - 2019-06-18
523
### Added
624
- Added support for sending synchronous messages to an actor.

Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ RUN pip3 install -r /requirements.txt
1313

1414
RUN touch /var/log/abaco.log
1515

16+
# set default threads for gunicorn
17+
ENV threads=3
18+
1619
# todo -- add/remove to toggle between local channelpy and github instance
1720
#ADD channelpy /channelpy
1821
#RUN pip3 install /channelpy

abaco.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ dd: unix://var/run/docker.sock
6464
# number of worker containers to initially start when an actor is created
6565
init_count: 1
6666

67+
# set whether autoscaling is enabled
68+
autoscaling = false
69+
6770
# max length of time, in seconds, an actor container is allowed to execute before being killed.
6871
# set to -1 for indefinite execution time.
6972
max_run_time: -1
@@ -116,6 +119,11 @@ show_traceback: false
116119
# Here we set the to 12 hours.
117120
log_ex: 43200
118121

122+
# Max length (in bytes) to store an actor execution's log. If a log exceeds this length, the log will be truncated.
123+
# Note: max_log_length must not exceed the maximum document length for the log store.
124+
# here we default it to 1 MB
125+
max_log_length: 1000000
126+
119127
# Either camel or snake: Whether to return responses in camel case or snake. Default is snake.
120128
case: snake
121129

actors/auth.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,32 @@ def authorization():
206206
if '/actors/aliases' in request.url_rule.rule:
207207
alias_id = get_alias_id()
208208
noun = 'alias'
209-
if request.method == 'GET':
210-
# GET requests require READ access
211-
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ)
212-
# all other requests require UPDATE access
213-
elif request.method in ['DELETE', 'POST', 'PUT']:
214-
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE)
209+
# we need to compute the db_id since it is not computed in the general case for
210+
# alias endpoints
211+
db_id, _ = get_db_id()
212+
# reading/creating/updating nonces for an alias requires permissions for both the
213+
# alias itself and the underlying actor
214+
if 'nonce' in request.url_rule.rule:
215+
noun = 'alias and actor'
216+
# logger.debug("checking user {} has permissions for "
217+
# "alias: {} and actor: {}".format(g.user, alias_id, db_id))
218+
if request.method == 'GET':
219+
# GET requests require READ access
220+
221+
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ)
222+
has_pem = has_pem and check_permissions(user=g.user, identifier=db_id, level=codes.READ)
223+
elif request.method in ['DELETE', 'POST', 'PUT']:
224+
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE)
225+
has_pem = has_pem and check_permissions(user=g.user, identifier=db_id, level=codes.UPDATE)
226+
227+
# otherwise, this is a request to manage the alias itself; only requires permissions on the alias
228+
else:
229+
if request.method == 'GET':
230+
# GET requests require READ access
231+
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ)
232+
# all other requests require UPDATE access
233+
elif request.method in ['DELETE', 'POST', 'PUT']:
234+
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE)
215235
else:
216236
# all other checks are based on actor-id:
217237
noun = 'actor'
@@ -318,14 +338,21 @@ def check_permissions(user, identifier, level):
318338

319339
def get_db_id():
320340
"""Get the db_id and actor_identifier from the request path."""
321-
# logger.debug("top of get_db_id. request.path: {}".format(request.path))
341+
# the location of the actor identifier is different for aliases vs actor_id's.
342+
# for actors, it is in index 2:
343+
# /actors/<actor_id>
344+
# for aliases, it is in index 3:
345+
# /actors/aliases/<alias_id>
346+
idx = 2
347+
if 'aliases' in request.path:
348+
idx = 3
322349
path_split = request.path.split("/")
323350
if len(path_split) < 3:
324351
logger.error("Unrecognized request -- could not find the actor id. path_split: {}".format(path_split))
325352
raise PermissionsException("Not authorized.")
326-
# logger.debug("path_split: {}".format(path_split))
327-
actor_identifier = path_split[2]
328-
# logger.debug("actor_identifier: {}; tenant: {}".format(actor_identifier, g.tenant))
353+
logger.debug("path_split: {}".format(path_split))
354+
actor_identifier = path_split[idx]
355+
logger.debug("actor_identifier: {}; tenant: {}".format(actor_identifier, g.tenant))
329356
try:
330357
actor_id = Actor.get_actor_id(g.tenant, actor_identifier)
331358
except KeyError:

actors/channels.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,26 @@ def put_cmd(self, actor_id, worker_id, image, tenant, stop_existing=True):
9696
self.put(msg)
9797

9898

99+
class EventsChannel(Channel):
100+
"""Work with events on the events channel."""
101+
102+
event_queue_names = ('default',
103+
)
104+
105+
def __init__(self, name='default'):
106+
self.uri = Config.get('rabbit', 'uri')
107+
if name not in EventsChannel.event_queue_names:
108+
raise Exception('Invalid Events Channel Queue name.')
109+
110+
super().__init__(name='events_channel_{}'.format(name),
111+
connection_type=RabbitConnection,
112+
uri=self.uri)
113+
114+
def put_event(self, json_data):
115+
"""Put a new event on the events channel."""
116+
self.put(json_data)
117+
118+
99119
class BinaryChannel(BasicChannel):
100120
"""Override BaseChannel methods to handle binary messages."""
101121

@@ -131,8 +151,21 @@ def get_one(self):
131151
return self._process(msg.body), msg
132152

133153

154+
from queues import BinaryTaskQueue
155+
156+
157+
class ActorMsgChannel(BinaryTaskQueue):
158+
def __init__(self, actor_id):
159+
super().__init__(name='actor_msg_{}'.format(actor_id))
160+
161+
def put_msg(self, message, d={}, **kwargs):
162+
d['message'] = message
163+
for k, v in kwargs:
164+
d[k] = v
165+
self.put(d)
166+
134167

135-
class ActorMsgChannel(BinaryChannel):
168+
class ActorMSSgChannel(BinaryChannel):
136169
"""Work with messages sent to a specific actor.
137170
"""
138171
def __init__(self, actor_id):

actors/clients.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def run(self):
7575
message, msg_obj = self.ch.get_one()
7676
# we directly ack messages from the clients channel because caller expects direct reply_to
7777
msg_obj.ack()
78-
logger.info("cleintg processing message: {}".format(message))
78+
logger.info("clientg processing message: {}".format(message))
7979
anon_ch = message['reply_to']
8080
cmd = message['value']
8181
if cmd.get('command') == 'new':

actors/codes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
PROCESSING = 'PROCESSING'
1414
COMPLETE = 'COMPLETE'
1515
SUBMITTED = 'SUBMITTED'
16+
RUNNING = 'RUNNING'
1617
READY = 'READY'
1718
ERROR = 'ERROR'
1819
BUSY = 'BUSY'

0 commit comments

Comments
 (0)