Skip to content

Commit a2f1180

Browse files
authored
Add result_backend_transport_options for Redis Sentinel support in Celery (#59498)
* Add result_backend_transport_options for Redis Sentinel support in Celery * Fix language check * Fix static checks * Added info
1 parent 22e75b5 commit a2f1180

File tree

4 files changed

+155
-0
lines changed

4 files changed

+155
-0
lines changed

providers/celery/provider.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,32 @@ config:
380380
sensitive: true
381381
example: '{"password": "password_for_redis_server"}'
382382
default: ~
383+
celery_result_backend_transport_options:
384+
description: |
385+
This section is for specifying options which can be passed to the
386+
underlying celery result backend transport. This is particularly useful when using
387+
Redis Sentinel as the result backend. See:
388+
https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options
389+
options:
390+
master_name:
391+
description: |
392+
The name of the Redis Sentinel primary node to connect to.
393+
Required when using Redis Sentinel as the result backend.
394+
version_added: ~
395+
type: string
396+
example: "mymaster"
397+
default: ~
398+
sentinel_kwargs:
399+
description: |
400+
The sentinel_kwargs parameter allows passing additional options to the Sentinel client
401+
for the result backend. In a typical scenario where Redis Sentinel is used as the
402+
result backend and Redis servers are password-protected, the password needs to be
403+
passed through this parameter. Although its type is string, it is required to pass
404+
a string that conforms to the dictionary format.
405+
See:
406+
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
407+
version_added: ~
408+
type: string
409+
sensitive: true
410+
example: '{"password": "password_for_redis_server"}'
411+
default: ~

providers/celery/src/airflow/providers/celery/executors/default_celery.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ def _broker_supports_visibility_timeout(url):
7070
log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.")
7171
result_backend = f"db+{conf.get('database', 'SQL_ALCHEMY_CONN')}"
7272

73+
# Handle result backend transport options (for Redis Sentinel support)
74+
result_backend_transport_options: dict = conf.getsection("celery_result_backend_transport_options") or {}
75+
if "sentinel_kwargs" in result_backend_transport_options:
76+
try:
77+
result_sentinel_kwargs = json.loads(result_backend_transport_options["sentinel_kwargs"])
78+
if not isinstance(result_sentinel_kwargs, dict):
79+
raise ValueError
80+
result_backend_transport_options["sentinel_kwargs"] = result_sentinel_kwargs
81+
except Exception:
82+
raise AirflowException(
83+
"sentinel_kwargs in [celery_result_backend_transport_options] should be written "
84+
"in the correct dictionary format."
85+
)
86+
7387
extra_celery_config = conf.getjson("celery", "extra_celery_config", fallback={})
7488

7589
DEFAULT_CELERY_CONFIG = {
@@ -86,6 +100,7 @@ def _broker_supports_visibility_timeout(url):
86100
"celery", "broker_connection_retry_on_startup", fallback=True
87101
),
88102
"result_backend": result_backend,
103+
"result_backend_transport_options": result_backend_transport_options,
89104
"database_engine_options": conf.getjson(
90105
"celery", "result_backend_sqlalchemy_engine_options", fallback={}
91106
),

providers/celery/src/airflow/providers/celery/get_provider_info.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,5 +260,25 @@ def get_provider_info():
260260
},
261261
},
262262
},
263+
"celery_result_backend_transport_options": {
264+
"description": "This section is for specifying options which can be passed to the\nunderlying celery result backend transport. This is particularly useful when using\nRedis Sentinel as the result backend. See:\nhttps://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options\n",
265+
"options": {
266+
"master_name": {
267+
"description": "The name of the Redis Sentinel primary node to connect to.\nRequired when using Redis Sentinel as the result backend.\n",
268+
"version_added": None,
269+
"type": "string",
270+
"example": "mymaster",
271+
"default": None,
272+
},
273+
"sentinel_kwargs": {
274+
"description": "The sentinel_kwargs parameter allows passing additional options to the Sentinel client\nfor the result backend. In a typical scenario where Redis Sentinel is used as the\nresult backend and Redis servers are password-protected, the password needs to be\npassed through this parameter. Although its type is string, it is required to pass\na string that conforms to the dictionary format.\nSee:\nhttps://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration\n",
275+
"version_added": None,
276+
"type": "string",
277+
"sensitive": True,
278+
"example": '{"password": "password_for_redis_server"}',
279+
"default": None,
280+
},
281+
},
282+
},
263283
},
264284
}

