diff --git a/airflow-core/docs/security/secrets/secrets-backend/index.rst b/airflow-core/docs/security/secrets/secrets-backend/index.rst index a4ae1547a103d..139de75e4537c 100644 --- a/airflow-core/docs/security/secrets/secrets-backend/index.rst +++ b/airflow-core/docs/security/secrets/secrets-backend/index.rst @@ -39,13 +39,15 @@ When looking up a connection/variable, by default Airflow will search environmen database second. If you enable an alternative secrets backend, it will be searched first, followed by environment variables, -then metastore. This search ordering is not configurable. Though, in some alternative secrets backend you might have +then metastore. Though, in some alternative secrets backend you might have the option to filter which connection/variable/config is searched in the secret backend. Please look at the documentation of the secret backend you are using to see if such option is available. On the other hand, if a workers secrets backend is defined, the order of lookup has higher priority for the workers secrets backend and then the secrets backend. +The secrets backends search ordering is also configurable via the configuration option ``[secrets]backends_order``. + .. warning:: When using environment variables or an alternative secrets backend to store secrets or variables, it is possible to create key collisions. @@ -64,12 +66,21 @@ The ``[secrets]`` section has the following options: [secrets] backend = backend_kwargs = + backends_order = Set ``backend`` to the fully qualified class name of the backend you want to enable. You can provide ``backend_kwargs`` with json and it will be passed as kwargs to the ``__init__`` method of your secrets backend. +``backends_order`` is a comma-separated list of secret backends. These backends will be used in the order they are specified. +Please note that the ``environment_variable`` and ``metastore`` are required values and cannot be removed +from the list. Supported values are: + +* ``custom``: Custom secret backend specified in the ``secrets[backend]`` configuration option. +* ``environment_variable``: Standard environment variable backend ``airflow.secrets.environment_variables.EnvironmentVariablesBackend``. +* ``metastore``: Standard metastore backend ``airflow.secrets.metastore.MetastoreBackend``. + If you want to check which secret backend is currently set, you can use ``airflow config get-value secrets backend`` command as in the example below. @@ -89,13 +100,21 @@ configure separate secrets backend for workers, you can do that using: [workers] secrets_backend = secrets_backend_kwargs = - + backends_order = Set ``secrets_backend`` to the fully qualified class name of the backend you want to enable. You can provide ``secrets_backend_kwargs`` with json and it will be passed as kwargs to the ``__init__`` method of your secrets backend for the workers. +``backends_order`` is a comma-separated list of secret backends for workers. These backends will be used in the order they are specified. +Please note that the ``environment_variable`` and ``execution_api`` are required values and cannot be removed +from the list. Supported values are: + +* ``custom``: Custom secret backend specified in the ``workers[secrets_backend]`` configuration option. +* ``environment_variable``: Standard environment variable backend ``airflow.secrets.environment_variables.EnvironmentVariablesBackend``. +* ``execution_api``: Standard execution_api backend ``airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend``. + If you want to check which secret backend is currently set, you can use ``airflow config get-value workers secrets_backend`` command as in the example below. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 31e2a351036d9..68f7e0bc5d4bf 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -94,6 +94,52 @@ paths: security: - OAuth2PasswordBearer: [] - HTTPBearer: [] + /ui/backends_order: + get: + tags: + - Config + summary: Get Backends Order Value + operationId: get_backends_order_value + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: accept + in: header + required: false + schema: + type: string + enum: + - application/json + - text/plain + - '*/*' + default: '*/*' + title: Accept + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Config' + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '406': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Acceptable + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /ui/connections/hook_meta: get: tags: @@ -1312,6 +1358,41 @@ components: - count title: CalendarTimeRangeResponse description: Represents a summary of DAG runs for a specific calendar time range. + Config: + properties: + sections: + items: + $ref: '#/components/schemas/ConfigSection' + type: array + title: Sections + additionalProperties: false + type: object + required: + - sections + title: Config + description: List of config sections with their options. + ConfigOption: + properties: + key: + type: string + title: Key + value: + anyOf: + - type: string + - prefixItems: + - type: string + - type: string + type: array + maxItems: 2 + minItems: 2 + title: Value + additionalProperties: false + type: object + required: + - key + - value + title: ConfigOption + description: Config option. ConfigResponse: properties: page_size: @@ -1370,6 +1451,23 @@ components: - theme title: ConfigResponse description: configuration serializer. + ConfigSection: + properties: + name: + type: string + title: Name + options: + items: + $ref: '#/components/schemas/ConfigOption' + type: array + title: Options + additionalProperties: false + type: object + required: + - name + - options + title: ConfigSection + description: Config Section Schema. ConnectionHookFieldBehavior: properties: hidden: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py index 28009f8343ee1..a3bc5ed261e3f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py @@ -19,13 +19,20 @@ from json import loads from typing import Any -from fastapi import Depends, status +from fastapi import Depends, HTTPException, status +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.common.types import UIAlert +from airflow.api_fastapi.core_api.datamodels.config import ( + Config, + ConfigOption, + ConfigSection, +) from airflow.api_fastapi.core_api.datamodels.ui.config import ConfigResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import requires_authenticated +from airflow.api_fastapi.core_api.services.public.config import _response_based_on_accept from airflow.configuration import conf from airflow.settings import DASHBOARD_UIALERTS from airflow.utils.log.log_reader import TaskLogReader @@ -66,3 +73,32 @@ def get_configs() -> ConfigResponse: config.update({key: value for key, value in additional_config.items()}) return ConfigResponse.model_validate(config) + + +@config_router.get( + "/backends_order", + responses={ + **create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + status.HTTP_406_NOT_ACCEPTABLE, + ] + ), + }, + response_model=Config, + dependencies=[Depends(requires_authenticated())], +) +def get_backends_order_value( + accept: HeaderAcceptJsonOrText, +): + section, option = "secrets", "backends_order" + if not conf.has_option(section, option): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Option [{section}/{option}] not found.", + ) + + value = conf.get(section, option) + + config = Config(sections=[ConfigSection(name=section, options=[ConfigOption(key=option, value=value)])]) + return _response_based_on_accept(accept, config) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index dac144f54cbf0..0f7b0efeac1ce 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1323,6 +1323,20 @@ secrets: sensitive: true example: ~ default: "" + backends_order: + description: | + Comma-separated list of secret backends. These backends will be used in the order they are specified. + Please note that the `environment_variable` and `metastore` are required values and cannot be removed + from the list. Supported values are: + + * ``custom``: Custom secret backend specified in the ``secrets[backend]`` configuration option. + * ``environment_variable``: Standard environment variable backend + ``airflow.secrets.environment_variables.EnvironmentVariablesBackend``. + * ``metastore``: Standard metastore backend ``airflow.secrets.metastore.MetastoreBackend``. + version_added: 3.2.0 + type: string + example: ~ + default: "custom,environment_variable,metastore" use_cache: description: | .. note:: |experimental| @@ -1623,6 +1637,22 @@ workers: sensitive: true example: ~ default: "" + backends_order: + description: | + Comma-separated list of secret backends for workers. These backends will be used in the order they are + specified. Please note that the ``environment_variable`` and ``execution_api`` are required values and + cannot be removed from the list. Supported values are: + + * ``custom``: Custom secret backend specified in the ``workers[secrets_backend]`` configuration + option. + * ``environment_variable``: Standard environment variable backend + ``airflow.secrets.environment_variables.EnvironmentVariablesBackend``. + * ``execution_api``: Standard execution_api backend + ``airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend``. + version_added: 3.2.0 + type: string + example: ~ + default: "custom,environment_variable,execution_api" min_heartbeat_interval: description: | The minimum interval (in seconds) at which the worker checks the task instance's diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 7bc34bec47a34..d892752d14bde 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -31,6 +31,7 @@ from collections.abc import Callable from configparser import ConfigParser from copy import deepcopy +from enum import Enum from inspect import ismodule from io import StringIO from re import Pattern @@ -46,7 +47,6 @@ ) from airflow._shared.module_loading import import_string from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning -from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH from airflow.task.weight_rule import WeightRule from airflow.utils import yaml @@ -837,7 +837,7 @@ def make_group_other_inaccessible(file_path: str): def ensure_secrets_loaded( - default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, + default_backends: list[str] | None = None, ) -> list[BaseSecretsBackend]: """ Ensure that all secrets backends are loaded. @@ -846,9 +846,8 @@ def ensure_secrets_loaded( """ # Check if the secrets_backend_list contains only 2 default backends. - # Check if we are loading the backends for worker too by checking if the default_backends is equal - # to DEFAULT_SECRETS_SEARCH_PATH. - if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: + # Check if we are loading the backends for worker too by checking if the default_backends is not None + if len(secrets_backend_list) == 2 or default_backends is not None: return initialize_secrets_backends(default_backends=default_backends) return secrets_backend_list @@ -864,8 +863,25 @@ def get_custom_secret_backend(worker_mode: bool = False) -> BaseSecretsBackend | return conf._get_custom_secret_backend(worker_mode=worker_mode) +def get_importable_secret_backend(class_name: str | None) -> BaseSecretsBackend | None: + """Get secret backend defined in the given class name.""" + if class_name is not None: + secrets_backend_cls = import_string(class_name) + return secrets_backend_cls() + return None + + +class Backends(Enum): + """Type of the secrets backend.""" + + ENVIRONMENT_VARIABLE = "environment_variable" + EXECUTION_API = "execution_api" + CUSTOM = "custom" + METASTORE = "metastore" + + def initialize_secrets_backends( - default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, + default_backends: list[str] | None = None, ) -> list[BaseSecretsBackend]: """ Initialize secrets backend. @@ -873,19 +889,78 @@ def initialize_secrets_backends( * import secrets backend classes * instantiate them and return them in a list """ - backend_list = [] worker_mode = False - if default_backends != DEFAULT_SECRETS_SEARCH_PATH: + search_section = "secrets" + environment_variable_args: str | None = ( + "airflow.secrets.environment_variables.EnvironmentVariablesBackend" + ) + metastore_args: str | None = "airflow.secrets.metastore.MetastoreBackend" + execution_args: str | None = None + + if default_backends is not None: worker_mode = True + search_section = "workers" + environment_variable_args = ( + environment_variable_args if environment_variable_args in default_backends else None + ) + metastore_args = metastore_args if metastore_args in default_backends else None + execution_args = ( + "airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend" + if "airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend" + in default_backends + else None + ) + + backends_map: dict[str, dict[str, Any]] = { + "environment_variable": { + "callback": get_importable_secret_backend, + "args": (environment_variable_args,), + }, + "metastore": { + "callback": get_importable_secret_backend, + "args": (metastore_args,), + }, + "custom": { + "callback": get_custom_secret_backend, + "args": (worker_mode,), + }, + "execution_api": { + "callback": get_importable_secret_backend, + "args": (execution_args,), + }, + } - custom_secret_backend = get_custom_secret_backend(worker_mode) + backends_order = conf.getlist(search_section, "backends_order", delimiter=",") - if custom_secret_backend is not None: - backend_list.append(custom_secret_backend) + required_backends = ( + [Backends.ENVIRONMENT_VARIABLE, Backends.EXECUTION_API] + if worker_mode + else [Backends.METASTORE, Backends.ENVIRONMENT_VARIABLE] + ) - for class_name in default_backends: - secrets_backend_cls = import_string(class_name) - backend_list.append(secrets_backend_cls()) + if missing_backends := [b.value for b in required_backends if b.value not in backends_order]: + raise AirflowConfigException( + f"The configuration option [{search_section}]backends_order is misconfigured. " + f"The following backend types are missing: {missing_backends}", + search_section, + missing_backends, + ) + + if unsupported_backends := [b for b in backends_order if b not in backends_map.keys()]: + raise AirflowConfigException( + f"The configuration option [{search_section}]backends_order is misconfigured. " + f"The following backend types are unsupported: {unsupported_backends}", + search_section, + unsupported_backends, + ) + + backend_list = [] + for backend_type in backends_order: + backend_item = backends_map[backend_type] + callback, args = backend_item["callback"], backend_item["args"] + backend = callback(*args) if args else callback() + if backend: + backend_list.append(backend) return backend_list diff --git a/airflow-core/src/airflow/secrets/__init__.py b/airflow-core/src/airflow/secrets/__init__.py index 09c35ce7d2162..91a7ea4191e4f 100644 --- a/airflow-core/src/airflow/secrets/__init__.py +++ b/airflow-core/src/airflow/secrets/__init__.py @@ -29,15 +29,11 @@ from airflow.utils.deprecation_tools import add_deprecated_classes -__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH"] - -from airflow.secrets.base_secrets import BaseSecretsBackend - -DEFAULT_SECRETS_SEARCH_PATH = [ - "airflow.secrets.environment_variables.EnvironmentVariablesBackend", - "airflow.secrets.metastore.MetastoreBackend", +__all__ = [ + "BaseSecretsBackend", ] +from airflow.secrets.base_secrets import BaseSecretsBackend __deprecated_classes = { "cache": { diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 7e60175fbd026..96cbc4accf8ec 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -231,6 +231,12 @@ export type ConfigServiceGetConfigsDefaultResponse = Awaited = UseQueryResult; export const useConfigServiceGetConfigsKey = "ConfigServiceGetConfigs"; export const UseConfigServiceGetConfigsKeyFn = (queryKey?: Array) => [useConfigServiceGetConfigsKey, ...(queryKey ?? [])]; +export type ConfigServiceGetBackendsOrderValueDefaultResponse = Awaited>; +export type ConfigServiceGetBackendsOrderValueQueryResult = UseQueryResult; +export const useConfigServiceGetBackendsOrderValueKey = "ConfigServiceGetBackendsOrderValue"; +export const UseConfigServiceGetBackendsOrderValueKeyFn = ({ accept }: { + accept?: "application/json" | "text/plain" | "*/*"; +} = {}, queryKey?: Array) => [useConfigServiceGetBackendsOrderValueKey, ...(queryKey ?? [{ accept }])]; export type DagWarningServiceListDagWarningsDefaultResponse = Awaited>; export type DagWarningServiceListDagWarningsQueryResult = UseQueryResult; export const useDagWarningServiceListDagWarningsKey = "DagWarningServiceListDagWarnings"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 6d051aaabc0ae..2dd7a9e127f08 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -431,6 +431,16 @@ export const ensureUseConfigServiceGetConfigValueData = (queryClient: QueryClien */ export const ensureUseConfigServiceGetConfigsData = (queryClient: QueryClient) => queryClient.ensureQueryData({ queryKey: Common.UseConfigServiceGetConfigsKeyFn(), queryFn: () => ConfigService.getConfigs() }); /** +* Get Backends Order Value +* @param data The data for the request. +* @param data.accept +* @returns Config Successful Response +* @throws ApiError +*/ +export const ensureUseConfigServiceGetBackendsOrderValueData = (queryClient: QueryClient, { accept }: { + accept?: "application/json" | "text/plain" | "*/*"; +} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseConfigServiceGetBackendsOrderValueKeyFn({ accept }), queryFn: () => ConfigService.getBackendsOrderValue({ accept }) }); +/** * List Dag Warnings * Get a list of DAG warnings. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 1020640d3a912..14e75c5513cca 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -431,6 +431,16 @@ export const prefetchUseConfigServiceGetConfigValue = (queryClient: QueryClient, */ export const prefetchUseConfigServiceGetConfigs = (queryClient: QueryClient) => queryClient.prefetchQuery({ queryKey: Common.UseConfigServiceGetConfigsKeyFn(), queryFn: () => ConfigService.getConfigs() }); /** +* Get Backends Order Value +* @param data The data for the request. +* @param data.accept +* @returns Config Successful Response +* @throws ApiError +*/ +export const prefetchUseConfigServiceGetBackendsOrderValue = (queryClient: QueryClient, { accept }: { + accept?: "application/json" | "text/plain" | "*/*"; +} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseConfigServiceGetBackendsOrderValueKeyFn({ accept }), queryFn: () => ConfigService.getBackendsOrderValue({ accept }) }); +/** * List Dag Warnings * Get a list of DAG warnings. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 13668d9b5fa10..0ac6dc8278549 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -431,6 +431,16 @@ export const useConfigServiceGetConfigValue = = unknown[]>(queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseConfigServiceGetConfigsKeyFn(queryKey), queryFn: () => ConfigService.getConfigs() as TData, ...options }); /** +* Get Backends Order Value +* @param data The data for the request. +* @param data.accept +* @returns Config Successful Response +* @throws ApiError +*/ +export const useConfigServiceGetBackendsOrderValue = = unknown[]>({ accept }: { + accept?: "application/json" | "text/plain" | "*/*"; +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseConfigServiceGetBackendsOrderValueKeyFn({ accept }, queryKey), queryFn: () => ConfigService.getBackendsOrderValue({ accept }) as TData, ...options }); +/** * List Dag Warnings * Get a list of DAG warnings. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 0f2ca446f9644..5d694bd7bf57e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -431,6 +431,16 @@ export const useConfigServiceGetConfigValueSuspense = = unknown[]>(queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseConfigServiceGetConfigsKeyFn(queryKey), queryFn: () => ConfigService.getConfigs() as TData, ...options }); /** +* Get Backends Order Value +* @param data The data for the request. +* @param data.accept +* @returns Config Successful Response +* @throws ApiError +*/ +export const useConfigServiceGetBackendsOrderValueSuspense = = unknown[]>({ accept }: { + accept?: "application/json" | "text/plain" | "*/*"; +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseConfigServiceGetBackendsOrderValueKeyFn({ accept }, queryKey), queryFn: () => ConfigService.getBackendsOrderValue({ accept }) as TData, ...options }); +/** * List Dag Warnings * Get a list of DAG warnings. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index d45b2560e2bcd..d5b15af4a0d56 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, GetBackendsOrderValueData, GetBackendsOrderValueResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -1328,6 +1328,28 @@ export class ConfigService { }); } + /** + * Get Backends Order Value + * @param data The data for the request. + * @param data.accept + * @returns Config Successful Response + * @throws ApiError + */ + public static getBackendsOrderValue(data: GetBackendsOrderValueData = {}): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/ui/backends_order', + headers: { + accept: data.accept + }, + errors: { + 404: 'Not Found', + 406: 'Not Acceptable', + 422: 'Validation Error' + } + }); + } + } export class DagWarningService { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 69aeeda532f3c..26b0a7bceb556 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2490,6 +2490,12 @@ export type GetConfigValueResponse = Config; export type GetConfigsResponse = ConfigResponse; +export type GetBackendsOrderValueData = { + accept?: 'application/json' | 'text/plain' | '*/*'; +}; + +export type GetBackendsOrderValueResponse = Config; + export type ListDagWarningsData = { dagId?: string | null; limit?: number; @@ -4679,6 +4685,29 @@ export type $OpenApiTs = { }; }; }; + '/ui/backends_order': { + get: { + req: GetBackendsOrderValueData; + res: { + /** + * Successful Response + */ + 200: Config; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Not Acceptable + */ + 406: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/dagWarnings': { get: { req: ListDagWarningsData; diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json index ad231ceb31059..c6eb4d107daf5 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/admin.json @@ -122,6 +122,7 @@ }, "variables": { "add": "Add Variable", + "backendsOrder": "Secret backends order", "columns": { "isEncrypted": "Is Encrypted" }, diff --git a/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderButton.tsx b/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderButton.tsx new file mode 100644 index 0000000000000..03f033c98f3f8 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderButton.tsx @@ -0,0 +1,76 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, HStack, Skeleton, Text } from "@chakra-ui/react"; +import { FiChevronRight } from "react-icons/fi"; +import { Link as RouterLink } from "react-router-dom"; + +import type { TaskInstanceState } from "openapi/requests/types.gen"; +import { StateBadge } from "src/components/StateBadge"; + +export const BackendsOrderButton = ({ + colorScheme, + icon, + isLoading = false, + label, + link, + onClick, + state, +}: { + readonly colorScheme: string; + readonly icon?: React.ReactNode; + readonly isLoading?: boolean; + readonly label: string; + readonly link?: string; + readonly onClick?: () => void; + readonly state?: TaskInstanceState | null; +}) => { + if (isLoading) { + return ; + } + + const content = ( + + + {icon} + + + + {label} + + + + ); + + if (onClick) { + return ( + + {content} + + ); + } + + return {content}; +}; diff --git a/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderCard.tsx b/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderCard.tsx new file mode 100644 index 0000000000000..8988190ae4ecf --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderCard.tsx @@ -0,0 +1,42 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, useDisclosure } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { LuSettings } from "react-icons/lu"; + +import { BackendsOrderButton } from "src/pages/Variables/BackendsOrderButton"; +import { BackendsOrderModal } from "src/pages/Variables/BackendsOrderModal"; + +export const BackendsOrderCard = () => { + const { t: translate } = useTranslation("admin"); + const { onClose, onOpen, open } = useDisclosure(); + + return ( + + } + isLoading={false} + label={translate("variables.backendsOrder")} + onClick={onOpen} + /> + + + ); +}; diff --git a/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderModal.tsx b/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderModal.tsx new file mode 100644 index 0000000000000..bb5fa993945df --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/Variables/BackendsOrderModal.tsx @@ -0,0 +1,67 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Heading, Text, HStack } from "@chakra-ui/react"; +import { useEffect, useState } from "react"; +import { useTranslation } from "react-i18next"; +import { LuSettings } from "react-icons/lu"; + +import { useConfigServiceGetBackendsOrderValue } from "openapi/queries"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { Dialog } from "src/components/ui"; + +type BackendsOrderModalProps = { + onClose: () => void; + open: boolean; +}; + +export const BackendsOrderModal: React.FC = ({ onClose, open }) => { + const { t: translate } = useTranslation("admin"); + const [backendsOrder, setBackendsOrder] = useState | string>(); + const { data, error } = useConfigServiceGetBackendsOrderValue(); + + const onOpenChange = () => { + onClose(); + }; + + useEffect(() => { + setBackendsOrder(data?.sections[0]?.options[0]?.value ?? ""); + }, [data, open]); + + return ( + + + + + + {translate("variables.backendsOrder")} + + + + + + + {Boolean(error) ? : null} + + {backendsOrder} + + + + + ); +}; diff --git a/airflow-core/src/airflow/ui/src/pages/Variables/Variables.tsx b/airflow-core/src/airflow/ui/src/pages/Variables/Variables.tsx index 4271ebbc8d819..5fd8c2082208d 100644 --- a/airflow-core/src/airflow/ui/src/pages/Variables/Variables.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Variables/Variables.tsx @@ -36,6 +36,7 @@ import { Checkbox } from "src/components/ui/Checkbox"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { TrimText } from "src/utils/TrimText"; +import { BackendsOrderCard } from "./BackendsOrderCard"; import DeleteVariablesButton from "./DeleteVariablesButton"; import ImportVariablesButton from "./ImportVariablesButton"; import AddVariableButton from "./ManageVariable/AddVariableButton"; @@ -172,6 +173,7 @@ export const Variables = () => { /> 0} /> + 0} /> diff --git a/airflow-core/tests/unit/always/test_secrets.py b/airflow-core/tests/unit/always/test_secrets.py index 3aabc5d19a174..7c0cf3330e8c8 100644 --- a/airflow-core/tests/unit/always/test_secrets.py +++ b/airflow-core/tests/unit/always/test_secrets.py @@ -22,6 +22,7 @@ import pytest from airflow.configuration import ensure_secrets_loaded, initialize_secrets_backends +from airflow.exceptions import AirflowConfigException from airflow.models import Connection, Variable from airflow.sdk import SecretCache @@ -119,6 +120,128 @@ def test_backend_fallback_to_env_var(self, mock_get_connection): assert conn.get_uri() == "mysql://airflow:airflow@host:5432/airflow" + @conf_vars( + { + ( + "secrets", + "backend", + ): "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend", + ("secrets", "backend_kwargs"): '{"connections_prefix": "/airflow", "profile_name": null}', + ("secrets", "backends_order"): "custom,environment_variable,metastore", + } + ) + def test_backends_order(self): + backends = ensure_secrets_loaded() + backend_classes = [backend.__class__.__name__ for backend in backends] + assert backend_classes == [ + "SystemsManagerParameterStoreBackend", + "EnvironmentVariablesBackend", + "MetastoreBackend", + ] + + @pytest.mark.parametrize( + ("backends_order", "expected_backends_order"), + [ + pytest.param( + "custom,environment_variable,execution_api", + [ + "SystemsManagerParameterStoreBackend", + "EnvironmentVariablesBackend", + "ExecutionAPISecretsBackend", + ], + ), + pytest.param( + "environment_variable,execution_api,custom", + [ + "EnvironmentVariablesBackend", + "ExecutionAPISecretsBackend", + "SystemsManagerParameterStoreBackend", + ], + ), + pytest.param( + "execution_api,environment_variable,custom", + [ + "ExecutionAPISecretsBackend", + "EnvironmentVariablesBackend", + "SystemsManagerParameterStoreBackend", + ], + ), + ], + ) + def test_workers_backends_order_param(self, backends_order, expected_backends_order): + with conf_vars( + { + ( + "workers", + "secrets_backend", + ): "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend", + ( + "workers", + "secrets_backend_kwargs", + ): '{"connections_prefix": "/airflow", "profile_name": null}', + ("workers", "backends_order"): backends_order, + } + ): + backends = ensure_secrets_loaded( + default_backends=[ + "airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend", + "airflow.secrets.environment_variables.EnvironmentVariablesBackend", + "airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend", + ] + ) + backend_classes = [backend.__class__.__name__ for backend in backends] + assert backend_classes == expected_backends_order + + @pytest.mark.parametrize( + "backends_order", + [ + pytest.param("custom,metastore", id="no_environment_variable_backend"), + pytest.param("environment_variable", id="no_metastore_backend"), + pytest.param("metastore,environment_variable,unsupported", id="unsupported_backend"), + ], + ) + def test_backends_order_invalid_cases(self, backends_order): + with conf_vars({("secrets", "backends_order"): backends_order}): + with pytest.raises(AirflowConfigException): + ensure_secrets_loaded() + + @pytest.mark.parametrize( + ("backends_order", "exc_msg"), + [ + pytest.param( + "custom,environment_variable", + "The configuration option [workers]backends_order is misconfigured. The following backend types are missing: ['execution_api']", + id="no_execution_api_backend", + ), + pytest.param( + "execution_api", + "The configuration option [workers]backends_order is misconfigured. The following backend types are missing: ['environment_variable']", + id="no_environment_variable_backend", + ), + pytest.param( + "custom", + "The configuration option [workers]backends_order is misconfigured. The following backend types are missing: ['environment_variable', 'execution_api']", + id="no_environment_variable_and_execution_api_backend", + ), + pytest.param( + "execution_api,environment_variable,unsupported", + "The configuration option [workers]backends_order is misconfigured. The following backend types are unsupported: ['unsupported']", + id="unsupported_backend", + ), + ], + ) + def test_workers_backends_order_invalid_cases(self, backends_order, exc_msg): + with conf_vars({("workers", "backends_order"): backends_order}): + with pytest.raises(AirflowConfigException) as exc_info: + ensure_secrets_loaded( + default_backends=[ + "test_worker_backend_1", + "test_worker_backend_2", + "test_worker_backend_3", + ] + ) + assert exc_info.value.args[0] == exc_msg + @skip_if_force_lowest_dependencies_marker @pytest.mark.db_test diff --git a/task-sdk/src/airflow/sdk/configuration.py b/task-sdk/src/airflow/sdk/configuration.py index e514940fda78e..e539d27abfa8c 100644 --- a/task-sdk/src/airflow/sdk/configuration.py +++ b/task-sdk/src/airflow/sdk/configuration.py @@ -23,12 +23,13 @@ import os import pathlib from configparser import ConfigParser +from enum import Enum from io import StringIO from typing import Any +from airflow.exceptions import AirflowConfigException from airflow.sdk import yaml from airflow.sdk._shared.configuration.parser import AirflowConfigParser as _SharedAirflowConfigParser -from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH log = logging.getLogger(__name__) @@ -184,8 +185,27 @@ def get_custom_secret_backend(worker_mode: bool = False): return conf._get_custom_secret_backend(worker_mode=worker_mode) +class Backends(Enum): + """Type of the secrets backend.""" + + ENVIRONMENT_VARIABLE = "environment_variable" + EXECUTION_API = "execution_api" + CUSTOM = "custom" + METASTORE = "metastore" + + +def get_importable_secret_backend(class_name: str | None) -> Any | None: + """Get secret backend defined in the given class name.""" + from airflow.sdk._shared.module_loading import import_string + + if class_name is not None: + secrets_backend_cls = import_string(class_name) + return secrets_backend_cls() + return None + + def initialize_secrets_backends( - default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, + default_backends: list[str] | None = None, ): """ Initialize secrets backend. @@ -195,32 +215,88 @@ def initialize_secrets_backends( Uses SDK's conf instead of Core's conf. """ - from airflow.sdk._shared.module_loading import import_string - - backend_list = [] - worker_mode = False # Determine worker mode - if default_backends is not the server default, it's worker mode # This is a simplified check; in practice, worker mode is determined by the caller - if default_backends != [ - "airflow.secrets.environment_variables.EnvironmentVariablesBackend", - "airflow.secrets.metastore.MetastoreBackend", - ]: + from airflow.sdk.configuration import conf + + worker_mode = False + search_section = "secrets" + environment_variable_args: str | None = ( + "airflow.secrets.environment_variables.EnvironmentVariablesBackend" + ) + metastore_args: str | None = "airflow.secrets.metastore.MetastoreBackend" + execution_args: str | None = None + + if default_backends is not None: worker_mode = True + search_section = "workers" + environment_variable_args = ( + environment_variable_args if environment_variable_args in default_backends else None + ) + metastore_args = metastore_args if metastore_args in default_backends else None + execution_args = ( + "airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend" + if "airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend" + in default_backends + else None + ) - custom_secret_backend = get_custom_secret_backend(worker_mode) + backends_map: dict[str, dict[str, Any]] = { + "environment_variable": { + "callback": get_importable_secret_backend, + "args": (environment_variable_args,), + }, + "metastore": { + "callback": get_importable_secret_backend, + "args": (metastore_args,), + }, + "custom": { + "callback": get_custom_secret_backend, + "args": (worker_mode,), + }, + "execution_api": { + "callback": get_importable_secret_backend, + "args": (execution_args,), + }, + } + + backends_order = conf.getlist(search_section, "backends_order", delimiter=",") + + required_backends = ( + [Backends.ENVIRONMENT_VARIABLE, Backends.EXECUTION_API] + if worker_mode + else [Backends.METASTORE, Backends.ENVIRONMENT_VARIABLE] + ) - if custom_secret_backend is not None: - backend_list.append(custom_secret_backend) + if missing_backends := [b.value for b in required_backends if b.value not in backends_order]: + raise AirflowConfigException( + f"The configuration option [{search_section}]backends_order is misconfigured. " + f"The following backend types are missing: {missing_backends}", + search_section, + missing_backends, + ) - for class_name in default_backends: - secrets_backend_cls = import_string(class_name) - backend_list.append(secrets_backend_cls()) + if unsupported_backends := [b for b in backends_order if b not in backends_map.keys()]: + raise AirflowConfigException( + f"The configuration option [{search_section}]backends_order is misconfigured. " + f"The following backend types are unsupported: {unsupported_backends}", + search_section, + unsupported_backends, + ) + + backend_list = [] + for backend_type in backends_order: + backend_item = backends_map[backend_type] + callback, args = backend_item["callback"], backend_item["args"] + backend = callback(*args) if args else callback() + if backend: + backend_list.append(backend) return backend_list def ensure_secrets_loaded( - default_backends: list[str] = DEFAULT_SECRETS_SEARCH_PATH, + default_backends: list[str] | None = None, ) -> list: """ Ensure that all secrets backends are loaded. @@ -229,10 +305,9 @@ def ensure_secrets_loaded( """ # Check if the secrets_backend_list contains only 2 default backends. - # Check if we are loading the backends for worker too by checking if the default_backends is equal - # to DEFAULT_SECRETS_SEARCH_PATH. + # Check if we are loading the backends for worker too by checking if the default_backends is not None secrets_backend_list = initialize_secrets_backends() - if len(secrets_backend_list) == 2 or default_backends != DEFAULT_SECRETS_SEARCH_PATH: + if len(secrets_backend_list) == 2 or default_backends is not None: return initialize_secrets_backends(default_backends=default_backends) return secrets_backend_list diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py b/task-sdk/tests/task_sdk/execution_time/test_context.py index f4a8f2f14a03f..e0341885f2276 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_context.py +++ b/task-sdk/tests/task_sdk/execution_time/test_context.py @@ -908,16 +908,6 @@ def test_execution_api_backend_in_worker_chain(self): in DEFAULT_SECRETS_SEARCH_PATH_WORKERS ) - def test_metastore_backend_in_server_chain(self): - """Test that MetastoreBackend is in the API server search path.""" - from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH - - assert "airflow.secrets.metastore.MetastoreBackend" in DEFAULT_SECRETS_SEARCH_PATH - assert ( - "airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend" - not in DEFAULT_SECRETS_SEARCH_PATH - ) - def test_get_connection_uses_backend_chain(self, mock_supervisor_comms): """Test that _get_connection properly iterates through backends.""" from airflow.sdk.api.datamodels._generated import ConnectionResponse