Skip to content

Commit d8c355a

Browse files
authored
Merge pull request #6578 from MetRonnie/graphql-err-handling
Improve server-client communication error handling * Ensure GraphQL errors are forwarded onto the client * Print better message for cylc trigger incompatibility between 8.3-8.4 * Move network module tests into own dir
1 parent 0076ffc commit d8c355a

26 files changed

+428
-251
lines changed

changes.d/6578.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improved handling of any internal errors when executing commands against a running workflow.

cylc/flow/exceptions.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@
1818

1919
from textwrap import wrap
2020
from typing import (
21+
TYPE_CHECKING,
2122
Dict,
2223
Optional,
2324
Sequence,
2425
Set,
2526
Union,
26-
TYPE_CHECKING,
2727
)
2828

29+
from cylc.flow import __version__ as CYLC_VERSION
2930
from cylc.flow.util import format_cmd
3031

32+
3133
if TYPE_CHECKING:
3234
from cylc.flow.subprocctx import SubFuncContext
3335

@@ -285,6 +287,24 @@ def __str__(self) -> str:
285287
return ret
286288

287289

290+
class RequestError(ClientError):
291+
"""Represents an error handling a request, returned by the server."""
292+
293+
def __init__(
294+
self, message: str, workflow_cylc_version: Optional[str] = None
295+
):
296+
ClientError.__init__(
297+
self,
298+
message,
299+
traceback=(
300+
f"(Workflow is running in Cylc {workflow_cylc_version})"
301+
if workflow_cylc_version
302+
and workflow_cylc_version != CYLC_VERSION
303+
else None
304+
),
305+
)
306+
307+
288308
class WorkflowStopped(ClientError):
289309
"""The Cylc scheduler you attempted to connect to is stopped."""
290310

cylc/flow/network/__init__.py

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
import asyncio
1919
import getpass
2020
import json
21-
from typing import Optional, Tuple
21+
from typing import (
22+
TYPE_CHECKING,
23+
Optional,
24+
Tuple,
25+
Union,
26+
)
2227

2328
import zmq
2429
import zmq.asyncio
@@ -30,32 +35,77 @@
3035
CylcError,
3136
CylcVersionError,
3237
ServiceFileError,
33-
WorkflowStopped
38+
WorkflowStopped,
3439
)
3540
from cylc.flow.hostuserutil import get_fqdn_by_host
3641
from cylc.flow.workflow_files import (
3742
ContactFileFields,
38-
KeyType,
39-
KeyOwner,
4043
KeyInfo,
44+
KeyOwner,
45+
KeyType,
46+
get_workflow_srv_dir,
4147
load_contact_file,
42-
get_workflow_srv_dir
4348
)
4449

50+
51+
if TYPE_CHECKING:
52+
# BACK COMPAT: typing_extensions.TypedDict
53+
# FROM: Python 3.7
54+
# TO: Python 3.11
55+
from typing_extensions import TypedDict
56+
57+
4558
API = 5 # cylc API version
4659
MSG_TIMEOUT = "TIMEOUT"
4760

61+
if TYPE_CHECKING:
62+
class ResponseDict(TypedDict, total=False):
63+
"""Structure of server response messages.
4864
49-
def encode_(message):
50-
"""Convert the structure holding a message field from JSON to a string."""
51-
try:
52-
return json.dumps(message)
53-
except TypeError as exc:
54-
return json.dumps({'errors': [{'message': str(exc)}]})
65+
Confusingly, has similar format to GraphQL execution result.
66+
But if we change this now we could break compatibility for
67+
issuing commands to/receiving responses from workflows running in
68+
different versions of Cylc 8.
69+
"""
70+
data: object
71+
"""For most Cylc commands that issue GQL mutations, the data field will
72+
look like:
73+
data: {
74+
<mutationName1>: {
75+
result: [
76+
{
77+
id: <workflow/task ID>,
78+
response: [<success_bool>, <message>]
79+
},
80+
...
81+
]
82+
}
83+
}
84+
but this is not 100% consistent unfortunately
85+
"""
86+
error: Union[Exception, str, dict]
87+
"""If an error occurred that could not be handled.
88+
(usually a dict {message: str, traceback?: str}).
89+
"""
90+
user: str
91+
cylc_version: str
92+
"""Server (i.e. running workflow) Cylc version.
93+
94+
Going forward, we include this so we can more easily handle any future
95+
back-compat issues."""
96+
97+
98+
def serialize(data: object) -> str:
99+
"""Convert the structure holding a message to a JSON message string."""
100+
# Abstract out the transport format in order to allow it to be changed
101+
# in future.
102+
return json.dumps(data)
55103

