|
46 | 46 | TASK_OUTPUT_FAILED,
|
47 | 47 | TASK_OUTPUT_SUCCEEDED,
|
48 | 48 | )
|
| 49 | +from cylc.flow.task_pool import TaskPool |
49 | 50 | from cylc.flow.task_state import (
|
50 | 51 | TASK_STATUS_EXPIRED,
|
51 | 52 | TASK_STATUS_FAILED,
|
@@ -2464,3 +2465,75 @@ async def test_start_tasks(
|
2464 | 2465 | "2050/baz",
|
2465 | 2466 | ])
|
2466 | 2467 | )
|
| 2468 | + |
| 2469 | + |
| 2470 | +async def test_add_new_flow_rows_on_spawn( |
| 2471 | + flow, |
| 2472 | + scheduler, |
| 2473 | + run, |
| 2474 | + complete, |
| 2475 | + db_select, |
| 2476 | + capcall, |
| 2477 | +) -> None: |
| 2478 | + """Task suicide should not override previously completed outputs. |
| 2479 | +
|
| 2480 | + See https://github.com/cylc/cylc-flow/pull/6821 |
| 2481 | + """ |
| 2482 | + # capture all TaskPool.spawn_task() calls |
| 2483 | + spawn_task_calls = capcall( |
| 2484 | + 'cylc.flow.task_pool.TaskPool.spawn_task', TaskPool.spawn_task |
| 2485 | + ) |
| 2486 | + |
| 2487 | + def list_spawn_task_calls(): |
| 2488 | + """Return a list of the names of tasks which have been run through the |
| 2489 | + "spawn_tasks" function so far.""" |
| 2490 | + return [ |
| 2491 | + args[1] for args, _kwargs in spawn_task_calls |
| 2492 | + ] |
| 2493 | + |
| 2494 | + id_ = flow({ |
| 2495 | + 'scheduling': { |
| 2496 | + 'graph': { |
| 2497 | + 'R1': ''' |
| 2498 | + slow:fail? => foo |
| 2499 | + slow? => !foo |
| 2500 | + foo:x => x |
| 2501 | + ''', |
| 2502 | + }, |
| 2503 | + }, |
| 2504 | + 'runtime': { |
| 2505 | + 'foo': { |
| 2506 | + 'outputs': {'x': 'xxx'} |
| 2507 | + }, |
| 2508 | + }, |
| 2509 | + }) |
| 2510 | + |
| 2511 | + schd = scheduler(id_, paused_start=False) |
| 2512 | + async with run(schd): |
| 2513 | + # 1/slow should spawn on startup |
| 2514 | + assert list_spawn_task_calls() == ['slow'] |
| 2515 | + |
| 2516 | + # set foo:x |
| 2517 | + await commands.run_cmd( |
| 2518 | + commands.set_prereqs_and_outputs( |
| 2519 | + schd, ['1/foo'], ['1'], ['x'], None |
| 2520 | + ) |
| 2521 | + ) |
| 2522 | + # 1/foo:x should be recorded in the DB: |
| 2523 | + assert db_select( |
| 2524 | + schd, True, 'task_outputs', 'outputs', cycle='1', name='foo' |
| 2525 | + ) == [('{"x": "(manually completed)"}',)] |
| 2526 | + # and 1/x should spawn: |
| 2527 | + assert list_spawn_task_calls() == ['slow', 'x'] |
| 2528 | + |
| 2529 | + # run the workflow until completion |
| 2530 | + await complete(schd, timeout=5) |
| 2531 | + |
| 2532 | + # 1/foo should spawn as a result of the suicide trigger |
| 2533 | + assert list_spawn_task_calls() == ['slow', 'x', 'foo'] |
| 2534 | + |
| 2535 | + # the manually completed output should not have been overwritten by the |
| 2536 | + # suicide trigger |
| 2537 | + assert db_select( |
| 2538 | + schd, True, 'task_outputs', 'outputs', cycle='1', name='foo' |
| 2539 | + ) == [('{"x": "(manually completed)"}',)] |
0 commit comments