Skip to content

Commit f886d8c

Browse files
author
Gohar Irfan Chaudhry
authored
Get shared memory directories (in Unix) from AppSettings (#7239)
* Checking env var for directories where MemoryMappedFiles can be created * Tests for AllowedDirectories and ValidDirectories * WIP - fixing tests * WIP - added missing comment * Cleanup of tests * Remove directories and subdirectories when cleaning up previously present directories * Sleep between creating directory and calling into MemoryMappedFileAccessor to ensure a significant measurable delay between the two creations of the directory * Addressing comments
1 parent 0f07743 commit f886d8c

File tree

10 files changed

+297
-21
lines changed

10 files changed

+297
-21
lines changed

src/WebJobs.Script.Grpc/Extensions/RpcSharedMemoryDataExtensions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ internal static class RpcSharedMemoryDataExtensions
1313
{
1414
internal static async Task<RpcSharedMemory> ToRpcSharedMemoryAsync(this object value, ILogger logger, string invocationId, ISharedMemoryManager sharedMemoryManager)
1515
{
16+
if (value == null)
17+
{
18+
return new RpcSharedMemory();
19+
}
20+
1621
if (!sharedMemoryManager.IsSupported(value))
1722
{
1823
return null;

src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ public static class RpcWorkerConstants
1111
public const string FunctionWorkerRuntimeVersionSettingName = "FUNCTIONS_WORKER_RUNTIME_VERSION";
1212
public const string FunctionsWorkerProcessCountSettingName = "FUNCTIONS_WORKER_PROCESS_COUNT";
1313
public const string FunctionsWorkerSharedMemoryDataTransferEnabledSettingName = "FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED";
14+
// Comma-separated list of directories where shared memory maps can be created for data transfer between host and worker.
15+
// This will override the default directories.
16+
public const string FunctionsUnixSharedMemoryDirectories = "FUNCTIONS_UNIX_SHARED_MEMORY_DIRECTORIES";
1417
public const string DotNetLanguageWorkerName = "dotnet";
1518
public const string NodeLanguageWorkerName = "node";
1619
public const string JavaLanguageWorkerName = "java";

src/WebJobs.Script/Workers/SharedMemoryDataTransfer/MemoryMappedFileAccessorUnix.cs

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
using System.Collections.Generic;
66
using System.IO;
77
using System.IO.MemoryMappedFiles;
8+
using System.Linq;
89
using System.Runtime.InteropServices;
10+
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
911
using Microsoft.Extensions.Logging;
1012

1113
namespace Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer
@@ -15,15 +17,22 @@ namespace Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer
1517
/// </summary>
1618
public class MemoryMappedFileAccessorUnix : MemoryMappedFileAccessor
1719
{
18-
public MemoryMappedFileAccessorUnix(ILogger<MemoryMappedFileAccessor> logger) : base(logger)
20+
private IEnvironment _environment;
21+
22+
public MemoryMappedFileAccessorUnix(ILogger<MemoryMappedFileAccessor> logger, IEnvironment environment) : base(logger)
1923
{
2024
ValidatePlatform(new List<OSPlatform>()
2125
{
2226
OSPlatform.Linux,
2327
OSPlatform.OSX
2428
});
29+
30+
_environment = environment;
31+
ValidDirectories = GetValidDirectories();
2532
}
2633

34+
internal List<string> ValidDirectories { get; private set; }
35+
2736
public override bool TryCreate(string mapName, long size, out MemoryMappedFile mmf)
2837
{
2938
mmf = null;
@@ -127,6 +136,74 @@ public override void Delete(string mapName, MemoryMappedFile mmf)
127136
}
128137
}
129138

139+
/// <summary>
140+
/// Checks if a list of directories is specified in AppSettings to create <see cref="MemoryMappedFile"/>.
141+
/// If one is specified, returns that list. Otherwise returns the default list.
142+
/// </summary>
143+
/// <returns>List of paths of directories where <see cref="MemoryMappedFile"/> are allowed to be created.</returns>
144+
internal List<string> GetAllowedDirectories()
145+
{
146+
string envVal = _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionsUnixSharedMemoryDirectories);
147+
if (string.IsNullOrEmpty(envVal))
148+
{
149+
return SharedMemoryConstants.TempDirs;
150+
}
151+
152+
return envVal.Split(',').ToList();
153+
}
154+
155+
/// <summary>
156+
/// From a list of allowed directories where <see cref="MemoryMappedFile"/> can be created, it will return
157+
/// a list of those that are valid (i.e. exist, or have been successfully created).
158+
/// </summary>
159+
/// <returns>List of paths of directories where <see cref="MemoryMappedFile"/> can be created.</returns>
160+
internal List<string> GetValidDirectories()
161+
{
162+
List<string> allowedDirectories = GetAllowedDirectories();
163+
List<string> validDirectories = new List<string>();
164+
165+
foreach (string directory in allowedDirectories)
166+
{
167+
string path = Path.Combine(directory, SharedMemoryConstants.TempDirSuffix);
168+
if (Directory.Exists(path))
169+
{
170+
Logger.LogTrace("Found directory for shared memory usage: {Directory}", path);
171+
try
172+
{
173+
// If the directory already exists (maybe from a previous run of the host) then clean it up and start afresh
174+
// The previously created memory maps in that directory are not needed and we need to should clean up the memory
175+
Directory.Delete(path, recursive: true);
176+
Logger.LogTrace("Cleaned up existing directory for shared memory usage: {Directory}", path);
177+
}
178+
catch (Exception exception)
179+
{
180+
Logger.LogWarning(exception, "Cannot delete existing directory for shared memory usage: {Directory}", path);
181+
}
182+
}
183+
184+
try
185+
{
186+
DirectoryInfo info = Directory.CreateDirectory(path);
187+
if (info.Exists)
188+
{
189+
validDirectories.Add(path);
190+
Logger.LogTrace("Created directory for shared memory usage: {Directory}", path);
191+
}
192+
else
193+
{
194+
Logger.LogWarning("Directory for shared memory usage does not exist: {Directory}", path);
195+
}
196+
}
197+
catch (Exception exception)
198+
{
199+
Logger.LogWarning(exception, "Cannot create directory for shared memory usage: {Directory}", path);
200+
}
201+
}
202+
203+
Logger.LogDebug("Valid directories for shared memory usage: {Directories}", string.Join(",", validDirectories));
204+
return validDirectories;
205+
}
206+
130207
/// <summary>
131208
/// Get the path of the file to store a <see cref="MemoryMappedFile"/>.
132209
/// We first try to mount it in memory-mounted directories (e.g. /dev/shm/).
@@ -136,15 +213,11 @@ public override void Delete(string mapName, MemoryMappedFile mmf)
136213
/// <see cref="null"/> otherwise.</returns>
137214
private string GetPath(string mapName)
138215
{
139-
// We escape the mapName to make it a valid file name
140-
// Python will use urllib.parse.quote_plus(mapName)
141-
string escapedMapName = Uri.EscapeDataString(mapName);
142-
143216
// Check if the file already exists
144217
string filePath;
145-
foreach (string tempDir in SharedMemoryConstants.TempDirs)
218+
foreach (string tempDir in ValidDirectories)
146219
{
147-
filePath = Path.Combine(tempDir, SharedMemoryConstants.TempDirSuffix, escapedMapName);
220+
filePath = Path.Combine(tempDir, mapName);
148221
if (File.Exists(filePath))
149222
{
150223
return filePath;
@@ -162,14 +235,11 @@ private string GetPath(string mapName)
162235
/// <returns>Created path.</returns>
163236
private string CreatePath(string mapName, long size)
164237
{
165-
string escapedMapName = Uri.EscapeDataString(mapName);
166-
167238
// Create a new file
168-
string newTempDir = GetDirectory(size);
169-
if (newTempDir != null)
239+
string tempDir = GetDirectory(size);
240+
if (tempDir != null)
170241
{
171-
DirectoryInfo newTempDirInfo = Directory.CreateDirectory(newTempDir);
172-
return Path.Combine(newTempDirInfo.FullName, escapedMapName);
242+
return Path.Combine(tempDir, mapName);
173243
}
174244
else
175245
{
@@ -187,7 +257,7 @@ private string CreatePath(string mapName, long size)
187257
/// <see cref="SharedMemoryConstants.TempDirSuffix"/> is used.</returns>
188258
private string GetDirectory(long size)
189259
{
190-
foreach (string tempDir in SharedMemoryConstants.TempDirs)
260+
foreach (string tempDir in ValidDirectories)
191261
{
192262
try
193263
{
@@ -197,7 +267,7 @@ private string GetDirectory(long size)
197267
long minSize = size + SharedMemoryConstants.TempDirMinSize;
198268
if (driveInfo.AvailableFreeSpace > minSize)
199269
{
200-
return Path.Combine(tempDir, SharedMemoryConstants.TempDirSuffix);
270+
return tempDir;
201271
}
202272
}
203273
}

src/WebJobs.Script/Workers/SharedMemoryDataTransfer/SharedMemoryConstants.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,17 @@ internal class SharedMemoryConstants
3333
/// <summary>
3434
/// Minimum size (in number of bytes) an object must be in order for it to be transferred over shared memory.
3535
/// If the object is smaller than this, gRPC is used.
36+
/// Note: This needs to be consistent among the host and workers.
37+
/// e.g. in the Python worker, it is defined in shared_memory_constants.py
3638
/// </summary>
3739
public const long MinObjectBytesForSharedMemoryTransfer = 1024 * 1024; // 1 MB
3840

3941
/// <summary>
4042
/// Maximum size (in number of bytes) an object can be in order for it to be transferred over shared memory.
4143
/// This limit is imposed because initializing objects like <see cref="byte[]"/> greater than 2GB is not allowed.
4244
/// Ref: https://stackoverflow.com/a/3944336/3132415
45+
/// Note: This needs to be consistent among the host and workers.
46+
/// e.g. in the Python worker, it is defined in shared_memory_constants.py
4347
/// </summary>
4448
public const long MaxObjectBytesForSharedMemoryTransfer = ((long)2 * 1024 * 1024 * 1024) - 1; // 2 GB
4549

src/WebJobs.Script/Workers/SharedMemoryDataTransfer/SharedMemoryMap.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ public void Dispose()
195195
public async Task<byte[]> CopyStreamAsync(int offset, int count)
196196
{
197197
Stream contentStream = await GetStreamAsync();
198+
if (contentStream == null)
199+
{
200+
_logger.LogError("Cannot get stream for shared memory map: {MapName}", _mapName);
201+
return null;
202+
}
203+
198204
int contentLength = (int)contentStream.Length;
199205
if (contentLength == 0)
200206
{

0 commit comments

Comments
 (0)