Skip to content

Commit 5e944e4

Browse files
Add queue and remove queue cli commands for EdgeExecutor (#53505)
* Add queue and remove queue cli commands for EdgeExecutor * Add queue cli commands to docs
1 parent 11a6361 commit 5e944e4

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

providers/edge3/docs/deployment.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,5 @@ instance. The commands are:
190190
- ``airflow edge remote-edge-worker-exit-maintenance``: Request a remote edge worker to exit maintenance mode
191191
- ``airflow edge shutdown-remote-edge-worker``: Shuts down a remote edge worker gracefully
192192
- ``airflow edge remove-remote-edge-worker``: Remove a worker instance from the cluster
193+
- ``airflow edge add-worker-queues``: Add queues to an edge worker
194+
- ``airflow edge remove-worker-queues``: Remove queues from an edge worker

providers/edge3/src/airflow/providers/edge3/cli/edge_command.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,48 @@ def remote_worker_request_shutdown(args) -> None:
351351
logger.info("Requested shutdown of Edge Worker host %s by %s.", args.edge_hostname, getuser())
352352

353353

354+
@cli_utils.action_cli(check_db=False)
355+
@providers_configuration_loaded
356+
def add_worker_queues(args) -> None:
357+
"""Add queues to an edge worker."""
358+
_check_valid_db_connection()
359+
_check_if_registered_edge_host(hostname=args.edge_hostname)
360+
from airflow.providers.edge3.models.edge_worker import add_worker_queues
361+
362+
queues = args.queues.split(",") if args.queues else []
363+
if not queues:
364+
raise SystemExit("Error: No queues specified to add.")
365+
366+
try:
367+
add_worker_queues(args.edge_hostname, queues)
368+
logger.info("Added queues %s to Edge Worker host %s by %s.", queues, args.edge_hostname, getuser())
369+
except TypeError as e:
370+
logger.error(str(e))
371+
raise SystemExit
372+
373+
374+
@cli_utils.action_cli(check_db=False)
375+
@providers_configuration_loaded
376+
def remove_worker_queues(args) -> None:
377+
"""Remove queues from an edge worker."""
378+
_check_valid_db_connection()
379+
_check_if_registered_edge_host(hostname=args.edge_hostname)
380+
from airflow.providers.edge3.models.edge_worker import remove_worker_queues
381+
382+
queues = args.queues.split(",") if args.queues else []
383+
if not queues:
384+
raise SystemExit("Error: No queues specified to remove.")
385+
386+
try:
387+
remove_worker_queues(args.edge_hostname, queues)
388+
logger.info(
389+
"Removed queues %s from Edge Worker host %s by %s.", queues, args.edge_hostname, getuser()
390+
)
391+
except TypeError as e:
392+
logger.error(str(e))
393+
raise SystemExit
394+
395+
354396
ARG_CONCURRENCY = Arg(
355397
("-c", "--concurrency"),
356398
type=int,
@@ -380,6 +422,11 @@ def remote_worker_request_shutdown(args) -> None:
380422
help="Maintenance comments to report reason. Required if enabling maintenance",
381423
required=True,
382424
)
425+
ARG_QUEUES_MANAGE = Arg(
426+
("-q", "--queues"),
427+
help="Comma delimited list of queues to add or remove.",
428+
required=True,
429+
)
383430
ARG_WAIT_MAINT = Arg(
384431
("-w", "--wait"),
385432
default=False,
@@ -516,4 +563,22 @@ def remote_worker_request_shutdown(args) -> None:
516563
func=remote_worker_request_shutdown,
517564
args=(ARG_REQUIRED_EDGE_HOSTNAME,),
518565
),
566+
ActionCommand(
567+
name="add-worker-queues",
568+
help=add_worker_queues.__doc__,
569+
func=add_worker_queues,
570+
args=(
571+
ARG_REQUIRED_EDGE_HOSTNAME,
572+
ARG_QUEUES_MANAGE,
573+
),
574+
),
575+
ActionCommand(
576+
name="remove-worker-queues",
577+
help=remove_worker_queues.__doc__,
578+
func=remove_worker_queues,
579+
args=(
580+
ARG_REQUIRED_EDGE_HOSTNAME,
581+
ARG_QUEUES_MANAGE,
582+
),
583+
),
519584
]

providers/edge3/src/airflow/providers/edge3/models/edge_worker.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,37 @@ def request_shutdown(worker_name: str, session: Session = NEW_SESSION) -> None:
283283
EdgeWorkerState.UNKNOWN,
284284
):
285285
worker.state = EdgeWorkerState.SHUTDOWN_REQUEST
286+
287+
288+
@provide_session
289+
def add_worker_queues(worker_name: str, queues: list[str], session: Session = NEW_SESSION) -> None:
290+
"""Add queues to an edge worker."""
291+
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
292+
worker: EdgeWorkerModel = session.scalar(query)
293+
if worker.state in (
294+
EdgeWorkerState.OFFLINE,
295+
EdgeWorkerState.OFFLINE_MAINTENANCE,
296+
EdgeWorkerState.UNKNOWN,
297+
):
298+
error_message = f"Cannot add queues to edge worker {worker_name} as it is in {worker.state} state!"
299+
logger.error(error_message)
300+
raise TypeError(error_message)
301+
worker.add_queues(queues)
302+
303+
304+
@provide_session
305+
def remove_worker_queues(worker_name: str, queues: list[str], session: Session = NEW_SESSION) -> None:
306+
"""Remove queues from an edge worker."""
307+
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
308+
worker: EdgeWorkerModel = session.scalar(query)
309+
if worker.state in (
310+
EdgeWorkerState.OFFLINE,
311+
EdgeWorkerState.OFFLINE_MAINTENANCE,
312+
EdgeWorkerState.UNKNOWN,
313+
):
314+
error_message = (
315+
f"Cannot remove queues from edge worker {worker_name} as it is in {worker.state} state!"
316+
)
317+
logger.error(error_message)
318+
raise TypeError(error_message)
319+
worker.remove_queues(queues)

0 commit comments

Comments
 (0)