Skip to content

Conversation

@benclifford
Copy link
Collaborator

Description

this is most immediately in the context of issue #3534 - but @WardLT might also be interested

this very rough PR:

  • moves more checkpoint/memo out of the data flow kernel into the existing memoizer implementation class

  • makes the DFK use an abstract Memoizer class, with the existing implementation now in BasicMemoizer

  • adds a test to demo perhaps for isse Method for overriding cached/checkpointed results #3534 showing how checkpoint/memo lookup can look at the args of the function it has been passed to decide whether to ask an underlying BasicMemoizer to look up a result, or to not return a memoized result without even asking the basic memoizer

This PR is intended for experimentation with this kind of API, but a lot of it drives towards a cleaner codebase and so for the most part should find its way into master branch

Type of change

Choose which options apply, and delete the ones which do not apply.

  • New feature

Copy link

@drewoldag drewoldag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks great and seems to be exactly what we would want to be able to programmatically avoid using cached/memoized data. I left one small question spurred by a comment in test_memoize_plugin.py, but it's not critical. Thanks a bunch for putting this together.

# TODO: this .result() needs to be here, not in the loop
# because otherwise we race to complete... and then
# we might sometimes get a memoization before the loop
# and sometimes not...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this generally something that users should keep in mind when working with caching and memoization in Parsl? Namely that there could be race conditions that crop up for functions that are in a loop and return very quickly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will happen if you launch two apps with the same parameters at "the same time", where the same time means without waiting for one of them to complete:

  1. invoke my_app(7)

there's no checkpoint/memo so we submit it to be executed, as task 1

  1. invoke my_app(7)

there's no checkpoint/memo so we submit it to be executed, as task 2

  1. task 1 completes and its result is memoized as my_app 7

  2. task 2 completes and its result is memozied as my_app 7, replacing the result from step 3.

Maybe possible to implement to avoid this (again low priority for me) is more like Haskell thunks:

  1. invoke my_app(7)

there's no checkpoint/memo so we submit it to be executed, as task 1. we memoise its unpopulated future as my_app 7.

  1. invoke my_app(7)

there's a memo future for it (with no result yet) - so use that for the result

  1. task 1 completes, populating its result future.
    This population of result future causes the 2nd invocation memo future to be populated too, and that completes.

so my_app is only run once.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, ok. I see what you mean. From a selfish perspective, I don't think the extra effort of memoizing a future is necessary for my work.

Thanks a bunch for the explanation.

@benclifford
Copy link
Collaborator Author

@drewoldag I'm interested to see any code you write that makes use of the interface - link it here if anything goes online. it doesn't need to look beautiful - i'm more interested in the realistic practical use.

@benclifford benclifford force-pushed the benc-checkpoint-plugins branch from 12b2168 to 1846145 Compare July 30, 2024 20:04
@benclifford
Copy link
Collaborator Author

by chance a different checkpoint related question/use case come up and I've added a bit more to this demo to address that. the question was can checkpointing optionally also checkpoint exceptions (so that they will be treated as permanent failures even across runs). almost all of the machinery to do that is already there - including the file format having a space for exceptions (that was unused and should have been tidied away long ago). This PR instead now makes use of that file format and lets a user specify policies for which completed apps should be checkpointed to disk.

@PfeifferMicha
Copy link

PfeifferMicha commented Aug 1, 2024

As mentioned by @benclifford, I wanted to cache certain exceptions as "valid" results so that these tasks would not be rerun. I've made some preliminary tests with this PR and it seems to work as expected.

I used this custom Memoizer to cache tasks throwing SampleProcessingException or SampleValidationException:

class ExceptionSafeMemoizer(BasicMemoizer):
    def filter_for_checkpoint(self, app_fu):

        # task record is available from app_fu.task_record
        assert app_fu.task_record is not None

        # Checkpoint either if there's a result, or if the exception is a
        # SampleProcessingException or SampleValidationException:
        exp = app_fu.exception() 
        # If an exception has occurred:
        if exp is not None:
            # Check if the exception type is one of our internal pipeline exceptions. If so, do cache this
            # result, because this is expected behavior and we do not need to re-run this task in the future.
            if isinstance( exp, SampleProcessingException ) or isinstance( exp, SampleValidationException ):
                return True
            else:
                # If an unexpected exception occurred, do _not_ cache anything
                return False
        # If no exception occurred, cache result.
        return True

