Skip to content

Commit 6f87c8f

Browse files
authored
Refactoring ZeroToOneTargetScaler (Azure#53263)
1 parent 419e61c commit 6f87c8f

File tree

6 files changed

+336
-187
lines changed

6 files changed

+336
-187
lines changed

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobLogListener.cs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
using System;
55
using System.Collections.Generic;
66
using System.Globalization;
7+
using System.IO;
78
using System.Linq;
89
using System.Text;
910
using System.Threading;
1011
using System.Threading.Tasks;
1112
using Azure.Storage.Blobs;
1213
using Azure.Storage.Blobs.Models;
1314
using Azure.Storage.Blobs.Specialized;
14-
using Microsoft.Azure.WebJobs.Host.Timers;
1515
using Microsoft.Extensions.Logging;
1616

1717
namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
@@ -77,6 +77,72 @@ public async Task<IEnumerable<BlobWithContainer<BlobBaseClient>>> GetRecentBlobW
7777
return blobs;
7878
}
7979

80+
public async Task<StorageAnalyticsLogEntry> GetFirstLogEntryWithWritesAsync(string containerName, CancellationToken cancellationToken, int hoursWindow = DefaultScanHoursWindow)
81+
{
82+
if (hoursWindow <= 0)
83+
{
84+
return null;
85+
}
86+
87+
DateTime hourCursor = DateTime.UtcNow;
88+
BlobContainerClient containerClient = _blobClient.GetBlobContainerClient(LogContainer);
89+
90+
for (int hourIndex = 0; hourIndex < hoursWindow; hourIndex++)
91+
{
92+
cancellationToken.ThrowIfCancellationRequested();
93+
94+
string prefix = GetSearchPrefix("blob", hourCursor, hourCursor);
95+
96+
await foreach (BlobItem blob in containerClient
97+
.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefix, states: BlobStates.None, cancellationToken: cancellationToken)
98+
.ConfigureAwait(false))
99+
{
100+
cancellationToken.ThrowIfCancellationRequested();
101+
102+
if (blob.Metadata is not null &&
103+
blob.Metadata.TryGetValue(LogType, out string logType) &&
104+
!string.IsNullOrEmpty(logType) &&
105+
logType.IndexOf("write", StringComparison.OrdinalIgnoreCase) >= 0)
106+
{
107+
BlobClient logBlobClient = containerClient.GetBlobClient(blob.Name);
108+
109+
using (Stream stream = await logBlobClient.OpenReadAsync(options: default, cancellationToken: cancellationToken).ConfigureAwait(false))
110+
{
111+
using (StreamReader reader = new StreamReader(stream))
112+
{
113+
int lineNumber = 0;
114+
while (!reader.EndOfStream)
115+
{
116+
cancellationToken.ThrowIfCancellationRequested();
117+
string line = await reader.ReadLineAsync().ConfigureAwait(false);
118+
lineNumber++;
119+
120+
if (line != null)
121+
{
122+
var entry = _parser.ParseLine(line, logBlobClient.Name, lineNumber.ToString(CultureInfo.InvariantCulture));
123+
if (entry != null && entry.IsBlobWrite)
124+
{
125+
var path = entry.ToBlobPath();
126+
if (path != null &&
127+
string.Equals(path.ContainerName, containerName, StringComparison.OrdinalIgnoreCase))
128+
{
129+
// If we found a valid write entry, we can stop searching.
130+
return entry;
131+
}
132+
}
133+
}
134+
}
135+
}
136+
}
137+
}
138+
}
139+
140+
hourCursor = hourCursor.AddHours(-1);
141+
}
142+
143+
return null;
144+
}
145+
80146
internal static IEnumerable<BlobPath> GetPathsForValidBlobWrites(IEnumerable<StorageAnalyticsLogEntry> entries)
81147
{
82148
IEnumerable<BlobPath> parsedBlobPaths = from entry in entries

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobScalerMonitorProvider.cs

Lines changed: 0 additions & 163 deletions
This file was deleted.

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/StorageAnalyticsLogParser.cs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -93,28 +93,9 @@ public async Task<IEnumerable<StorageAnalyticsLogEntry>> ParseLogAsync(BlobBaseC
9393
break;
9494
}
9595

96-
Version version = TryParseVersion(line);
97-
if (version == null)
96+
var entry = ParseLine(line, blob.Name, lineNumber.ToString(CultureInfo.CurrentCulture));
97+
if (entry != null)
9898
{
99-
string message = String.Format(CultureInfo.CurrentCulture,
100-
"Unable to detect a version of log entry on line {1} of Storage Analytics log file '{0}'.",
101-
blob.Name, lineNumber);
102-
103-
_logger.LogWarning(message);
104-
}
105-
106-
if (version == supportedVersion)
107-
{
108-
StorageAnalyticsLogEntry entry = TryParseLogEntry(line);
109-
if (entry == null)
110-
{
111-
string message = String.Format(CultureInfo.CurrentCulture,
112-
"Unable to parse the log entry on line {1} of Storage Analytics log file '{0}'.",
113-
blob.Name, lineNumber);
114-
115-
_logger.LogWarning(message);
116-
}
117-
11899
entries.Add(entry);
119100
}
120101
}
@@ -174,5 +155,33 @@ from Match m in _compiledRegex.Matches(line)
174155

175156
return StorageAnalyticsLogEntry.TryParse(fieldsArray);
176157
}
158+
159+
internal StorageAnalyticsLogEntry ParseLine(string line, string blobName, string lineNumber)
160+
{
161+
Version version = TryParseVersion(line);
162+
if (version == null)
163+
{
164+
string message = String.Format(CultureInfo.CurrentCulture,
165+
"Unable to detect a version of log entry on line {1} of Storage Analytics log file '{0}'.",
166+
blobName, lineNumber);
167+
168+
_logger.LogWarning(message);
169+
}
170+
171+
if (version == supportedVersion)
172+
{
173+
StorageAnalyticsLogEntry entry = TryParseLogEntry(line);
174+
if (entry == null)
175+
{
176+
string message = String.Format(CultureInfo.CurrentCulture,
177+
"Unable to parse the log entry on line {1} of Storage Analytics log file '{0}'.",
178+
blobName, lineNumber);
179+
180+
_logger.LogWarning(message);
181+
}
182+
return entry;
183+
}
184+
return null;
185+
}
177186
}
178187
}

0 commit comments

Comments
 (0)