Skip to content

Commit d0410b6

Browse files
committed
Publish Function execution activity in Linux consumption
1 parent a09891a commit d0410b6

19 files changed

+1033
-180
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
5+
6+
namespace Microsoft.Azure.WebJobs.Script.WebHost.ContainerManagement
7+
{
8+
public interface ILinuxContainerActivityPublisher
9+
{
10+
void PublishFunctionExecutionActivity(ContainerFunctionExecutionActivity activity);
11+
}
12+
}
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Microsoft.Azure.WebJobs.Script.WebHost.Management;
10+
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
11+
using Microsoft.Extensions.Hosting;
12+
using Microsoft.Extensions.Logging;
13+
using Microsoft.Extensions.Options;
14+
15+
namespace Microsoft.Azure.WebJobs.Script.WebHost.ContainerManagement
16+
{
17+
public class LinuxContainerActivityPublisher : IHostedService, IDisposable, ILinuxContainerActivityPublisher
18+
{
19+
private const int FlushIntervalMs = 20 * 1000; // 20 seconds
20+
private const int LockTimeOutMs = 1 * 1000; // 1 second
21+
22+
private readonly ReaderWriterLockSlim _activitiesLock = new ReaderWriterLockSlim();
23+
private readonly IMeshServiceClient _meshServiceClient;
24+
private readonly ILogger<LinuxContainerActivityPublisher> _logger;
25+
private readonly int _flushIntervalMs;
26+
private readonly IOptionsMonitor<StandbyOptions> _standbyOptions;
27+
private readonly HashSet<ContainerFunctionExecutionActivity> _uniqueActivities;
28+
private IDisposable _standbyOptionsOnChangeSubscription;
29+
private DateTime _lastHeartBeatTime = DateTime.MinValue;
30+
private Timer _timer;
31+
private int _flushInProgress;
32+
33+
public LinuxContainerActivityPublisher(IOptionsMonitor<StandbyOptions> standbyOptions,
34+
IMeshServiceClient meshServiceClient, IEnvironment environment,
35+
ILogger<LinuxContainerActivityPublisher> logger, int flushIntervalMs = FlushIntervalMs)
36+
{
37+
if (!environment.IsLinuxConsumption())
38+
{
39+
throw new NotSupportedException(
40+
$"{nameof(LinuxContainerActivityPublisher)} is available in Linux consumption environment only");
41+
}
42+
43+
_standbyOptions = standbyOptions ?? throw new ArgumentNullException(nameof(standbyOptions));
44+
_meshServiceClient = meshServiceClient;
45+
_logger = logger;
46+
_flushIntervalMs = flushIntervalMs;
47+
_timer = new Timer(OnTimer, null, Timeout.Infinite, Timeout.Infinite);
48+
_uniqueActivities = new HashSet<ContainerFunctionExecutionActivity>();
49+
_flushInProgress = 0;
50+
}
51+
52+
private void Start()
53+
{
54+
_logger.LogInformation($"Starting {nameof(LinuxContainerActivityPublisher)}");
55+
56+
// start the timer by setting the due time
57+
SetTimerInterval(_flushIntervalMs);
58+
}
59+
60+
private void OnStandbyOptionsChange()
61+
{
62+
_logger.LogInformation($"Triggering {nameof(OnStandbyOptionsChange)}");
63+
64+
if (!_standbyOptions.CurrentValue.InStandbyMode)
65+
{
66+
Start();
67+
}
68+
}
69+
70+
public Task StartAsync(CancellationToken cancellationToken)
71+
{
72+
_logger.LogInformation($"Initializing {nameof(LinuxContainerActivityPublisher)}");
73+
74+
if (_standbyOptions.CurrentValue.InStandbyMode)
75+
{
76+
_logger.LogInformation($"Registering {nameof(_standbyOptionsOnChangeSubscription)}");
77+
_standbyOptionsOnChangeSubscription = _standbyOptions.OnChange(o => OnStandbyOptionsChange());
78+
}
79+
else
80+
{
81+
Start();
82+
}
83+
84+
return Task.CompletedTask;
85+
}
86+
87+
public Task StopAsync(CancellationToken cancellationToken)
88+
{
89+
_logger.LogInformation($"Stopping {nameof(LinuxContainerActivityPublisher)}");
90+
91+
// stop the timer if it has been started
92+
_timer?.Change(Timeout.Infinite, Timeout.Infinite);
93+
94+
return Task.CompletedTask;
95+
}
96+
97+
private async void OnTimer(object state)
98+
{
99+
await FlushFunctionExecutionActivities();
100+
SetTimerInterval(_flushIntervalMs);
101+
}
102+
103+
private async Task FlushFunctionExecutionActivities()
104+
{
105+
try
106+
{
107+
if (Interlocked.CompareExchange(ref _flushInProgress, 1, 0) == 0)
108+
{
109+
try
110+
{
111+
var currentActivities = new List<ContainerFunctionExecutionActivity>();
112+
if (TryGetCurrentActivities(currentActivities))
113+
{
114+
if (_lastHeartBeatTime.AddMinutes(5) < DateTime.UtcNow)
115+
{
116+
_logger.LogDebug($"Current activities count = {currentActivities.Count}");
117+
_lastHeartBeatTime = DateTime.UtcNow;
118+
}
119+
120+
if (currentActivities.Any())
121+
{
122+
_logger.LogDebug($"Flushing {currentActivities.Count} function activities");
123+
await _meshServiceClient.PublishContainerActivity(currentActivities);
124+
}
125+
}
126+
else
127+
{
128+
_logger.LogWarning($"Failed to fetch {nameof(ContainerFunctionExecutionActivity)}");
129+
}
130+
}
131+
finally
132+
{
133+
_flushInProgress = 0;
134+
}
135+
}
136+
}
137+
catch (Exception exc) when (!exc.IsFatal())
138+
{
139+
_logger.LogError(exc, $"{nameof(FlushFunctionExecutionActivities)}");
140+
}
141+
}
142+
143+
private void SetTimerInterval(int dueTime)
144+
{
145+
var timer = _timer;
146+
try
147+
{
148+
timer?.Change(dueTime, Timeout.Infinite);
149+
}
150+
catch (ObjectDisposedException)
151+
{
152+
// might race with dispose
153+
}
154+
catch (Exception e)
155+
{
156+
_logger.LogError(e, $"{nameof(SetTimerInterval)}");
157+
}
158+
}
159+
160+
public void Dispose()
161+
{
162+
if (_timer != null)
163+
{
164+
_timer?.Dispose();
165+
_timer = null;
166+
}
167+
}
168+
169+
private bool PublishActivity(ContainerFunctionExecutionActivity activity)
170+
{
171+
if (_activitiesLock.TryEnterWriteLock(LockTimeOutMs))
172+
{
173+
try
174+
{
175+
_uniqueActivities.Add(activity);
176+
}
177+
finally
178+
{
179+
_activitiesLock.ExitWriteLock();
180+
}
181+
return true;
182+
}
183+
184+
return false;
185+
}
186+
187+
private bool TryGetCurrentActivities(IList<ContainerFunctionExecutionActivity> currentActivities)
188+
{
189+
if (_activitiesLock.TryEnterWriteLock(LockTimeOutMs))
190+
{
191+
try
192+
{
193+
foreach (var activity in _uniqueActivities)
194+
{
195+
currentActivities.Add(activity);
196+
}
197+
_uniqueActivities.Clear();
198+
}
199+
finally
200+
{
201+
_activitiesLock.ExitWriteLock();
202+
}
203+
return true;
204+
}
205+
206+
return false;
207+
}
208+
209+
public void PublishFunctionExecutionActivity(ContainerFunctionExecutionActivity activity)
210+
{
211+
if (!_standbyOptions.CurrentValue.InStandbyMode)
212+
{
213+
if (!PublishActivity(activity))
214+
{
215+
_logger.LogWarning($"Failed to add activity {activity}");
216+
}
217+
}
218+
}
219+
}
220+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
8+
using Microsoft.Extensions.Hosting;
9+
using Microsoft.Extensions.Logging;
10+
11+
namespace Microsoft.Azure.WebJobs.Script.WebHost.ContainerManagement
12+
{
13+
public class NullLinuxContainerActivityPublisher : ILinuxContainerActivityPublisher, IHostedService
14+
{
15+
public NullLinuxContainerActivityPublisher(ILogger<NullLinuxContainerActivityPublisher> logger)
16+
{
17+
var nullLogger = logger ?? throw new ArgumentNullException(nameof(logger));
18+
nullLogger.LogDebug($"Initializing {nameof(NullLinuxContainerActivityPublisher)}");
19+
}
20+
21+
public void PublishFunctionExecutionActivity(ContainerFunctionExecutionActivity activity)
22+
{
23+
//do nothing
24+
}
25+
26+
public Task StartAsync(CancellationToken cancellationToken)
27+
{
28+
return Task.CompletedTask;
29+
}
30+
31+
public Task StopAsync(CancellationToken cancellationToken)
32+
{
33+
return Task.CompletedTask;
34+
}
35+
}
36+
}

src/WebJobs.Script.WebHost/Diagnostics/MetricsEventManager.cs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111
using Microsoft.Azure.WebJobs.Script.Configuration;
1212
using Microsoft.Azure.WebJobs.Script.Description;
1313
using Microsoft.Azure.WebJobs.Script.Diagnostics;
14+
using Microsoft.Azure.WebJobs.Script.WebHost.ContainerManagement;
15+
using Microsoft.Azure.WebJobs.Script.WebHost.Management;
1416
using Microsoft.Azure.WebJobs.Script.WebHost.Metrics;
1517
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
18+
using Microsoft.Extensions.Logging;
1619
using Microsoft.Extensions.Options;
1720

1821
namespace Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics
@@ -27,11 +30,12 @@ public class MetricsEventManager : IDisposable
2730
private readonly int _functionActivityFlushIntervalSeconds;
2831
private readonly Timer _metricsFlushTimer;
2932
private readonly object _functionActivityTrackerLockObject = new object();
33+
private readonly IMetricsPublisher _metricsPublisher;
34+
private readonly ILinuxContainerActivityPublisher _linuxContainerActivityPublisher;
3035
private bool _disposed;
31-
private IMetricsPublisher _metricsPublisher;
3236
private IOptionsMonitor<AppServiceOptions> _appServiceOptions;
3337

34-
public MetricsEventManager(IOptionsMonitor<AppServiceOptions> appServiceOptions, IEventGenerator generator, int functionActivityFlushIntervalSeconds, IMetricsPublisher metricsPublisher, int metricsFlushIntervalMS = DefaultFlushIntervalMS)
38+
public MetricsEventManager(IOptionsMonitor<AppServiceOptions> appServiceOptions, IEventGenerator generator, int functionActivityFlushIntervalSeconds, IMetricsPublisher metricsPublisher, ILinuxContainerActivityPublisher linuxContainerActivityPublisher, int metricsFlushIntervalMS = DefaultFlushIntervalMS)
3539
{
3640
// we read these in the ctor (not static ctor) since it can change on the fly
3741
_appServiceOptions = appServiceOptions;
@@ -43,6 +47,7 @@ public MetricsEventManager(IOptionsMonitor<AppServiceOptions> appServiceOptions,
4347
_metricsFlushTimer = new Timer(TimerFlush, null, metricsFlushIntervalMS, metricsFlushIntervalMS);
4448

4549
_metricsPublisher = metricsPublisher;
50+
_linuxContainerActivityPublisher = linuxContainerActivityPublisher;
4651
}
4752

4853
/// <summary>
@@ -162,7 +167,8 @@ internal void FunctionStarted(FunctionStartedEvent startedEvent)
162167
{
163168
if (instance == null)
164169
{
165-
instance = new FunctionActivityTracker(_appServiceOptions, _eventGenerator, _metricsPublisher, _functionActivityFlushIntervalSeconds);
170+
instance = new FunctionActivityTracker(_appServiceOptions, _eventGenerator, _metricsPublisher,
171+
_linuxContainerActivityPublisher, _functionActivityFlushIntervalSeconds);
166172
}
167173
instance.FunctionStarted(startedEvent);
168174
}
@@ -325,21 +331,23 @@ private class FunctionActivityTracker : IDisposable
325331
{
326332
private readonly string _executionId = Guid.NewGuid().ToString();
327333
private readonly object _functionMetricEventLockObject = new object();
334+
private readonly IMetricsPublisher _metricsPublisher;
335+
private readonly ILinuxContainerActivityPublisher _linuxContainerActivityPublisher;
328336
private ulong _totalExecutionCount = 0;
329337
private int _functionActivityFlushInterval;
330338
private CancellationTokenSource _etwTaskCancellationSource = new CancellationTokenSource();
331339
private ConcurrentQueue<FunctionMetrics> _functionMetricsQueue = new ConcurrentQueue<FunctionMetrics>();
332340
private Dictionary<string, RunningFunctionInfo> _runningFunctions = new Dictionary<string, RunningFunctionInfo>();
333341
private bool _disposed = false;
334342
private IOptionsMonitor<AppServiceOptions> _appServiceOptions;
335-
private IMetricsPublisher _metricsPublisher;
336343

337-
internal FunctionActivityTracker(IOptionsMonitor<AppServiceOptions> appServiceOptions, IEventGenerator generator, IMetricsPublisher metricsPublisher, int functionActivityFlushInterval)
344+
internal FunctionActivityTracker(IOptionsMonitor<AppServiceOptions> appServiceOptions, IEventGenerator generator, IMetricsPublisher metricsPublisher, ILinuxContainerActivityPublisher linuxContainerActivityPublisher, int functionActivityFlushInterval)
338345
{
339346
MetricsEventGenerator = generator;
340347
_appServiceOptions = appServiceOptions;
341348
_functionActivityFlushInterval = functionActivityFlushInterval;
342349
_metricsPublisher = metricsPublisher;
350+
_linuxContainerActivityPublisher = linuxContainerActivityPublisher;
343351
Task.Run(
344352
async () =>
345353
{
@@ -413,7 +421,10 @@ internal void FunctionStarted(FunctionStartedEvent startedEvent)
413421
{
414422
if (!_runningFunctions.ContainsKey(key))
415423
{
416-
_runningFunctions.Add(key, new RunningFunctionInfo(startedEvent.FunctionMetadata.Name, startedEvent.InvocationId, startedEvent.Timestamp, startedEvent.Success));
424+
_runningFunctions.Add(key,
425+
new RunningFunctionInfo(startedEvent.FunctionMetadata.Name, startedEvent.InvocationId,
426+
startedEvent.Timestamp, startedEvent.Success,
427+
startedEvent.FunctionMetadata.Trigger?.Type));
417428
}
418429
}
419430
}
@@ -501,6 +512,11 @@ private void RaiseFunctionMetricEvent(RunningFunctionInfo runningFunctionInfo, i
501512
currentTime,
502513
runningFunctionInfo.StartTime);
503514
}
515+
516+
_linuxContainerActivityPublisher.PublishFunctionExecutionActivity(
517+
new ContainerFunctionExecutionActivity(DateTime.UtcNow, runningFunctionInfo.Name,
518+
runningFunctionInfo.ExecutionStage, runningFunctionInfo.TriggerType,
519+
runningFunctionInfo.Success));
504520
}
505521

506522
private static string GetDictionaryKey(string name, Guid invocationId)
@@ -547,13 +563,14 @@ private List<FunctionMetrics> GetMetricsQueueSnapshot()
547563

548564
private class RunningFunctionInfo
549565
{
550-
public RunningFunctionInfo(string name, Guid invocationId, DateTime startTime, bool success, ExecutionStage executionStage = ExecutionStage.InProgress)
566+
public RunningFunctionInfo(string name, Guid invocationId, DateTime startTime, bool success, string triggerType, ExecutionStage executionStage = ExecutionStage.InProgress)
551567
{
552568
Name = name;
553569
InvocationId = invocationId;
554570
StartTime = startTime;
555571
Success = success;
556572
ExecutionStage = executionStage;
573+
TriggerType = triggerType;
557574
}
558575

559576
public string Name { get; private set; }
@@ -567,6 +584,8 @@ public RunningFunctionInfo(string name, Guid invocationId, DateTime startTime, b
567584
public DateTime EndTime { get; set; }
568585

569586
public bool Success { get; set; }
587+
588+
public string TriggerType { get; private set; }
570589
}
571590
}
572591
}

0 commit comments

Comments
 (0)