(Not sure about the line assert app_fu.task_record is not None, I copied that from the example.)

As mentioned on Slack, the only thing that was a bit inconvenient was that exceptions within the filter_for_checkpoint call - even code errors - would be silently ignored (i.e. they were only written into parsl.log, but they didn't communicate back to the main process called by the user or abort the program as I'd expect them to).

@benclifford
Copy link
Collaborator Author

a good other example of checkpointing is an out of memory store using sqlite3 - that would make a nice alternate implementation

@benclifford benclifford force-pushed the benc-checkpoint-plugins branch 7 times, most recently from 23ff9ce to 4726c81 Compare August 22, 2024 15:57
@benclifford benclifford force-pushed the benc-checkpoint-plugins branch 3 times, most recently from 697ee3d to fd32707 Compare January 27, 2025 17:59
@benclifford benclifford force-pushed the benc-checkpoint-plugins branch from fd32707 to 57fcfc6 Compare March 14, 2025 14:05
@benclifford benclifford force-pushed the benc-checkpoint-plugins branch from 57fcfc6 to dbba605 Compare September 3, 2025 10:34
@benclifford benclifford force-pushed the benc-checkpoint-plugins branch 2 times, most recently from 6b0ac2b to 779b001 Compare September 29, 2025 18:02
@benclifford
Copy link
Collaborator Author

I made a blog post about this work: https://parsl-project.org/2025/09/30/checkpointing.html

@PfeifferMicha
Copy link

I made a blog post about this work: https://parsl-project.org/2025/09/30/checkpointing.html

Thanks, I hope to check it out soon! Both database-checkpointing and the pluggable filter sound like they'll make my use-case a lot easier.

@benclifford benclifford force-pushed the benc-checkpoint-plugins branch 2 times, most recently from 3f2464a to 3e74939 Compare October 1, 2025 14:05
@benclifford benclifford changed the title demo of what checkpointing plugins might look like Checkpointing plugins Oct 1, 2025
@benclifford benclifford force-pushed the benc-checkpoint-plugins branch 2 times, most recently from d1d157f to e1178be Compare October 7, 2025 19:22
benclifford added a commit that referenced this pull request Oct 10, 2025
This should not change behaviour and only move code around and
add in more wiring.

This is working towards all of checkpointing/memoizer code being
plugable, with the DFK having no knowledge of the specifics - for
example, the DFK no longer imports `pickle`, because that is now the
business of the memoizer.

Because of that goal, the parameters for Memoizer are arranged
in two ways:

Constructor parameters for Memoizer are parameters that the future user
will supply at configuration when Memoizer is exposed as a user plugin,
like other plugins.

DFK internals that should be injected into the (eventually pluggable)
memoizer (small m) are set as startup-time attributes, as for other
plugins.

Right now these two things happen right next to each other, but the
Memoizer constructor call will move into user configuration space in
a subsequent PR.

As before this PR, there is still a separation between "checkpointing"
and "memoization" that is slightly artificial. Subsequent PRs will
merge these actions together more in this Memoizer implementation.

This PR preserves a user-facing dfk.checkpoint() call, that becomes a
passthrough to Memoizer.checkpoint().  I think that is likely to
disappear in a future PR: manual checkpoint triggering will become a
feature (or non-feature) of a specific memoizer implementation and so
a method directly on that memoizer (or not).

To go alongside the existing update_memo call, called by the DFK when
a task is ready for memoization, this PR adds update_checkpoint, which
captures the slightly different notion of a task being ready for
checkpointing -- the current implementation can memoize a task that is
not yet complete because it memoizes the future, not the result, while
a checkpoint needs the actual result to be ready and so must be called
later. This latter call happens in the race-condition-prone
handle_app_update. A later PR will remove this distinction and move
everything to around the same place as update_memo. See PR #3979 for
description of fixing a related race condition related to update_memo.

See PR #3535 for broader context.
The new complete_task_exception and the older complete_task_result
share a lot of code -- but upcoming PRs in my current checkpoint
work will make them behave a bit differently later, so I don't want
to merge them now and unmerge them later.

