Skip to content

Commit c3daef9

Browse files
authored
HTTP Notifier implementation (apache#56160)
1 parent a8f2d97 commit c3daef9

File tree

8 files changed

+261
-9
lines changed

8 files changed

+261
-9
lines changed

dev/breeze/tests/test_selective_checks.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
670670
"providers/http/tests/file.py",
671671
),
672672
{
673-
"selected-providers-list-as-string": "amazon apache.livy atlassian.jira dbt.cloud dingding discord google http pagerduty",
673+
"selected-providers-list-as-string": "amazon apache.livy atlassian.jira common.compat dbt.cloud dingding discord google http pagerduty",
674674
"all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
675675
"all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
676676
"python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -691,7 +691,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
691691
[
692692
{
693693
"description": "amazon...google",
694-
"test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]",
694+
"test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,common.compat,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]",
695695
}
696696
]
697697
),
@@ -702,18 +702,21 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
702702
"test_types": "Providers[amazon] Providers[apache.livy]",
703703
},
704704
{
705-
"description": "atlassian.jir...dbt.cloud",
706-
"test_types": "Providers[atlassian.jira] Providers[dbt.cloud]",
705+
"description": "atlassian.jir...common.compat",
706+
"test_types": "Providers[atlassian.jira] Providers[common.compat]",
707707
},
708708
{
709-
"description": "dingding...discord",
710-
"test_types": "Providers[dingding] Providers[discord]",
709+
"description": "dbt.cloud...dingding",
710+
"test_types": "Providers[dbt.cloud] Providers[dingding]",
711711
},
712712
{
713-
"description": "google...http",
714-
"test_types": "Providers[google] Providers[http]",
713+
"description": "discord...google",
714+
"test_types": "Providers[discord] Providers[google]",
715+
},
716+
{
717+
"description": "http...pagerduty",
718+
"test_types": "Providers[http] Providers[pagerduty]",
715719
},
716-
{"description": "pagerduty", "test_types": "Providers[pagerduty]"},
717720
]
718721
),
719722
"run-mypy": "true",

providers/http/provider.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ hooks:
9797
python-modules:
9898
- airflow.providers.http.hooks.http
9999

100+
notifications:
101+
- airflow.providers.http.notifications.HttpNotifier
102+
100103
triggers:
101104
- integration-name: Hypertext Transfer Protocol (HTTP)
102105
python-modules:

providers/http/pyproject.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,19 @@ dependencies = [
6666
"asgiref>=2.3.0",
6767
]
6868

69+
# The optional dependencies should be modified in place in the generated file
70+
# Any change in the dependencies is preserved when the file is regenerated
71+
[project.optional-dependencies]
72+
"common.compat" = [
73+
"apache-airflow-providers-common-compat"
74+
]
75+
6976
[dependency-groups]
7077
dev = [
7178
"apache-airflow",
7279
"apache-airflow-task-sdk",
7380
"apache-airflow-devel-common",
81+
"apache-airflow-providers-common-compat",
7482
# Additional devel dependencies (do not remove this line and add extra development dependencies)
7583
]
7684

