Skip to content

Commit 1b8a5a5

Browse files
mathewcyojagad
authored andcommitted
Fixing MetricsEventManager memory leak (#6818)
1 parent cfcfbe2 commit 1b8a5a5

File tree

2 files changed

+138
-50
lines changed

2 files changed

+138
-50
lines changed

src/WebJobs.Script.WebHost/Diagnostics/MetricsEventManager.cs

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using System.Diagnostics;
88
using System.Linq;
99
using System.Threading;
10-
using System.Threading.Tasks;
1110
using Microsoft.Azure.WebJobs.Script.Configuration;
1211
using Microsoft.Azure.WebJobs.Script.Description;
1312
using Microsoft.Azure.WebJobs.Script.Diagnostics;
@@ -45,7 +44,7 @@ public MetricsEventManager(IOptionsMonitor<AppServiceOptions> appServiceOptions,
4544
// Initialize the periodic log flush timer
4645
_metricsFlushTimer = new Timer(TimerFlush, null, metricsFlushIntervalMS, metricsFlushIntervalMS);
4746

48-
_functionActivityTracker = new FunctionActivityTracker(_appServiceOptions, _eventGenerator, metricsPublisher, linuxContainerActivityPublisher, _functionActivityFlushIntervalSeconds);
47+
_functionActivityTracker = new FunctionActivityTracker(_appServiceOptions, _eventGenerator, metricsPublisher, linuxContainerActivityPublisher, _functionActivityFlushIntervalSeconds, _logger);
4948
}
5049

5150
/// <summary>
@@ -316,7 +315,7 @@ protected virtual void Dispose(bool disposing)
316315

317316
if (_functionActivityTracker != null)
318317
{
319-
_functionActivityTracker.StopEtwTaskAndRaiseFinishedEvent();
318+
_functionActivityTracker.StopTimerAndRaiseFinishedEvent();
320319
_functionActivityTracker.Dispose();
321320
}
322321
}
@@ -334,24 +333,30 @@ public void Dispose()
334333

335334
private class FunctionActivityTracker : IDisposable
336335
{
336+
// this interval should stay at 1 second because the timer is also
337+
// used to emit events every Nth second
338+
private const int _activityTimerIntervalMS = 1000;
339+
337340
private readonly IMetricsPublisher _metricsPublisher;
338341
private readonly ILinuxContainerActivityPublisher _linuxContainerActivityPublisher;
339342
private readonly object _runningFunctionsSyncLock = new object();
343+
private readonly Timer _activityTimer;
344+
private readonly ILogger<MetricsEventManager> _logger;
340345

341346
private ulong _totalExecutionCount = 0;
342347
private int _activeFunctionCount = 0;
343348
private int _functionActivityFlushInterval;
344-
private CancellationTokenSource _etwTaskCancellationSource = new CancellationTokenSource();
345349
private ConcurrentQueue<FunctionMetrics> _functionMetricsQueue = new ConcurrentQueue<FunctionMetrics>();
346350
private List<FunctionStartedEvent> _runningFunctions = new List<FunctionStartedEvent>();
347351
private bool _disposed = false;
348352
private IOptionsMonitor<AppServiceOptions> _appServiceOptions;
353+
private int _activityFlushCounter;
349354

350355
// This ID is just an event grouping mechanism that can be used by event consumers
351356
// to group events coming from the same app host.
352357
private string _executionId = Guid.NewGuid().ToString();
353358

354-
internal FunctionActivityTracker(IOptionsMonitor<AppServiceOptions> appServiceOptions, IEventGenerator generator, IMetricsPublisher metricsPublisher, ILinuxContainerActivityPublisher linuxContainerActivityPublisher, int functionActivityFlushInterval)
359+
internal FunctionActivityTracker(IOptionsMonitor<AppServiceOptions> appServiceOptions, IEventGenerator generator, IMetricsPublisher metricsPublisher, ILinuxContainerActivityPublisher linuxContainerActivityPublisher, int functionActivityFlushInterval, ILogger<MetricsEventManager> logger)
355360
{
356361
MetricsEventGenerator = generator;
357362
_appServiceOptions = appServiceOptions;
@@ -367,43 +372,37 @@ internal FunctionActivityTracker(IOptionsMonitor<AppServiceOptions> appServiceOp
367372
_metricsPublisher = metricsPublisher;
368373
}
369374

370-
StartActivityTimer();
375+
_activityFlushCounter = _functionActivityFlushInterval;
376+
_activityTimer = new Timer(TimerFlush, null, _activityTimerIntervalMS, _activityTimerIntervalMS);
377+
378+
_logger = logger;
371379
}
372380

373381
internal IEventGenerator MetricsEventGenerator { get; private set; }
374382

375-
private void StartActivityTimer()
383+
private void TimerFlush(object state)
376384
{
377-
Task.Run(
378-
async () =>
385+
try
386+
{
387+
// we raise these events every interval as needed
388+
RaiseMetricsPerFunctionEvent();
389+
390+
// only raise these events every Nth interval
391+
if (_activityFlushCounter >= _functionActivityFlushInterval)
379392
{
380-
try
381-
{
382-
int currentSecond = _functionActivityFlushInterval;
383-
while (!_etwTaskCancellationSource.Token.IsCancellationRequested)
384-
{
385-
RaiseMetricsPerFunctionEvent();
386-
387-
if (currentSecond >= _functionActivityFlushInterval)
388-
{
389-
RaiseFunctionMetricEvents();
390-
currentSecond = 0;
391-
}
392-
else
393-
{
394-
currentSecond = currentSecond + 1;
395-
}
396-
397-
await Task.Delay(TimeSpan.FromSeconds(1), _etwTaskCancellationSource.Token);
398-
}
399-
}
400-
catch (TaskCanceledException)
401-
{
402-
// This exception gets throws when cancellation request is raised via cancellation token.
403-
// Let's eat this exception and continue
404-
}
405-
},
406-
_etwTaskCancellationSource.Token);
393+
RaiseFunctionMetricEvents();
394+
_activityFlushCounter = 0;
395+
}
396+
else
397+
{
398+
_activityFlushCounter += 1;
399+
}
400+
}
401+
catch (Exception ex)
402+
{
403+
// log error and continue
404+
_logger.LogError(ex, "Error occurred when logging function activity");
405+
}
407406
}
408407