The order of task completion operations is not preserved by this PR:
before this PR, different instances of the exception completion boiler
plate ran the same operations in different orders.

I don't expect that to be a problem, but if you've bisected to this
PR with a strange ordering/race condition, there's a clue.

This PR removes a comment from one instance of the boilerplate that
asks a question fixed by a previous PR #4004 that also accidentally
introduced the question while tidying it.
benclifford added a commit that referenced this pull request Oct 17, 2025
This should not change behaviour and only move code around and
add in more wiring.

This is working towards all of checkpointing/memoizer code being
plugable, with the DFK having no knowledge of the specifics - for
example, the DFK no longer imports `pickle`, because that is now the
business of the memoizer.

Because of that goal, the parameters for Memoizer are arranged
in two ways:

Constructor parameters for Memoizer are parameters that the future user
will supply at configuration when Memoizer is exposed as a user plugin,
like other plugins.

DFK internals that should be injected into the (eventually pluggable)
memoizer (small m) are set as startup-time attributes, as for other
plugins.

Right now these two things happen right next to each other, but the
Memoizer constructor call will move into user configuration space in
a subsequent PR.

As before this PR, there is still a separation between "checkpointing"
and "memoization" that is slightly artificial. Subsequent PRs will
merge these actions together more in this Memoizer implementation.

This PR preserves a user-facing dfk.checkpoint() call, that becomes a
passthrough to Memoizer.checkpoint().  I think that is likely to
disappear in a future PR: manual checkpoint triggering will become a
feature (or non-feature) of a specific memoizer implementation and so
a method directly on that memoizer (or not).

To go alongside the existing update_memo call, called by the DFK when
a task is ready for memoization, this PR adds update_checkpoint, which
captures the slightly different notion of a task being ready for
checkpointing -- the current implementation can memoize a task that is
not yet complete because it memoizes the future, not the result, while
a checkpoint needs the actual result to be ready and so must be called
later. This latter call happens in the race-condition-prone
handle_app_update. A later PR will remove this distinction and move
everything to around the same place as update_memo. See PR #3979 for
description of fixing a related race condition related to update_memo.

See PR #3535 for broader context.
@benclifford benclifford force-pushed the benc-checkpoint-plugins branch from 5593ade to 34592a4 Compare October 17, 2025 15:02
This PR is not trying to simplify the whole nested structure,
just this one layer, as an incremental simplification.

It relies on:

if a:
  x
else:
  if b:
    y
  else:
    z

being the same as:

if a:
  x
elif b:
  y
else:
  z
This moves code around a little bit to make error handling in the
dependency failure case more consistent with other failures. This
unlocks potential for refactoring of the error handling code.

Rought principles for error handling that are true for other errors
and that this PR makes true for dependency failures:

* _launch_if_ready_async creates a Future for the task, and does not
  modify the TaskRecord state or send monitoring information

* handle_exec_update decides if a task has failed by whether the
  Future contains an exception or not. It updates the TaskRecord
  state and sends monitoring information.

Removing one call of _send_task_log_info (in _launch_if_ready_async)
and instead relying on the existing call in handle_exec_update
removes a duplicate dep_fail entry in the monitoring database, that
was previously unnoticed.

$ pytest parsl/tests/test_python_apps/test_depfail_propagation.py --config parsl/tests/configs/htex_local_alternate.py

Before this PR:

sqlite> select * from status where task_status_name = 'dep_fail';
1|dep_fail|2025-10-16 09:29:20.171145|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
1|dep_fail|2025-10-16 09:29:20.171472|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
3|dep_fail|2025-10-16 09:29:20.255851|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
3|dep_fail|2025-10-16 09:29:20.256067|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
4|dep_fail|2025-10-16 09:29:20.256353|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
4|dep_fail|2025-10-16 09:29:20.256452|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
5|dep_fail|2025-10-16 09:29:20.256732|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
5|dep_fail|2025-10-16 09:29:20.256856|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
7|dep_fail|2025-10-16 09:29:20.335936|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
7|dep_fail|2025-10-16 09:29:20.336110|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
8|dep_fail|2025-10-16 09:29:20.336398|1d65bd94-8639-4ebf-90f9-2b61cca36763|0
8|dep_fail|2025-10-16 09:29:20.336522|1d65bd94-8639-4ebf-90f9-2b61cca36763|0

