diff --git a/cylc/uiserver/app.py b/cylc/uiserver/app.py index 9914268a..9537b4d6 100644 --- a/cylc/uiserver/app.py +++ b/cylc/uiserver/app.py @@ -111,6 +111,7 @@ INFO_FILES_DIR = Path(USER_CONF_ROOT / "info_files") +DELTA_PROCESSING_INTERVAL = 0.5 class PathType(TraitType): @@ -321,16 +322,6 @@ class CylcUIServer(ExtensionApp): ''', default_value=1 ) - max_threads = Int( - config=True, - help=''' - Set the maximum number of threads the Cylc UI Server can use. - - This determines the maximum number of active workflows that the - server can track. - ''', - default_value=100, - ) profile = Bool( config=True, help=''' @@ -425,7 +416,6 @@ def __init__(self, *args, **kwargs): self.data_store_mgr = DataStoreMgr( self.workflows_mgr, self.log, - self.max_threads, ) # sub_status dictionary storing status of subscriptions self.sub_statuses = {} @@ -491,6 +481,12 @@ def initialize_settings(self): self.scan_interval * 1000 ).start() + # process incoming ZeroMQ deltas from the scheduler(s) + ioloop.PeriodicCallback( + self.data_store_mgr.process_incoming_deltas, + DELTA_PROCESSING_INTERVAL * 1000 + ).start() + def initialize_handlers(self): self.authobj = self.set_auth() self.set_sub_server() @@ -620,9 +616,6 @@ async def stop_extension(self): for sub in self.data_store_mgr.w_subs.values(): sub.stop() - # Shutdown the thread pool executor (used for subscription processing) - self.data_store_mgr.executor.shutdown(wait=False) - # stop the process pool (used for background commands) self.executor.shutdown() diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index db066e08..9aefc3aa 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -26,18 +26,16 @@ Reconciliation on failed verification is done by requesting all elements of a topic, and replacing the respective data-store elements with this. - -Subscriptions are currently run in a different thread (via ThreadPoolExecutor). - """ import asyncio -from concurrent.futures import ThreadPoolExecutor from copy import deepcopy from pathlib import Path import time from typing import TYPE_CHECKING, Dict, Optional, Set, cast +import zmq + from cylc.flow.exceptions import WorkflowStopped from cylc.flow.id import Tokens from cylc.flow.network.server import PB_METHOD_MAP @@ -82,15 +80,6 @@ class DataStoreMgr: Service that scans for workflows. log: Application logger. - max_threads: - Max number of threads to use for subscriptions. - - Note, this determines the maximum number of active workflows that - can be updated. - - This should be overridden for real use in the UIS app. The - default is here for test purposes. - """ INIT_DATA_WAIT_TIME = 5. # seconds @@ -98,14 +87,13 @@ class DataStoreMgr: RECONCILE_TIMEOUT = 5. # seconds PENDING_DELTA_CHECK_INTERVAL = 0.5 - def __init__(self, workflows_mgr, log, max_threads=10): + def __init__(self, workflows_mgr, log): self.workflows_mgr = workflows_mgr self.log = log self.data = {} self.w_subs: Dict[str, WorkflowSubscriber] = {} self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'} self.loop = None - self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} @log_call @@ -148,11 +136,6 @@ async def connect_workflow(self, w_id, contact_data): """Initiate workflow subscriptions. Call this when a workflow has started. - - Subscriptions and sync management is instantiated and run in - a separate thread for each workflow. This is to avoid the sync loop - blocking the main loop. - """ if self.loop is None: self.loop = asyncio.get_running_loop() @@ -163,10 +146,7 @@ async def connect_workflow(self, w_id, contact_data): self.delta_queues[w_id] = {} - # Might be options other than threads to achieve - # non-blocking subscriptions, but this works. - self.executor.submit( - self._start_subscription, + self._start_subscription( w_id, contact_data['name'], contact_data[CFF.HOST], @@ -256,11 +236,30 @@ def _start_subscription(self, w_id, reg, host, port): context=self.workflows_mgr.context, topics=self.topics ) - self.w_subs[w_id].loop.run_until_complete( - self.w_subs[w_id].subscribe( - process_delta_msg, - func=self._update_workflow_data, - w_id=w_id)) + + async def process_incoming_deltas(self): + """Receive and process deltas until there are none for each workflow. + """ + w_empty = set() + w_check = set(self.w_subs.keys()) + while w_check.difference(w_empty): + for w_id, sub in self.w_subs.items(): + if w_id in w_empty: + continue + try: + [topic, delta] = await sub.socket.recv_multipart( + flags=zmq.NOBLOCK + ) + except zmq.ZMQError: + w_empty.add(w_id) + continue + + process_delta_msg( + topic, + delta, + func=self._update_workflow_data, + w_id=w_id + ) def _update_workflow_data(self, topic, delta, w_id): """Manage and apply incoming data-store deltas. @@ -303,7 +302,6 @@ def _apply_all_delta(self, w_id, delta): if sub_delta.reloaded: self._clear_data_field(w_id, field.name) self.data[w_id]['delta_times'][field.name] = 0.0 - # hard to catch errors in a threaded async app, so use try-except. try: # Apply the delta if newer than the previously applied. if delta_time >= self.data[w_id]['delta_times'][field.name]: @@ -346,8 +344,7 @@ def _reconcile_update(self, topic, delta, w_id): self.log.debug( f'Out of sync with {topic} of {w_id}... Reconciling.') try: - # use threadsafe as client socket is in main loop thread. - future = asyncio.run_coroutine_threadsafe( + task = asyncio.create_task( workflow_request( self.workflows_mgr.workflows[w_id]['req_client'], 'pb_data_elements', @@ -355,7 +352,7 @@ def _reconcile_update(self, topic, delta, w_id): ), self.loop ) - new_delta_msg = future.result(self.RECONCILE_TIMEOUT) + new_delta_msg = task.result(self.RECONCILE_TIMEOUT) new_delta = DELTAS_MAP[topic]() new_delta.ParseFromString(new_delta_msg) self._clear_data_field(w_id, topic) @@ -366,7 +363,7 @@ def _reconcile_update(self, topic, delta, w_id): f'The reconcile update coroutine {w_id} {topic}' f'took too long, cancelling the subscription/sync.' ) - future.cancel() + task.cancel() except Exception as exc: self.log.exception(exc)