Skip to content

Commit a18fc01

Browse files
authored
Fix scheduler crash with email notifications (apache#56429)
The ``EmailNotificationRequest`` class name (25 characters) exceeded the database constraint for ``DbCallbackRequest.callback_type column`` (20 characters), causing scheduler crashes when email notifications were triggered for task failures or retries. This fix renames the class to ``EmailRequest`` (12 characters) to fit within the constraint. A backwards compatibility alias ensures existing DB entries with `'EmailNotificationRequest'` can still be deserialized via getattr lookup. The 20-character limit is arbitrary and does not affect performance. In a follow-up PR for 3.2, we should increase this to 50+ characters to accommodate descriptive class names without requiring abbreviations. Fixes apache#56426
1 parent c636f47 commit a18fc01

File tree

5 files changed

+27
-26
lines changed

5 files changed

+27
-26
lines changed

airflow-core/src/airflow/callbacks/callback_requests.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def is_failure_callback(self) -> bool:
7777
}
7878

7979

80-
class EmailNotificationRequest(BaseCallbackRequest):
80+
class EmailRequest(BaseCallbackRequest):
8181
"""Email notification request for task failures/retries."""
8282

8383
ti: ti_datamodel.TaskInstance
@@ -86,7 +86,7 @@ class EmailNotificationRequest(BaseCallbackRequest):
8686
"""Whether this is for a failure or retry email"""
8787
context_from_server: ti_datamodel.TIRunContext
8888
"""Task execution context from the Server"""
89-
type: Literal["EmailNotificationRequest"] = "EmailNotificationRequest"
89+
type: Literal["EmailRequest"] = "EmailRequest"
9090

9191

9292
class DagRunContext(BaseModel):
@@ -108,6 +108,9 @@ class DagCallbackRequest(BaseCallbackRequest):
108108

109109