409408
protected virtual void Dispose(bool disposing)
@@ -412,7 +411,7 @@ protected virtual void Dispose(bool disposing)
412411
{
413412
if (disposing)
414413
{
415-
_etwTaskCancellationSource.Dispose();
414+
_activityTimer?.Dispose();
416415
}
417416
_disposed = true;
418417
}
@@ -450,9 +449,11 @@ internal void FunctionCompleted(FunctionStartedEvent startedEvent)
450449
RaiseFunctionMetricEvent(startedEvent, _activeFunctionCount, DateTime.UtcNow);
451450
}
452451

453-
internal void StopEtwTaskAndRaiseFinishedEvent()
452+
internal void StopTimerAndRaiseFinishedEvent()
454453
{
455-
_etwTaskCancellationSource.Cancel();
454+
// stop the timer if it has been started
455+
_activityTimer?.Change(Timeout.Infinite, Timeout.Infinite);
456+
456457
RaiseMetricsPerFunctionEvent();
457458
}
458459

@@ -474,20 +475,23 @@ private void RaiseFunctionMetricEvents()
474475

475476
// We only need to raise events here for functions that aren't completed.
476477
// Events are raised immediately for completed functions elsewhere.
477-
var runningFunctions = new List<FunctionStartedEvent>();
478+
FunctionStartedEvent[] runningFunctionsSnapshot = null;
478479
lock (_runningFunctionsSyncLock)
479480
{
480481
// effectively we're pruning all the completed invocations here
481-
runningFunctions = _runningFunctions = _runningFunctions.Where(p => !p.Completed).ToList();
482+
_runningFunctions = _runningFunctions.Where(p => !p.Completed).ToList();
483+
484+
// create a snapshot within the lock so we can enumerate below
485+
runningFunctionsSnapshot = _runningFunctions.ToArray();
482486
}
483487

484488
// we calculate concurrency here based on count, since these events are raised
485489
// on a background thread, so we want the actual count for this interval, not
486490
// the current count.
487491
var currentTime = DateTime.UtcNow;
488-
foreach (var runningFunction in runningFunctions)
492+
foreach (var runningFunction in runningFunctionsSnapshot)
489493
{
490-
RaiseFunctionMetricEvent(runningFunction, runningFunctions.Count, currentTime);
494+
RaiseFunctionMetricEvent(runningFunction, runningFunctionsSnapshot.Length, currentTime);
491495
}
492496
}
493497

