Skip to content

Commit 36cca3a

Browse files
committed
Cleanup of variables in settings.py
1 parent 24d95ec commit 36cca3a

File tree

18 files changed

+2272
-2279
lines changed

18 files changed

+2272
-2279
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
5f382d018612b61b547d05402c6988bc5b9959caa7e494dbb8775a3a5c425e31
1+
1c79db933fe961f2d23605e1dcf73125923ea39bbd719c800e8adc1aa5bedb52

airflow-core/docs/img/airflow_erd.svg

Lines changed: 2158 additions & 2158 deletions
Loading

airflow-core/src/airflow/api_fastapi/core_api/app.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@
3333
from airflow.api_fastapi.auth.tokens import get_signing_key
3434
from airflow.configuration import conf
3535
from airflow.exceptions import AirflowException
36-
from airflow.settings import AIRFLOW_PATH
3736

3837
log = logging.getLogger(__name__)
3938

40-
PY313 = sys.version_info >= (3, 13)
39+
_PY313 = sys.version_info >= (3, 13)
40+
_AIRFLOW_PATH = Path(__file__).parents[3]
4141

4242

4343
def init_views(app: FastAPI) -> None:
@@ -50,7 +50,7 @@ def init_views(app: FastAPI) -> None:
5050

5151
dev_mode = os.environ.get("DEV_MODE", str(False)) == "true"
5252

53-
directory = Path(AIRFLOW_PATH) / ("airflow/ui/dev" if dev_mode else "airflow/ui/dist")
53+
directory = _AIRFLOW_PATH / ("airflow/ui/dev" if dev_mode else "airflow/ui/dist")
5454

5555
# During python tests or when the backend is run without having the frontend build
5656
# those directories might not exist. App should not fail initializing in those scenarios.
@@ -61,7 +61,7 @@ def init_views(app: FastAPI) -> None:
6161
if dev_mode:
6262
app.mount(
6363
"/static/i18n/locales",
64-
StaticFiles(directory=Path(AIRFLOW_PATH) / "airflow/ui/public/i18n/locales"),
64+
StaticFiles(directory=_AIRFLOW_PATH / "airflow/ui/public/i18n/locales"),
6565
name="dev_i18n_static",
6666
)
6767

