Skip to content

Commit 2766531

Browse files
committed
Refactor audit log ingestion to use orchestrated activities
Refactored Push-AuditLogIngestion to split content listing, downloading, and result aggregation into orchestrated activity functions for improved scalability and maintainability. Added Push-AuditLogIngestionDownload and Push-AuditLogIngestionResults activity functions. Updated orchestration logic in CippEntrypoints.psm1 to support a 'NoScaling' mode. Improved null handling for cached group and device lookups in Test-CIPPAuditLogRules.
1 parent d241d67 commit 2766531

File tree

5 files changed

+265
-168
lines changed

5 files changed

+265
-168
lines changed

Modules/CIPPCore/Public/Entrypoints/Activity Triggers/Webhooks/Push-AuditLogIngestion.ps1

Lines changed: 46 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ function Push-AuditLogIngestion {
2323
$SwInit = [System.Diagnostics.Stopwatch]::StartNew()
2424
$AuditLogStateTable = Get-CippTable -TableName 'AuditLogState'
2525
$CacheWebhooksTable = Get-CippTable -TableName 'CacheWebhooks'
26+
$CacheWebhooks = Get-CIPPAzDataTableEntity @CacheWebhooksTable -Filter "PartitionKey eq '$TenantFilter'"
27+
2628
$SwInit.Stop()
2729
$Timings['Initialization'] = $SwInit.Elapsed.TotalMilliseconds
2830

@@ -138,191 +140,76 @@ function Push-AuditLogIngestion {
138140
return $true
139141
}
140142

141-
$TotalProcessedRecords = 0
142143
$Now = Get-Date
143144

144-
$SwContentList = 0
145-
$SwContentFilter = 0
146-
$SwBlobDownload = 0
147-
$SwRecordCache = 0
145+
# Step 1: List content for each enabled content type (sequential) WITHOUT invoking activities
146+
$AllContentItems = @()
148147

149148
foreach ($ContentType in $EnabledContentTypes) {
150149
try {
151-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Processing content type: $ContentType" -sev Debug
152-
153-
$StateRowKey = "$TenantFilter-$ContentType"
154-
$StateEntity = $StateCache[$ContentType]
155-
156-
if ($StateEntity -and $StateEntity.LastContentCreatedUtc) { $StartTime = ([DateTime]$StateEntity.LastContentCreatedUtc).AddMinutes(-5).ToUniversalTime() } else { $StartTime = $Now.AddHours(-1).ToUniversalTime() }
157-
$EndTime = $Now.AddMinutes(-5).ToUniversalTime()
158-
$StartTimeStr = $StartTime.ToString('yyyy-MM-ddTHH:mm:ss')
159-
$EndTimeStr = $EndTime.ToString('yyyy-MM-ddTHH:mm:ss')
160-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Polling $ContentType from $StartTimeStr to $EndTimeStr" -sev Debug
161-
$ContentUri = "https://manage.office.com/api/v1.0/$TenantId/activity/feed/subscriptions/content?contentType=$ContentType&startTime=$StartTimeStr&endTime=$EndTimeStr"
162-
$ContentParams = @{
163-
Uri = $ContentUri
150+
$listUri = "https://manage.office.com/api/v1.0/$TenantId/activity/feed/subscriptions/content?contentType=$ContentType"
151+
$params = @{
164152
scope = 'https://manage.office.com/.default'
153+
Uri = $listUri
165154
TenantId = $TenantFilter
166155
}
167156

168-
$SwList = [System.Diagnostics.Stopwatch]::StartNew()
169-
try {
170-
$ContentList = New-GraphGetRequest @ContentParams -ErrorAction Stop
171-
} catch {
172-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Failed to list content for $ContentType : $($_.Exception.Message)" -sev Error
173-
continue
174-
}
175-
$SwList.Stop()
176-
$SwContentList += $SwList.Elapsed.TotalMilliseconds
177-
178-
if (!$ContentList -or ($ContentList | Measure-Object).Count -eq 0) {
179-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "No new content available for $ContentType" -sev Debug
180-
continue
181-
}
182-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Found $($ContentList.Count) content blobs for $ContentType" -sev Info
183-
184-
$SwFilter = [System.Diagnostics.Stopwatch]::StartNew()
185-
$NewContentItems = if ($StateEntity -and $StateEntity.LastContentId) {
186-
$LastContentCreated = [DateTime]$StateEntity.LastContentCreatedUtc
187-
$LastContentId = $StateEntity.LastContentId
188-
189-
foreach ($Content in $ContentList) {
190-
$ContentCreated = [DateTime]$Content.contentCreated
191-
if ($ContentCreated -gt $LastContentCreated -or
192-
($ContentCreated -eq $LastContentCreated -and $Content.contentId -ne $LastContentId)) {
193-
$Content
194-
}
195-
}
196-
} else {
197-
$ContentList
198-
}
199-
$SwFilter.Stop()
200-
$SwContentFilter += $SwFilter.Elapsed.TotalMilliseconds
201-
202-
if (($NewContentItems | Measure-Object).Count -eq 0) {
203-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "No new content items for $ContentType (all already processed)" -sev Debug
204-
continue
205-
}
206-
207-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Processing $($NewContentItems.Count) new content items for $ContentType" -sev Info
208-
209-
$LatestContentCreated = $null
210-
$LatestContentId = $null
211-
$ProcessedRecords = 0
157+
$contentPage = New-GraphGetRequest @params -ErrorAction Stop
158+
if ($contentPage -and $contentPage.Count -gt 0) {
212159

213-
foreach ($ContentItem in $NewContentItems) {
214-
try {
215-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Downloading content blob for $ContentType" -sev Debug
216-
217-
$SwBlob = [System.Diagnostics.Stopwatch]::StartNew()
218-
$BlobParams = @{
219-
scope = 'https://manage.office.com/.default'
220-
Uri = $ContentItem.contentUri
221-
TenantId = $TenantFilter
222-
}
223-
224-
$BlobResponse = New-GraphGetRequest @BlobParams -ErrorAction Stop
225-
226-
if ($BlobResponse -is [string]) {
227-
$AuditRecords = $BlobResponse | ConvertFrom-Json -Depth 5
228-
} else {
229-
$AuditRecords = $BlobResponse
230-
}
231-
$SwBlob.Stop()
232-
$SwBlobDownload += $SwBlob.Elapsed.TotalMilliseconds
233160

234-
if (!$AuditRecords) {
235-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "No records in blob for $ContentType" -sev Warn
161+
$AllContentItems = foreach ($ci in $contentPage) {
162+
if ($CacheWebhooks.ContentId -contains $ci.contentId) {
163+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Content item $($ci.contentId) for $ContentType already cached, skipping" -sev Debug
236164
continue
237165
}
238-
239-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Caching $($AuditRecords.Count) audit records for $ContentType" -sev Debug
240-
241-
$SwCache = [System.Diagnostics.Stopwatch]::StartNew()
242-
$CacheEntities = [System.Collections.Generic.List[hashtable]]::new()
243-
foreach ($Record in $AuditRecords) {
244-
$CacheEntities.Add(@{
245-
RowKey = $Record.Id
246-
PartitionKey = $TenantFilter
247-
JSON = [string]($Record | ConvertTo-Json -Depth 10 -Compress)
248-
ContentId = $ContentItem.contentId
249-
ContentType = $ContentType
250-
})
251-
}
252-
253-
if ($CacheEntities.Count -gt 0) {
254-
try {
255-
Add-CIPPAzDataTableEntity @CacheWebhooksTable -Entity $CacheEntities -Force
256-
$ProcessedRecords += $CacheEntities.Count
257-
} catch {
258-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Failed to batch cache records for $ContentType : $($_.Exception.Message)" -sev Error
259-
}
260-
}
261-
$SwCache.Stop()
262-
$SwRecordCache += $SwCache.Elapsed.TotalMilliseconds
263-
264-
$ContentCreated = [DateTime]$ContentItem.contentCreated
265-
if (!$LatestContentCreated -or $ContentCreated -gt $LatestContentCreated) {
266-
$LatestContentCreated = $ContentCreated
267-
$LatestContentId = $ContentItem.contentId
268-
}
269-
270-
} catch {
271-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Failed to download/process content blob for $ContentType : $($_.Exception.Message)" -sev Error
272-
continue
273-
}
274-
}
275-
276-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Cached $ProcessedRecords audit records for $ContentType" -sev Info
277-
$TotalProcessedRecords += $ProcessedRecords
278-
279-
if ($LatestContentCreated) {
280-
if (!$StateUpdates[$ContentType]) {
281-
$StateUpdates[$ContentType] = @{
282-
PartitionKey = 'AuditLogState'
283-
RowKey = $StateRowKey
166+
@{
167+
FunctionName = 'AuditLogIngestionDownload'
168+
TenantFilter = $TenantFilter
169+
TenantId = $TenantId
284170
ContentType = $ContentType
171+
ContentItem = $ci
285172
}
286173
}
287-
$StateUpdates[$ContentType].SubscriptionEnabled = $true
288-
$StateUpdates[$ContentType].LastContentCreatedUtc = $LatestContentCreated.ToString('yyyy-MM-ddTHH:mm:ss')
289-
$StateUpdates[$ContentType].LastContentId = $LatestContentId
290-
$StateUpdates[$ContentType].LastProcessedUtc = $Now.ToUniversalTime().ToString('yyyy-MM-ddTHH:mm:ss')
291-
292-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Updated watermark for $ContentType to $($LatestContentCreated.ToString('yyyy-MM-ddTHH:mm:ss'))" -sev Debug
293174
}
294-
295175
} catch {
296-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Error processing content type $ContentType : $($_.Exception.Message)" -sev Error -LogData (Get-CippException -Exception $_)
176+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Error listing content for $ContentType : $($_.Exception.Message)" -sev Error -LogData (Get-CippException -Exception $_)
297177
continue
298178
}
299179
}
300180

301-
$Timings['ContentList'] = $SwContentList
302-
$Timings['ContentFilter'] = $SwContentFilter
303-
$Timings['BlobDownload'] = $SwBlobDownload
304-
$Timings['RecordCache'] = $SwRecordCache
305-
306-
$SwStateWrite = [System.Diagnostics.Stopwatch]::StartNew()
307-
if ($StateUpdates.Count -gt 0) {
308-
$UpdateEntities = @($StateUpdates.Values)
309-
Add-CIPPAzDataTableEntity @AuditLogStateTable -Entity $UpdateEntities -Force
181+
if ($AllContentItems.Count -eq 0) {
182+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message 'No content items to download' -sev Info
183+
if ($StateUpdates.Count -gt 0) {
184+
$UpdateEntities = @($StateUpdates.Values)
185+
Add-CIPPAzDataTableEntity @AuditLogStateTable -Entity $UpdateEntities -Force
186+
}
187+
return $true
310188
}
311-
$SwStateWrite.Stop()
312-
$Timings['StateWrite'] = $SwStateWrite.Elapsed.TotalMilliseconds
313189

314-
$TotalStopwatch.Stop()
315-
$TotalMs = $TotalStopwatch.Elapsed.TotalMilliseconds
190+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Found $($AllContentItems.Count) total content items to process across all types" -sev Info
316191

317-
$TimingReport = "AUDITLOG: Total: $([math]::Round($TotalMs, 2))ms"
318-
foreach ($Key in ($Timings.Keys | Sort-Object)) {
319-
$Ms = [math]::Round($Timings[$Key], 2)
320-
$Pct = [math]::Round(($Timings[$Key] / $TotalMs) * 100, 1)
321-
$TimingReport += " | $Key : $Ms ms ($Pct %)"
322-
}
323-
Write-Host $TimingReport
324192

325-
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Completed ingestion: $TotalProcessedRecords total records cached" -sev Info
193+
$TotalStopwatch.Stop()
194+
# Step 2: Start NoScaling orchestrator to process items sequentially and run post-exec aggregation
195+
try {
196+
$InputObject = [PSCustomObject]@{
197+
OrchestratorName = 'AuditLogDownload'
198+
DurableMode = 'NoScaling'
199+
Batch = @($AllContentItems)
200+
PostExecution = @{
201+
FunctionName = 'AuditLogIngestionResults'
202+
Parameters = @{
203+
TenantFilter = $TenantFilter
204+
TotalStopwatch = $TotalStopwatch.Elapsed.TotalMilliseconds
205+
}
206+
}
207+
SkipLog = $true
208+
}
209+
Start-NewOrchestration -FunctionName 'CIPPOrchestrator' -InputObject ($InputObject | ConvertTo-Json -Depth 5 -Compress)
210+
} catch {
211+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Error starting orchestrator: $($_.Exception.Message)" -sev Error -LogData (Get-CippException -Exception $_)
212+
}
326213

327214
return $true
328215

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
function Push-AuditLogIngestionDownload {
2+
<#
3+
.SYNOPSIS
4+
Download and cache audit log content blobs
5+
.DESCRIPTION
6+
Activity function that downloads a single content blob and caches records
7+
.FUNCTIONALITY
8+
Entrypoint
9+
#>
10+
param($Item)
11+
12+
try {
13+
$TenantFilter = $Item.TenantFilter
14+
$ContentType = $Item.ContentType
15+
$ContentItem = $Item.ContentItem
16+
$CacheWebhooksTable = Get-CippTable -tablename 'CacheWebhooks'
17+
18+
$DownloadStopwatch = [System.Diagnostics.Stopwatch]::StartNew()
19+
20+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Downloading content blob for $ContentType" -sev Debug
21+
22+
$BlobParams = @{
23+
scope = 'https://manage.office.com/.default'
24+
Uri = $ContentItem.contentUri
25+
TenantId = $TenantFilter
26+
}
27+
28+
$BlobResponse = New-GraphGetRequest @BlobParams -ErrorAction Stop
29+
30+
if ($BlobResponse -is [string]) {
31+
$AuditRecords = $BlobResponse | ConvertFrom-Json -Depth 5
32+
} else {
33+
$AuditRecords = $BlobResponse
34+
}
35+
36+
if (!$AuditRecords) {
37+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "No records in blob for $ContentType" -sev Warn
38+
$DownloadStopwatch.Stop()
39+
return @{
40+
Success = $true
41+
ProcessedRecords = 0
42+
Timings = @{ Download = $DownloadStopwatch.Elapsed.TotalMilliseconds }
43+
}
44+
}
45+
46+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Caching $($AuditRecords.Count) audit records for $ContentType" -sev Debug
47+
48+
$CacheStopwatch = [System.Diagnostics.Stopwatch]::StartNew()
49+
$CacheEntities = [System.Collections.Generic.List[hashtable]]::new()
50+
foreach ($Record in $AuditRecords) {
51+
$CacheEntities.Add(@{
52+
RowKey = $Record.Id
53+
PartitionKey = $TenantFilter
54+
JSON = [string]($Record | ConvertTo-Json -Depth 10 -Compress)
55+
ContentId = $ContentItem.contentId
56+
ContentType = $ContentType
57+
})
58+
}
59+
60+
if ($CacheEntities.Count -gt 0) {
61+
try {
62+
Add-CIPPAzDataTableEntity @CacheWebhooksTable -Entity $CacheEntities -Force
63+
} catch {
64+
Write-LogMessage -API 'AuditLogIngestion' -tenant $TenantFilter -message "Failed to batch cache records for $ContentType : $($_.Exception.Message)" -sev Error
65+
throw
66+
}
67+
}
68+
$CacheStopwatch.Stop()
69+
70+
$DownloadStopwatch.Stop()
71+
72+
return @{
73+
Success = $true
74+
TenantFilter = $TenantFilter
75+
ContentType = $ContentType
76+
ProcessedRecords = $CacheEntities.Count
77+
ContentCreated = [DateTime]$ContentItem.contentCreated
78+
ContentId = $ContentItem.contentId
79+
Timings = @{
80+
Download = $DownloadStopwatch.Elapsed.TotalMilliseconds
81+
Cache = $CacheStopwatch.Elapsed.TotalMilliseconds
82+
}
83+
}
84+
85+
} catch {
86+
Write-LogMessage -API 'AuditLogIngestion' -tenant $Item.TenantFilter -message "Error downloading content blob: $($_.Exception.Message)" -sev Error -LogData (Get-CippException -Exception $_)
87+
return @{
88+
Success = $false
89+
Error = $_.Exception.Message
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)