providers/http/src/airflow/providers/http/get_provider_info.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def get_provider_info():
5353
"python-modules": ["airflow.providers.http.hooks.http"],
5454
}
5555
],
56+
"notifications": ["airflow.providers.http.notifications.HttpNotifier"],
5657
"triggers": [
5758
{
5859
"integration-name": "Hypertext Transfer Protocol (HTTP)",
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
from airflow.providers.http.notifications.http import HttpNotifier
20+
21+
__all__ = ["HttpNotifier"]
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from functools import cached_property
21+
from typing import TYPE_CHECKING, Any
22+
23+
import aiohttp
24+
25+
from airflow.providers.common.compat.notifier import BaseNotifier
26+
from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook
27+
28+
if TYPE_CHECKING:
29+
from airflow.sdk.definitions.context import Context
30+
31+
32+
class HttpNotifier(BaseNotifier):
33+
"""
34+
HTTP Notifier.
35+
36+
Sends HTTP requests to notify external systems.
37+
38+
:param http_conn_id: HTTP connection id that has the base URL and optional authentication credentials.
39+
:param endpoint: The endpoint to be called i.e. resource/v1/query?
40+
:param method: The HTTP method to use. Defaults to POST.
41+
:param data: Payload to be uploaded or request parameters
42+
:param json: JSON payload to be uploaded
43+
:param headers: Additional headers to be passed through as a dictionary
44+
:param extra_options: Additional options to be used when executing the request
45+
"""
46+
47+
template_fields = ("http_conn_id", "endpoint", "data", "json", "headers", "extra_options")
48+
49+
def __init__(
50+
self,
51+
*,
52+
http_conn_id: str = HttpHook.default_conn_name,
53+
endpoint: str | None = None,
54+
method: str = "POST",
55+
data: dict[str, Any] | str | None = None,
56+
json: dict[str, Any] | str | None = None,
57+
headers: dict[str, Any] | None = None,
58+
extra_options: dict[str, Any] | None = None,
59+
**kwargs,
60+
):
61+
super().__init__(**kwargs)
62+
self.http_conn_id = http_conn_id
63+
self.endpoint = endpoint
64+
self.method = method
65+
self.data = data
66+
self.json = json
67+
self.headers = headers
68+
self.extra_options = extra_options or {}
69+
70+
@cached_property
71+
def hook(self) -> HttpHook:
72+
"""HTTP Hook."""
73+
return HttpHook(method=self.method, http_conn_id=self.http_conn_id)
74+
75+
@cached_property
76+
def async_hook(self) -> HttpAsyncHook:
77+
"""HTTP Async Hook."""
78+
return HttpAsyncHook(method=self.method, http_conn_id=self.http_conn_id)
79+
80+
def notify(self, context: Context) -> None:
81+
"""Send HTTP notification (sync)."""
82+
resp = self.hook.run(
83+
endpoint=self.endpoint,
84+
data=self.data,
85+
headers=self.headers,
86+
extra_options=self.extra_options,
87+
json=self.json,
88+
)
89+
self.log.debug("HTTP notification sent: %s %s", resp.status_code, resp.url)
90+
91+
async def async_notify(self, context: Context) -> None:
92+
"""Send HTTP notification (async)."""
93+
async with aiohttp.ClientSession() as session:
94+
resp = await self.async_hook.run(
95+
session=session,
96+
endpoint=self.endpoint,
97+
data=self.data,
98+
json=self.json,
99+
headers=self.headers,
100+
extra_options=self.extra_options,
101+
)
102+
self.log.debug("HTTP notification sent (async): %s %s", resp.status, resp.url)
103+
104+
105+
send_http_notification = HttpNotifier
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from unittest import mock
21+
22+
import pytest
23+
24+
from airflow.providers.http.notifications.http import HttpNotifier, send_http_notification
25+
26+
27+
class TestHttpNotifier:
28+
def test_class_and_notifier_are_same(self):
29+
assert send_http_notification is HttpNotifier
30+
31+
@mock.patch("airflow.providers.http.notifications.http.HttpHook")
32+
def test_http_notifier(self, mock_http_hook):
33+
notifier = HttpNotifier(
34+
http_conn_id="test_conn_id",
35+
endpoint="/testing",
36+
method="POST",
37+
json={"message": "testing"},
38+
headers={"Content-Type": "application/json"},
39+
)
40+
notifier.notify({})
41+
42+
mock_http_hook.return_value.run.assert_called_once_with(
43+
endpoint="/testing",
44+
data=None,
45+
headers={"Content-Type": "application/json"},
46+
extra_options={},
47+
json={"message": "testing"},
48+
)
49+
mock_http_hook.assert_called_once_with(method="POST", http_conn_id="test_conn_id")
50+
51+
@pytest.mark.asyncio
52+
@mock.patch("airflow.providers.http.notifications.http.HttpAsyncHook")
53+
@mock.patch("aiohttp.ClientSession")
54+
async def test_async_http_notifier(self, mock_session, mock_http_async_hook):
55+
mock_hook = mock_http_async_hook.return_value
56+
mock_hook.run = mock.AsyncMock()
57+
58+
notifier = HttpNotifier(
59+
http_conn_id="test_conn_id",
60+
endpoint="/test",
61+
method="POST",
62+
json={"message": "test"},
63+
)
64+
65+
await notifier.async_notify({})
66+
67+
mock_hook.run.assert_called_once_with(
68+
session=mock_session.return_value.__aenter__.return_value,
69+
endpoint="/test",
70+
data=None,
71+
json={"message": "test"},
72+
headers=None,
73+
extra_options={},
74+
)
75+
76+
@mock.patch("airflow.providers.http.notifications.http.HttpHook")
77+
def test_http_notifier_templated(self, mock_http_hook, create_dag_without_db):
78+
notifier = HttpNotifier(
79+
endpoint="/{{ dag.dag_id }}",
80+
json={"dag_id": "{{ dag.dag_id }}", "user": "{{ username }}"},
81+
)
82+
notifier(
83+
{
84+
"dag": create_dag_without_db("test_http_notification_templated"),
85+
"username": "test-user",
86+
}
87+
)
88+
89+
mock_http_hook.return_value.run.assert_called_once_with(
90+
endpoint="/test_http_notification_templated",
91+
data=None,
92+
headers=None,
93+
extra_options={},
94+
json={"dag_id": "test_http_notification_templated", "user": "test-user"},
95+
)

0 commit comments

Comments
 (0)