@@ -124,7 +124,7 @@ def init_flask_plugins(app: FastAPI) -> None:
124124
try:
125125
from airflow.providers.fab.www.app import create_app
126126
except ImportError:
127-
if PY313:
127+
if _PY313:
128128
log.info(
129129
"Some Airflow 2 plugins have been detected in your environment. Currently FAB provider "
130130
"does not support Python 3.13, so you cannot use Airflow 2 plugins with Airflow 3 until "

airflow-core/src/airflow/cli/commands/backfill_command.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
@providers_configuration_loaded
3939
def create_backfill(args) -> None:
4040
"""Create backfill job or dry run for a DAG or list of DAGs using regex."""
41-
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
41+
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
4242
signal.signal(signal.SIGTERM, sigint_handler)
4343
console = AirflowConsole()
4444

airflow-core/src/airflow/cli/commands/task_command.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ def task_render(args, dag: DAG | None = None) -> None:
468468
@providers_configuration_loaded
469469
def task_clear(args) -> None:
470470
"""Clear all task instances or only those matched by regex for a DAG(s)."""
471-
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
471+
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
472472
if args.dag_id and not args.bundle_name and not args.dag_regex and not args.task_regex:
473473
dags = [get_dag_by_file_location(args.dag_id)]
474474
else:

airflow-core/src/airflow/dag_processing/collection.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,17 +208,21 @@ def _serialize_dag_capturing_errors(
208208
209209
We can't place them directly in import_errors, as this may be retried, and work the next time
210210
"""
211-
from airflow import settings
212211
from airflow.models.dagcode import DagCode
213212
from airflow.models.serialized_dag import SerializedDagModel
214213

214+
# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
215+
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
216+
"core", "min_serialized_dag_update_interval", fallback=30
217+
)
218+
215219
try:
216220
# We can't use bulk_write_to_db as we want to capture each error individually
217221
dag_was_updated = SerializedDagModel.write_dag(
218222
dag,
219223
bundle_name=bundle_name,
220224
bundle_version=bundle_version,
221-
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
225+
min_update_interval=MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
222226
session=session,
223227
)
224228
if not dag_was_updated:

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1612,7 +1612,7 @@ def _do_scheduling(self, session: Session) -> int:
16121612
"""
16131613
# Put a check in place to make sure we don't commit unexpectedly
16141614
with prohibit_commit(session) as guard:
1615-
if settings.USE_JOB_SCHEDULE:
1615+
if conf.getboolean("scheduler", "use_job_schedule", fallback=True):
16161616
self._create_dagruns_for_dags(guard, session)
16171617

16181618
self._start_queued_dagruns(session)

airflow-core/src/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
from alembic import context, op
3535
from sqlalchemy import text
3636

37-
from airflow import settings
3837
from airflow.migrations.db_types import TIMESTAMP
3938

4039
# revision identifiers, used by Alembic.
@@ -71,6 +70,8 @@ def upgrade():
7170

7271

7372
def _move_offending_dagruns():
73+
from airflow.utils.db import AIRFLOW_MOVED_TABLE_PREFIX
74+
7475
select_null_logical_date_query = "select * from dag_run where logical_date is null"
7576

7677
conn = op.get_bind()
@@ -83,7 +84,7 @@ def _move_offending_dagruns():
8384

8485
# Copy offending data to a new table. This can be done directly in Postgres
8586
# and SQLite with create-from-select; MySQL needs a special case.
86-
offending_table_name = f"{settings.AIRFLOW_MOVED_TABLE_PREFIX}__3_0_0__offending_dag_run"
87+
offending_table_name = f"{AIRFLOW_MOVED_TABLE_PREFIX}__3_0_0__offending_dag_run"
8788
if conn.dialect.name == "mysql":
8889
op.execute(f"create table {offending_table_name} like dag_run")
8990
op.execute(f"insert into {offending_table_name} {select_null_logical_date_query}")

airflow-core/src/airflow/models/serialized_dag.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from sqlalchemy_utils import UUIDType
3535

3636
from airflow._shared.timezones import timezone
37+
from airflow.configuration import conf
3738
from airflow.models.asset import (
3839
AssetAliasModel,
3940
AssetModel,
@@ -46,7 +47,7 @@
4647
from airflow.serialization.dag_dependency import DagDependency
4748
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey as UKey
4849
from airflow.serialization.serialized_objects import DagSerialization
49-
from airflow.settings import COMPRESS_SERIALIZED_DAGS, json
50+
from airflow.settings import json
5051
from airflow.utils.hashlib_wrapper import md5
5152
from airflow.utils.session import NEW_SESSION, provide_session
5253
from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name, mapped_column
@@ -62,6 +63,9 @@
6263

6364
log = logging.getLogger(__name__)
6465

66+
# If set to True, serialized DAGs is compressed before writing to DB,
67+
_COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
68+
6569

6670
class _DagDependenciesResolver:
6771
"""Resolver that resolves dag dependencies to include asset id and assets link to asset aliases."""
@@ -329,7 +333,7 @@ def __init__(self, dag: LazyDeserializedDAG) -> None:
329333
# partially ordered json data
330334
dag_data_json = json.dumps(dag_data, sort_keys=True).encode("utf-8")
331335

332-
if COMPRESS_SERIALIZED_DAGS:
336+
if _COMPRESS_SERIALIZED_DAGS:
333337
self._data = None
334338
self._data_compressed = zlib.compress(dag_data_json)
335339
else:
@@ -620,7 +624,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
620624
"""
621625
load_json: Callable
622626
data_col_to_select: ColumnElement[Any] | InstrumentedAttribute[bytes | None]
623-
if COMPRESS_SERIALIZED_DAGS is False:
627+
if _COMPRESS_SERIALIZED_DAGS is False:
624628
dialect = get_dialect_name(session)
625629
if dialect in ["sqlite", "mysql"]:
626630
data_col_to_select = func.json_extract(cls._data, "$.dag.dag_dependencies")

airflow-core/src/airflow/settings.py

Lines changed: 45 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@
5858
from airflow.utils.orm_event_handlers import setup_event_handlers
5959
from airflow.utils.sqlalchemy import is_sqlalchemy_v1
6060

61-
USE_PSYCOPG3: bool
61+
_USE_PSYCOPG3: bool
6262
try:
6363
from importlib.util import find_spec
6464

6565
is_psycopg3 = find_spec("psycopg") is not None
6666

67-
USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1()
67+
_USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1()
6868
except (ImportError, ModuleNotFoundError):
69-
USE_PSYCOPG3 = False
69+
_USE_PSYCOPG3 = False
7070

7171
if TYPE_CHECKING:
7272
from sqlalchemy.engine import Engine
@@ -107,23 +107,13 @@
107107
]
108108
)
109109

110-
LOGGING_LEVEL = logging.INFO
111-
112-
LOG_FORMAT = conf.get("logging", "log_format")
113110
SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
114111

115112
SQL_ALCHEMY_CONN: str | None = None
116113
SQL_ALCHEMY_CONN_ASYNC: str | None = None
117114
PLUGINS_FOLDER: str | None = None
118115
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
119116

120-
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
121-
"""
122-
Mapping of sync scheme to async scheme.
123-
124-
:meta private:
125-
"""
126-
127117
engine: Engine | None = None
128118
Session: scoped_session | None = None
129119
# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without
@@ -246,6 +236,9 @@ def load_policy_plugins(pm: pluggy.PluginManager):
246236

247237

248238
def _get_async_conn_uri_from_sync(sync_uri):
239+
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
240+
"""Mapping of sync scheme to async scheme."""
241+
249242
scheme, rest = sync_uri.split(":", maxsplit=1)
250243
scheme = scheme.split("+", maxsplit=1)[0]
251244
aiolib = AIO_LIBS_MAPPING.get(scheme)
@@ -327,9 +320,6 @@ def get_bind(
327320
pass
328321

329322

330-
AIRFLOW_PATH = os.path.dirname(os.path.dirname(__file__))
331-
332-
333323
def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
334324
"""Determine whether the database connection URI specifies a relative path."""
335325
# Check for non-empty connection string:
@@ -455,22 +445,21 @@ def clean_in_fork():
455445
register_at_fork(after_in_child=clean_in_fork)
456446

457447

458-
DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = {
459-
"postgresql": (
460-
{
461-
"executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
462-
}
463-
| (
464-
{}
465-
if USE_PSYCOPG3
466-
else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000}
467-
)
468-
)
469-
}
470-
471-
472448
def prepare_engine_args(disable_connection_pool=False, pool_class=None):
473449
"""Prepare SQLAlchemy engine args."""
450+
DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = {
451+
"postgresql": (
452+
{
453+
"executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
454+
}
455+
| (
456+
{}
457+
if _USE_PSYCOPG3
458+
else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000}
459+
)
460+
)
461+
}
462+
474463
default_args = {}
475464
for dialect, default in DEFAULT_ENGINE_ARGS.items():
476465
if SQL_ALCHEMY_CONN.startswith(dialect):
@@ -654,16 +643,39 @@ def prepare_syspath_for_config_and_plugins():
654643

655644
def __getattr__(name: str):
656645
"""Handle deprecated module attributes."""
657-
if name == "MASK_SECRETS_IN_LOGS":
658-
import warnings
646+
import warnings
647+
648+
from airflow.exceptions import RemovedInAirflow4Warning
659649

650+
if name == "MASK_SECRETS_IN_LOGS":
660651
warnings.warn(
661652
"settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. "
662653
"Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.",
663-
DeprecationWarning,
654+
RemovedInAirflow4Warning,
664655
stacklevel=2,
665656
)
666657
return False
658+
if name == "WEB_COLORS":
659+
warnings.warn(
660+
"settings.WEB_COLORS has been removed. This shim returns default value. "
661+
"Please upgrade your provider or integration.",
662+
RemovedInAirflow4Warning,
663+
stacklevel=2,
664+
)
665+
return {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
666+
if name == "EXECUTE_TASKS_NEW_PYTHON_INTERPRETER":
667+
warnings.warn(
668+
"settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER has been removed. This shim returns default value. "
669+
"Please upgrade your provider or integration.",
670+
RemovedInAirflow4Warning,
671+
stacklevel=2,
672+
)
673+
return not hasattr(os, "fork") or conf.getboolean(
674+
"core",
675+
"execute_tasks_new_python_interpreter",
676+
fallback=False,
677+
)
678+
667679
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
668680

669681

@@ -736,26 +748,6 @@ def initialize():
736748
atexit.register(dispose_orm)
737749

738750

739-
# Const stuff
740-
WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
741-
742-
# Updating serialized DAG can not be faster than a minimum interval to reduce database
743-
# write rate.
744-
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)
745-
746-
# If set to True, serialized DAGs is compressed before writing to DB,
747-
COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
748-
749-
CAN_FORK = hasattr(os, "fork")
750-
751-
EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
752-
"core",
753-
"execute_tasks_new_python_interpreter",
754-
fallback=False,
755-
)
756-
757-
USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
758-
759751
# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
760752
# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
761753
LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
@@ -765,15 +757,4 @@ def initialize():
765757
# loaded from module.
766758
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
767759

768-
# Executors can set this to true to configure logging correctly for
769-
# containerized executors.
770-
IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
771-
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
772-
"""Will be True if running in kubernetes executor pod."""
773-
774-
HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")
775-
776-
# Prefix used to identify tables holding data moved during migration.
777-
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
778-
779760
DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")

0 commit comments

Comments
 (0)