Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions procrastinate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,49 @@ def configure_worker_parser(subparsers: argparse._SubParsersAction):
envvar_help="use 0/n/f or 1/y/t",
envvar_type=env_bool,
)

# Reload options (similar to uvicorn)
add_argument(
worker_parser,
"--reload",
action="store_true",
help="Enable auto-reload for development",
envvar="WORKER_RELOAD",
envvar_help="use 0/n/f or 1/y/t",
envvar_type=env_bool,
)
add_argument(
worker_parser,
"--reload-dir",
dest="reload_dirs",
action="append",
help="Set reload directories explicitly, instead of using current working directory. May be used multiple times",
envvar="WORKER_RELOAD_DIRS",
)
add_argument(
worker_parser,
"--reload-include",
dest="reload_includes",
action="append",
help="Set glob patterns to include while watching for files. Includes '*.py' by default. May be used multiple times",
envvar="WORKER_RELOAD_INCLUDES",
)
add_argument(
worker_parser,
"--reload-exclude",
dest="reload_excludes",
action="append",
help="Set glob patterns to exclude while watching for files. May be used multiple times",
envvar="WORKER_RELOAD_EXCLUDES",
)
add_argument(
worker_parser,
"--reload-delay",
type=float,
default=0.25,
help="Delay between previous and next check if application needs to be reloaded",
envvar="WORKER_RELOAD_DELAY",
)
add_argument(
worker_parser,
"--delete-jobs",
Expand Down Expand Up @@ -551,6 +594,11 @@ async def worker_(
Launch a worker, listening on the given queues (or all queues).
Values default to App.worker_defaults and then App.run_worker() defaults values.
"""
# Handle reload mode
if kwargs.get("reload"):
return run_worker_with_reload(app, **kwargs)

# Standard worker mode
queues = kwargs.get("queues")
print_stderr(
f"Launching a worker on {'all queues' if not queues else ', '.join(queues)}"
Expand Down Expand Up @@ -674,6 +722,61 @@ async def shell_(app: procrastinate.App, shell_command: list[str]):
await utils.sync_to_async(shell_obj.cmdloop)


def run_worker_with_reload(app: procrastinate.App, **kwargs) -> int:
"""
Run worker with auto-reload functionality.

This function switches from async context to sync and uses subprocess
to enable file watching and process restarting.
"""
# Import here to avoid import errors if watchfiles not installed
from .reloader import run_with_reload

# Extract reload configuration
reload_config = {
"reload_dirs": kwargs.get("reload_dirs"),
"reload_includes": kwargs.get("reload_includes"),
"reload_excludes": kwargs.get("reload_excludes"),
"reload_delay": kwargs.get("reload_delay", 0.25),
}

# Build command to run worker without reload
# We need to reconstruct the CLI command

# Start with base command
cmd = [sys.executable, "-m", "procrastinate"]

# Add app parameter if available
app_import_string = getattr(app, "_import_string", None)
if app_import_string:
cmd.extend(["--app", app_import_string])

# Add worker subcommand
cmd.append("worker")

# Add worker-specific arguments (excluding reload options)
worker_args = {
k: v
for k, v in kwargs.items()
if not k.startswith("reload") and k != "reload" and v is not None
}

for key, value in worker_args.items():
cli_key = key.replace("_", "-")
if isinstance(value, bool):
if value:
cmd.append(f"--{cli_key}")
elif isinstance(value, list):
for item in value:
cmd.extend([f"--{cli_key}", str(item)])
else:
cmd.extend([f"--{cli_key}", str(value)])

# Run with reload
print_stderr("πŸ”„ Starting Procrastinate worker with auto-reload enabled")
return run_with_reload(cmd, **reload_config)


def main():
if os.name == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
Expand Down
Loading
Loading