56104

57-
def decode_(message):
58-
"""Convert an encoded message string to JSON with an added 'user' field."""
105+
def deserialize(message: str) -> 'ResponseDict':
106+
"""Convert a JSON message string to dict with an added 'user' field."""
107+
# Abstract out the transport format in order to allow it to be changed
108+
# in future.
59109
msg = json.loads(message)
60110
msg['user'] = getpass.getuser() # assume this is the user
61111
return msg

cylc/flow/network/client.py

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,51 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
"""Client for workflow runtime API."""
1717

18-
from abc import ABCMeta, abstractmethod
18+
from abc import (
19+
ABCMeta,
20+
abstractmethod,
21+
)
1922
import asyncio
2023
import os
2124
from shutil import which
2225
import socket
2326
import sys
24-
from typing import Any, Optional, Union, Dict
27+
from typing import (
28+
TYPE_CHECKING,
29+
Any,
30+
Dict,
31+
Optional,
32+
Union,
33+
)
2534

2635
import zmq
2736
import zmq.asyncio
2837

29-
from cylc.flow import LOG
38+
from cylc.flow import (
39+
LOG,
40+
__version__ as CYLC_VERSION,
41+
)
3042
from cylc.flow.exceptions import (
31-
ClientError,
3243
ClientTimeout,
3344
ContactFileExists,
3445
CylcError,
46+
RequestError,
3547
WorkflowStopped,
3648
)
3749
from cylc.flow.hostuserutil import get_fqdn_by_host
3850
from cylc.flow.network import (
39-
encode_,
40-
decode_,
51+
ZMQSocketBase,
52+
deserialize,
4153
get_location,
42-
ZMQSocketBase
54+
serialize,
4355
)
4456
from cylc.flow.network.client_factory import CommsMeth
4557
from cylc.flow.network.server import PB_METHOD_MAP
46-
from cylc.flow.workflow_files import (
47-
detect_old_contact_file,
48-
)
58+
from cylc.flow.workflow_files import detect_old_contact_file
59+
60+
61+
if TYPE_CHECKING:
62+
from cylc.flow.network import ResponseDict
4963

5064