After this PR:

sqlite> select * from status where task_status_name = 'dep_fail';
1|dep_fail|2025-10-16 09:31:35.782225|be2cd195-b13b-4b03-ac2f-95b73cc9a200|0
3|dep_fail|2025-10-16 09:31:35.865129|be2cd195-b13b-4b03-ac2f-95b73cc9a200|0
4|dep_fail|2025-10-16 09:31:35.865509|be2cd195-b13b-4b03-ac2f-95b73cc9a200|0
5|dep_fail|2025-10-16 09:31:35.865873|be2cd195-b13b-4b03-ac2f-95b73cc9a200|0
7|dep_fail|2025-10-16 09:31:35.945858|be2cd195-b13b-4b03-ac2f-95b73cc9a200|0
8|dep_fail|2025-10-16 09:31:35.946355|be2cd195-b13b-4b03-ac2f-95b73cc9a200|0
(These counts are not generally tested, but this PR is not introducing
a general tests for them; instead it is about testing dependency handling
behaviour more - based on regressions encountered during development of
adjacent PRs)
The "else" case of the erstwhile-if statement was not defined properly:
What are the circumstances and behaviour if an exec_fu is not set by
this point?

All don't-launch-yet behaviour is covered by `return` statements earlier
on and the second part of the method should always proceed with making
an exec_fu of some kind.
This PR adds _send_task_log_info as a mandatory part of task completion,
which is already manually enforced. In one place, around line 500, one
_send_task_log_info is split into two: one which then disappears into
complete_task_result, and the other which remains in place for the
non-factored error handling path.

This PR renames _complete_task to _complete_task_result to reflect that
it is for the result path not the exception path (taking naming from
the Future concepts of the same name), and to open up the way for
factorisation of the exception path into _complete_task_exception.
This is set again 2 lines later, and the only potential for
use is a call to update_task_state, which does not use
time_returned. The time is set for the purposes of the
subsequent _send_task_log_info.
This should not change behaviour and only move code around and
add in more wiring.

This is working towards all of checkpointing/memoizer code being
plugable, with the DFK having no knowledge of the specifics - for
example, the DFK no longer imports `pickle`, because that is now the
business of the memoizer.

Because of that goal, the parameters for Memoizer are arranged
in two ways:

Constructor parameters for Memoizer are parameters that the future user
will supply at configuration when Memoizer is exposed as a user plugin,
like other plugins.

DFK internals that should be injected into the (eventually pluggable)
memoizer (small m) are set as startup-time attributes, as for other
plugins.

Right now these two things happen right next to each other, but the
Memoizer constructor call will move into user configuration space in
a subsequent PR.

As before this PR, there is still a separation between "checkpointing"
and "memoization" that is slightly artificial. Subsequent PRs will
merge these actions together more in this Memoizer implementation.

This PR preserves a user-facing dfk.checkpoint() call, that becomes a
passthrough to Memoizer.checkpoint().  I think that is likely to
disappear in a future PR: manual checkpoint triggering will become a
feature (or non-feature) of a specific memoizer implementation and so
a method directly on that memoizer (or not).

To go alongside the existing update_memo call, called by the DFK when
a task is ready for memoization, this PR adds update_checkpoint, which
captures the slightly different notion of a task being ready for
checkpointing -- the current implementation can memoize a task that is
not yet complete because it memoizes the future, not the result, while
a checkpoint needs the actual result to be ready and so must be called
later. This latter call happens in the race-condition-prone
handle_app_update. A later PR will remove this distinction and move
everything to around the same place as update_memo. See PR #3979 for
description of fixing a related race condition related to update_memo.

See PR #3535 for broader context.
Prior to this PR, the memoizer was told about the result in the
AppFuture result callback.

By the time this happens, the user workflow code can also observe that
the task is completed and also observe that the memoizer has not got
a copy of the result: for example, by invoking the same app again
immediately and seeing a duplicate execution.

The AppFuture callback shouldn't have anything in it that is related
to user-observable task completion, but this PR does not remove other
stuff in there.

