Skip to content

Commit 70a8637

Browse files
authored
Merge pull request #5763: Add publisher to ActionAlias
2 parents 8d2304e + 76891f3 commit 70a8637

File tree

6 files changed

+72
-3
lines changed

6 files changed

+72
-3
lines changed

CHANGELOG.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ Added
2020

2121
* Added a joint index to solve the problem of slow mongo queries for scheduled executions. #5805
2222

23+
* Added publisher to ActionAlias to enable streaming ActionAlias create/update/delete events. #5763
24+
Contributed by @ubaumann
25+
2326

2427
3.8.0 - November 18, 2022
2528
-------------------------

st2common/st2common/persistence/actionalias.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,24 @@
1414
# limitations under the License.
1515

1616
from __future__ import absolute_import
17+
from st2common import transport
1718
from st2common.models.db.actionalias import actionalias_access
18-
from st2common.persistence import base as persistence
19+
from st2common.persistence.base import Access
1920

21+
__all__ = [
22+
"ActionAlias",
23+
]
2024

21-
class ActionAlias(persistence.Access):
25+
26+
class ActionAlias(Access):
2227
impl = actionalias_access
2328

2429
@classmethod
2530
def _get_impl(cls):
2631
return cls.impl
32+
33+
@classmethod
34+
def _get_publisher(cls):
35+
if not cls.publisher:
36+
cls.publisher = transport.actionalias.ActionAliasPublisher()
37+
return cls.publisher

st2common/st2common/stream/listener.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
from kombu.mixins import ConsumerMixin
2222
from oslo_config import cfg
2323

24-
from st2common.models.api.action import LiveActionAPI
24+
from st2common.models.api.action import LiveActionAPI, ActionAliasAPI
2525
from st2common.models.api.execution import ActionExecutionAPI
2626
from st2common.models.api.execution import ActionExecutionOutputAPI
2727
from st2common.transport import utils as transport_utils
28+
from st2common.transport.queues import STREAM_ACTIONALIAS_QUEUE
2829
from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE
2930
from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE
3031
from st2common.transport.queues import STREAM_EXECUTION_UPDATE_WORK_QUEUE
@@ -201,6 +202,11 @@ class StreamListener(BaseListener):
201202

202203
def get_consumers(self, consumer, channel):
203204
return [
205+
consumer(
206+
queues=[STREAM_ACTIONALIAS_QUEUE],
207+
accept=["pickle"],
208+
callbacks=[self.processor(ActionAliasAPI)],
209+
),
204210
consumer(
205211
queues=[STREAM_ANNOUNCEMENT_WORK_QUEUE],
206212
accept=["pickle"],
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright 2022 The StackStorm Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# All Exchanges and Queues related to liveaction.
16+
17+
from __future__ import absolute_import
18+
from kombu import Exchange, Queue
19+
from st2common.transport import publishers
20+
21+
__all__ = [
22+
"ActionAliasPublisher",
23+
"get_queue",
24+
]
25+
26+
ACTIONALIAS_XCHG = Exchange("st2.actionalias", type="topic")
27+
28+
29+
class ActionAliasPublisher(publishers.CUDPublisher):
30+
def __init__(self):
31+
super(ActionAliasPublisher, self).__init__(exchange=ACTIONALIAS_XCHG)
32+
33+
34+
def get_queue(name=None, routing_key=None, exclusive=False, auto_delete=False):
35+
return Queue(
36+
name,
37+
ACTIONALIAS_XCHG,
38+
routing_key=routing_key,
39+
exclusive=exclusive,
40+
auto_delete=auto_delete,
41+
)

st2common/st2common/transport/bootstrap_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from st2common import log as logging
2929
from st2common.transport import utils as transport_utils
30+
from st2common.transport.actionalias import ACTIONALIAS_XCHG
3031
from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG
3132
from st2common.transport.announcement import ANNOUNCEMENT_XCHG
3233
from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper
@@ -62,6 +63,7 @@
6263

6364
# List of exchanges which are pre-declared on service set up.
6465
EXCHANGES = [
66+
ACTIONALIAS_XCHG,
6567
ACTIONEXECUTIONSTATE_XCHG,
6668
ANNOUNCEMENT_XCHG,
6769
EXECUTION_XCHG,

st2common/st2common/transport/queues.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from kombu import Queue
2626

2727
from st2common.constants import action as action_constants
28+
from st2common.transport import actionalias
2829
from st2common.transport import actionexecutionstate
2930
from st2common.transport import announcement
3031
from st2common.transport import execution
@@ -42,6 +43,7 @@
4243
"NOTIFIER_ACTIONUPDATE_WORK_QUEUE",
4344
"RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE",
4445
"RULESENGINE_WORK_QUEUE",
46+
"STREAM_ACTIONALIAS_QUEUE",
4547
"STREAM_ANNOUNCEMENT_WORK_QUEUE",
4648
"STREAM_EXECUTION_ALL_WORK_QUEUE",
4749
"STREAM_EXECUTION_UPDATE_WORK_QUEUE",
@@ -94,6 +96,10 @@
9496

9597

9698
# Used by the stream service
99+
STREAM_ACTIONALIAS_QUEUE = actionalias.get_queue(
100+
routing_key=publishers.ANY_RK, exclusive=True, auto_delete=True
101+
)
102+
97103
STREAM_ANNOUNCEMENT_WORK_QUEUE = announcement.get_queue(
98104
routing_key=publishers.ANY_RK, exclusive=True, auto_delete=True
99105
)

0 commit comments

Comments
 (0)