5165
class WorkflowRuntimeClientBase(metaclass=ABCMeta):
@@ -292,12 +306,12 @@ async def async_request(
292306
if req_meta:
293307
msg['meta'].update(req_meta)
294308
LOG.debug('zmq:send %s', msg)
295-
message = encode_(msg)
309+
message = serialize(msg)
296310
self.socket.send_string(message)
297311

298312
# receive response
299313
if self.poller.poll(timeout):
300-
res = await self.socket.recv()
314+
res: bytes = await self.socket.recv()
301315
else:
302316
self.timeout_handler()
303317
raise ClientTimeout(
@@ -307,25 +321,26 @@ async def async_request(
307321
' --comms-timeout option;'
308322
'\n* or check the workflow log.'
309323
)
324+
LOG.debug('zmq:recv %s', res)
310325

311-
if msg['command'] in PB_METHOD_MAP:
312-
response = {'data': res}
313-
else:
314-
response = decode_(
315-
res.decode() if isinstance(res, bytes) else res
316-
)
317-
LOG.debug('zmq:recv %s', response)
326+
if command in PB_METHOD_MAP:
327+
return res
328+
329+
response: ResponseDict = deserialize(res.decode())
318330

319331
try:
320332
return response['data']
321333
except KeyError:
322-
error = response.get(
323-
'error',
324-
{'message': f'Received invalid response: {response}'},
325-
)
326-
raise ClientError(
327-
error.get('message'), # type: ignore
328-
error.get('traceback'), # type: ignore
334+
error = response.get('error')
335+
if isinstance(error, dict):
336+
error = error.get('message', error)
337+
if not error:
338+
error = (
339+
f"Received invalid response for Cylc {CYLC_VERSION}: "
340+
f"{response}"
341+
)
342+
raise RequestError(
343+
str(error), response.get('cylc_version')
329344
) from None
330345

331346
def get_header(self) -> dict:

cylc/flow/network/multi.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,36 @@
1616

1717
import asyncio
1818
import sys
19-
from typing import Callable, Dict, List, Tuple, Optional, Union, Type
19+
from typing import (
20+
Callable,
21+
Dict,
22+
List,
23+
Optional,
24+
Tuple,
25+
Type,
26+
Union,
27+
)
2028

2129
from ansimarkup import ansiprint
2230

31+
from cylc.flow import __version__ as CYLC_VERSION
2332
from cylc.flow.async_util import unordered_map
24-
from cylc.flow.exceptions import CylcError, WorkflowStopped
33+
from cylc.flow.exceptions import (
34+
CylcError,
35+
WorkflowStopped,
36+
)
2537
import cylc.flow.flags
2638
from cylc.flow.id_cli import parse_ids_async
2739
from cylc.flow.terminal import DIM
2840

2941

42+
# Known error messages for incompatibilites between this version of Cylc (that
43+
# is running the command) and the version of Cylc running the workflow:
44+
KNOWN_INCOMPAT = {
45+
'Unknown argument "onResume" on field "trigger" of type "Mutations".',
46+
}
47+
48+
3049
def call_multi(*args, **kwargs):
3150
"""Call a function for each workflow in a list of IDs.
3251
@@ -220,21 +239,36 @@ def _process_response(
220239

221240

222241
def _report(
223-
response: dict,
242+
response: Union[dict, list],
224243
) -> Tuple[Optional[str], Optional[str], bool]:
225244
"""Report the result of a GraphQL operation.
226245
227246
This analyses GraphQL mutation responses to determine the outcome.
228247
229248
Args:
230-
response: The GraphQL response.
249+
response: The workflow server response (NOT necessarily conforming to
250+
GraphQL execution result spec).
231251
232252
Returns:
233253
(stdout, stderr, outcome)
234254
235255
"""
236256
try:
237257
ret: List[Tuple[Optional[str], Optional[str], bool]] = []
258+
if not isinstance(response, dict):
259+
if isinstance(response, list) and response[0].get('error'):
260+
# If operating on workflow running in older Cylc version,
261+
# may get a error response like [{'error': '...'}]
262+
if response[0]['error'].get('message') in KNOWN_INCOMPAT:
263+
raise Exception(
264+
"This command is no longer compatible with the "
265+
"version of Cylc running the workflow. Please stop & "
266+
f"restart the workflow with Cylc {CYLC_VERSION} "
267+
"or higher."
268+
f"\n\n{response}"
269+
)
270+
raise Exception(response)
271+
raise Exception(f"Unexpected response: {response}")
238272
for mutation_response in response.values():
239273
# extract the result of each mutation result in the response
240274
success, msg = mutation_response['result'][0]['response']
@@ -268,7 +302,7 @@ def _report(
268302
# response returned is not in the expected format - this shouldn't
269303
# happen but we need to protect against it
270304
err_msg = ''
271-
if cylc.flow.flags.verbosity > 1: # debug mode
305+
if cylc.flow.flags.verbosity > 0: # verbose mode
272306
# print the full result to stderr
273307
err_msg += f'\n <{DIM}>response={response}</{DIM}>'
274308
return (

0 commit comments

Comments
 (0)