Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@


INFO_FILES_DIR = Path(USER_CONF_ROOT / "info_files")
DELTA_PROCESSING_INTERVAL = 0.5


class PathType(TraitType):
Expand Down Expand Up @@ -321,16 +322,6 @@ class CylcUIServer(ExtensionApp):
''',
default_value=1
)
max_threads = Int(
config=True,
help='''
Set the maximum number of threads the Cylc UI Server can use.

This determines the maximum number of active workflows that the
server can track.
''',
default_value=100,
)
profile = Bool(
config=True,
help='''
Expand Down Expand Up @@ -425,7 +416,6 @@ def __init__(self, *args, **kwargs):
self.data_store_mgr = DataStoreMgr(
self.workflows_mgr,
self.log,
self.max_threads,
)
# sub_status dictionary storing status of subscriptions
self.sub_statuses = {}
Expand Down Expand Up @@ -491,6 +481,12 @@ def initialize_settings(self):
self.scan_interval * 1000
).start()

# process incoming ZeroMQ deltas from the scheduler(s)
ioloop.PeriodicCallback(
self.data_store_mgr.process_incoming_deltas,
DELTA_PROCESSING_INTERVAL * 1000
).start()

def initialize_handlers(self):
self.authobj = self.set_auth()
self.set_sub_server()
Expand Down Expand Up @@ -620,9 +616,6 @@ async def stop_extension(self):
for sub in self.data_store_mgr.w_subs.values():
sub.stop()

# Shutdown the thread pool executor (used for subscription processing)
self.data_store_mgr.executor.shutdown(wait=False)

# stop the process pool (used for background commands)
self.executor.shutdown()

Expand Down
65 changes: 31 additions & 34 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@

Reconciliation on failed verification is done by requesting all elements of a
topic, and replacing the respective data-store elements with this.

Subscriptions are currently run in a different thread (via ThreadPoolExecutor).

"""

import asyncio
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from pathlib import Path
import time
from typing import TYPE_CHECKING, Dict, Optional, Set, cast

import zmq

from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
from cylc.flow.network.server import PB_METHOD_MAP
Expand Down Expand Up @@ -82,30 +80,20 @@ class DataStoreMgr:
Service that scans for workflows.
log:
Application logger.
max_threads:
Max number of threads to use for subscriptions.

Note, this determines the maximum number of active workflows that
can be updated.

This should be overridden for real use in the UIS app. The
default is here for test purposes.

"""

INIT_DATA_WAIT_TIME = 5. # seconds
INIT_DATA_RETRY_DELAY = 0.5 # seconds
RECONCILE_TIMEOUT = 5. # seconds
PENDING_DELTA_CHECK_INTERVAL = 0.5

def __init__(self, workflows_mgr, log, max_threads=10):
def __init__(self, workflows_mgr, log):
self.workflows_mgr = workflows_mgr
self.log = log
self.data = {}
self.w_subs: Dict[str, WorkflowSubscriber] = {}
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.loop = None
self.executor = ThreadPoolExecutor(max_threads)
self.delta_queues = {}

@log_call
Expand Down Expand Up @@ -148,11 +136,6 @@ async def connect_workflow(self, w_id, contact_data):
"""Initiate workflow subscriptions.

Call this when a workflow has started.

Subscriptions and sync management is instantiated and run in
a separate thread for each workflow. This is to avoid the sync loop
blocking the main loop.

"""
if self.loop is None:
self.loop = asyncio.get_running_loop()
Expand All @@ -163,10 +146,7 @@ async def connect_workflow(self, w_id, contact_data):

self.delta_queues[w_id] = {}

# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executor.submit(
self._start_subscription,
self._start_subscription(
w_id,
contact_data['name'],
contact_data[CFF.HOST],
Expand Down Expand Up @@ -256,11 +236,30 @@ def _start_subscription(self, w_id, reg, host, port):
context=self.workflows_mgr.context,
topics=self.topics
)
self.w_subs[w_id].loop.run_until_complete(
self.w_subs[w_id].subscribe(
process_delta_msg,
func=self._update_workflow_data,
w_id=w_id))

async def process_incoming_deltas(self):
"""Receive and process deltas until there are none for each workflow.
"""
w_empty = set()
w_check = set(self.w_subs.keys())
while w_check.difference(w_empty):
for w_id, sub in self.w_subs.items():
if w_id in w_empty:
continue
try:
[topic, delta] = await sub.socket.recv_multipart(
flags=zmq.NOBLOCK
)
except zmq.ZMQError:
w_empty.add(w_id)
continue

process_delta_msg(
topic,
delta,
func=self._update_workflow_data,
w_id=w_id
)

def _update_workflow_data(self, topic, delta, w_id):
"""Manage and apply incoming data-store deltas.
Expand Down Expand Up @@ -303,7 +302,6 @@ def _apply_all_delta(self, w_id, delta):
if sub_delta.reloaded:
self._clear_data_field(w_id, field.name)
self.data[w_id]['delta_times'][field.name] = 0.0
# hard to catch errors in a threaded async app, so use try-except.
try:
# Apply the delta if newer than the previously applied.
if delta_time >= self.data[w_id]['delta_times'][field.name]:
Expand Down Expand Up @@ -346,16 +344,15 @@ def _reconcile_update(self, topic, delta, w_id):
self.log.debug(
f'Out of sync with {topic} of {w_id}... Reconciling.')
try:
# use threadsafe as client socket is in main loop thread.
future = asyncio.run_coroutine_threadsafe(
task = asyncio.create_task(
workflow_request(
self.workflows_mgr.workflows[w_id]['req_client'],
'pb_data_elements',
args={'element_type': topic}
),
self.loop
)
new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
new_delta_msg = task.result(self.RECONCILE_TIMEOUT)
new_delta = DELTAS_MAP[topic]()
new_delta.ParseFromString(new_delta_msg)
self._clear_data_field(w_id, topic)
Expand All @@ -366,7 +363,7 @@ def _reconcile_update(self, topic, delta, w_id):
f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the subscription/sync.'
)
future.cancel()
task.cancel()
except Exception as exc:
self.log.exception(exc)

Expand Down
Loading