Skip to content

Commit d91b6c4

Browse files
authored
SQLA2: Address assorted mypy violations in airflow-core (#59746)
* Remove redefinition of `get` overloads in parser.py Lines 285-289 have `@overload` declarations but no implementation, and line 980 re-defines `get` with overloads. * Add type hint for `upgraded_values` * Fix mypy violations related to OTEL * Add type hints to datadog's `DogStatsd` kwargs * Address mypy violations related to structlog + refactoring * Address mypy violations in `scheduler_job_runner`
1 parent bca0a63 commit d91b6c4

File tree

6 files changed

+39
-27
lines changed

6 files changed

+39
-27
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2188,14 +2188,19 @@ def _schedule_dag_run(
21882188
):
21892189
dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))
21902190

2191-
dag_run = session.scalar(
2191+
dag_run_reloaded = session.scalar(
21922192
select(DagRun)
21932193
.where(DagRun.id == dag_run.id)
21942194
.options(
21952195
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset),
21962196
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases),
21972197
)
21982198
)
2199+
if dag_run_reloaded is None:
2200+
# This should never happen since we just had the dag_run
2201+
self.log.error("DagRun %s was deleted unexpectedly", dag_run.id)
2202+
return None
2203+
dag_run = dag_run_reloaded
21992204
callback_to_execute = DagCallbackRequest(
22002205
filepath=dag_model.relative_fileloc or "",
22012206
dag_id=dag.dag_id,

shared/configuration/src/airflow_shared/configuration/parser.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ def __init__(
248248
self.configuration_description = configuration_description
249249
self._default_values = _default_values
250250
self._suppress_future_warnings = False
251-
self.upgraded_values = {}
251+
self.upgraded_values: dict[tuple[str, str], str] = {}
252252

253253
@functools.cached_property
254254
def inversed_deprecated_options(self):
@@ -282,12 +282,6 @@ def sensitive_config_values(self) -> set[tuple[str, str]]:
282282
sensitive.update(depr_section, depr_option)
283283
return sensitive
284284

285-
@overload # type: ignore[override]
286-
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
287-
288-
@overload
289-
def get(self, section: str, key: str, **kwargs) -> str | None: ...
290-
291285
def _update_defaults_from_string(self, config_string: str) -> None:
292286
"""
293287
Update the defaults in _default_values based on values in config_string ("ini" format).

shared/logging/src/airflow_shared/logging/structlog.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from collections.abc import Callable, Iterable, Mapping, Sequence
2828
from functools import cache, cached_property, partial
2929
from pathlib import Path
30+
from types import ModuleType
3031
from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast
3132

3233
import pygtrie
@@ -96,7 +97,7 @@ def meth(self: Any, event: str, *args: Any, **kw: Any) -> Any:
9697

9798
# See https://github.com/python/cpython/blob/3.13/Lib/logging/__init__.py#L307-L326 for reason
9899
if args and len(args) == 1 and isinstance(args[0], Mapping) and args[0]:
99-
args = args[0]
100+
return self._proxy_to_logger(name, event % args[0], **kw)
100101
return self._proxy_to_logger(name, event % args, **kw)
101102

102103
meth.__name__ = name
@@ -185,14 +186,14 @@ def __init__(self, name: str | None = None, file: TextIO | None = None):
185186
class LoggerFactory(Generic[LogOutputType]):
186187
def __init__(
187188
self,
188-
cls: Callable[[str | None, LogOutputType | None], WrappedLogger],
189+
cls: type[WrappedLogger],
189190
io: LogOutputType | None = None,
190191
):
191192
self.cls = cls
192193
self.io = io
193194

194195
def __call__(self, logger_name: str | None = None, *args: Any) -> WrappedLogger:
195-
return self.cls(logger_name, self.io)
196+
return self.cls(logger_name, self.io) # type: ignore[call-arg]
196197

197198

198199
def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
@@ -282,17 +283,17 @@ def structlog_processors(
282283

283284
import click
284285

285-
suppress = (click, contextlib)
286+
suppress: tuple[ModuleType, ...] = (click, contextlib)
286287
try:
287288
import httpcore
288289

289-
suppress += (httpcore,)
290+
suppress = (*suppress, httpcore)
290291
except ImportError:
291292
pass
292293
try:
293294
import httpx
294295

295-
suppress += (httpx,)
296+
suppress = (*suppress, httpx)
296297
except ImportError:
297298
pass
298299

@@ -320,7 +321,8 @@ def json_dumps(msg, default):
320321
json = structlog.processors.JSONRenderer(serializer=json_dumps)
321322

322323
def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str:
323-
return json(logger, method_name, event_dict).decode("utf-8")
324+
result = json(logger, method_name, event_dict)
325+
return result.decode("utf-8") if isinstance(result, bytes) else result
324326

325327
shared_processors.extend(
326328
(
@@ -331,6 +333,7 @@ def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str:
331333

332334
return shared_processors, json_processor, json
333335

336+
exc_formatter: structlog.dev.RichTracebackFormatter | structlog.typing.ExceptionRenderer
334337
if os.getenv("DEV", "") != "":
335338
# Only use Rich in dev -- otherwise for "production" deployments it makes the logs harder to read as
336339
# it uses lots of ANSI escapes and non ASCII characters. Simpler is better for non-dev non-JSON
@@ -349,6 +352,7 @@ def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str:
349352
if colors:
350353
my_styles["debug"] = structlog.dev.CYAN
351354

355+
console: PercentFormatRender | structlog.dev.ConsoleRenderer
352356
if log_format:
353357
console = PercentFormatRender(
354358
fmt=log_format,
@@ -479,15 +483,18 @@ def is_atty():
479483

480484
wrapper_class = cast("type[BindableLogger]", make_filtering_logger())
481485
if json_output:
482-
logger_factory = LoggerFactory(NamedBytesLogger, io=output)
486+
logger_factory: LoggerFactory[Any] = LoggerFactory(NamedBytesLogger, io=output)
483487
else:
484488
# There is no universal way of telling if a file-like-object is binary (and needs bytes) or text that
485489
# works for files, sockets and io.StringIO/BytesIO.
486490

487491
# If given a binary object, wrap it in a text mode wrapper
492+
text_output: TextIO | None = None
488493
if output is not None and not hasattr(output, "encoding"):
489-
output = io.TextIOWrapper(output, line_buffering=True)
490-
logger_factory = LoggerFactory(NamedWriteLogger, io=output)
494+
text_output = io.TextIOWrapper(cast("BinaryIO", output), line_buffering=True)
495+
elif output is not None:
496+
text_output = cast("TextIO", output)
497+
logger_factory = LoggerFactory(NamedWriteLogger, io=text_output)
491498

492499
structlog.configure(
493500
processors=shared_pre_chain + [for_structlog],

shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,16 @@ def get_dogstatsd_logger(
175175
"""Get DataDog StatsD logger."""
176176
from datadog import DogStatsd
177177

178-
dogstatsd = DogStatsd(
179-
host,
180-
port,
181-
namespace,
182-
constant_tags=cls.get_constant_tags(),
183-
)
178+
dogstatsd_kwargs: dict[str, str | int | list[str]] = {
179+
"constant_tags": cls.get_constant_tags(),
180+
}
181+
if host is not None:
182+
dogstatsd_kwargs["host"] = host
183+
if port is not None:
184+
dogstatsd_kwargs["port"] = port
185+
if namespace is not None:
186+
dogstatsd_kwargs["namespace"] = namespace
187+
dogstatsd = DogStatsd(**dogstatsd_kwargs)
184188
metric_tags_validator = PatternBlockListValidator(statsd_disabled_tags)
185189
validator = get_validator(metrics_allow_list, metrics_block_list)
186190
return SafeDogStatsdLogger(

shared/observability/src/airflow_shared/observability/metrics/otel_logger.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,9 @@ def get_otel_logger(
388388
stat_name_handler: Callable[[str], str] | None = None,
389389
statsd_influxdb_enabled: bool = False,
390390
) -> SafeOtelLogger:
391-
resource = Resource.create(attributes={SERVICE_NAME: service_name})
391+
effective_service_name: str = service_name or "airflow"
392+
effective_prefix: str = prefix or DEFAULT_METRIC_NAME_PREFIX
393+
resource = Resource.create(attributes={SERVICE_NAME: effective_service_name})
392394
protocol = "https" if ssl_active else "http"
393395
# Allow transparent support for standard OpenTelemetry SDK environment variables.
394396
# https://opentelemetry.io/docs/specs/otel/protocol/exporter/#configuration-options
@@ -420,5 +422,5 @@ def get_otel_logger(
420422
validator = get_validator(metrics_allow_list, metrics_block_list)
421423

422424
return SafeOtelLogger(
423-
metrics.get_meter_provider(), prefix, validator, stat_name_handler, statsd_influxdb_enabled
425+
metrics.get_meter_provider(), effective_prefix, validator, stat_name_handler, statsd_influxdb_enabled
424426
)

shared/observability/src/airflow_shared/observability/traces/otel_tracer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(
7777
log.info("(otel_tracer.__init__) - [BatchSpanProcessor] is being used")
7878
self.span_processor = BatchSpanProcessor(self.span_exporter)
7979
self.tag_string = tag_string
80-
self.otel_service = otel_service
80+
self.otel_service: str = otel_service or "airflow"
8181
self.resource = Resource.create(attributes={SERVICE_NAME: self.otel_service})
8282
self.debug = debug
8383

0 commit comments

Comments
 (0)