Skip to content

Commit b417e56

Browse files
author
MHD\nialan
committed
Added a factory to instantiate the data lake client using the parameters in the request. This should make the functions simpler to unit test.
1 parent 4d313cf commit b417e56

File tree

8 files changed

+233
-35
lines changed

8 files changed

+233
-35
lines changed

ADF.Functions/DataLakeConfigFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class DataLakeConfigFactory
2424
private const string LimitParam = "limit";
2525

2626
private readonly ILogger _logger;
27-
public DataLakeConfigFactory(ILogger logger)
27+
public DataLakeConfigFactory(ILogger<DataLakeConfigFactory> logger)
2828
{
2929
_logger = logger;
3030
}

ADF.Functions/DataLakeFunctions.cs

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ public partial class DataLakeFunctions
2525
{
2626
private readonly ILogger<DataLakeFunctions> _logger;
2727
private readonly DataLakeConfigFactory _configFactory;
28-
public DataLakeFunctions(ILogger<DataLakeFunctions> logger, DataLakeConfigFactory configFactory)
28+
private readonly IDataLakeClientFactory _clientFactory;
29+
public DataLakeFunctions(ILogger<DataLakeFunctions> logger, DataLakeConfigFactory configFactory, IDataLakeClientFactory clientFactory)
2930
{
3031
_logger = logger;
3132
_configFactory = configFactory;
33+
_clientFactory = clientFactory;
3234
}
3335

3436

@@ -50,9 +52,8 @@ public async Task<IActionResult> DataLakeGetItems(
5052
if (string.IsNullOrWhiteSpace(dataLakeConfig.AccountUri))
5153
throw new ArgumentException($"Account Uri '{dataLakeConfig.AccountUri}' not found. Check the URI is correct.");
5254

53-
var clientFactory = new DataLakeClientFactory(_logger);
54-
var client = clientFactory.GetDataLakeClient(dataLakeConfig);
55-
return await GetItemsAsync(client, dataLakeConfig, getItemsConfig, _logger);
55+
var client = _clientFactory.GetDataLakeClient(dataLakeConfig);
56+
return await GetItemsAsync(client, dataLakeConfig, getItemsConfig);
5657
}
5758
catch (ArgumentException ex)
5859
{
@@ -83,15 +84,14 @@ public async Task<IActionResult> DataLakeCheckPathCase(
8384
if (string.IsNullOrWhiteSpace(dataLakeConfig.AccountUri))
8485
throw new ArgumentException($"Account Uri '{dataLakeConfig.AccountUri}' not found. Check the URI is correct.");
8586

86-
var clientFactory = new DataLakeClientFactory(_logger);
87-
var client = clientFactory.GetDataLakeClient(dataLakeConfig);
87+
var client = _clientFactory.GetDataLakeClient(dataLakeConfig);
8888

8989
var paramsJsonFragment = GetParamsJsonFragment(dataLakeConfig, getItemsConfig);
90-
var validatedPath = await CheckPathAsync(client, getItemsConfig.Path, true, _logger);
90+
var validatedPath = await CheckPathAsync(client, getItemsConfig.Path, true);
9191

9292
// If multiple files match, the function will throw and the catch block will return a BadRequestObjectResult
9393
// If the path could not be found as a directory, try for a file...
94-
validatedPath = validatedPath ?? await CheckPathAsync(client, getItemsConfig.Path, false, _logger);
94+
validatedPath ??= await CheckPathAsync(client, getItemsConfig.Path, false);
9595

9696
var resultJson = "{" +
9797
$"{paramsJsonFragment}, \"validatedPath\":\"{validatedPath}\" " +
@@ -119,6 +119,18 @@ public async Task<IActionResult> DataLakeCheckPathCase(
119119

120120

121121

122+
private string GetParamsJsonFragment(DataLakeConfig dataLakeConfig, object parameters)
123+
{
124+
return $"\"debugInfo\": {AssemblyHelpers.GetAssemblyVersionInfoJson()}," +
125+
$"\"storageContainerUrl\": {dataLakeConfig.BaseUrl}," +
126+
parameters == null ?
127+
string.Empty :
128+
$"\"parameters\": {JsonConvert.SerializeObject(parameters, Formatting.Indented, new JsonSerializerSettings { NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore })}";
129+
}
130+
131+
132+
133+
122134

123135

124136

@@ -134,17 +146,9 @@ public async Task<IActionResult> DataLakeCheckPathCase(
134146

135147

136148

137-
private string GetParamsJsonFragment(DataLakeConfig dataLakeConfig, object parameters)
138-
{
139-
return $"\"debugInfo\": {AssemblyHelpers.GetAssemblyVersionInfoJson()}," +
140-
$"\"storageContainerUrl\": {dataLakeConfig.BaseUrl}," +
141-
parameters == null ?
142-
string.Empty :
143-
$"\"parameters\": {JsonConvert.SerializeObject(parameters, Formatting.Indented, new JsonSerializerSettings { NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore })}";
144-
}
145149

146150

147-
private async Task<string> CheckPathAsync(DataLakeFileSystemClient client, string path, bool isDirectory, ILogger log)
151+
private async Task<string> CheckPathAsync(DataLakeFileSystemClient client, string path, bool isDirectory)
148152
{
149153
if (path == null || path.Trim() == "/")
150154
return null;
@@ -156,7 +160,7 @@ private async Task<string> CheckPathAsync(DataLakeFileSystemClient client, strin
156160
if (pathExists)
157161
return path;
158162

159-
log.LogInformation($"${(isDirectory ? "Directory" : "File")} '${path}' not found, checking paths case using case insensitive compare...");
163+
_logger.LogInformation($"${(isDirectory ? "Directory" : "File")} '${path}' not found, checking paths case using case insensitive compare...");
160164

161165
// Split the paths so we can test them seperately
162166
var directoryPath = isDirectory ? path : Path.GetDirectoryName(path).Replace(Path.DirectorySeparatorChar, '/');
@@ -170,7 +174,7 @@ private async Task<string> CheckPathAsync(DataLakeFileSystemClient client, strin
170174
foreach (var directoryPart in directoryParts)
171175
{
172176
var searchItem = directoryPart;
173-
var validPaths = MatchPathItemsCaseInsensitive(client, validDirectory, searchItem, true, log);
177+
var validPaths = MatchPathItemsCaseInsensitive(client, validDirectory, searchItem, true);
174178

175179
if (validPaths.Count == 0)
176180
return null;
@@ -189,13 +193,13 @@ private async Task<string> CheckPathAsync(DataLakeFileSystemClient client, strin
189193
if (client.GetFileClient(testFilePath).Exists())
190194
return testFilePath;
191195

192-
var files = MatchPathItemsCaseInsensitive(client, validDirectory, filename, false, log);
196+
var files = MatchPathItemsCaseInsensitive(client, validDirectory, filename, false);
193197
if (files.Count > 1)
194198
throw new Exception("Multiple paths matched with case insensitive compare.");
195199
return files.FirstOrDefault();
196200
}
197201

198-
private IList<string> MatchPathItemsCaseInsensitive(DataLakeFileSystemClient client, string basePath, string searchItem, bool isDirectory, ILogger log)
202+
private IList<string> MatchPathItemsCaseInsensitive(DataLakeFileSystemClient client, string basePath, string searchItem, bool isDirectory)
199203
{
200204
var paths = client.GetPaths(basePath).ToList();
201205
return paths.Where(p => p.IsDirectory == isDirectory && Path.GetFileName(p.Name).Equals(searchItem, StringComparison.CurrentCultureIgnoreCase))
@@ -205,10 +209,10 @@ private IList<string> MatchPathItemsCaseInsensitive(DataLakeFileSystemClient cli
205209
}
206210

207211

208-
private async Task<IActionResult> GetItemsAsync(DataLakeFileSystemClient client, DataLakeConfig dataLakeConfig, DataLakeGetItemsConfig getItemsConfig, ILogger log)
212+
private async Task<IActionResult> GetItemsAsync(DataLakeFileSystemClient client, DataLakeConfig dataLakeConfig, DataLakeGetItemsConfig getItemsConfig)
209213
{
210214
var directory = getItemsConfig.IgnoreDirectoryCase ?
211-
await CheckPathAsync(client, getItemsConfig.Directory, true, log) :
215+
await CheckPathAsync(client, getItemsConfig.Directory, true) :
212216
getItemsConfig.Directory;
213217

214218
var paramsJsonFragment = GetParamsJsonFragment(dataLakeConfig, getItemsConfig);
@@ -237,7 +241,7 @@ await CheckPathAsync(client, getItemsConfig.Directory, true, log) :
237241
{
238242
var dynamicLinqQuery = filter.GetDynamicLinqString();
239243
string dynamicLinqQueryValue = filter.GetDynamicLinqValue();
240-
log.LogInformation($"Applying filter: paths.AsQueryable().Where(\"{dynamicLinqQuery}\", \"{filter.Value}\").ToList()");
244+
_logger.LogInformation($"Applying filter: paths.AsQueryable().Where(\"{dynamicLinqQuery}\", \"{filter.Value}\").ToList()");
241245
paths = paths.AsQueryable().Where(dynamicLinqQuery, dynamicLinqQueryValue).ToList();
242246
}
243247

ADF.Functions/Startup.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
1+
using Azure.Datafactory.Extensions.DataLake;
2+
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
23
using Microsoft.Extensions.DependencyInjection;
34

45
[assembly: FunctionsStartup(typeof(Azure.Datafactory.Extensions.Functions.Startup))]
@@ -11,12 +12,9 @@ public override void Configure(IFunctionsHostBuilder builder)
1112
// ** Registers the ILogger instance **
1213
builder.Services.AddLogging();
1314

14-
builder.Services.AddSingleton(typeof(DataLakeConfigFactory));
15-
16-
// Registers the application settings' class.
17-
//...
18-
19-
//...omitted for brevity
15+
builder.Services.AddTransient(typeof(DataLakeConfigFactory));
16+
//builder.Services.AddSingleton<DataLakeConfigFactory>();
17+
builder.Services.AddTransient<IDataLakeClientFactory, DataLakeClientFactory>();
2018
}
2119
}
2220
}

Azure.DataFactory.Extensions/Azure.DataFactory.Extensions.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
<ItemGroup>
2222
<PackageReference Include="Azure.Identity" Version="1.3.0" />
2323
<PackageReference Include="Azure.Storage.Files.DataLake" Version="12.6.0" />
24+
<PackageReference Include="Flurl" Version="3.0.1" />
2425
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
2526
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.13" />
2627
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />

Azure.DataFactory.Extensions/DataLake/DataLakeClientFactory.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
using Microsoft.Extensions.Logging;
55
using System;
66

7-
namespace Azure.Datafactory.Extensions.Functions
7+
namespace Azure.Datafactory.Extensions.DataLake
88
{
9-
public class DataLakeClientFactory
9+
public class DataLakeClientFactory: IDataLakeClientFactory
1010
{
1111
private readonly ILogger _logger;
12-
public DataLakeClientFactory(ILogger logger)
12+
public DataLakeClientFactory(ILogger<DataLakeClientFactory> logger)
1313
{
1414
_logger = logger;
1515
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
using Azure.Datafactory.Extensions.DataLake.Model;
2+
using Azure.Datafactory.Extensions.Functions;
3+
using Azure.Storage.Files.DataLake;
4+
using Flurl;
5+
using Microsoft.Extensions.Logging;
6+
using Newtonsoft.Json;
7+
using Newtonsoft.Json.Linq;
8+
using System;
9+
using System.Collections.Generic;
10+
using System.IO;
11+
using System.Linq;
12+
using System.Linq.Dynamic.Core;
13+
using System.Reflection;
14+
using System.Text;
15+
using System.Threading.Tasks;
16+
17+
namespace Azure.Datafactory.Extensions.DataLake
18+
{
19+
public class DataLakeController
20+
{
21+
private readonly ILogger _logger;
22+
private readonly DataLakeFileSystemClient _client;
23+
public DataLakeController (ILogger logger, DataLakeFileSystemClient client)
24+
{
25+
_logger = logger;
26+
_client = client;
27+
}
28+
29+
public async Task<string> CheckPathAsync(string path, bool isDirectory)
30+
{
31+
if (path == null || path.Trim() == "/")
32+
return null;
33+
34+
// Check if the path exists with the casing as is...
35+
var pathExists = isDirectory ?
36+
_client.GetDirectoryClient(path).Exists() :
37+
_client.GetFileClient(path).Exists();
38+
if (pathExists)
39+
return path;
40+
41+
_logger.LogInformation($"${(isDirectory ? "Directory" : "File")} '${path}' not found, checking paths case using case insensitive compare...");
42+
43+
// Split the paths so we can test them seperately
44+
var directoryPath = isDirectory ? path : Path.GetDirectoryName(path).Replace(Path.DirectorySeparatorChar, '/');
45+
var filename = isDirectory ? null : Path.GetFileName(path);
46+
47+
// If the directory does not exist, we find it
48+
string validDirectory = null;
49+
if (!await _client.GetDirectoryClient(path).ExistsAsync())
50+
{
51+
var directoryParts = directoryPath.Split('/');
52+
foreach (var directoryPart in directoryParts)
53+
{
54+
var searchItem = directoryPart;
55+
var validPaths = MatchPathItemsCaseInsensitive(validDirectory, searchItem, true);
56+
57+
if (validPaths.Count == 0)
58+
return null;
59+
else if (validPaths.Count > 1)
60+
throw new Exception("Multiple paths matched with case insensitive compare.");
61+
62+
validDirectory = validPaths[0];
63+
}
64+
}
65+
66+
if (isDirectory)
67+
return validDirectory;
68+
69+
// Now check if the file exists using the corrected directory, and if not find a match...
70+
var testFilePath = $"{validDirectory ?? ""}/{filename}".TrimStart('/');
71+
if (_client.GetFileClient(testFilePath).Exists())
72+
return testFilePath;
73+
74+
var files = MatchPathItemsCaseInsensitive(validDirectory, filename, false);
75+
if (files.Count > 1)
76+
throw new Exception("Multiple paths matched with case insensitive compare.");
77+
return files.FirstOrDefault();
78+
}
79+
80+
private IList<string> MatchPathItemsCaseInsensitive(string basePath, string searchItem, bool isDirectory)
81+
{
82+
var paths = _client.GetPaths(basePath).ToList();
83+
return paths.Where(p => p.IsDirectory == isDirectory && Path.GetFileName(p.Name).Equals(searchItem, StringComparison.CurrentCultureIgnoreCase))
84+
.Select(p => p.Name)
85+
.ToList();
86+
87+
}
88+
89+
private async Task<JObject> GetItemsAsync(DataLakeConfig dataLakeConfig, DataLakeGetItemsConfig getItemsConfig)
90+
{
91+
var directory = getItemsConfig.IgnoreDirectoryCase ?
92+
await CheckPathAsync(getItemsConfig.Directory, true) :
93+
getItemsConfig.Directory;
94+
95+
var paramsJsonFragment = GetParamsJsonFragment(dataLakeConfig, getItemsConfig);
96+
97+
if (!_client.GetDirectoryClient(directory).Exists())
98+
throw new DirectoryNotFoundException("Directory '{directory} could not be found'");
99+
//return new BadRequestObjectResult(JObject.Parse($"{{ {paramsJsonFragment}, \"error\": \"Directory '{directory} could not be found'\" }}"));
100+
101+
var paths = _client
102+
.GetPaths(path: directory ?? string.Empty, recursive: getItemsConfig.Recursive)
103+
.Select(p => new DataLakeFile
104+
{
105+
Name = Path.GetFileName(p.Name),
106+
Directory = p.IsDirectory.GetValueOrDefault(false) ?
107+
p.Name :
108+
Path.GetDirectoryName(p.Name).Replace(Path.DirectorySeparatorChar, '/'),
109+
FullPath = p.Name,
110+
Url = Url.Combine(dataLakeConfig.BaseUrl, p.Name),
111+
IsDirectory = p.IsDirectory.GetValueOrDefault(false),
112+
ContentLength = p.ContentLength.GetValueOrDefault(0),
113+
LastModified = p.LastModified.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss.fffZ")
114+
})
115+
.ToList();
116+
117+
// 1: Filter the results using dynamic LINQ
118+
foreach (var filter in getItemsConfig.Filters.Where(f => f.IsValid))
119+
{
120+
var dynamicLinqQuery = filter.GetDynamicLinqString();
121+
string dynamicLinqQueryValue = filter.GetDynamicLinqValue();
122+
_logger.LogInformation($"Applying filter: paths.AsQueryable().Where(\"{dynamicLinqQuery}\", \"{filter.Value}\").ToList()");
123+
paths = paths.AsQueryable().Where(dynamicLinqQuery, dynamicLinqQueryValue).ToList();
124+
}
125+
126+
// 2: Sort the results
127+
if (!string.IsNullOrWhiteSpace(getItemsConfig.OrderByColumn))
128+
{
129+
paths = paths.AsQueryable()
130+
.OrderBy(getItemsConfig.OrderByColumn + (getItemsConfig.OrderByDescending ? " descending" : string.Empty))
131+
.ToList();
132+
}
133+
134+
// 3: Do a top N if required
135+
if (getItemsConfig.Limit > 0 && getItemsConfig.Limit < paths.Count)
136+
paths = paths.Take(getItemsConfig.Limit).ToList();
137+
138+
139+
140+
// Output the results
141+
var versionAttribute = Attribute.GetCustomAttribute(Assembly.GetExecutingAssembly(), typeof(AssemblyInformationalVersionAttribute)) as AssemblyInformationalVersionAttribute;
142+
143+
var isEveryFilterValid = getItemsConfig.Filters.All(f => f.IsValid);
144+
if (!isEveryFilterValid)
145+
//throw InvalidFilterException()
146+
throw new InvalidFilterCriteriaException("Som filters are not valid");
147+
148+
149+
var filesListJson = isEveryFilterValid ?
150+
$"\"fileCount\": {paths.Count}," +
151+
$"\"files\": {JsonConvert.SerializeObject(paths, Formatting.Indented)}" :
152+
string.Empty;
153+
154+
var resultJson = $"{{ {paramsJsonFragment}, {(getItemsConfig.IgnoreDirectoryCase && directory != getItemsConfig.Directory ? $"\"correctedFilePath\": \"{directory}\"," : string.Empty)} {filesListJson} }}";
155+
156+
//return isEveryFilterValid ?
157+
// (IActionResult)new OkObjectResult(JObject.Parse(resultJson)) :
158+
// (IActionResult)new BadRequestObjectResult(JObject.Parse(resultJson));
159+
return JObject.Parse(resultJson);
160+
}
161+
162+
163+
private string GetParamsJsonFragment(DataLakeConfig dataLakeConfig, object parameters)
164+
{
165+
return $"\"debugInfo\": {AssemblyHelpers.GetAssemblyVersionInfoJson()}," +
166+
$"\"storageContainerUrl\": {dataLakeConfig.BaseUrl}," +
167+
parameters == null ?
168+
string.Empty :
169+
$"\"parameters\": {JsonConvert.SerializeObject(parameters, Formatting.Indented, new JsonSerializerSettings { NullValueHandling = Newtonsoft.Json.NullValueHandling.Ignore })}";
170+
}
171+
}
172+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace Azure.Datafactory.Extensions.DataLake
6+
{
7+
class DataLakeControllerFactory
8+
{
9+
}
10+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using Azure.Datafactory.Extensions.DataLake.Model;
2+
using Azure.Identity;
3+
using Azure.Storage.Files.DataLake;
4+
using Microsoft.Extensions.Logging;
5+
using System;
6+
7+
namespace Azure.Datafactory.Extensions.DataLake
8+
{
9+
public interface IDataLakeClientFactory
10+
{
11+
public DataLakeFileSystemClient GetDataLakeClient(DataLakeConfig dataLakeConfig);
12+
}
13+
}

0 commit comments

Comments
 (0)