110110
CallbackRequest = Annotated[
111-
DagCallbackRequest | TaskCallbackRequest | EmailNotificationRequest,
111+
DagCallbackRequest | TaskCallbackRequest | EmailRequest,
112112
Field(discriminator="type"),
113113
]
114+
115+
# Backwards compatibility alias
116+
EmailNotificationRequest = EmailRequest

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from airflow.callbacks.callback_requests import (
3232
CallbackRequest,
3333
DagCallbackRequest,
34-
EmailNotificationRequest,
34+
EmailRequest,
3535
TaskCallbackRequest,
3636
)
3737
from airflow.configuration import conf
@@ -243,7 +243,7 @@ def _execute_callbacks(
243243
_execute_task_callbacks(dagbag, request, log)
244244
elif isinstance(request, DagCallbackRequest):
245245
_execute_dag_callbacks(dagbag, request, log)
246-
elif isinstance(request, EmailNotificationRequest):
246+
elif isinstance(request, EmailRequest):
247247
_execute_email_callbacks(dagbag, request, log)
248248

249249

@@ -354,9 +354,7 @@ def get_callback_representation(callback):
354354
log.exception("Error in callback at index %d: %s", idx, callback_repr)
355355

356356

357-
def _execute_email_callbacks(
358-
dagbag: DagBag, request: EmailNotificationRequest, log: FilteringBoundLogger
359-
) -> None:
357+
def _execute_email_callbacks(dagbag: DagBag, request: EmailRequest, log: FilteringBoundLogger) -> None:
360358
"""Execute email notification for task failure/retry."""
361359
dag = dagbag.dags[request.ti.dag_id]
362360
task = dag.get_task(request.ti.task_id)

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from airflow.callbacks.callback_requests import (
4444
DagCallbackRequest,
4545
DagRunContext,
46-
EmailNotificationRequest,
46+
EmailRequest,
4747
TaskCallbackRequest,
4848
)
4949
from airflow.configuration import conf
@@ -959,7 +959,7 @@ def process_executor_events(
959959
"Sending email request for task %s to DAG Processor",
960960
ti,
961961
)
962-
email_request = EmailNotificationRequest(
962+
email_request = EmailRequest(
963963
filepath=ti.dag_model.relative_fileloc,
964964
bundle_name=ti.dag_version.bundle_name,
965965
bundle_version=ti.dag_version.bundle_version,

airflow-core/tests/unit/callbacks/test_callback_requests.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
CallbackRequest,
3333
DagCallbackRequest,
3434
DagRunContext,
35-
EmailNotificationRequest,
35+
EmailRequest,
3636
TaskCallbackRequest,
3737
)
3838
from airflow.models.dag import DAG
@@ -314,9 +314,9 @@ def test_dag_callback_request_serialization_with_context(self):
314314
assert result.context_from_server.last_ti.task_id == "test_task"
315315

316316

317-
class TestEmailNotificationRequest:
317+
class TestEmailRequest:
318318
def test_email_notification_request_serialization(self):
319-
"""Test EmailNotificationRequest can be serialized and used in CallbackRequest union."""
319+
"""Test EmailRequest can be serialized and used in CallbackRequest union."""
320320
ti_data = TIDataModel(
321321
id=str(uuid.uuid4()),
322322
task_id="test_task",
@@ -331,8 +331,8 @@ def test_email_notification_request_serialization(self):
331331

332332
current_time = timezone.utcnow()
333333

334-
# Create EmailNotificationRequest
335-
email_request = EmailNotificationRequest(
334+
# Create EmailRequest
335+
email_request = EmailRequest(
336336
filepath="/path/to/dag.py",
337337
bundle_name="test_bundle",
338338
bundle_version="1.0.0",
@@ -359,17 +359,17 @@ def test_email_notification_request_serialization(self):
359359

360360
# Test serialization
361361
json_str = email_request.to_json()
362-
assert "EmailNotificationRequest" in json_str
362+
assert "EmailRequest" in json_str
363363
assert "failure" in json_str
364364

365365
# Test deserialization
366-
result = EmailNotificationRequest.from_json(json_str)
366+
result = EmailRequest.from_json(json_str)
367367
assert result == email_request
368368
assert result.email_type == "failure"
369369
assert result.ti.task_id == "test_task"
370370

371371
def test_callback_request_union_with_email_notification(self):
372-
"""Test EmailNotificationRequest works in CallbackRequest union type."""
372+
"""Test EmailRequest works in CallbackRequest union type."""
373373
ti_data = TIDataModel(
374374
id=str(uuid.uuid4()),
375375
task_id="test_task",
@@ -402,7 +402,7 @@ def test_callback_request_union_with_email_notification(self):
402402
)
403403

404404
email_data = {
405-
"type": "EmailNotificationRequest",
405+
"type": "EmailRequest",
406406
"filepath": "/path/to/dag.py",
407407
"bundle_name": "test_bundle",
408408
"bundle_version": "1.0.0",
@@ -416,7 +416,7 @@ def test_callback_request_union_with_email_notification(self):
416416
adapter = TypeAdapter(CallbackRequest)
417417
callback_request = adapter.validate_python(email_data)
418418

419-
# Verify it's correctly identified as EmailNotificationRequest
420-
assert isinstance(callback_request, EmailNotificationRequest)
419+
# Verify it's correctly identified as EmailRequest
420+
assert isinstance(callback_request, EmailRequest)
421421
assert callback_request.email_type == "retry"
422422
assert callback_request.ti.task_id == "test_task"

airflow-core/tests/unit/dag_processing/test_processor.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
CallbackRequest,
4646
DagCallbackRequest,
4747
DagRunContext,
48-
EmailNotificationRequest,
48+
EmailRequest,
4949
TaskCallbackRequest,
5050
)
5151
from airflow.dag_processing.dagbag import DagBag
@@ -1303,7 +1303,7 @@ def test_execute_email_callbacks_failure(self, mock_send_email):
13031303
)
13041304

13051305
current_time = timezone.utcnow()
1306-
request = EmailNotificationRequest(
1306+
request = EmailRequest(
13071307
filepath="/path/to/dag.py",
13081308
bundle_name="test_bundle",
13091309
bundle_version="1.0.0",
@@ -1363,7 +1363,7 @@ def test_execute_email_callbacks_retry(self, mock_send_email):
13631363

13641364
current_time = timezone.utcnow()
13651365

1366-
request = EmailNotificationRequest(
1366+
request = EmailRequest(
13671367
filepath="/path/to/dag.py",
13681368
bundle_name="test_bundle",
13691369
bundle_version="1.0.0",
@@ -1422,7 +1422,7 @@ def test_execute_email_callbacks_no_email_configured(self, mock_send_email):
14221422
)
14231423

14241424
current_time = timezone.utcnow()
1425-
request = EmailNotificationRequest(
1425+
request = EmailRequest(
14261426
filepath="/path/to/dag.py",
14271427
bundle_name="test_bundle",
14281428
bundle_version="1.0.0",
@@ -1480,7 +1480,7 @@ def test_execute_email_callbacks_email_disabled_for_type(self, mock_send_email):
14801480
current_time = timezone.utcnow()
14811481

14821482
# Create request for failure (but email_on_failure is False)
1483-
request = EmailNotificationRequest(
1483+
request = EmailRequest(
14841484
filepath="/path/to/dag.py",
14851485
bundle_name="test_bundle",
14861486
bundle_version="1.0.0",

0 commit comments

Comments
 (0)