providers/celery/tests/unit/celery/executors/test_celery_executor.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,3 +466,94 @@ def test_celery_extra_celery_config_loaded_from_string():
466466
# reload celery conf to apply the new config
467467
importlib.reload(default_celery)
468468
assert default_celery.DEFAULT_CELERY_CONFIG["worker_max_tasks_per_child"] == 10
469+
470+
471+
@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}'})
472+
def test_result_backend_sentinel_kwargs_loaded_from_string():
473+
"""Test that sentinel_kwargs for result backend transport options is correctly parsed."""
474+
import importlib
475+
476+
# reload celery conf to apply the new config
477+
importlib.reload(default_celery)
478+
assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG
479+
assert default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["sentinel_kwargs"] == {
480+
"password": "redis_password"
481+
}
482+
483+
484+
@conf_vars({("celery_result_backend_transport_options", "master_name"): "mymaster"})
485+
def test_result_backend_master_name_loaded():
486+
"""Test that master_name for result backend transport options is correctly loaded."""
487+
import importlib
488+
489+
# reload celery conf to apply the new config
490+
importlib.reload(default_celery)
491+
assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG
492+
assert (
493+
default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["master_name"] == "mymaster"
494+
)
495+
496+
497+
@conf_vars(
498+
{
499+
("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}',
500+
("celery_result_backend_transport_options", "master_name"): "mymaster",
501+
}
502+
)
503+
def test_result_backend_transport_options_with_multiple_options():
504+
"""Test that multiple result backend transport options are correctly loaded."""
505+
import importlib
506+
507+
# reload celery conf to apply the new config
508+
importlib.reload(default_celery)
509+
result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]
510+
assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_password"}
511+
assert result_backend_opts["master_name"] == "mymaster"
512+
513+
514+
@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): "invalid_json"})
515+
def test_result_backend_sentinel_kwargs_invalid_json():
516+
"""Test that invalid JSON in sentinel_kwargs raises an error."""
517+
import importlib
518+
519+
from airflow.providers.common.compat.sdk import AirflowException
520+
521+
with pytest.raises(
522+
AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"
523+
):
524+
importlib.reload(default_celery)
525+
526+
527+
@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '"not_a_dict"'})
528+
def test_result_backend_sentinel_kwargs_not_dict():
529+
"""Test that non-dict sentinel_kwargs raises an error."""
530+
import importlib
531+
532+
from airflow.providers.common.compat.sdk import AirflowException
533+
534+
with pytest.raises(
535+
AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"
536+
):
537+
importlib.reload(default_celery)
538+
539+
540+
@conf_vars(
541+
{
542+
("celery", "result_backend"): "sentinel://sentinel1:26379;sentinel://sentinel2:26379",
543+
("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_pass"}',
544+
("celery_result_backend_transport_options", "master_name"): "mymaster",
545+
}
546+
)
547+
def test_result_backend_sentinel_full_config():
548+
"""Test full Redis Sentinel configuration for result backend."""
549+
import importlib
550+
551+
# reload celery conf to apply the new config
552+
importlib.reload(default_celery)
553+
554+
assert default_celery.DEFAULT_CELERY_CONFIG["result_backend"] == (
555+
"sentinel://sentinel1:26379;sentinel://sentinel2:26379"
556+
)
557+
result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]
558+
assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_pass"}
559+
assert result_backend_opts["master_name"] == "mymaster"

0 commit comments

Comments
 (0)