This PR moves update_memo earlier, to before setting the AppFuture
result, so that the memoizer is strictly before future changes state.
This was introduced in 2018 in commit
fd8ce2e
already commented-out.
Prior to PR #2514, tasks were removed at completion when checkpointed,
rather than when completed, in the case that checkpointing was enabled.

This was race-y behaviour and was removed in PR #2414.

This test also looks like it is subject to a related race condition
around task completion, described in issue #1279, for which there is a
sleep statement - this PR does not modify that but does add a reference
to the issue.
This was introduced in 52b6b3a and
looks like it was not used at that time either.
This comment talks about checkpointing taking priority over tasks,
but it is unclear what priority for what?

This comment was introduced in f2659c8
which implemented different checkpointing times.
update_memo is given the whole task record, which contains the relevant
Future to be memoizing.

The memoizer should never be being updated with a future that is not
coming from a coherent task record with aligned hash sum and future.
…e all this stuff happen inside those?

TODO: should these run in the app_fu update lock? there's a bunch of other completion stuff that is not
locked, so probably no? in general this lock isn't protecting against two completion actions happening
at once.

split update_memo into two notify-exception and notify-result forms, that run before the future is set and which
means checkpointing can run in that place rather than racing with the user workflow.

this is needed to address an existing race condition - that task_exit tasks aren't checkpointed by the time
the workflow observes they have finished. it's debatable whether that is actually the intended functionality
or not, but i think it should be. otherwise what's the behaviour? a little bit later?

this manifests as awkwardness in implementation in later checkpoint methods too.

and moving more checkpoint code into update_memo makes the checkpoint/memo API unification cleaner.

See PR #NNNN which talks about the app future being used internally when it should only be user-facing.
wipe_task now happens in complete task, the moral successor of handle_app_update

but it now happens *before* the user observes a task complete.

this makes the task be able to be removed from the tasks table before the
user observes the future is complete.

you might say thats a new race condition: the user cannot see the task table entry
at the moment of completion. but that was never strongly guaranteed: the callback
to remove the entry could happen before or after the user observed the completion.

now theres a stronger assertion: it will definitely happen before the user observes
task completion via an AppFuture.

TODO: are there already tests about this?

When i removed wipe_task calls entirely, no per-config test failed...

parsl/tests/test_python_apps/test_garbage_collect.py
  - this test was very slightly racey before and this was noticeable
    in some race-condition fuzzing work I've done
by this time, all the checkpointing and memoization code bother
needs to be moved out of the DFK
(in upcoming PR, for exmaple)

fresh_config style templatisation works just fine here IMO
TODO: rewrite documentation that refers to any example checkpointing or app caching or memoization configuration
with the documentation ready to receive documentation for the SQL plugin

this PR is the biggest user facing change because it changes the configuration interface for anyone configuring
anything to do with app caching/memoization/checkpointing.
and is more reusable when it isn't

this isn't the only way to make a hash though. and
hashing isn't the only way to compare checkpoint
entries for equality.
because different memoizers (and different BasicMemoizer configs) won't necessarily have the identity semantics tested here
goal: results should not (never? in weak small cache?) be stored in an
in-memory memo table. so that memo table should be not present in this
implementation. instead all memo questions go to the sqlite3 database.

this drives some blurring between in-memory caching and disk-based
checkpointing: the previous disk based checkpointed model relied on
repopulating the in-memory memo table cache...

i hit some thread problems when using one sqlite3 connection across threads
and the docs are unclear about what I can/cannot do, so i made this open
the sqlite3 database on every access. that's probably got quite a performance
hit, but its probably enough for basically validating the idea.

the three usecases: sqlite3, in-memory and traditional on-disk checkpoint,
maybe needs some thoughts about the completion semantics and data structures
to go with it.

TODO: this breaks:
  pytest  parsl/tests/test_python_apps/test_memoize_exception.py  --config parsl/tests/configs/htex_local_alternate.py
because the behaviour asserted there is only behaviour for the default
memoizer. So it should probably become a --config local test with
known configuration(s).
@benclifford benclifford force-pushed the benc-checkpoint-plugins branch from 34592a4 to a9cb522 Compare October 17, 2025 15:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants