Skip to content

Commit 7a78f0a

Browse files
committed
Merge branch 'dev' into v3.x
2 parents 675c5bf + e94fb49 commit 7a78f0a

File tree

93 files changed

+1837
-705
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1837
-705
lines changed

src/WebJobs.Script.Grpc/Server/GrpcServer.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Grpc.Core;
89
using Microsoft.Azure.WebJobs.Script.Abstractions;
@@ -12,6 +13,7 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc
1213
{
1314
public class GrpcServer : IRpcServer, IDisposable
1415
{
16+
private int _shutdown = 0;
1517
private Server _server;
1618
private bool _disposed = false;
1719
public const int MaxMessageLengthBytes = 128 * 1024 * 1024;
@@ -36,7 +38,16 @@ public Task StartAsync()
3638
return Task.CompletedTask;
3739
}
3840

39-
public Task ShutdownAsync() => _server.ShutdownAsync();
41+
public Task ShutdownAsync()
42+
{
43+
// The Grpc server will throw if it is shutdown multiple times.
44+
if (Interlocked.CompareExchange(ref _shutdown, 1, 0) == 0)
45+
{
46+
return _server.ShutdownAsync();
47+
}
48+
49+
return Task.CompletedTask;
50+
}
4051

4152
public Task KillAsync() => _server.KillAsync();
4253

@@ -46,7 +57,7 @@ protected virtual void Dispose(bool disposing)
4657
{
4758
if (disposing)
4859
{
49-
_server.ShutdownAsync();
60+
ShutdownAsync();
5061
}
5162
_disposed = true;
5263
}

src/WebJobs.Script.Grpc/generate_protos.sh

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@
3333

3434
# enter Script.Rpc directory
3535

36-
echo "OS: $OSTYPE"
36+
ARCH=$(uname -m)
37+
echo "OS: $OSTYPE $ARCH"
3738
if [[ $OSTYPE == "darwin"* ]]; then
38-
PLATFORM="macosx_x86"
39+
if [[ $ARCH == "x86_64" ]]; then
40+
PLATFORM="macosx_x64"
41+
else
42+
PLATFORM="macosx_x86"
43+
fi
3944
elif [[ $OSTYPE == "linux"* ]];then
4045
PLATFORM="linux_x64"
4146
else
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
5+
6+
namespace Microsoft.Azure.WebJobs.Script.WebHost.ContainerManagement
7+
{
8+
public interface ILinuxContainerActivityPublisher
9+
{
10+
void PublishFunctionExecutionActivity(ContainerFunctionExecutionActivity activity);
11+
}
12+
}
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Microsoft.Azure.WebJobs.Script.WebHost.Management;
10+
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
11+
using Microsoft.Extensions.Hosting;
12+
using Microsoft.Extensions.Logging;
13+
using Microsoft.Extensions.Options;
14+
15+
namespace Microsoft.Azure.WebJobs.Script.WebHost.ContainerManagement
16+
{
17+
public class LinuxContainerActivityPublisher : IHostedService, IDisposable, ILinuxContainerActivityPublisher
18+
{
19+
private const int FlushIntervalMs = 20 * 1000; // 20 seconds
20+
private const int LockTimeOutMs = 1 * 1000; // 1 second
21+
22+
private readonly ReaderWriterLockSlim _activitiesLock = new ReaderWriterLockSlim();
23+
private readonly IMeshServiceClient _meshServiceClient;
24+
private readonly ILogger<LinuxContainerActivityPublisher> _logger;
25+
private readonly int _flushIntervalMs;
26+
private readonly IOptionsMonitor<StandbyOptions> _standbyOptions;
27+
private readonly HashSet<ContainerFunctionExecutionActivity> _uniqueActivities;
28+
private IDisposable _standbyOptionsOnChangeSubscription;
29+
private DateTime _lastHeartBeatTime = DateTime.MinValue;
30+
private Timer _timer;
31+
private int _flushInProgress;
32+
33+
public LinuxContainerActivityPublisher(IOptionsMonitor<StandbyOptions> standbyOptions,
34+
IMeshServiceClient meshServiceClient, IEnvironment environment,
35+
ILogger<LinuxContainerActivityPublisher> logger, int flushIntervalMs = FlushIntervalMs)
36+
{
37+
if (!environment.IsLinuxConsumption())
38+
{
39+
throw new NotSupportedException(
40+
$"{nameof(LinuxContainerActivityPublisher)} is available in Linux consumption environment only");
41+
}
42+
43+
_standbyOptions = standbyOptions ?? throw new ArgumentNullException(nameof(standbyOptions));
44+
_meshServiceClient = meshServiceClient;
45+
_logger = logger;
46+
_flushIntervalMs = flushIntervalMs;
47+
_timer = new Timer(OnTimer, null, Timeout.Infinite, Timeout.Infinite);
48+
_uniqueActivities = new HashSet<ContainerFunctionExecutionActivity>();
49+
_flushInProgress = 0;
50+
}
51+
52+
private void Start()
53+
{
54+
_logger.LogInformation($"Starting {nameof(LinuxContainerActivityPublisher)}");
55+
56+
// start the timer by setting the due time
57+
SetTimerInterval(_flushIntervalMs);
58+
}
59+
60+
private void OnStandbyOptionsChange()
61+
{
62+
_logger.LogInformation($"Triggering {nameof(OnStandbyOptionsChange)}");
63+
64+
if (!_standbyOptions.CurrentValue.InStandbyMode)
65+
{
66+
Start();
67+
}
68+
}
69+
70+
public Task StartAsync(CancellationToken cancellationToken)
71+
{
72+
_logger.LogInformation($"Initializing {nameof(LinuxContainerActivityPublisher)}");
73+
74+
if (_standbyOptions.CurrentValue.InStandbyMode)
75+
{
76+
_logger.LogInformation($"Registering {nameof(_standbyOptionsOnChangeSubscription)}");
77+
_standbyOptionsOnChangeSubscription = _standbyOptions.OnChange(o => OnStandbyOptionsChange());
78+
}
79+
else
80+
{
81+
Start();
82+
}
83+
84+
return Task.CompletedTask;
85+
}
86+
87+
public Task StopAsync(CancellationToken cancellationToken)
88+
{
89+
_logger.LogInformation($"Stopping {nameof(LinuxContainerActivityPublisher)}");
90+
91+
// stop the timer if it has been started
92+
_timer?.Change(Timeout.Infinite, Timeout.Infinite);
93+
94+
return Task.CompletedTask;
95+
}
96+
97+
private async void OnTimer(object state)
98+
{
99+
await FlushFunctionExecutionActivities();
100+
SetTimerInterval(_flushIntervalMs);
101+
}
102+
103+
private async Task FlushFunctionExecutionActivities()
104+
{
105+
try
106+
{
107+
if (Interlocked.CompareExchange(ref _flushInProgress, 1, 0) == 0)
108+
{
109+
try
110+
{
111+
var currentActivities = new List<ContainerFunctionExecutionActivity>();
112+
if (TryGetCurrentActivities(currentActivities))
113+
{
114+
if (_lastHeartBeatTime.AddMinutes(5) < DateTime.UtcNow)
115+
{
116+
_logger.LogDebug($"Current activities count = {currentActivities.Count}");
117+
_lastHeartBeatTime = DateTime.UtcNow;
118+
}
119+
120+
if (currentActivities.Any())
121+
{
122+
_logger.LogDebug($"Flushing {currentActivities.Count} function activities");
123+
await _meshServiceClient.PublishContainerActivity(currentActivities);
124+
}
125+
}
126+
else
127+
{
128+
_logger.LogWarning($"Failed to fetch {nameof(ContainerFunctionExecutionActivity)}");
129+
}
130+
}
131+
finally
132+
{
133+
_flushInProgress = 0;
134+
}
135+
}
136+
}
137+
catch (Exception exc) when (!exc.IsFatal())
138+
{
139+
_logger.LogError(exc, $"{nameof(FlushFunctionExecutionActivities)}");
140+
}
141+
}
142+
143+
private void SetTimerInterval(int dueTime)
144+
{
145+
var timer = _timer;
146+
try
147+
{
148+
timer?.Change(dueTime, Timeout.Infinite);
149+
}
150+
catch (ObjectDisposedException)
151+
{
152+
// might race with dispose
153+
}
154+
catch (Exception e)
155+
{
156+
_logger.LogError(e, $"{nameof(SetTimerInterval)}");
157+
}
158+
}
159+
160+
public void Dispose()
161+
{
162+
if (_timer != null)
163+
{
164+
_timer?.Dispose();
165+
_timer = null;
166+
}
167+
}
168+
169+
private bool PublishActivity(ContainerFunctionExecutionActivity activity)
170+
{
171+
if (_activitiesLock.TryEnterWriteLock(LockTimeOutMs))
172+
{
173+
try
174+
{
175+
_uniqueActivities.Add(activity);
176+
}
177+
finally
178+
{
179+
_activitiesLock.ExitWriteLock();
180+
}
181+
return true;
182+
}
183+
184+
return false;
185+
}
186+
187+
private bool TryGetCurrentActivities(IList<ContainerFunctionExecutionActivity> currentActivities)
188+
{
189+
if (_activitiesLock.TryEnterWriteLock(LockTimeOutMs))
190+
{
191+
try
192+
{
193+
foreach (var activity in _uniqueActivities)
194+
{
195+
currentActivities.Add(activity);
196+
}
197+
_uniqueActivities.Clear();
198+
}
199+
finally
200+
{
201+
_activitiesLock.ExitWriteLock();
202+
}
203+
return true;
204+
}
205+
206+
return false;
207+
}
208+
209+
public void PublishFunctionExecutionActivity(ContainerFunctionExecutionActivity activity)
210+
{
211+
if (!_standbyOptions.CurrentValue.InStandbyMode)
212+
{
213+
if (!PublishActivity(activity))
214+
{
215+
_logger.LogWarning($"Failed to add activity {activity}");
216+
}
217+
}
218+
}
219+
}
220+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.Azure.WebJobs.Script.WebHost.Models;
8+
using Microsoft.Extensions.Hosting;
9+
using Microsoft.Extensions.Logging;
10+
11+
namespace Microsoft.Azure.WebJobs.Script.WebHost.ContainerManagement
12+
{
13+
public class NullLinuxContainerActivityPublisher : ILinuxContainerActivityPublisher, IHostedService
14+
{
15+
public NullLinuxContainerActivityPublisher(ILogger<NullLinuxContainerActivityPublisher> logger)
16+
{
17+
var nullLogger = logger ?? throw new ArgumentNullException(nameof(logger));
18+
nullLogger.LogDebug($"Initializing {nameof(NullLinuxContainerActivityPublisher)}");
19+
}
20+
21+
public void PublishFunctionExecutionActivity(ContainerFunctionExecutionActivity activity)
22+
{
23+
//do nothing
24+
}
25+
26+
public Task StartAsync(CancellationToken cancellationToken)
27+
{
28+
return Task.CompletedTask;
29+
}
30+
31+
public Task StopAsync(CancellationToken cancellationToken)
32+
{
33+
return Task.CompletedTask;
34+
}
35+
}
36+
}

src/WebJobs.Script.WebHost/DependencyInjection/WebHostServiceProvider.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Microsoft.Azure.WebJobs.Script.WebHost.DependencyInjection
1010
{
11-
public class WebHostServiceProvider : IServiceProvider, IServiceScopeFactory
11+
public class WebHostServiceProvider : IServiceProvider, IServiceScopeFactory, IDisposable
1212
{
1313
private static readonly Rules _defaultContainerRules;
1414
private readonly Container _container;
@@ -50,5 +50,10 @@ public IServiceScope CreateScope()
5050
{
5151
return new JobHostServiceScope(_container.OpenScope(preferInterpretation: _container.PreferInterpretation));
5252
}
53+
54+
public void Dispose()
55+
{
56+
_container?.Dispose();
57+
}
5358
}
5459
}

0 commit comments

Comments
 (0)