diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c2ae5ad0ed..b31dbcf566 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -20,6 +20,9 @@ Added * Added a joint index to solve the problem of slow mongo queries for scheduled executions. #5805 +* Added publisher to ActionAlias to enable streaming ActionAlias create/update/delete events. #5763 + Contributed by @ubaumann + 3.8.0 - November 18, 2022 ------------------------- diff --git a/st2common/st2common/persistence/actionalias.py b/st2common/st2common/persistence/actionalias.py index 1782e4422c..facbe140a1 100644 --- a/st2common/st2common/persistence/actionalias.py +++ b/st2common/st2common/persistence/actionalias.py @@ -14,13 +14,24 @@ # limitations under the License. from __future__ import absolute_import +from st2common import transport from st2common.models.db.actionalias import actionalias_access -from st2common.persistence import base as persistence +from st2common.persistence.base import Access +__all__ = [ + "ActionAlias", +] -class ActionAlias(persistence.Access): + +class ActionAlias(Access): impl = actionalias_access @classmethod def _get_impl(cls): return cls.impl + + @classmethod + def _get_publisher(cls): + if not cls.publisher: + cls.publisher = transport.actionalias.ActionAliasPublisher() + return cls.publisher diff --git a/st2common/st2common/stream/listener.py b/st2common/st2common/stream/listener.py index 347c4cfc75..759133f96f 100644 --- a/st2common/st2common/stream/listener.py +++ b/st2common/st2common/stream/listener.py @@ -21,10 +21,11 @@ from kombu.mixins import ConsumerMixin from oslo_config import cfg -from st2common.models.api.action import LiveActionAPI +from st2common.models.api.action import LiveActionAPI, ActionAliasAPI from st2common.models.api.execution import ActionExecutionAPI from st2common.models.api.execution import ActionExecutionOutputAPI from st2common.transport import utils as transport_utils +from st2common.transport.queues import STREAM_ACTIONALIAS_QUEUE from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE from st2common.transport.queues import STREAM_EXECUTION_UPDATE_WORK_QUEUE @@ -201,6 +202,11 @@ class StreamListener(BaseListener): def get_consumers(self, consumer, channel): return [ + consumer( + queues=[STREAM_ACTIONALIAS_QUEUE], + accept=["pickle"], + callbacks=[self.processor(ActionAliasAPI)], + ), consumer( queues=[STREAM_ANNOUNCEMENT_WORK_QUEUE], accept=["pickle"], diff --git a/st2common/st2common/transport/actionalias.py b/st2common/st2common/transport/actionalias.py new file mode 100644 index 0000000000..33fff1e92d --- /dev/null +++ b/st2common/st2common/transport/actionalias.py @@ -0,0 +1,41 @@ +# Copyright 2022 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# All Exchanges and Queues related to liveaction. + +from __future__ import absolute_import +from kombu import Exchange, Queue +from st2common.transport import publishers + +__all__ = [ + "ActionAliasPublisher", + "get_queue", +] + +ACTIONALIAS_XCHG = Exchange("st2.actionalias", type="topic") + + +class ActionAliasPublisher(publishers.CUDPublisher): + def __init__(self): + super(ActionAliasPublisher, self).__init__(exchange=ACTIONALIAS_XCHG) + + +def get_queue(name=None, routing_key=None, exclusive=False, auto_delete=False): + return Queue( + name, + ACTIONALIAS_XCHG, + routing_key=routing_key, + exclusive=exclusive, + auto_delete=auto_delete, + ) diff --git a/st2common/st2common/transport/bootstrap_utils.py b/st2common/st2common/transport/bootstrap_utils.py index 2eea9ad64b..b2eb8c7c2d 100644 --- a/st2common/st2common/transport/bootstrap_utils.py +++ b/st2common/st2common/transport/bootstrap_utils.py @@ -27,6 +27,7 @@ from st2common import log as logging from st2common.transport import utils as transport_utils +from st2common.transport.actionalias import ACTIONALIAS_XCHG from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG from st2common.transport.announcement import ANNOUNCEMENT_XCHG from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper @@ -62,6 +63,7 @@ # List of exchanges which are pre-declared on service set up. EXCHANGES = [ + ACTIONALIAS_XCHG, ACTIONEXECUTIONSTATE_XCHG, ANNOUNCEMENT_XCHG, EXECUTION_XCHG, diff --git a/st2common/st2common/transport/queues.py b/st2common/st2common/transport/queues.py index 0b19c0ff1d..33a3f26d03 100644 --- a/st2common/st2common/transport/queues.py +++ b/st2common/st2common/transport/queues.py @@ -25,6 +25,7 @@ from kombu import Queue from st2common.constants import action as action_constants +from st2common.transport import actionalias from st2common.transport import actionexecutionstate from st2common.transport import announcement from st2common.transport import execution @@ -42,6 +43,7 @@ "NOTIFIER_ACTIONUPDATE_WORK_QUEUE", "RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE", "RULESENGINE_WORK_QUEUE", + "STREAM_ACTIONALIAS_QUEUE", "STREAM_ANNOUNCEMENT_WORK_QUEUE", "STREAM_EXECUTION_ALL_WORK_QUEUE", "STREAM_EXECUTION_UPDATE_WORK_QUEUE", @@ -94,6 +96,10 @@ # Used by the stream service +STREAM_ACTIONALIAS_QUEUE = actionalias.get_queue( + routing_key=publishers.ANY_RK, exclusive=True, auto_delete=True +) + STREAM_ANNOUNCEMENT_WORK_QUEUE = announcement.get_queue( routing_key=publishers.ANY_RK, exclusive=True, auto_delete=True )