|
15 | 15 |
|
16 | 16 | import json |
17 | 17 | from textwrap import dedent |
| 18 | +from time import time |
18 | 19 | from urllib.parse import urlencode |
19 | 20 |
|
20 | 21 | import pytest |
21 | 22 | from tornado.httpclient import HTTPClientError |
22 | 23 |
|
23 | 24 | from cylc.flow.id import Tokens |
| 25 | +from cylc.uiserver.data_store_mgr import DataStoreMgr, ALL_DELTAS |
24 | 26 |
|
25 | 27 |
|
26 | 28 | @pytest.fixture |
@@ -421,3 +423,140 @@ async def test_subscription(gql_subscription, dummy_workflow): |
421 | 423 | await ws.write_message(json.dumps({"id": sub_id, "type": "stop"})) |
422 | 424 |
|
423 | 425 | ws.close() |
| 426 | + |
| 427 | + |
| 428 | +async def test_subscription_deltas( |
| 429 | + cylc_uis, gql_subscription, make_all_delta): |
| 430 | + """Test deltas being processesed and recieved by a GraphQL subscription.""" |
| 431 | + |
| 432 | + data_store_mgr = cylc_uis.data_store_mgr |
| 433 | + |
| 434 | + # Start subscription |
| 435 | + sub_id = "1" |
| 436 | + ws = await gql_subscription( |
| 437 | + *('cylc', 'subscriptions'), |
| 438 | + sub={ |
| 439 | + "id": sub_id, |
| 440 | + "type": "start", |
| 441 | + "payload": { |
| 442 | + "query": """ |
| 443 | + subscription { |
| 444 | + deltas (stripNull: true){ |
| 445 | + id |
| 446 | + shutdown |
| 447 | + added { |
| 448 | + workflow { |
| 449 | + status |
| 450 | + } |
| 451 | + taskProxies { |
| 452 | + id |
| 453 | + state |
| 454 | + } |
| 455 | + } |
| 456 | + updated { |
| 457 | + workflow { |
| 458 | + status |
| 459 | + } |
| 460 | + taskProxies { |
| 461 | + id |
| 462 | + state |
| 463 | + } |
| 464 | + } |
| 465 | + } |
| 466 | + } |
| 467 | + """ |
| 468 | + } |
| 469 | + } |
| 470 | + ) |
| 471 | + |
| 472 | + w_tokens = Tokens(user='user', workflow='this') |
| 473 | + w_id = w_tokens.id |
| 474 | + |
| 475 | + # Register for data-store entry |
| 476 | + await data_store_mgr.register_workflow(w_id=w_id, is_active=False) |
| 477 | + |
| 478 | + # Receive first message |
| 479 | + response = json.loads(await ws.read_message()) |
| 480 | + assert response == { |
| 481 | + 'id': sub_id, |
| 482 | + 'type': 'data', |
| 483 | + 'payload': { |
| 484 | + 'data': { |
| 485 | + 'deltas': { |
| 486 | + 'id': w_id, |
| 487 | + 'shutdown': False, |
| 488 | + 'added': { |
| 489 | + 'workflow': {'status': 'stopped'} |
| 490 | + }, |
| 491 | + 'updated': {} |
| 492 | + } |
| 493 | + } |
| 494 | + } |
| 495 | + } |
| 496 | + |
| 497 | + # Create added delta. |
| 498 | + tp_id = w_tokens.duplicate(cycle='1', task='foo').id |
| 499 | + all_added_delta = make_all_delta(w_id, 'added', tp_id, 'waiting', time()) |
| 500 | + all_added_delta.workflow.added.status = 'running' |
| 501 | + all_added_delta.workflow.reloaded = True |
| 502 | + |
| 503 | + # Process added delta, creating gql subscription delta as a result. |
| 504 | + data_store_mgr._update_workflow_data(ALL_DELTAS, all_added_delta, w_id) |
| 505 | + |
| 506 | + # Receive next message. |
| 507 | + response = json.loads(await ws.read_message()) |
| 508 | + assert response == { |
| 509 | + 'id': sub_id, |
| 510 | + 'type': 'data', |
| 511 | + 'payload': { |
| 512 | + 'data': { |
| 513 | + 'deltas': { |
| 514 | + 'id': w_id, |
| 515 | + 'shutdown': False, |
| 516 | + 'added': { |
| 517 | + 'workflow': {'status': 'running'}, |
| 518 | + 'taskProxies': [ |
| 519 | + { |
| 520 | + 'id': tp_id, |
| 521 | + 'state': 'waiting' |
| 522 | + } |
| 523 | + ] |
| 524 | + }, |
| 525 | + 'updated': {} |
| 526 | + } |
| 527 | + } |
| 528 | + } |
| 529 | + } |
| 530 | + |
| 531 | + # Create update delta. |
| 532 | + all_updated_delta = make_all_delta( |
| 533 | + w_id, 'updated', tp_id, 'running', time()) |
| 534 | + all_updated_delta.workflow.updated.status = 'stopping' |
| 535 | + all_updated_delta.workflow.reloaded = False |
| 536 | + |
| 537 | + # Process updated delta. |
| 538 | + data_store_mgr._update_workflow_data(ALL_DELTAS, all_updated_delta, w_id) |
| 539 | + |
| 540 | + # Receive next message. |
| 541 | + response = json.loads(await ws.read_message()) |
| 542 | + assert response['payload']['data']['deltas']['updated'][ |
| 543 | + 'workflow' |
| 544 | + ]['status'] == 'stopping' |
| 545 | + assert response['payload']['data']['deltas']['updated'][ |
| 546 | + 'taskProxies' |
| 547 | + ][0]['state'] == 'running' |
| 548 | + |
| 549 | + # Shutdown delta |
| 550 | + data_store_mgr._update_workflow_data( |
| 551 | + 'shutdown', 'this is the end'.encode('utf-8'), w_id) |
| 552 | + |
| 553 | + # Receive messages until shutdown message. |
| 554 | + # The workflow manager sends a stopped delta when scanning, |
| 555 | + # because of the test setup. |
| 556 | + while not response['payload']['data']['deltas']['shutdown']: |
| 557 | + response = json.loads(await ws.read_message()) |
| 558 | + |
| 559 | + # Run the stop/cleanup code |
| 560 | + await ws.write_message(json.dumps({"id": sub_id, "type": "stop"})) |
| 561 | + |
| 562 | + ws.close() |
0 commit comments