diff --git a/src/huggingface_hub/_jobs_api.py b/src/huggingface_hub/_jobs_api.py index 2c9d76d645..1cddd2d423 100644 --- a/src/huggingface_hub/_jobs_api.py +++ b/src/huggingface_hub/_jobs_api.py @@ -85,7 +85,7 @@ class JobInfo: status: (`JobStatus` or `None`): Status of the Job, e.g. `JobStatus(stage="RUNNING", message=None)` See [`JobStage`] for possible stage values. - status: (`JobOwner` or `None`): + owner: (`JobOwner` or `None`): Owner of the Job, e.g. `JobOwner(id="5e9ecfc04957053f60648a3e", name="lhoestq", type="user")` Example: @@ -142,3 +142,122 @@ def __init__(self, **kwargs) -> None: # Inferred fields self.endpoint = kwargs.get("endpoint", constants.ENDPOINT) self.url = f"{self.endpoint}/jobs/{self.owner.name}/{self.id}" + + +@dataclass +class JobSpec: + docker_image: Optional[str] + space_id: Optional[str] + command: Optional[List[str]] + arguments: Optional[List[str]] + environment: Optional[Dict[str, Any]] + secrets: Optional[Dict[str, Any]] + flavor: Optional[SpaceHardware] + owner: JobOwner + timeout: Optional[int] + tags: Optional[List[str]] + arch: Optional[str] + + def __init__(self, **kwargs) -> None: + self.docker_image = kwargs.get("dockerImage") or kwargs.get("docker_image") + self.space_id = kwargs.get("spaceId") or kwargs.get("space_id") + self.command = kwargs.get("command") + self.arguments = kwargs.get("arguments") + self.environment = kwargs.get("environment") + self.secrets = kwargs.get("secrets") + self.flavor = kwargs.get("flavor") + owner = kwargs.get("owner", {}) + self.owner = JobOwner(id=owner["id"], name=owner["name"], type=owner["type"]) + self.timeout = kwargs.get("timeout") + self.tags = kwargs.get("tags") + self.arch = kwargs.get("arch") + + +@dataclass +class LastJobInfo: + id: str + at: datetime + + def __init__(self, **kwargs) -> None: + self.id = kwargs["id"] + self.at = parse_datetime(kwargs["at"]) + + +@dataclass +class ScheduledJobStatus: + last_job: Optional[LastJobInfo] + next_job_run_at: datetime + + def __init__(self, **kwargs) -> None: + last_job = kwargs.get("lastJob") or kwargs.get("last_job") + self.last_job = LastJobInfo(**last_job) if last_job else None + next_job_run_at = kwargs.get("nextJobRunAt") or kwargs.get("next_job_run_at") + self.next_job_run_at = parse_datetime(str(next_job_run_at)) + + +@dataclass +class ScheduledJobInfo: + """ + Contains information about a Job. + + Args: + id (`str`): + Scheduled Job ID. + created_at (`datetime` or `None`): + When the scheduled Job was created. + tags (`List[str]` or `None`): + The tags of the scheduled Job. + schedule (`str` or `None`): + One of "annually", "yearly", "monthly", "weekly", "daily", "hourly", or a + CRON schedule expression (e.g., '0 9 * * 1' for 9 AM every Monday). + suspend (`bool` or `None`): + Whether the the scheduled job is suspended (paused). + concurrency (`bool` or `None`): + Whether multiple instances of this Job can run concurrently. + status (`ScheduledJobStatus` or `None`): + Status of the scheduled Job. + owner: (`JobOwner` or `None`): + Owner of the scheduled Job, e.g. `JobOwner(id="5e9ecfc04957053f60648a3e", name="lhoestq", type="user")` + job_spec: (`JobSpec` or `None`): + Specifications of the Job. + + Example: + + ```python + >>> from huggingface_hub import run_job + >>> scheduled_job = create_scheduled_job( + ... image="python:3.12", + ... command=["python", "-c", "print('Hello from the cloud!')"], + ... schedule="hourly", + ... ) + >>> scheduled_job.id + '687fb701029421ae5549d999' + >>> scheduled_job.status.next_job_run_at + datetime.datetime(2025, 7, 22, 17, 6, 25, 79000, tzinfo=datetime.timezone.utc) + ``` + """ + + id: str + created_at: Optional[datetime] + job_spec: JobSpec + schedule: Optional[str] + suspend: Optional[bool] + concurrency: Optional[bool] + status: ScheduledJobStatus + owner: JobOwner + + def __init__(self, **kwargs) -> None: + self.id = kwargs["id"] + created_at = kwargs.get("createdAt") or kwargs.get("created_at") + self.created_at = parse_datetime(created_at) if created_at else None + self.job_spec = JobSpec(**(kwargs.get("job_spec") or kwargs.get("jobSpec"))) + self.schedule = kwargs.get("schedule") + self.suspend = kwargs.get("suspend") + self.concurrency = kwargs.get("concurrency") + status = kwargs.get("status", {}) + self.status = ScheduledJobStatus( + last_job=status.get("last_job") or status.get("lastJob"), + next_job_run_at=status.get("next_job_run_at") or status.get("nextJobRunAt"), + ) + owner = kwargs.get("owner", {}) + self.owner = JobOwner(id=owner["id"], name=owner["name"], type=owner["type"]) diff --git a/src/huggingface_hub/cli/jobs.py b/src/huggingface_hub/cli/jobs.py index bd88648c79..b4d99f13f3 100644 --- a/src/huggingface_hub/cli/jobs.py +++ b/src/huggingface_hub/cli/jobs.py @@ -68,6 +68,7 @@ def register_subcommand(parser: _SubParsersAction): RunCommand.register_subcommand(jobs_subparsers) CancelCommand.register_subcommand(jobs_subparsers) UvCommand.register_subcommand(jobs_subparsers) + ScheduledJobsCommands.register_subcommand(jobs_subparsers) class RunCommand(BaseHuggingfaceCLICommand): @@ -299,7 +300,7 @@ def run(self) -> None: command_str = " ".join(command) if command else "N/A" # Extract creation time - created_at = job.created_at or "N/A" + created_at = job.created_at.strftime("%Y-%m-%d %H:%M:%S") if job.created_at else "N/A" # Create a dict with all job properties for filtering job_properties = { @@ -548,3 +549,544 @@ def _get_extended_environ() -> Dict[str, str]: if (token := get_token()) is not None: extended_environ["HF_TOKEN"] = token return extended_environ + + +class ScheduledJobsCommands(BaseHuggingfaceCLICommand): + @staticmethod + def register_subcommand(parser: _SubParsersAction): + scheduled_jobs_parser = parser.add_parser("scheduled", help="Create and manage scheduled Jobs on the Hub.") + scheduled_jobs_subparsers = scheduled_jobs_parser.add_subparsers( + help="huggingface.co scheduled jobs related commands" + ) + + # Show help if no subcommand is provided + scheduled_jobs_parser.set_defaults(func=lambda args: scheduled_jobs_subparsers.print_help()) + + # Register commands + ScheduledRunCommand.register_subcommand(scheduled_jobs_subparsers) + ScheduledPsCommand.register_subcommand(scheduled_jobs_subparsers) + ScheduledInspectCommand.register_subcommand(scheduled_jobs_subparsers) + ScheduledDeleteCommand.register_subcommand(scheduled_jobs_subparsers) + ScheduledSuspendCommand.register_subcommand(scheduled_jobs_subparsers) + ScheduledResumeCommand.register_subcommand(scheduled_jobs_subparsers) + ScheduledUvCommand.register_subcommand(scheduled_jobs_subparsers) + + +class ScheduledRunCommand(BaseHuggingfaceCLICommand): + @staticmethod + def register_subcommand(parser: _SubParsersAction) -> None: + run_parser = parser.add_parser("run", help="Schedule a Job") + run_parser.add_argument( + "schedule", + type=str, + help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.", + ) + run_parser.add_argument("image", type=str, help="The Docker image to use.") + run_parser.add_argument( + "--suspend", + action="store_true", + help="Suspend (pause) the scheduled Job", + ) + run_parser.add_argument( + "--concurrency", + action="store_true", + help="Allow multiple instances of this Job to run concurrently", + ) + run_parser.add_argument("-e", "--env", action="append", help="Set environment variables. E.g. --env ENV=value") + run_parser.add_argument( + "-s", + "--secrets", + action="append", + help=( + "Set secret environment variables. E.g. --secrets SECRET=value " + "or `--secrets HF_TOKEN` to pass your Hugging Face token." + ), + ) + run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") + run_parser.add_argument("--secrets-file", type=str, help="Read in a file of secret environment variables.") + run_parser.add_argument( + "--flavor", + type=str, + help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", + ) + run_parser.add_argument( + "--timeout", + type=str, + help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).", + ) + run_parser.add_argument( + "--namespace", + type=str, + help="The namespace where the scheduled Job will be created. Defaults to the current user's namespace.", + ) + run_parser.add_argument( + "--token", + type=str, + help="A User Access Token generated from https://huggingface.co/settings/tokens", + ) + run_parser.add_argument("command", nargs="...", help="The command to run.") + run_parser.set_defaults(func=ScheduledRunCommand) + + def __init__(self, args: Namespace) -> None: + self.schedule: str = args.schedule + self.image: str = args.image + self.command: List[str] = args.command + self.suspend: bool = args.suspend + self.concurrency: bool = args.concurrency + self.env: dict[str, Optional[str]] = {} + if args.env_file: + self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) + for env_value in args.env or []: + self.env.update(load_dotenv(env_value, environ=os.environ.copy())) + self.secrets: dict[str, Optional[str]] = {} + extended_environ = _get_extended_environ() + if args.secrets_file: + self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) + for secret in args.secrets or []: + self.secrets.update(load_dotenv(secret, environ=extended_environ)) + self.flavor: Optional[SpaceHardware] = args.flavor + self.timeout: Optional[str] = args.timeout + self.namespace: Optional[str] = args.namespace + self.token: Optional[str] = args.token + + def run(self) -> None: + api = HfApi(token=self.token) + scheduled_job = api.schedule_job( + image=self.image, + command=self.command, + schedule=self.schedule, + suspend=self.suspend, + concurrency=self.concurrency, + env=self.env, + secrets=self.secrets, + flavor=self.flavor, + timeout=self.timeout, + namespace=self.namespace, + ) + # Always print the scheduled job ID to the user + print(f"Scheduled Job created with ID: {scheduled_job.id}") + + +class ScheduledPsCommand(BaseHuggingfaceCLICommand): + @staticmethod + def register_subcommand(parser: _SubParsersAction) -> None: + run_parser = parser.add_parser("ps", help="List scheduled Jobs") + run_parser.add_argument( + "-a", + "--all", + action="store_true", + help="Show all scheduled Jobs (default hides suspended)", + ) + run_parser.add_argument( + "--namespace", + type=str, + help="The namespace from where it lists the jobs. Defaults to the current user's namespace.", + ) + run_parser.add_argument( + "--token", + type=str, + help="A User Access Token generated from https://huggingface.co/settings/tokens", + ) + # Add Docker-style filtering argument + run_parser.add_argument( + "-f", + "--filter", + action="append", + default=[], + help="Filter output based on conditions provided (format: key=value)", + ) + # Add option to format output + run_parser.add_argument( + "--format", + type=str, + help="Format output using a custom template", + ) + run_parser.set_defaults(func=ScheduledPsCommand) + + def __init__(self, args: Namespace) -> None: + self.all: bool = args.all + self.namespace: Optional[str] = args.namespace + self.token: Optional[str] = args.token + self.format: Optional[str] = args.format + self.filters: Dict[str, str] = {} + + # Parse filter arguments (key=value pairs) + for f in args.filter: + if "=" in f: + key, value = f.split("=", 1) + self.filters[key.lower()] = value + else: + print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.") + + def run(self) -> None: + """ + Fetch and display scheduked job information for the current user. + Uses Docker-style filtering with -f/--filter flag and key=value pairs. + """ + try: + api = HfApi(token=self.token) + + # Fetch jobs data + scheduled_jobs = api.list_scheduled_jobs(namespace=self.namespace) + + # Define table headers + table_headers = [ + "ID", + "SCHEDULE", + "IMAGE/SPACE", + "COMMAND", + "LAST RUN", + "NEXT RUN", + "SUSPEND", + ] + + # Process jobs data + rows = [] + + for scheduled_job in scheduled_jobs: + # Extract job data for filtering + suspend = scheduled_job.suspend + + # Skip job if not all jobs should be shown and status doesn't match criteria + if not self.all and suspend: + continue + + # Extract job ID + scheduled_job_id = scheduled_job.id + + # Extract schedule + schedule = scheduled_job.schedule + + # Extract image or space information + image_or_space = scheduled_job.job_spec.docker_image or "N/A" + + # Extract and format command + command = scheduled_job.job_spec.command or [] + command_str = " ".join(command) if command else "N/A" + + # Extract status + last_job_at = ( + scheduled_job.status.last_job.at.strftime("%Y-%m-%d %H:%M:%S") + if scheduled_job.status.last_job + else "N/A" + ) + next_job_run_at = scheduled_job.status.next_job_run_at.strftime("%Y-%m-%d %H:%M:%S") + + # Create a dict with all job properties for filtering + job_properties = { + "id": scheduled_job_id, + "image": image_or_space, + "suspend": str(suspend), + "command": command_str, + } + + # Check if job matches all filters + if not self._matches_filters(job_properties): + continue + + # Create row + rows.append( + [ + scheduled_job_id, + schedule, + image_or_space, + command_str, + last_job_at, + next_job_run_at, + suspend, + ] + ) + + # Handle empty results + if not rows: + filters_msg = "" + if self.filters: + filters_msg = f" matching filters: {', '.join([f'{k}={v}' for k, v in self.filters.items()])}" + + print(f"No scheduled jobs found{filters_msg}") + return + + # Apply custom format if provided or use default tabular format + self._print_output(rows, table_headers) + + except requests.RequestException as e: + print(f"Error fetching scheduled jobs data: {e}") + except (KeyError, ValueError, TypeError) as e: + print(f"Error processing scheduled jobs data: {e}") + except Exception as e: + print(f"Unexpected error - {type(e).__name__}: {e}") + + def _matches_filters(self, job_properties: Dict[str, str]) -> bool: + """Check if scheduled job matches all specified filters.""" + for key, pattern in self.filters.items(): + # Check if property exists + if key not in job_properties: + return False + + # Support pattern matching with wildcards + if "*" in pattern or "?" in pattern: + # Convert glob pattern to regex + regex_pattern = pattern.replace("*", ".*").replace("?", ".") + if not re.search(f"^{regex_pattern}$", job_properties[key], re.IGNORECASE): + return False + # Simple substring matching + elif pattern.lower() not in job_properties[key].lower(): + return False + + return True + + def _print_output(self, rows, headers): + """Print output according to the chosen format.""" + if self.format: + # Custom template formatting (simplified) + template = self.format + for row in rows: + line = template + for i, field in enumerate( + ["id", "schedule", "image", "command", "last_job_at", "next_job_run_at", "suspend"] + ): + placeholder = f"{{{{.{field}}}}}" + if placeholder in line: + line = line.replace(placeholder, str(row[i])) + print(line) + else: + # Default tabular format + print( + _tabulate( + rows, + headers=headers, + ) + ) + + +class ScheduledInspectCommand(BaseHuggingfaceCLICommand): + @staticmethod + def register_subcommand(parser: _SubParsersAction) -> None: + run_parser = parser.add_parser("inspect", help="Display detailed information on one or more scheduled Jobs") + run_parser.add_argument( + "--namespace", + type=str, + help="The namespace where the scheduled job is. Defaults to the current user's namespace.", + ) + run_parser.add_argument( + "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" + ) + run_parser.add_argument("scheduled_job_ids", nargs="...", help="The scheduled jobs to inspect") + run_parser.set_defaults(func=ScheduledInspectCommand) + + def __init__(self, args: Namespace) -> None: + self.namespace: Optional[str] = args.namespace + self.token: Optional[str] = args.token + self.scheduled_job_ids: List[str] = args.scheduled_job_ids + + def run(self) -> None: + api = HfApi(token=self.token) + scheduled_jobs = [ + api.inspect_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=self.namespace) + for scheduled_job_id in self.scheduled_job_ids + ] + print(json.dumps([asdict(scheduled_job) for scheduled_job in scheduled_jobs], indent=4, default=str)) + + +class ScheduledDeleteCommand(BaseHuggingfaceCLICommand): + @staticmethod + def register_subcommand(parser: _SubParsersAction) -> None: + run_parser = parser.add_parser("delete", help="Delete a scheduled Job") + run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID") + run_parser.add_argument( + "--namespace", + type=str, + help="The namespace where the scheduled job is. Defaults to the current user's namespace.", + ) + run_parser.add_argument( + "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" + ) + run_parser.set_defaults(func=ScheduledDeleteCommand) + + def __init__(self, args: Namespace) -> None: + self.scheduled_job_id: str = args.scheduled_job_id + self.namespace = args.namespace + self.token: Optional[str] = args.token + + def run(self) -> None: + api = HfApi(token=self.token) + api.delete_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace) + + +class ScheduledSuspendCommand(BaseHuggingfaceCLICommand): + @staticmethod + def register_subcommand(parser: _SubParsersAction) -> None: + run_parser = parser.add_parser("suspend", help="Suspend (pause) a scheduled Job") + run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID") + run_parser.add_argument( + "--namespace", + type=str, + help="The namespace where the scheduled job is. Defaults to the current user's namespace.", + ) + run_parser.add_argument( + "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" + ) + run_parser.set_defaults(func=ScheduledSuspendCommand) + + def __init__(self, args: Namespace) -> None: + self.scheduled_job_id: str = args.scheduled_job_id + self.namespace = args.namespace + self.token: Optional[str] = args.token + + def run(self) -> None: + api = HfApi(token=self.token) + api.suspend_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace) + + +class ScheduledResumeCommand(BaseHuggingfaceCLICommand): + @staticmethod + def register_subcommand(parser: _SubParsersAction) -> None: + run_parser = parser.add_parser("resume", help="Resume (unpause) a scheduled Job") + run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID") + run_parser.add_argument( + "--namespace", + type=str, + help="The namespace where the scheduled job is. Defaults to the current user's namespace.", + ) + run_parser.add_argument( + "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" + ) + run_parser.set_defaults(func=ScheduledResumeCommand) + + def __init__(self, args: Namespace) -> None: + self.scheduled_job_id: str = args.scheduled_job_id + self.namespace = args.namespace + self.token: Optional[str] = args.token + + def run(self) -> None: + api = HfApi(token=self.token) + api.resume_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace) + + +class ScheduledUvCommand(BaseHuggingfaceCLICommand): + """Schedule UV scripts on Hugging Face infrastructure.""" + + @staticmethod + def register_subcommand(parser): + """Register UV run subcommand.""" + uv_parser = parser.add_parser( + "uv", + help="Schedule UV scripts (Python with inline dependencies) on HF infrastructure", + ) + + subparsers = uv_parser.add_subparsers(dest="uv_command", help="UV commands", required=True) + + # Run command only + run_parser = subparsers.add_parser( + "run", + help="Run a UV script (local file or URL) on HF infrastructure", + ) + run_parser.add_argument( + "schedule", + type=str, + help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.", + ) + run_parser.add_argument("script", help="UV script to run (local file or URL)") + run_parser.add_argument("script_args", nargs="...", help="Arguments for the script", default=[]) + run_parser.add_argument( + "--suspend", + action="store_true", + help="Suspend (pause) the scheduled Job", + ) + run_parser.add_argument( + "--concurrency", + action="store_true", + help="Allow multiple instances of this Job to run concurrently", + ) + run_parser.add_argument("--image", type=str, help="Use a custom Docker image with `uv` installed.") + run_parser.add_argument( + "--repo", + help="Repository name for the script (creates ephemeral if not specified)", + ) + run_parser.add_argument( + "--flavor", + type=str, + help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", + ) + run_parser.add_argument("-e", "--env", action="append", help="Environment variables") + run_parser.add_argument( + "-s", + "--secrets", + action="append", + help=( + "Set secret environment variables. E.g. --secrets SECRET=value " + "or `--secrets HF_TOKEN` to pass your Hugging Face token." + ), + ) + run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") + run_parser.add_argument( + "--secrets-file", + type=str, + help="Read in a file of secret environment variables.", + ) + run_parser.add_argument("--timeout", type=str, help="Max duration (e.g., 30s, 5m, 1h)") + run_parser.add_argument("-d", "--detach", action="store_true", help="Run in background") + run_parser.add_argument( + "--namespace", + type=str, + help="The namespace where the Job will be created. Defaults to the current user's namespace.", + ) + run_parser.add_argument("--token", type=str, help="HF token") + # UV options + run_parser.add_argument("--with", action="append", help="Run with the given packages installed", dest="with_") + run_parser.add_argument( + "-p", "--python", type=str, help="The Python interpreter to use for the run environment" + ) + run_parser.set_defaults(func=ScheduledUvCommand) + + def __init__(self, args: Namespace) -> None: + """Initialize the command with parsed arguments.""" + self.schedule: str = args.schedule + self.script = args.script + self.script_args = args.script_args + self.suspend: bool = args.suspend + self.concurrency: bool = args.concurrency + self.dependencies = args.with_ + self.python = args.python + self.image = args.image + self.env: dict[str, Optional[str]] = {} + if args.env_file: + self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) + for env_value in args.env or []: + self.env.update(load_dotenv(env_value, environ=os.environ.copy())) + self.secrets: dict[str, Optional[str]] = {} + extended_environ = _get_extended_environ() + if args.secrets_file: + self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) + for secret in args.secrets or []: + self.secrets.update(load_dotenv(secret, environ=extended_environ)) + self.flavor: Optional[SpaceHardware] = args.flavor + self.timeout: Optional[str] = args.timeout + self.detach: bool = args.detach + self.namespace: Optional[str] = args.namespace + self.token: Optional[str] = args.token + self._repo = args.repo + + def run(self) -> None: + """Schedule UV command.""" + logging.set_verbosity(logging.INFO) + api = HfApi(token=self.token) + job = api.schedule_uv_job( + script=self.script, + script_args=self.script_args, + schedule=self.schedule, + suspend=self.suspend, + concurrency=self.concurrency, + dependencies=self.dependencies, + python=self.python, + image=self.image, + env=self.env, + secrets=self.secrets, + flavor=self.flavor, + timeout=self.timeout, + namespace=self.namespace, + _repo=self._repo, + ) + + # Always print the job ID to the user + print(f"Scheduled Job created with ID: {job.id}") diff --git a/src/huggingface_hub/hf_api.py b/src/huggingface_hub/hf_api.py index ec41ae26f2..973f12937c 100644 --- a/src/huggingface_hub/hf_api.py +++ b/src/huggingface_hub/hf_api.py @@ -67,7 +67,7 @@ _warn_on_overwriting_operations, ) from ._inference_endpoints import InferenceEndpoint, InferenceEndpointType -from ._jobs_api import JobInfo +from ._jobs_api import JobInfo, ScheduledJobInfo from ._space_api import SpaceHardware, SpaceRuntime, SpaceStorage, SpaceVariable from ._upload_large_folder import upload_large_folder_internal from .community import ( @@ -10087,7 +10087,7 @@ def fetch_job_logs( ```python >>> from huggingface_hub import fetch_job_logs, run_job >>> job = run_job("python:3.12", ["python", "-c" ,"print('Hello from HF compute!')"]) - >>> for log in fetch_job_logs(job.job_id): + >>> for log in fetch_job_logs(job.id): ... print(log) Hello from HF compute! ``` @@ -10211,7 +10211,7 @@ def inspect_job( ```python >>> from huggingface_hub import inspect_job, run_job >>> job = run_job("python:3.12", ["python", "-c" ,"print('Hello from HF compute!')"]) - >>> inspect_job(job.job_id) + >>> inspect_job(job.id) JobInfo( id='68780d00bbe36d38803f645f', created_at=datetime.datetime(2025, 7, 16, 20, 35, 12, 808000, tzinfo=datetime.timezone.utc), @@ -10375,14 +10375,14 @@ def run_uv_job( with open(script_path, "r") as f: script_content = f.read() - self.upload_file( + commit_hash = self.upload_file( path_or_fileobj=script_content.encode(), path_in_repo=filename, repo_id=repo_id, repo_type="dataset", - ) + ).oid - script_url = f"https://huggingface.co/datasets/{repo_id}/resolve/main/{filename}" + script_url = f"https://huggingface.co/datasets/{repo_id}/resolve/{commit_hash}/{filename}" repo_url = f"https://huggingface.co/datasets/{repo_id}" logger.debug(f"✓ Script uploaded to: {repo_url}/blob/main/{filename}") @@ -10453,6 +10453,518 @@ def run_uv_job( token=token, ) + def schedule_job( + self, + *, + image: str, + command: List[str], + schedule: str, + suspend: bool = False, + concurrency: bool = False, + env: Optional[Dict[str, Any]] = None, + secrets: Optional[Dict[str, Any]] = None, + flavor: Optional[SpaceHardware] = None, + timeout: Optional[Union[int, float, str]] = None, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + ) -> ScheduledJobInfo: + """ + Create scheduled compute Jobs on Hugging Face infrastructure. + + Args: + image (`str`): + The Docker image to use. + Examples: `"ubuntu"`, `"python:3.12"`, `"pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel"`. + Example with an image from a Space: `"hf.co/spaces/lhoestq/duckdb"`. + + command (`List[str]`): + The command to run. Example: `["echo", "hello"]`. + + schedule (`str`): + One of "annually", "yearly", "monthly", "weekly", "daily", "hourly", or a + CRON schedule expression (e.g., '0 9 * * 1' for 9 AM every Monday). + + suspend (`bool`, *optional*): + If True, the scheduled Job is suspended (paused). + + concurrency (`bool`, *optional*): + If True, multiple instances of this Job can run concurrently. + + env (`Dict[str, Any]`, *optional*): + Defines the environment variables for the Job. + + secrets (`Dict[str, Any]`, *optional*): + Defines the secret environment variables for the Job. + + flavor (`str`, *optional*): + Flavor for the hardware, as in Hugging Face Spaces. See [`SpaceHardware`] for possible values. + Defaults to `"cpu-basic"`. + + timeout (`Union[int, float, str]`, *optional*): + Max duration for the Job: int/float with s (seconds, default), m (minutes), h (hours) or d (days). + Example: `300` or `"5m"` for 5 minutes. + + namespace (`str`, *optional*): + The namespace where the Job will be created. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + + Example: + Create your first scheduled Job: + + ```python + >>> from huggingface_hub import schedule_job + >>> schedule_job("python:3.12", ["python", "-c" ,"print('Hello from HF compute!')"], schedule="hourly") + ``` + + Create a scheduled GPU Job: + + ```python + >>> from huggingface_hub import schedule_job + >>> image = "pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel" + >>> command = ["python", "-c", "import torch; print(f"This code ran with the following GPU: {torch.cuda.get_device_name()}")"] + >>> schedule_job(image, command, flavor="a10g-small", schedule="hourly") + ``` + + """ + if flavor is None: + flavor = SpaceHardware.CPU_BASIC + + # prepare job spec to send to HF Jobs API + job_spec: Dict[str, Any] = { + "command": command, + "arguments": [], + "env": env or {}, + "flavor": flavor, + } + # secrets are optional + if secrets: + job_spec["secrets"] = secrets + # timeout is optional + if timeout: + time_units_factors = {"s": 1, "m": 60, "h": 3600, "d": 3600 * 24} + if isinstance(timeout, str) and timeout[-1] in time_units_factors: + job_spec["timeoutSeconds"] = int(float(timeout[:-1]) * time_units_factors[timeout[-1]]) + else: + job_spec["timeoutSeconds"] = int(timeout) + # input is either from docker hub or from HF spaces + for prefix in ( + "https://huggingface.co/spaces/", + "https://hf.co/spaces/", + "huggingface.co/spaces/", + "hf.co/spaces/", + ): + if image.startswith(prefix): + job_spec["spaceId"] = image[len(prefix) :] + break + else: + job_spec["dockerImage"] = image + if namespace is None: + namespace = self.whoami(token=token)["name"] + + # prepare payload to send to HF Jobs API + input_json: Dict[str, Any] = { + "jobSpec": job_spec, + "schedule": schedule, + "suspend": suspend, + "concurrency": concurrency, + } + response = get_session().post( + f"https://huggingface.co/api/scheduled-jobs/{namespace}", + json=input_json, + headers=self._build_hf_headers(token=token), + ) + hf_raise_for_status(response) + scheduled_job_info = response.json() + return ScheduledJobInfo(**scheduled_job_info) + + def list_scheduled_jobs( + self, + *, + timeout: Optional[int] = None, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + ) -> List[ScheduledJobInfo]: + """ + List scheduled compute Jobs on Hugging Face infrastructure. + + Args: + timeout (`float`, *optional*): + Whether to set a timeout for the request to the Hub. + + namespace (`str`, *optional*): + The namespace from where it lists the jobs. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + """ + if namespace is None: + namespace = whoami(token=token)["name"] + response = get_session().get( + f"{self.endpoint}/api/scheduled-jobs/{namespace}", + headers=self._build_hf_headers(token=token), + timeout=timeout, + ) + response.raise_for_status() + return [ScheduledJobInfo(**scheduled_job_info) for scheduled_job_info in response.json()] + + def inspect_scheduled_job( + self, + *, + scheduled_job_id: str, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + ) -> ScheduledJobInfo: + """ + Inspect a scheduled compute Job on Hugging Face infrastructure. + + Args: + scheduled_job_id (`str`): + ID of the scheduled Job. + + namespace (`str`, *optional*): + The namespace where the scheduled Job is. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + + Example: + + ```python + >>> from huggingface_hub import inspect_job, run_scheduled_job + >>> scheduled_job = run_scheduled_job("python:3.12", ["python", "-c" ,"print('Hello from HF compute!')"], schedule="hourly") + >>> inspect_scheduled_job(scheduled_job.id) + ``` + """ + if namespace is None: + namespace = self.whoami(token=token)["name"] + response = get_session().get( + f"{self.endpoint}/api/scheduled-jobs/{namespace}/{scheduled_job_id}", + headers=self._build_hf_headers(token=token), + ) + response.raise_for_status() + return ScheduledJobInfo(**response.json()) + + def delete_scheduled_job( + self, + *, + scheduled_job_id: str, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + ) -> None: + """ + Delete a scheduled compute Job on Hugging Face infrastructure. + + Args: + scheduled_job_id (`str`): + ID of the scheduled Job. + + namespace (`str`, *optional*): + The namespace where the scheduled Job is. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + """ + if namespace is None: + namespace = self.whoami(token=token)["name"] + get_session().delete( + f"{self.endpoint}/api/scheduled-jobs/{namespace}/{scheduled_job_id}", + headers=self._build_hf_headers(token=token), + ).raise_for_status() + + def suspend_scheduled_job( + self, + *, + scheduled_job_id: str, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + ) -> None: + """ + Suspend (pause) a scheduled compute Job on Hugging Face infrastructure. + + Args: + scheduled_job_id (`str`): + ID of the scheduled Job. + + namespace (`str`, *optional*): + The namespace where the scheduled Job is. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + """ + if namespace is None: + namespace = self.whoami(token=token)["name"] + get_session().post( + f"{self.endpoint}/api/scheduled-jobs/{namespace}/{scheduled_job_id}/suspend", + headers=self._build_hf_headers(token=token), + ).raise_for_status() + + def resume_scheduled_job( + self, + *, + scheduled_job_id: str, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + ) -> None: + """ + Resume (unpause) a scheduled compute Job on Hugging Face infrastructure. + + Args: + scheduled_job_id (`str`): + ID of the scheduled Job. + + namespace (`str`, *optional*): + The namespace where the scheduled Job is. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + """ + if namespace is None: + namespace = self.whoami(token=token)["name"] + get_session().post( + f"{self.endpoint}/api/scheduled-jobs/{namespace}/{scheduled_job_id}/resume", + headers=self._build_hf_headers(token=token), + ).raise_for_status() + + @experimental + def schedule_uv_job( + self, + script: str, + *, + script_args: Optional[List[str]] = None, + schedule: str, + suspend: bool = False, + concurrency: bool = False, + dependencies: Optional[List[str]] = None, + python: Optional[str] = None, + image: Optional[str] = None, + env: Optional[Dict[str, Any]] = None, + secrets: Optional[Dict[str, Any]] = None, + flavor: Optional[SpaceHardware] = None, + timeout: Optional[Union[int, float, str]] = None, + namespace: Optional[str] = None, + token: Union[bool, str, None] = None, + _repo: Optional[str] = None, + ) -> ScheduledJobInfo: + """ + Run a UV script Job on Hugging Face infrastructure. + + Args: + script (`str`): + Path or URL of the UV script, or a command. + + script_args (`List[str]`, *optional*) + Arguments to pass to the script, or a command. + + schedule (`str`): + One of "annually", "yearly", "monthly", "weekly", "daily", "hourly", or a + CRON schedule expression (e.g., '0 9 * * 1' for 9 AM every Monday). + + suspend (`bool`, *optional*): + If True, the scheduled Job is suspended (paused). + + concurrency (`bool`, *optional*): + If True, multiple instances of this Job can run concurrently. + + dependencies (`List[str]`, *optional*) + Dependencies to use to run the UV script. + + python (`str`, *optional*) + Use a specific Python version. Default is 3.12. + + image (`str`, *optional*, defaults to "ghcr.io/astral-sh/uv:python3.12-bookworm"): + Use a custom Docker image with `uv` installed. + + env (`Dict[str, Any]`, *optional*): + Defines the environment variables for the Job. + + secrets (`Dict[str, Any]`, *optional*): + Defines the secret environment variables for the Job. + + flavor (`str`, *optional*): + Flavor for the hardware, as in Hugging Face Spaces. See [`SpaceHardware`] for possible values. + Defaults to `"cpu-basic"`. + + timeout (`Union[int, float, str]`, *optional*): + Max duration for the Job: int/float with s (seconds, default), m (minutes), h (hours) or d (days). + Example: `300` or `"5m"` for 5 minutes. + + namespace (`str`, *optional*): + The namespace where the Job will be created. Defaults to the current user's namespace. + + token `(Union[bool, str, None]`, *optional*): + A valid user access token. If not provided, the locally saved token will be used, which is the + recommended authentication method. Set to `False` to disable authentication. + Refer to: https://huggingface.co/docs/huggingface_hub/quick-start#authentication. + + Example: + + Schedule a script from a URL: + + ```python + >>> from huggingface_hub import schedule_uv_job + >>> script = "https://raw.githubusercontent.com/huggingface/trl/refs/heads/main/trl/scripts/sft.py" + >>> script_args = ["--model_name_or_path", "Qwen/Qwen2-0.5B", "--dataset_name", "trl-lib/Capybara", "--push_to_hub"] + >>> schedule_uv_job(script, script_args=script_args, dependencies=["trl"], flavor="a10g-small", schedule="weekly") + ``` + + Schedule a local script: + + ```python + >>> from huggingface_hub import schedule_uv_job + >>> script = "my_sft.py" + >>> script_args = ["--model_name_or_path", "Qwen/Qwen2-0.5B", "--dataset_name", "trl-lib/Capybara", "--push_to_hub"] + >>> schedule_uv_job(script, script_args=script_args, dependencies=["trl"], flavor="a10g-small", schedule="weekly") + ``` + + Schedule a command: + + ```python + >>> from huggingface_hub import schedule_uv_job + >>> script = "lighteval" + >>> script_args= ["endpoint", "inference-providers", "model_name=openai/gpt-oss-20b,provider=auto", "lighteval|gsm8k|0|0"] + >>> schedule_uv_job(script, script_args=script_args, dependencies=["lighteval"], flavor="a10g-small", schedule="weekly") + ``` + """ + image = image or "ghcr.io/astral-sh/uv:python3.12-bookworm" + env = env or {} + secrets = secrets or {} + + # Build command + uv_args = [] + if dependencies: + for dependency in dependencies: + uv_args += ["--with", dependency] + if python: + uv_args += ["--python", python] + script_args = script_args or [] + + if namespace is None: + namespace = self.whoami(token=token)["name"] + + if script.startswith("http://") or script.startswith("https://") or not script.endswith(".py"): + # Direct URL execution or command - no upload needed + command = ["uv", "run"] + uv_args + [script] + script_args + else: + # Local file - upload to HF + script_path = Path(script) + filename = script_path.name + # Parse repo + if _repo: + repo_id = _repo + if "/" not in repo_id: + repo_id = f"{namespace}/{repo_id}" + repo_id = _repo + else: + repo_id = f"{namespace}/hf-cli-jobs-uv-run-scripts" + + # Create repo if needed + try: + self.repo_info(repo_id, repo_type="dataset") + logger.debug(f"Using existing repository: {repo_id}") + except RepositoryNotFoundError: + logger.info(f"Creating repository: {repo_id}") + create_repo(repo_id, repo_type="dataset", private=True, exist_ok=True) + + # Upload script + logger.info(f"Uploading {script_path.name} to {repo_id}...") + with open(script_path, "r") as f: + script_content = f.read() + + commit_hash = self.upload_file( + path_or_fileobj=script_content.encode(), + path_in_repo=filename, + repo_id=repo_id, + repo_type="dataset", + ).oid + + script_url = f"https://huggingface.co/datasets/{repo_id}/resolve/{commit_hash}/{filename}" + repo_url = f"https://huggingface.co/datasets/{repo_id}" + + logger.debug(f"✓ Script uploaded to: {repo_url}/blob/main/{filename}") + + # Create and upload minimal README + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") + readme_content = dedent( + f""" + --- + tags: + - hf-cli-jobs-uv-script + - ephemeral + viewer: false + --- + + # UV Script: {filename} + + Executed via `hf jobs uv run` on {timestamp} + + ## Run this script + + ```bash + hf jobs uv run {filename} + ``` + + --- + *Created with [hf jobs](https://huggingface.co/docs/huggingface_hub/main/en/guides/jobs)* + """ + ) + self.upload_file( + path_or_fileobj=readme_content.encode(), + path_in_repo="README.md", + repo_id=repo_id, + repo_type="dataset", + ) + + secrets["UV_SCRIPT_HF_TOKEN"] = token or self.token or get_token() + env["UV_SCRIPT_URL"] = script_url + + pre_command = ( + dedent( + """ + import urllib.request + import os + from pathlib import Path + o = urllib.request.build_opener() + o.addheaders = [("Authorization", "Bearer " + os.environ["UV_SCRIPT_HF_TOKEN"])] + Path("/tmp/script.py").write_bytes(o.open(os.environ["UV_SCRIPT_URL"]).read()) + """ + ) + .strip() + .replace('"', r"\"") + .split("\n") + ) + pre_command = ["python", "-c", '"' + "; ".join(pre_command) + '"'] + command = ["uv", "run"] + uv_args + ["/tmp/script.py"] + script_args + command = ["bash", "-c", " ".join(pre_command) + " && " + " ".join(command)] + + # Create RunCommand args + return self.schedule_job( + image=image, + command=command, + schedule=schedule, + suspend=suspend, + concurrency=concurrency, + env=env, + secrets=secrets, + flavor=flavor, + timeout=timeout, + namespace=namespace, + token=token, + ) + def _parse_revision_from_pr_url(pr_url: str) -> str: """Safely parse revision number from a PR url. @@ -10617,3 +11129,10 @@ def _parse_revision_from_pr_url(pr_url: str) -> str: inspect_job = api.inspect_job cancel_job = api.cancel_job run_uv_job = api.run_uv_job +schedule_job = api.schedule_job +list_scheduled_jobs = api.list_scheduled_jobs +inspect_scheduled_job = api.inspect_scheduled_job +delete_scheduled_job = api.delete_scheduled_job +suspend_scheduled_job = api.suspend_scheduled_job +resume_scheduled_job = api.suspend_scheduled_job +schedule_uv_job = api.schedule_uv_job