|
46 | 46 | TASK_OUTPUT_FAILED,
|
47 | 47 | TASK_OUTPUT_SUCCEEDED,
|
48 | 48 | )
|
49 |
| -from cylc.flow.task_proxy import TaskProxy |
| 49 | +from cylc.flow.task_pool import TaskPool |
50 | 50 | from cylc.flow.task_state import (
|
51 | 51 | TASK_STATUS_EXPIRED,
|
52 | 52 | TASK_STATUS_FAILED,
|
|
62 | 62 | if TYPE_CHECKING:
|
63 | 63 | from cylc.flow.cycling import PointBase
|
64 | 64 | from cylc.flow.scheduler import Scheduler
|
| 65 | + from cylc.flow.task_proxy import TaskProxy |
65 | 66 |
|
66 | 67 | # NOTE: foo and bar have no parents so at start-up (even with the workflow
|
67 | 68 | # paused) they get spawned out to the runahead limit. 2/pub spawns
|
@@ -2438,3 +2439,75 @@ async def test_start_tasks(
|
2438 | 2439 | "2050/baz",
|
2439 | 2440 | ])
|
2440 | 2441 | )
|
| 2442 | + |
| 2443 | + |
| 2444 | +async def test_add_new_flow_rows_on_spawn( |
| 2445 | + flow, |
| 2446 | + scheduler, |
| 2447 | + run, |
| 2448 | + complete, |
| 2449 | + db_select, |
| 2450 | + capcall, |
| 2451 | +) -> None: |
| 2452 | + """Task suicide should not override previously completed outputs. |
| 2453 | +
|
| 2454 | + See https://github.com/cylc/cylc-flow/pull/6821 |
| 2455 | + """ |
| 2456 | + # capture all TaskPool.spawn_task() calls |
| 2457 | + spawn_task_calls = capcall( |
| 2458 | + 'cylc.flow.task_pool.TaskPool.spawn_task', TaskPool.spawn_task |
| 2459 | + ) |
| 2460 | + |
| 2461 | + def list_spawn_task_calls(): |
| 2462 | + """Return a list of the names of tasks which have been run through the |
| 2463 | + "spawn_tasks" function so far.""" |
| 2464 | + return [ |
| 2465 | + args[1] for args, _kwargs in spawn_task_calls |
| 2466 | + ] |
| 2467 | + |
| 2468 | + id_ = flow({ |
| 2469 | + 'scheduling': { |
| 2470 | + 'graph': { |
| 2471 | + 'R1': ''' |
| 2472 | + slow:fail? => foo |
| 2473 | + slow? => !foo |
| 2474 | + foo:x => x |
| 2475 | + ''', |
| 2476 | + }, |
| 2477 | + }, |
| 2478 | + 'runtime': { |
| 2479 | + 'foo': { |
| 2480 | + 'outputs': {'x': 'xxx'} |
| 2481 | + }, |
| 2482 | + }, |
| 2483 | + }) |
| 2484 | + |
| 2485 | + schd = scheduler(id_, paused_start=False) |
| 2486 | + async with run(schd): |
| 2487 | + # 1/slow should spawn on startup |
| 2488 | + assert list_spawn_task_calls() == ['slow'] |
| 2489 | + |
| 2490 | + # set foo:x |
| 2491 | + await commands.run_cmd( |
| 2492 | + commands.set_prereqs_and_outputs( |
| 2493 | + schd, ['1/foo'], ['1'], ['x'], None |
| 2494 | + ) |
| 2495 | + ) |
| 2496 | + # 1/foo:x should be recorded in the DB: |
| 2497 | + assert db_select( |
| 2498 | + schd, True, 'task_outputs', 'outputs', cycle='1', name='foo' |
| 2499 | + ) == [('{"x": "(manually completed)"}',)] |
| 2500 | + # and 1/x should spawn: |
| 2501 | + assert list_spawn_task_calls() == ['slow', 'x'] |
| 2502 | + |
| 2503 | + # run the workflow until completion |
| 2504 | + await complete(schd, timeout=5) |
| 2505 | + |
| 2506 | + # 1/foo should spawn as a result of the suicide trigger |
| 2507 | + assert list_spawn_task_calls() == ['slow', 'x', 'foo'] |
| 2508 | + |
| 2509 | + # the manually completed output should not have been overwritten by the |
| 2510 | + # suicide trigger |
| 2511 | + assert db_select( |
| 2512 | + schd, True, 'task_outputs', 'outputs', cycle='1', name='foo' |
| 2513 | + ) == [('{"x": "(manually completed)"}',)] |
0 commit comments