-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patho365_container_puller.py
More file actions
executable file
·225 lines (181 loc) · 8.24 KB
/
o365_container_puller.py
File metadata and controls
executable file
·225 lines (181 loc) · 8.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!/usr/bin/env python3
"""
Office 365 Audit Log Collector for Azure Container Instances
This script collects Office 365 audit logs and forwards them to Azure Event Hub.
Designed to run as a scheduled container without storage dependencies.
Environment Variables Required:
- TENANT_ID: Azure AD Tenant ID
- CLIENT_ID: Azure AD Application ID
- CLIENT_SECRET: Azure AD Application Secret
- EVENT_HUB_CONNECTION_STR: Event Hub connection string
- EVENT_HUB_NAME: Event Hub name
"""
import datetime
import logging
import os
import requests
import json
import time
import sys
from azure.eventhub import EventHubProducerClient, EventData
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# --- Configuration ---
# Environment variables for security
TENANT_ID = os.environ.get("TENANT_ID")
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
EVENT_HUB_CONNECTION_STR = os.environ.get("EVENT_HUB_CONNECTION_STR")
EVENT_HUB_NAME = os.environ.get("EVENT_HUB_NAME")
# Office 365 Management API details
# Commercial is the default. Uncomment the lines for your specific cloud if needed.
API_BASE_URL = "https://manage.office.com"
TOKEN_RESOURCE = "https://manage.office.com"
# Content types to collect
CONTENT_TYPES = [
"Audit.AzureActiveDirectory",
"Audit.Exchange",
"Audit.SharePoint",
"Audit.General",
"DLP.All"
]
def get_access_token():
"""Authenticates with Azure AD to get an API access token."""
logging.info("Getting Access Token")
url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/token"
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"resource": TOKEN_RESOURCE,
}
response = requests.post(url, data=payload)
response.raise_for_status()
access_token = response.json().get("access_token")
if not access_token:
raise ValueError("Failed to get access token. Response did not contain one.")
logging.info("Successfully retrieved access token")
return access_token
def ensure_subscriptions(access_token):
"""Ensures subscriptions for all content types are active."""
api_root = f"{API_BASE_URL}/api/v1.0/{TENANT_ID}/activity/feed"
headers = {"Authorization": f"Bearer {access_token}"}
for content_type in CONTENT_TYPES:
logging.info(f"Ensuring subscription for content type: {content_type}")
url = f"{api_root}/subscriptions/start?contentType={content_type}&PublisherIdentifier={TENANT_ID}"
response = requests.post(url, headers=headers)
if response.status_code in [200, 409]: # 409 is "already exists" which is fine
logging.info(f"Subscription active for {content_type}")
else:
logging.warning(f"Could not ensure subscription for {content_type}. Status: {response.status_code}")
def list_available_content(access_token, content_type):
"""Lists all available content blobs for a specific content type."""
api_root = f"{API_BASE_URL}/api/v1.0/{TENANT_ID}/activity/feed"
url = f"{api_root}/subscriptions/content?contentType={content_type}&PublisherIdentifier={TENANT_ID}"
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(url, headers=headers)
if response.status_code == 404:
logging.info(f"No content available for {content_type}")
return []
response.raise_for_status()
content_list = response.json()
logging.info(f"Found {len(content_list)} content blobs for {content_type}")
return content_list
def process_content_blobs(access_token, content_list, content_type):
"""Downloads content blobs and sends their records to Event Hub."""
if not content_list:
return 0
total_events = 0
producer = EventHubProducerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR,
eventhub_name=EVENT_HUB_NAME
)
with producer:
for content_item in content_list:
content_uri = content_item.get("contentUri")
if not content_uri:
continue
logging.info(f"Processing content blob for {content_type}")
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(content_uri, headers=headers)
if not response.ok:
logging.warning(f"Failed to download content blob. Status: {response.status_code}")
continue
try:
log_records = response.json()
if not isinstance(log_records, list):
logging.warning(f"Expected a list of records but got {type(log_records)}. Skipping.")
continue
event_data_batch = producer.create_batch()
batch_count = 0
for record in log_records:
# Add metadata about the content type
record['_contentType'] = content_type
record['_collectionTime'] = datetime.datetime.utcnow().isoformat()
try:
event_data = EventData(json.dumps(record))
event_data_batch.add(event_data)
batch_count += 1
except ValueError:
# Batch is full, send and start a new one
producer.send_batch(event_data_batch)
logging.info(f"Sent batch of {batch_count} records to Event Hub")
total_events += batch_count
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(json.dumps(record)))
batch_count = 1
# Send any remaining events
if batch_count > 0:
producer.send_batch(event_data_batch)
logging.info(f"Sent final batch of {batch_count} records to Event Hub")
total_events += batch_count
except json.JSONDecodeError:
logging.warning("Could not decode JSON from content blob. Skipping.")
continue
# Rate limiting - be nice to the API
time.sleep(1)
return total_events
def main():
"""Main function to run the log retrieval and sending process."""
start_time = datetime.datetime.utcnow()
logging.info(f"Starting Office 365 log collection at {start_time.isoformat()}")
# Check for all required environment variables
if not all([TENANT_ID, CLIENT_ID, CLIENT_SECRET, EVENT_HUB_CONNECTION_STR, EVENT_HUB_NAME]):
logging.error("One or more required environment variables are not set:")
logging.error("Required: TENANT_ID, CLIENT_ID, CLIENT_SECRET, EVENT_HUB_CONNECTION_STR, EVENT_HUB_NAME")
return 1
try:
# Get access token
access_token = get_access_token()
# Ensure all subscriptions are active
ensure_subscriptions(access_token)
# Brief pause to let subscriptions propagate
time.sleep(5)
# Process each content type
total_events_collected = 0
for content_type in CONTENT_TYPES:
try:
content_list = list_available_content(access_token, content_type)
events_sent = process_content_blobs(access_token, content_list, content_type)
total_events_collected += events_sent
except Exception as e:
logging.error(f"Error processing {content_type}: {e}")
continue
end_time = datetime.datetime.utcnow()
duration = (end_time - start_time).total_seconds()
logging.info(f"Collection completed successfully")
logging.info(f"Total events sent to Event Hub: {total_events_collected}")
logging.info(f"Execution time: {duration:.2f} seconds")
return 0
except Exception as e:
logging.error(f"Fatal error during execution: {e}", exc_info=True)
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)