Skip to content

Conversation

@JinZhou5042
Copy link
Member

Proposed Changes

We had only one mode (cloudpickle) to wrap function arguments and send to workers.

But sometimes arguments can also be passed of a json format and we should support that.

Merge Checklist

The following items must be completed before PRs can be merged.
Check these off to verify you have completed all steps.

  • make test Run local tests prior to pushing.
  • make format Format source code to comply with lint policies. Note that some lint errors can only be resolved manually (e.g., Python)
  • make lint Run lint on source code prior to pushing.
  • Manual Update: Update the manual to reflect user-visible changes.
  • Type Labels: Select a github label for the type: bugfix, enhancement, etc.
  • Product Labels: Select a github label for the product: TaskVine, Makeflow, etc.
  • PR RTM: Mark your PR as ready to merge.

@JinZhou5042 JinZhou5042 self-assigned this Oct 15, 2025
@JinZhou5042 JinZhou5042 requested a review from tphung3 November 5, 2025 20:32
@tphung3
Copy link
Contributor

tphung3 commented Nov 5, 2025

What is the inspiration behind this change, or can you please provide more context or use case? I'm thinking about the corner case of a Python class as an argument, which AFAIK isn't JSON-serializable or -deserializable out of the box with the json module.

Can you also add tests to make sure this proposed feature works?

@tphung3
Copy link
Contributor

tphung3 commented Nov 5, 2025

If you want to support this feature you also need to add a flag somewhere around here to serialize Python objects by JSON too, as arguments via "infile" are always serialized with cloudpickle at the moment (see

name = os.path.join(self.manager.staging_directory, "arguments", self._id)
with open(name, "wb") as wf:
cloudpickle.dump(self._event, wf)
self._input_file = self.manager.declare_file(name, unlink_when_done=True, cache=False, peer_transfer=True)
if self._tmp_output_enabled:
self._output_file = self.manager.declare_temp()
else:
name = os.path.join(self.manager.staging_directory, "outputs", self._id)
self._output_file = self.manager.declare_file(name, cache=self._cache_output, unlink_when_done=False)
self._event = None # free args memory. Once in a file they are not needed anymore.
self.add_input(self._input_file, "infile")
)

@JinZhou5042
Copy link
Member Author

What is the inspiration behind this change, or can you please provide more context or use case? I'm thinking about the corner case of a Python class as an argument, which AFAIK isn't JSON-serializable or -deserializable out of the box with the json module.

Can you also add tests to make sure this proposed feature works?

This feature enables the design of a new graph executor, where the entire graph is installed within the library. Each function call carries the key that identifies the function to execute, whose format is string. Since the task key is a string, we can pass the text directly instead of serializing it via cloudpickle.

For example, a task graph can be defined as:

def add(x, y):
    return x + y

graph = {
    "a": (add, 1, 2),
    "b": (add, "a", 3),
    "c": (add, "a", "b"),
}

Here, graph is installed in the library, the executor takes a key, retrieves the corresponding task definition, and executes it. Thus, the infile for the three tasks would look like:

{args=("a"), kwargs={}}
{args=("b"), kwargs={}}
{args=("c"), kwargs={}}

Although a single argument could suffice for representing the key, json is used for better flexibility.

@JinZhou5042
Copy link
Member Author

JinZhou5042 commented Nov 6, 2025

If you want to support this feature you also need to add a flag somewhere around here to serialize Python objects by JSON too, as arguments via "infile" are always serialized with cloudpickle at the moment (see

name = os.path.join(self.manager.staging_directory, "arguments", self._id)
with open(name, "wb") as wf:
cloudpickle.dump(self._event, wf)
self._input_file = self.manager.declare_file(name, unlink_when_done=True, cache=False, peer_transfer=True)
if self._tmp_output_enabled:
self._output_file = self.manager.declare_temp()
else:
name = os.path.join(self.manager.staging_directory, "outputs", self._id)
self._output_file = self.manager.declare_file(name, cache=self._cache_output, unlink_when_done=False)
self._event = None # free args memory. Once in a file they are not needed anymore.
self.add_input(self._input_file, "infile")

)

The new feature doesn't go through task.py, instead it creates tasks directly on the C side, I will create some examples later in a later PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants