Skip to content

Commit c83b0b9

Browse files
authored
feat: added support for azure event hub (#2078)
* feat: added support for azure event hub This PR adds support for azure_event_hub with the Microsoft Entra OAuth credentials. You would need to provide the following * azure_tenant_id * azure_client_id * azure_client_secret * azure_namespace * azure_event_hub_name * chore: removed unwanted packages in test
1 parent fd7b3b6 commit c83b0b9

File tree

4 files changed

+444
-0
lines changed

4 files changed

+444
-0
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import asyncio
2+
import json
3+
import logging
4+
from typing import Any
5+
6+
from azure.eventhub.aio import EventHubConsumerClient
7+
from azure.identity.aio import ClientSecretCredential
8+
9+
DOCUMENTATION = r"""
10+
---
11+
short_description: Receive events via a Azure Event Hub
12+
description:
13+
- An ansible-rulebook event source plugin for receiving events
14+
via Azure Event Hub
15+
options:
16+
azure_tenant_id:
17+
description:
18+
- The azure tenant id
19+
type: str
20+
required: true
21+
azure_client_id:
22+
description:
23+
- The azure client id
24+
type: str
25+
required: true
26+
azure_client_secret:
27+
description:
28+
- The azure client secret
29+
type: str
30+
required: true
31+
azure_namespace:
32+
description:
33+
- The azure event hub namespace which includes the host name
34+
type: str
35+
example: "test.servicebus.windows.net",
36+
required: true
37+
azure_event_hub_name:
38+
description:
39+
- The azure event hub name
40+
type: str
41+
required: true
42+
azure_starting_position:
43+
description:
44+
- The starting position
45+
type: str
46+
default: "-1"
47+
azure_consumer_group:
48+
description:
49+
- The name of the consumer group
50+
type: str
51+
default: "$Default"
52+
"""
53+
54+
EXAMPLES = r"""
55+
- azure.azcollection.azure_event_hub:
56+
"azure_tenant_id": "your_tenant_id"
57+
"azure_client_id": "your_client_id"
58+
"azure_client_secret": "your_client_secret"
59+
"azure_namespace": "example.servicebus.windows.net"
60+
"azure_event_hub_name": "your_hub_name"
61+
"azure_starting_position": "-1"
62+
"""
63+
64+
logger = logging.getLogger()
65+
66+
67+
REQUIRED_ARGS = [
68+
"azure_tenant_id",
69+
"azure_client_id",
70+
"azure_client_secret",
71+
"azure_namespace",
72+
"azure_event_hub_name",
73+
]
74+
75+
76+
class AzureHubConsumer:
77+
def __init__(self, queue: asyncio.Queue[Any], args: dict[str, Any]) -> None:
78+
self.queue = queue
79+
tenant_id = args.get("azure_tenant_id")
80+
client_id = args.get("azure_client_id")
81+
client_secret = args.get("azure_client_secret")
82+
83+
self.event_hub_namespace = args.get("azure_namespace")
84+
self.event_hub_name = args.get("azure_event_hub_name")
85+
86+
self.consumer_group = args.get("azure_consumer_group", "$Default")
87+
self.starting_position = int(args.get("azure_starting_position", "-1"))
88+
89+
self.credential = ClientSecretCredential(
90+
tenant_id=tenant_id,
91+
client_id=client_id,
92+
client_secret=client_secret,
93+
)
94+
95+
async def on_event(self, partition_context, event):
96+
if event:
97+
await partition_context.update_checkpoint(event)
98+
meta = {}
99+
# Process message body
100+
try:
101+
value = event.body_as_str()
102+
logger.debug(
103+
"Received event from partition %s: %s",
104+
str(partition_context.partition_id),
105+
value,
106+
)
107+
except UnicodeError:
108+
logger.exception("Unicode error while decoding message body")
109+
data = None
110+
else:
111+
try:
112+
data = json.loads(value)
113+
except json.decoder.JSONDecodeError:
114+
logger.info("JSON decode error, storing raw value")
115+
data = value
116+
117+
# Add data to the event and put it into the queue
118+
if data:
119+
await self.queue.put({"body": data, "meta": meta})
120+
121+
await asyncio.sleep(0)
122+
123+
async def start_receiving(self):
124+
client = EventHubConsumerClient(
125+
fully_qualified_namespace=self.event_hub_namespace,
126+
eventhub_name=self.event_hub_name,
127+
consumer_group=self.consumer_group,
128+
credential=self.credential,
129+
)
130+
async with client:
131+
await client.receive(self.on_event)
132+
133+
134+
# Usage
135+
async def main( # pylint: disable=R0914
136+
queue: asyncio.Queue[Any],
137+
args: dict[str, Any],
138+
) -> None:
139+
140+
for key in REQUIRED_ARGS:
141+
if key not in args:
142+
msg = f"Please provide {key} it is a required argument."
143+
raise ValueError(msg)
144+
145+
consumer = AzureHubConsumer(queue, args)
146+
await consumer.start_receiving()
147+
148+
149+
if __name__ == "__main__":
150+
151+
class MockQueue(asyncio.Queue[Any]):
152+
"""A fake queue."""
153+
154+
async def put(self: "MockQueue", event: dict[str, Any]) -> None:
155+
"""Print the event."""
156+
print(event) # noqa: T201
157+
158+
test_args = {
159+
"azure_tenant_id": "your_tenant_id",
160+
"azure_client_id": "your_client_id",
161+
"azure_client_secret": "your_client_secret",
162+
"azure_namespace": "example.servicebus.windows.net",
163+
"azure_event_hub_name": "your_hub_name",
164+
"azure_starting_position": "-1",
165+
}
166+
167+
asyncio.run(
168+
main(
169+
MockQueue(),
170+
test_args,
171+
),
172+
)
173+
asyncio.run(main())

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,5 @@ pandas
5858
oras
5959
netaddr
6060
azure-servicebus
61+
azure-eventhub
62+
aiohttp

0 commit comments

Comments
 (0)