Skip to content

Commit c0a9e57

Browse files
authored
Improve Fray job name in remote & StepSpec (#3076)
* follow up to #3051 * if `name` is specified in `remote` don't change it * if name is not specified, best effort to get base name + short uuid * I don't think arg hashing is worth the complexity + on restart would collide, and if the user wants name lock, they can handle it * `StepSpec` uses the name with hash + short uuid This way: * afaiu you can get name "lock" via explicit name * restart of steps should just work
1 parent 9c372ee commit c0a9e57

File tree

2 files changed

+18
-11
lines changed

2 files changed

+18
-11
lines changed

lib/marin/src/marin/execution/remote.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ def train(...): ... # explicit resources
1717

1818
from __future__ import annotations
1919

20-
import hashlib
20+
import dataclasses
2121
import re
22+
import uuid
2223
from collections.abc import Callable
2324
from dataclasses import dataclass, field
2425
from typing import Generic, ParamSpec, TypeVar
@@ -29,7 +30,7 @@ def train(...): ... # explicit resources
2930
P = ParamSpec("P")
3031
R = TypeVar("R")
3132

32-
DEFAULT_JOB_NAME = "fray_exec_job"
33+
DEFAULT_JOB_NAME = "remote_job"
3334

3435

3536
def _sanitize_job_name(name: str) -> str:
@@ -39,12 +40,6 @@ def _sanitize_job_name(name: str) -> str:
3940
return sanitized or DEFAULT_JOB_NAME
4041

4142

42-
def _args_hash(*args: object, **kwargs: object) -> str:
43-
"""Short hash of call arguments to disambiguate jobs with the same function name."""
44-
content = repr(args) + repr(sorted(kwargs.items()))
45-
return hashlib.sha256(content.encode()).hexdigest()[:8]
46-
47-
4843
@dataclass(frozen=True)
4944
class RemoteCallable(Generic[P, R]):
5045
"""A callable wrapper that submits its function to Fray when called.
@@ -60,12 +55,21 @@ class RemoteCallable(Generic[P, R]):
6055
pip_dependency_groups: list[str] = field(default_factory=list)
6156
name: str | None = None
6257

58+
def named(self, name: str) -> RemoteCallable:
59+
"""Noop if already has a name. Otherwise use provided name."""
60+
if self.name:
61+
return self
62+
return dataclasses.replace(self, name=name)
63+
6364
# TODO: JobHandle doesn't have this option now, but we could make this return the R
6465
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> None:
6566
"""Submit fn to Fray and block until completion."""
6667

67-
base_name = self.name or getattr(self.fn, "__name__", None) or DEFAULT_JOB_NAME
68-
name = f"{base_name}-{_args_hash(*args, **kwargs)}"
68+
if self.name:
69+
name = self.name
70+
else:
71+
fn_name = getattr(self.fn, "__name__", None) or DEFAULT_JOB_NAME
72+
name = f"{fn_name}-{uuid.uuid4().hex[:8]}"
6973
c = fray_client.current_client()
7074
handle = c.submit(
7175
JobRequest(

lib/marin/src/marin/execution/step_spec.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import dataclasses
77
import hashlib
88
import json
9+
import uuid
910
from collections.abc import Callable
1011
from dataclasses import dataclass
1112
from functools import cached_property
@@ -92,6 +93,8 @@ def executable_fn(self) -> Callable[[str], Any]:
9293
)
9394

9495
if isinstance(self.fn, RemoteCallable):
95-
wrapped = dataclasses.replace(self.fn, fn=wrapped)
96+
job_name = f"{self.name_with_hash}-{uuid.uuid4().hex[:8]}"
97+
remote_callable = self.fn.named(job_name)
98+
wrapped = dataclasses.replace(remote_callable, fn=wrapped)
9699

97100
return wrapped

0 commit comments

Comments
 (0)