test/WebJobs.Script.Tests.Integration/Diagnostics/MetricsEventManagerTests.cs

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
using Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics;
1616
using Microsoft.Azure.WebJobs.Script.WebHost.Metrics;
1717
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
18+
using Microsoft.Extensions.Logging;
1819
using Microsoft.Extensions.Logging.Abstractions;
1920
using Microsoft.Extensions.Options;
21+
using Microsoft.WebJobs.Script.Tests;
2022
using Moq;
2123
using Moq.Protected;
2224
using Xunit;
25+
using Xunit.Sdk;
2326

2427
namespace Microsoft.Azure.WebJobs.Script.Tests
2528
{
@@ -33,14 +36,25 @@ public class MetricsEventManagerTests
3336
private readonly List<FunctionExecutionEventArguments> _functionExecutionEventArguments;
3437
private readonly List<SystemMetricEvent> _events;
3538
private readonly Mock<ILinuxContainerActivityPublisher> _linuxFunctionExecutionActivityPublisher;
39+
private readonly Mock<IMetricsPublisher> _mockEventPublisher;
40+
private readonly Mock<IEventGenerator> _mockEventGenerator;
41+
private readonly Mock<IOptionsMonitor<AppServiceOptions>> _mockAppServiceOptions;
42+
private readonly TestLoggerProvider _testLoggerProvider;
43+
private readonly LoggerFactory _loggerFactory;
44+
45+
private bool _throwOnExecutionEvent;
3646
private readonly object _syncLock = new object();
3747

3848
public MetricsEventManagerTests()
3949
{
50+
_loggerFactory = new LoggerFactory();
51+
_testLoggerProvider = new TestLoggerProvider();
52+
_loggerFactory.AddProvider(_testLoggerProvider);
53+
4054
_functionExecutionEventArguments = new List<FunctionExecutionEventArguments>();
4155

42-
var mockEventGenerator = new Mock<IEventGenerator>();
43-
mockEventGenerator.Setup(e => e.LogFunctionExecutionEvent(
56+
_mockEventGenerator = new Mock<IEventGenerator>();
57+
_mockEventGenerator.Setup(e => e.LogFunctionExecutionEvent(
4458
It.IsAny<string>(),
4559
It.IsAny<string>(),
4660
It.IsAny<int>(),
@@ -51,14 +65,20 @@ public MetricsEventManagerTests()
5165
It.IsAny<bool>()))
5266
.Callback((string executionId, string siteName, int concurrency, string functionName, string invocationId, string executionStage, long executionTimeSpan, bool success) =>
5367
{
68+
if (_throwOnExecutionEvent && executionStage == ExecutionStage.InProgress.ToString())
69+
{
70+
_throwOnExecutionEvent = false;
71+
throw new Exception("Kaboom!");
72+
}
73+
5474
lock (_syncLock)
5575
{
5676
_functionExecutionEventArguments.Add(new FunctionExecutionEventArguments(executionId, siteName, concurrency, functionName, invocationId, executionStage, executionTimeSpan, success));
5777
}
5878
});
5979

6080
_events = new List<SystemMetricEvent>();
61-
mockEventGenerator.Setup(p => p.LogFunctionMetricEvent(
81+
_mockEventGenerator.Setup(p => p.LogFunctionMetricEvent(
6282
It.IsAny<string>(),
6383
It.IsAny<string>(),
6484
It.IsAny<string>(),
@@ -91,11 +111,12 @@ public MetricsEventManagerTests()
91111
}
92112
});
93113

94-
var mockMetricsPublisher = new Mock<IMetricsPublisher>();
95-
var testAppServiceOptions = new Mock<IOptionsMonitor<AppServiceOptions>>();
96-
testAppServiceOptions.Setup(a => a.CurrentValue).Returns(new AppServiceOptions { AppName = "RandomAppName", SubscriptionId = Guid.NewGuid().ToString() });
114+
_mockEventPublisher = new Mock<IMetricsPublisher>();
115+
_mockAppServiceOptions = new Mock<IOptionsMonitor<AppServiceOptions>>();
116+
_mockAppServiceOptions.Setup(a => a.CurrentValue).Returns(new AppServiceOptions { AppName = "RandomAppName", SubscriptionId = Guid.NewGuid().ToString() });
97117
_linuxFunctionExecutionActivityPublisher = new Mock<ILinuxContainerActivityPublisher>();
98-
_metricsEventManager = new MetricsEventManager(testAppServiceOptions.Object, mockEventGenerator.Object, MinimumLongRunningDurationInMs / 1000, mockMetricsPublisher.Object, _linuxFunctionExecutionActivityPublisher.Object, NullLogger<MetricsEventManager>.Instance);
118+
var logger = _loggerFactory.CreateLogger<MetricsEventManager>();
119+
_metricsEventManager = new MetricsEventManager(_mockAppServiceOptions.Object, _mockEventGenerator.Object, MinimumLongRunningDurationInMs / 1000, _mockEventPublisher.Object, _linuxFunctionExecutionActivityPublisher.Object, logger);
99120
_metricsLogger = new WebHostMetricsLogger(_metricsEventManager);
100121
}
101122

@@ -554,6 +575,69 @@ public async Task MetricsEventManager_MultipleConcurrentLongFunctionExecutions()
554575
SerializeFunctionExecutionEventArguments(_functionExecutionEventArguments)));
555576
}
556577

578+
[Fact]
579+
public async Task MetricsEventManager_ActivityTimer_HandlesExceptions()
580+
{
581+
// create a local event manager for this test, so we can override the flush interval
582+
var logger = _loggerFactory.CreateLogger<MetricsEventManager>();
583+
var metricsEventManager = new MetricsEventManager(_mockAppServiceOptions.Object, _mockEventGenerator.Object, 1, _mockEventPublisher.Object, _linuxFunctionExecutionActivityPublisher.Object, logger);
584+
var metricsLogger = new WebHostMetricsLogger(metricsEventManager);
585+
586+
// execute some functions
587+
var taskList = new List<Task>();
588+
for (int currentIndex = 0; currentIndex < 10; currentIndex++)
589+
{
590+
taskList.Add(ShortTestFunction(metricsLogger));
591+
}
592+
await Task.WhenAll(taskList);
593+
594+
// wait for a flush to occur
595+
await Task.Delay(1000);
596+
597+
// verify events
598+
Assert.Equal(10, _functionExecutionEventArguments.Count);
599+
Assert.True(_functionExecutionEventArguments.All(p => p.ExecutionStage == ExecutionStage.Finished.ToString()));
600+
601+
// now force a logging error for an in progress function
602+
// on the background timer
603+
_throwOnExecutionEvent = true;
604+
var id = Guid.NewGuid();
605+
var functionMetadata = new FunctionMetadata
606+
{
607+
Name = "Test"
608+
};
609+
var functionEvent = new FunctionStartedEvent(id, functionMetadata);
610+
metricsLogger.BeginEvent(functionEvent);
611+
612+
// wait for the error to be logged
613+
LogMessage errorLog = null;
614+
await TestHelpers.Await(() =>
615+
{
616+
errorLog = _testLoggerProvider.GetAllLogMessages().SingleOrDefault();
617+
return errorLog != null;
618+
}, timeout: 5000);
619+
620+
// verify error was logged
621+
Assert.Equal(LogLevel.Error, errorLog.Level);
622+
Assert.Equal("Error occurred when logging function activity", errorLog.FormattedMessage);
623+
624+
// execute some more functions, verifying that the timer is
625+
// still running
626+
taskList = new List<Task>();
627+
for (int currentIndex = 0; currentIndex < 10; currentIndex++)
628+
{
629+
taskList.Add(ShortTestFunction(metricsLogger));
630+
}
631+
await Task.WhenAll(taskList);
632+
633+
await Task.Delay(1000);
634+
635+
// verify events
636+
Assert.Equal(20, _functionExecutionEventArguments.Count(p => p.ExecutionStage == ExecutionStage.Finished.ToString()));
637+
int inProgressCount = _functionExecutionEventArguments.Count(p => p.InvocationId == id.ToString() && p.ExecutionStage == ExecutionStage.InProgress.ToString());
638+
Assert.True(inProgressCount > 0);
639+
}
640+
557641
[Fact]
558642
public async Task MetricsEventManager_MultipleConcurrentFunctions()
559643
{

0 commit comments

Comments
 (0)