Skip to content

Commit a8cd72f

Browse files
committed
Add Azure Log Analytics Ingestion API plugin
The Ingestion API allows sending data to a Log Analytics workspace in Azure Monitor.
1 parent bc99432 commit a8cd72f

File tree

3 files changed

+383
-0
lines changed

3 files changed

+383
-0
lines changed

.github/BOTMETA.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ files:
6565
$callbacks/log_plays.py: {}
6666
$callbacks/loganalytics.py:
6767
maintainers: zhcli
68+
$callbacks/loganalytics_ingestion.py:
69+
maintainers: pboushy vsh47 wtcline-intc
6870
$callbacks/logdna.py: {}
6971
$callbacks/logentries.py: {}
7072
$callbacks/logstash.py:
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
#!/usr/bin/python
2+
# -*- coding: utf-8 -*-
3+
4+
# Copyright: Contributors to the Ansible project
5+
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
6+
# SPDX-License-Identifier: GPL-3.0-or-later
7+
8+
DOCUMENTATION = '''
9+
name: loganalytics_ingestion
10+
type: notification
11+
short_description: Posts task results to an Azure Log Analytics workspace using the new Logs Ingestion API
12+
author:
13+
- Wade Cline (@wtcline-intc) <[email protected]>
14+
- Sriramoju Vishal Bharath (@sriramoj) <[email protected]>
15+
- Cyrus Li (@zhcli) <[email protected]>
16+
description:
17+
- This callback plugin will post task results in JSON format to an Azure Log Analytics workspace using the new Logs Ingestion API.
18+
version_added: "10.7.0"
19+
requirements:
20+
- The callback plugin has been enabled
21+
- an Azure Log Analytics workspace has been established
22+
- a Data Collection Rule (DCR) and custom table are created
23+
options:
24+
dce_url:
25+
description: URL of the Data Collection Endpoint (DCE) for Azure Logs Ingestion API.
26+
type: str
27+
required: true
28+
env:
29+
- name: LOGANALYTICS_DCE_URL
30+
ini:
31+
- section: callback_loganalytics
32+
key: dce_url
33+
dcr_id:
34+
description: Data Collection Rule (DCR) ID for the Azure Log Ingestion API.
35+
type: str
36+
required: true
37+
env:
38+
- name: LOGANALYTICS_DCR_ID
39+
ini:
40+
- section: callback_loganalytics
41+
key: dcr_id
42+
disable_attempts:
43+
description:
44+
- When O(disable_on_failure=true), number of module failures that must occur before the module is disabled.
45+
- This helps prevent outright module failure from a single, transient network issue.
46+
type: int
47+
required: false
48+
default: 3
49+
env:
50+
- name: LOGANALYTICS_DISABLE_ATTEMPTS
51+
ini:
52+
- section: callback_loganalytics
53+
key: disable_attempts
54+
disable_on_failure:
55+
description: Stop trying to send data on module failure.
56+
type: bool
57+
required: false
58+
default: true
59+
env:
60+
- name: LOGANALYTICS_DISABLE_ON_FAILURE
61+
ini:
62+
- section: callback_loganalytics
63+
key: disable_on_failure
64+
client_id:
65+
description: Client ID of the Azure App registration for OAuth2 authentication ("Modern Authentication").
66+
type: str
67+
required: true
68+
env:
69+
- name: LOGANALYTICS_CLIENT_ID
70+
ini:
71+
- section: callback_loganalytics
72+
key: client_id
73+
client_secret:
74+
description: Client Secret of the Azure App registration.
75+
type: str
76+
required: true
77+
env:
78+
- name: LOGANALYTICS_CLIENT_SECRET
79+
ini:
80+
- section: callback_loganalytics
81+
key: client_secret
82+
include_content:
83+
description: Send the content to the Azure Log Analytics workspace.
84+
type: bool
85+
required: false
86+
default: false
87+
env:
88+
- name: LOGANALYTICS_INCLUDE_CONTENT
89+
ini:
90+
- section: callback_loganalytics
91+
key: include_content
92+
include_task_args:
93+
description: Send the task args to the Azure Log Analytics workspace.
94+
type: bool
95+
required: false
96+
default: false
97+
env:
98+
- name: LOGANALYTICS_INCLUDE_TASK_ARGS
99+
ini:
100+
- section: callback_loganalytics
101+
key: include_task_args
102+
stream_name:
103+
description: The name of the stream used to send the logs to the Azure Log Analytics workspace.
104+
type: str
105+
required: true
106+
env:
107+
- name: LOGANALYTICS_STREAM_NAME
108+
ini:
109+
- section: callback_loganalytics
110+
key: stream_name
111+
tenant_id:
112+
description: Tenant ID for the Azure Active Directory.
113+
type: str
114+
required: true
115+
env:
116+
- name: LOGANALYTICS_TENANT_ID
117+
ini:
118+
- section: callback_loganalytics
119+
key: tenant_id
120+
seealso:
121+
- name: Logs Ingestion API
122+
description: Overview of Logs Ingestion API in Azure Monitor
123+
link: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
124+
'''
125+
126+
EXAMPLES = '''
127+
examples: |
128+
Enable the plugin in ansible.cfg:
129+
[defaults]
130+
callback_enabled = community.general.loganalytics_ingestion
131+
Set the environment variables:
132+
export LOGANALYTICS_DCE_URL=https://my-dce.ingest.monitor.azure.com
133+
export LOGANALYTICS_DCR_ID=dcr-xxxxxx
134+
export LOGANALYTICS_CLIENT_ID=xxxxxxxx
135+
export LOGANALYTICS_CLIENT_SECRET=xxxxxxxx
136+
export LOGANALYTICS_TENANT_ID=xxxxxxxx
137+
export LOGANALYTICS_STREAM_NAME=Custom-MyTable
138+
'''
139+
140+
import base64
141+
import getpass
142+
import hashlib
143+
import json
144+
import requests
145+
import socket
146+
import time
147+
import uuid
148+
from datetime import datetime, timezone, timedelta
149+
from os.path import basename
150+
from requests.adapters import HTTPAdapter
151+
from urllib3.util.retry import Retry
152+
153+
from ansible.plugins.callback import CallbackBase
154+
from ansible.utils.display import Display
155+
156+
display = Display()
157+
158+
class AzureLogAnalyticsIngestionSource(object):
159+
160+
def __init__(self, dce_url, dcr_id, disable_attempts, disable_on_failure, client_id, client_secret, tenant_id, stream_name, include_task_args, include_content):
161+
self.dce_url = dce_url
162+
self.dcr_id = dcr_id
163+
self.disabled = False
164+
self.disable_attempts = disable_attempts
165+
self.disable_on_failure = disable_on_failure
166+
self.client_id = client_id
167+
self.client_secret = client_secret
168+
self.failures = 0
169+
self.tenant_id = tenant_id
170+
self.stream_name = stream_name
171+
self.include_task_args = include_task_args
172+
self.include_content = include_content
173+
self.token_expiration_time = None
174+
self.requests_session = requests.Session()
175+
self.bearer_token = self.get_bearer_token()
176+
self.session = str(uuid.uuid4())
177+
self.host = socket.gethostname()
178+
self.user = getpass.getuser()
179+
180+
# OAuth2 authentication method to get a Bearer token
181+
# This replaces the shared_key authentication mechanism
182+
def get_bearer_token(self):
183+
url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
184+
payload = {
185+
'grant_type': 'client_credentials',
186+
'client_id': self.client_id,
187+
'client_secret': self.client_secret,
188+
# The scope value comes from https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview#headers and https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#the-default-scope
189+
'scope': 'https://monitor.azure.com/.default'
190+
}
191+
response = self.requests_session.post(url, data=payload)
192+
response.raise_for_status()
193+
self.token_expiration_time=datetime.now()+timedelta(seconds=response.json().get("expires_in"))
194+
return response.json().get('access_token')
195+
196+
def is_token_valid(self):
197+
return True if (datetime.now() + timedelta(seconds=10))< self.token_expiration_time else False
198+
199+
# Method to send event data to the Azure Logs Ingestion API
200+
# This replaces the legacy API call and now uses the Logs Ingestion API endpoint
201+
def send_event(self, event_data):
202+
if not(self.is_token_valid()):
203+
self.bearer_token = self.get_bearer_token()
204+
ingestion_url = f"{self.dce_url}/dataCollectionRules/{self.dcr_id}/streams/{self.stream_name}?api-version=2023-01-01"
205+
headers = {
206+
'Authorization': f"Bearer {self.bearer_token}",
207+
'Content-Type': 'application/json'
208+
}
209+
response = self.requests_session.post(ingestion_url, headers=headers, json=event_data, timeout=2)
210+
response.raise_for_status()
211+
212+
def _rfc1123date(self):
213+
return datetime.now(timezone.utc).strftime('%a, %d %b %Y %H:%M:%S GMT')
214+
215+
# This method wraps the private method with the appropriate error handling.
216+
def send_to_loganalytics(self, playbook_name, result, state):
217+
if self.disabled:
218+
return
219+
try:
220+
self._send_to_loganalytics(playbook_name, result, state)
221+
except Exception as e:
222+
display.warning(f"LogAnalytics Ingestion callback module failure: {e}.")
223+
if self.disable_on_failure:
224+
self.failures += 1
225+
if self.failures >= self.disable_attempts:
226+
display.warning(f"LogAnalytics module failures exceed maximum of '{self.disable_attempts}'! Disabling module!")
227+
self.disabled = True
228+
else:
229+
display.v(f"LogAnalytics module failure {self.failures}/{self.disable_attempts}")
230+
231+
def _send_to_loganalytics(self, playbook_name, result, state):
232+
ansible_check_mode = None
233+
if result._task_fields['args'].get('_ansible_check_mode') is True:
234+
ansible_check_mode = True
235+
236+
ansible_version = None
237+
if result._task_fields['args'].get('_ansible_version'):
238+
ansible_version = result._task_fields['args'].get('_ansible_version')
239+
240+
ansible_role = None
241+
if result._task._role:
242+
ansible_role = str(result._task._role)
243+
244+
#Include/Exclude task args
245+
if not(self.include_task_args):
246+
result._task_fields.pop('args', None)
247+
248+
#Include/Exclude content
249+
if not(self.include_content):
250+
result._result.pop('content', None)
251+
252+
# Build the event data
253+
event_data = [{
254+
"TimeGenerated": self._rfc1123date(),
255+
"Host": result._host.name,
256+
"User": self.user,
257+
"Playbook": playbook_name,
258+
"Role": ansible_role,
259+
"TaskName": result._task.get_name(),
260+
"Task": result._task_fields,
261+
"Action": result._task_fields['action'],
262+
"State": state,
263+
"Result": result._result,
264+
"Session": self.session
265+
}]
266+
#Display event data
267+
display.vvv(f"Event Data :{str(event_data)}")
268+
269+
# Send the event data using the new Logs Ingestion API method
270+
self.send_event(event_data)
271+
272+
273+
class CallbackModule(CallbackBase):
274+
CALLBACK_VERSION = 2.0
275+
CALLBACK_TYPE = 'notification'
276+
CALLBACK_NAME = 'loganalytics_ingestion'
277+
CALLBACK_NEEDS_ENABLED = True
278+
279+
def __init__(self, display=None):
280+
super(CallbackModule, self).__init__(display=display)
281+
self.start_datetimes = {}
282+
self.playbook_name = None
283+
self.azure_loganalytics = None
284+
285+
def set_options(self, task_keys=None, var_options=None, direct=None):
286+
super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct)
287+
288+
# Set options for the new Azure Logs Ingestion API configuration
289+
self.client_id = self.get_option('client_id')
290+
self.client_secret = self.get_option('client_secret')
291+
self.dce_url = self.get_option('dce_url')
292+
self.dcr_id = self.get_option('dcr_id')
293+
self.disable_attempts = self.get_option('disable_attempts')
294+
self.disable_on_failure = self.get_option('disable_on_failure')
295+
self.include_content = self.get_option('include_content')
296+
self.include_task_args = self.get_option('include_task_args')
297+
self.stream_name = self.get_option('stream_name')
298+
self.tenant_id = self.get_option('tenant_id')
299+
self.validate_inputs()
300+
301+
# Initialize the AzureLogAnalyticsIngestionSource with the new settings
302+
self.azure_loganalytics = AzureLogAnalyticsIngestionSource(
303+
self.dce_url, self.dcr_id, self.disable_attempts, self.disable_on_failure, self.client_id, self.client_secret, self.tenant_id, self.stream_name, self.include_task_args, self.include_content
304+
)
305+
306+
# Input checks
307+
def validate_inputs(self):
308+
# Type checks
309+
boolean_checks = [
310+
(self.include_task_args, "INCLUDE_TASK_ARGS must be true or false"),
311+
(self.include_content, "INCLUDE_CONTENT must be true or false")
312+
]
313+
for attribute, message in boolean_checks:
314+
if not isinstance(attribute, bool):
315+
Display().warning(message)
316+
317+
def v2_playbook_on_start(self, playbook):
318+
self.playbook_name = basename(playbook._file_name)
319+
320+
# Build event data and send it to the Logs Ingestion API
321+
def v2_runner_on_failed(self, result, **kwargs):
322+
self.azure_loganalytics.send_to_loganalytics(self.playbook_name, result, "FAILED")
323+
324+
def v2_runner_on_ok(self, result, **kwargs):
325+
self.azure_loganalytics.send_to_loganalytics(self.playbook_name, result, "OK")
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright (c) Ansible project
2+
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
3+
# SPDX-License-Identifier: GPL-3.0-or-later
4+
5+
from datetime import datetime
6+
import json
7+
import time
8+
import unittest
9+
import unittest.mock
10+
import urllib
11+
12+
from ansible.executor.task_result import TaskResult
13+
from ansible_collections.community.general.plugins.callback.loganalytics_ingestion import AzureLogAnalyticsIngestionSource
14+
15+
class TestAzureLogAnalyticsIngestion(unittest.TestCase):
16+
dce_url = "https://fake.dce_url.ansible.com"
17+
dcr_id = "fake-dcr-id"
18+
client_id = "fake-client_id"
19+
client_secret = "fake-client-secret"
20+
tenant_id = "fake-tenant-id"
21+
stream_name = "fake-stream-name"
22+
23+
@unittest.mock.patch("ansible_collections.community.general.plugins.callback.loganalytics_ingestion.requests.Session", autospec=True)
24+
@unittest.mock.patch("ansible_collections.community.general.plugins.callback.loganalytics_ingestion.requests.Response", autospec=True)
25+
def setUp(self, ResponseMock, SessionMock):
26+
# Generate a fake access token.
27+
response_mock = ResponseMock()
28+
response_mock.json.return_value = {"expires_in": time.time() + 3600, "access_token": "fake_access_token"}
29+
SessionMock.return_value.post.return_value = response_mock
30+
31+
# TODO: How to set module default arguments?
32+
# I tried instantiating the 'CallbackModule' but all it ever did was complain that 'client_id' wasn't defined.
33+
self.loganalytics = AzureLogAnalyticsIngestionSource(
34+
self.dce_url, self.dcr_id, 3, True,
35+
self.client_id, self.client_secret, self.tenant_id,
36+
self.stream_name, False, False
37+
)
38+
39+
@unittest.mock.patch("ansible.executor.task_result.TaskResult")
40+
def test_sending_data(self, MockTaskResult):
41+
"""
42+
Tests sending data by verifying that the expected POST requests are submitted to the expected hosts.
43+
"""
44+
self.loganalytics.send_to_loganalytics("fake_playbook", MockTaskResult(), "OK")
45+
self.loganalytics.send_to_loganalytics("fake_playbook", MockTaskResult(), "FAILED")
46+
self.loganalytics.send_to_loganalytics("fake_playbook", MockTaskResult(), "OK")
47+
48+
# One POST request for authentication, three POST requests for sending data (one for each task).
49+
assert self.loganalytics.requests_session.post.call_count == 4
50+
# First POST request for login.
51+
url = urllib.parse.urlparse(self.loganalytics.requests_session.post.call_args_list[0][0][0])
52+
assert url.netloc == "login.microsoftonline.com"
53+
# Three POST requests to the DCE.
54+
for i in range(1, 3):
55+
url = urllib.parse.urlparse(self.loganalytics.requests_session.post.call_args_list[i][0][0])
56+
assert url.scheme + "://" + url.netloc == self.dce_url

0 commit comments

Comments
 (0)