Skip to content

Commit f5b19f5

Browse files
Add IDistributedLockManager for Kubernetes environment (#7327)
Added IDistributedLockManager for Kubernetes environment.
1 parent a68bf2e commit f5b19f5

File tree

9 files changed

+399
-0
lines changed

9 files changed

+399
-0
lines changed

release_notes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
<!-- Please add your release notes in the following format:
33
- My change description (#PR)
44
-->
5+
56
- Added a feature flag to opt out of the default behavior where the host sets the environment name to `Development` when running in debug mode. To disable the behavior, set the app setting: `AzureWebJobsFeatureFlags` to `DisableDevModeInDebug`
67
- Reorder CORS and EasyAuth middleware to prevent EasyAuth from blocking CORS requests (#7315)
78
- Updated PowerShell Worker (PS7) to [3.0.833](https://github.com/Azure/azure-functions-powershell-worker/releases/tag/v3.0.833)
9+
- Add IDistributedLockManager for Kubernetes environment (#7327)
810
- Call EnableDrainModeAsync on ApplicationStopping (#7262)
911

1012
**Release sprint:** Sprint 100

src/WebJobs.Script/Environment/EnvironmentExtensions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,11 @@ public static bool SupportsAzureFileShareMount(this IEnvironment environment)
448448
StringComparison.OrdinalIgnoreCase);
449449
}
450450

451+
public static string GetHttpLeaderEndpoint(this IEnvironment environment)
452+
{
453+
return environment.GetEnvironmentVariableOrDefault(HttpLeaderEndpoint, string.Empty);
454+
}
455+
451456
public static bool DrainOnApplicationStoppingEnabled(this IEnvironment environment)
452457
{
453458
return !string.IsNullOrEmpty(environment.GetEnvironmentVariable(KubernetesServiceHost)) ||

src/WebJobs.Script/Environment/EnvironmentSettingNames.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public static class EnvironmentSettingNames
6565
public const string PodNamespace = "POD_NAMESPACE";
6666
public const string PodName = "POD_NAME";
6767
public const string PodEncryptionKey = "POD_ENCRYPTION_KEY";
68+
public const string HttpLeaderEndpoint = "HTTP_LEADER_ENDPOINT";
6869

6970
/// <summary>
7071
/// Environment variable dynamically set by the platform when it is safe to
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Net.Http;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Newtonsoft.Json;
9+
10+
namespace Microsoft.Azure.WebJobs.Script
11+
{
12+
internal class KubernetesClient
13+
{
14+
private const int LeaseRenewDeadline = 10;
15+
private readonly HttpClient _httpClient;
16+
private readonly string _httpLeaderEndpoint;
17+
18+
internal KubernetesClient(IEnvironment environment, HttpClient httpClient = null)
19+
{
20+
_httpClient = httpClient ?? new HttpClient();
21+
_httpLeaderEndpoint = environment.GetHttpLeaderEndpoint();
22+
}
23+
24+
internal async Task<KubernetesLockHandle> GetLock(string lockName, CancellationToken cancellationToken)
25+
{
26+
if (string.IsNullOrEmpty(lockName))
27+
{
28+
throw new ArgumentNullException(nameof(lockName));
29+
}
30+
31+
var request = new HttpRequestMessage()
32+
{
33+
Method = HttpMethod.Get,
34+
RequestUri = GetRequestUri($"?name={lockName}")
35+
};
36+
37+
var response = await _httpClient.SendAsync(request, cancellationToken);
38+
39+
response.EnsureSuccessStatusCode();
40+
41+
var responseString = await response.Content.ReadAsStringAsync();
42+
return JsonConvert.DeserializeObject<KubernetesLockHandle>(responseString);
43+
}
44+
45+
internal async Task<KubernetesLockHandle> TryAcquireLock (string lockId, string ownerId, TimeSpan lockPeriod, CancellationToken cancellationToken)
46+
{
47+
if (string.IsNullOrEmpty(lockId))
48+
{
49+
throw new ArgumentNullException(nameof(lockId));
50+
}
51+
52+
if (string.IsNullOrEmpty(ownerId))
53+
{
54+
throw new ArgumentNullException(nameof(ownerId));
55+
}
56+
57+
var lockHandle = new KubernetesLockHandle();
58+
var request = new HttpRequestMessage()
59+
{
60+
Method = HttpMethod.Post,
61+
RequestUri = GetRequestUri($"/acquire?name={lockId}&owner={ownerId}&duration={lockPeriod.TotalSeconds}&renewDeadline={LeaseRenewDeadline}"),
62+
};
63+
64+
var response = await _httpClient.SendAsync(request, cancellationToken);
65+
if (response.IsSuccessStatusCode)
66+
{
67+
lockHandle.LockId = lockId;
68+
lockHandle.Owner = ownerId;
69+
}
70+
return lockHandle;
71+
}
72+
73+
internal async Task<HttpResponseMessage> ReleaseLock(string lockId, string ownerId)
74+
{
75+
if (string.IsNullOrEmpty(lockId))
76+
{
77+
throw new ArgumentNullException(nameof(lockId));
78+
}
79+
80+
var request = new HttpRequestMessage()
81+
{
82+
Method = HttpMethod.Post,
83+
RequestUri = GetRequestUri($"/release?name={lockId}&owner={ownerId}")
84+
};
85+
86+
return await _httpClient.SendAsync(request);
87+
}
88+
89+
internal Uri GetRequestUri(string requestStem)
90+
{
91+
return new Uri($"{_httpLeaderEndpoint}/lock{requestStem}");
92+
}
93+
}
94+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.Azure.WebJobs.Host;
8+
using Microsoft.Azure.WebJobs.Script.Config;
9+
10+
namespace Microsoft.Azure.WebJobs.Script
11+
{
12+
// This is an implementation of IDistributedLockManager to be used when running
13+
// in Kubernetes environments.
14+
internal class KubernetesDistributedLockManager : IDistributedLockManager
15+
{
16+
private readonly KubernetesClient _kubernetesClient;
17+
private readonly string _websiteInstanceId;
18+
19+
public KubernetesDistributedLockManager(IEnvironment environment,
20+
ScriptSettingsManager settingsManager)
21+
: this(new KubernetesClient(environment), settingsManager)
22+
{
23+
}
24+
25+
internal KubernetesDistributedLockManager(KubernetesClient client, ScriptSettingsManager settingsManager)
26+
{
27+
_kubernetesClient = client;
28+
_websiteInstanceId = settingsManager.AzureWebsiteInstanceId;
29+
}
30+
31+
public async Task<string> GetLockOwnerAsync(string account, string lockId, CancellationToken cancellationToken)
32+
{
33+
var response = await _kubernetesClient.GetLock(lockId, cancellationToken);
34+
return response.Owner;
35+
}
36+
37+
public async Task ReleaseLockAsync(IDistributedLock lockHandle, CancellationToken cancellationToken)
38+
{
39+
var kubernetesLock = (KubernetesLockHandle)lockHandle;
40+
var response = await _kubernetesClient.ReleaseLock(kubernetesLock.LockId, kubernetesLock.Owner);
41+
response.EnsureSuccessStatusCode();
42+
}
43+
44+
public async Task<bool> RenewAsync(IDistributedLock lockHandle, CancellationToken cancellationToken)
45+
{
46+
var kubernetesLock = (KubernetesLockHandle)lockHandle;
47+
var renewedLockHandle = await _kubernetesClient.TryAcquireLock(kubernetesLock.LockId, kubernetesLock.Owner, TimeSpan.Parse(kubernetesLock.LockPeriod), cancellationToken);
48+
return !string.IsNullOrEmpty(renewedLockHandle.LockId);
49+
}
50+
51+
public async Task<IDistributedLock> TryLockAsync(string account, string lockId, string lockOwnerId, string proposedLeaseId, TimeSpan lockPeriod, CancellationToken cancellationToken)
52+
{
53+
var ownerId = string.IsNullOrEmpty(lockOwnerId) ? _websiteInstanceId : lockOwnerId;
54+
var kubernetesLock = await _kubernetesClient.TryAcquireLock(lockId, ownerId, lockPeriod, cancellationToken);
55+
if (string.IsNullOrEmpty(kubernetesLock.LockId))
56+
{
57+
return null;
58+
}
59+
return kubernetesLock;
60+
}
61+
}
62+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using Microsoft.Azure.WebJobs.Host;
5+
6+
namespace Microsoft.Azure.WebJobs.Script
7+
{
8+
internal class KubernetesLockHandle : IDistributedLock
9+
{
10+
public string LockId { get; set; }
11+
12+
public string Owner { get; set; }
13+
14+
public string LockPeriod { get; set; }
15+
}
16+
}

src/WebJobs.Script/ScriptHostBuilderExtensions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Runtime.InteropServices;
77
using Microsoft.ApplicationInsights.DependencyCollector;
88
using Microsoft.ApplicationInsights.Extensibility;
9+
using Microsoft.Azure.WebJobs.Host;
910
using Microsoft.Azure.WebJobs.Host.Executors;
1011
using Microsoft.Azure.WebJobs.Hosting;
1112
using Microsoft.Azure.WebJobs.Logging;
@@ -288,6 +289,12 @@ public static IHostBuilder AddScriptHostCore(this IHostBuilder builder, ScriptAp
288289
}
289290

290291
services.AddSingleton<IHostedService, WorkerConsoleLogService>();
292+
293+
if (SystemEnvironment.Instance.IsKubernetesManagedHosting())
294+
{
295+
services.AddSingleton<IDistributedLockManager, KubernetesDistributedLockManager>();
296+
}
297+
291298
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService, PrimaryHostCoordinator>());
292299
services.TryAddEnumerable(ServiceDescriptor.Singleton<IHostedService, FunctionInvocationDispatcherShutdownManager>());
293300

0 commit comments

Comments
 (0)