1
1
using System ;
2
- using System . IO ;
3
2
using System . Threading . Tasks ;
4
3
using Microsoft . AspNetCore . Mvc ;
5
4
using Microsoft . Azure . WebJobs ;
6
5
using Microsoft . Azure . WebJobs . Extensions . Http ;
7
6
using Microsoft . AspNetCore . Http ;
8
7
using Microsoft . Extensions . Logging ;
9
- using Newtonsoft . Json ;
10
- using Azure . Identity ;
11
- using Azure . Storage . Files . DataLake ;
12
- using System . Collections . Generic ;
13
- using Newtonsoft . Json . Linq ;
14
- using Flurl ;
15
8
using System . Linq ;
16
- using System . Reflection ;
17
- using System . Linq . Dynamic . Core ;
18
- using Azure . Storage . Files . DataLake . Models ;
19
9
using Azure . Datafactory . Extensions . DataLake ;
20
- using Azure . Datafactory . Extensions . DataLake . Model ;
21
10
22
11
namespace Azure . Datafactory . Extensions . Functions
23
12
{
24
- public partial class DataLakeFunctions
13
+ public partial class DataLakeFunctions : FunctionsBase
25
14
{
26
- private readonly ILogger < DataLakeFunctions > _logger ;
27
15
private readonly DataLakeConfigFactory _configFactory ;
28
16
private readonly IDataLakeClientFactory _clientFactory ;
29
- public DataLakeFunctions ( ILogger < DataLakeFunctions > logger , DataLakeConfigFactory configFactory , IDataLakeClientFactory clientFactory )
17
+ private readonly DataLakeControllerFactory _controllerFactory ;
18
+ public DataLakeFunctions ( ILogger < DataLakeFunctions > logger , DataLakeConfigFactory configFactory , IDataLakeClientFactory clientFactory , DataLakeControllerFactory controllerFactory ) :
19
+ base ( logger )
30
20
{
31
- _logger = logger ;
32
21
_configFactory = configFactory ;
33
22
_clientFactory = clientFactory ;
23
+ _controllerFactory = controllerFactory ;
34
24
}
35
25
36
26
37
27
28
+
38
29
[ FunctionName ( "DataLakeGetItems" ) ]
39
30
public async Task < IActionResult > DataLakeGetItems (
40
31
[ HttpTrigger ( AuthorizationLevel . Function , "get" /*, "post"*/ , Route = null ) ] HttpRequest req )
@@ -53,7 +44,14 @@ public async Task<IActionResult> DataLakeGetItems(
53
44
throw new ArgumentException ( $ "Account Uri '{ dataLakeConfig . AccountUri } ' not found. Check the URI is correct.") ;
54
45
55
46
var client = _clientFactory . GetDataLakeClient ( dataLakeConfig ) ;
56
- return await GetItemsAsync ( client , dataLakeConfig , getItemsConfig ) ;
47
+ var controller = _controllerFactory . CreateDataLakeController ( client ) ;
48
+
49
+ var responseJson = GetBaseResponse ( dataLakeConfig , getItemsConfig ) ;
50
+ var items = await controller . GetItemsAsync ( dataLakeConfig , getItemsConfig ) ;
51
+ foreach ( var item in items )
52
+ responseJson . Add ( item . Key , item . Value ) ;
53
+
54
+ return ( IActionResult ) new OkObjectResult ( responseJson ) ;
57
55
}
58
56
catch ( ArgumentException ex )
59
57
{
@@ -85,21 +83,20 @@ public async Task<IActionResult> DataLakeCheckPathCase(
85
83
throw new ArgumentException ( $ "Account Uri '{ dataLakeConfig . AccountUri } ' not found. Check the URI is correct.") ;
86
84
87
85
var client = _clientFactory . GetDataLakeClient ( dataLakeConfig ) ;
86
+ var controller = _controllerFactory . CreateDataLakeController ( client ) ;
88
87
89
- var paramsJsonFragment = GetParamsJsonFragment ( dataLakeConfig , getItemsConfig ) ;
90
- var validatedPath = await CheckPathAsync ( client , getItemsConfig . Path , true ) ;
88
+ var validatedPath = await controller . CheckPathAsync ( getItemsConfig . Path , true ) ;
91
89
92
90
// If multiple files match, the function will throw and the catch block will return a BadRequestObjectResult
93
91
// If the path could not be found as a directory, try for a file...
94
- validatedPath ??= await CheckPathAsync ( client , getItemsConfig . Path , false ) ;
92
+ validatedPath ??= await controller . CheckPathAsync ( getItemsConfig . Path , false ) ;
95
93
96
- var resultJson = "{" +
97
- $ "{ paramsJsonFragment } , \" validatedPath\" :\" { validatedPath } \" " +
98
- "}" ;
94
+ var responseJson = GetBaseResponse ( dataLakeConfig , getItemsConfig ) ;
95
+ responseJson . Add ( "validatedPath" , validatedPath ) ;
99
96
100
97
return validatedPath != null ?
101
- ( IActionResult ) new OkObjectResult ( JObject . Parse ( resultJson ) ) :
102
- ( IActionResult ) new NotFoundObjectResult ( JObject . Parse ( resultJson ) ) ;
98
+ ( IActionResult ) new OkObjectResult ( responseJson ) :
99
+ ( IActionResult ) new NotFoundObjectResult ( responseJson ) ;
103
100
}
104
101
catch ( ArgumentException ex )
105
102
{
@@ -112,173 +109,6 @@ public async Task<IActionResult> DataLakeCheckPathCase(
112
109
return new BadRequestObjectResult ( "An error occurred, see the Azure Function logs for more details" ) ;
113
110
}
114
111
}
115
-
116
-
117
-
118
-
119
-
120
-
121
-
122
- private string GetParamsJsonFragment ( DataLakeConfig dataLakeConfig , object parameters )
123
- {
124
- return $ "\" debugInfo\" : { AssemblyHelpers . GetAssemblyVersionInfoJson ( ) } ," +
125
- $ "\" storageContainerUrl\" : { dataLakeConfig . BaseUrl } ," +
126
- parameters == null ?
127
- string . Empty :
128
- $ "\" parameters\" : { JsonConvert . SerializeObject ( parameters , Formatting . Indented , new JsonSerializerSettings { NullValueHandling = Newtonsoft . Json . NullValueHandling . Ignore } ) } ";
129
- }
130
-
131
-
132
-
133
-
134
-
135
-
136
-
137
-
138
-
139
-
140
-
141
-
142
-
143
-
144
-
145
-
146
-
147
-
148
-
149
-
150
-
151
- private async Task < string > CheckPathAsync ( DataLakeFileSystemClient client , string path , bool isDirectory )
152
- {
153
- if ( path == null || path . Trim ( ) == "/" )
154
- return null ;
155
-
156
- // Check if the path exists with the casing as is...
157
- var pathExists = isDirectory ?
158
- client . GetDirectoryClient ( path ) . Exists ( ) :
159
- client . GetFileClient ( path ) . Exists ( ) ;
160
- if ( pathExists )
161
- return path ;
162
-
163
- _logger . LogInformation ( $ "${ ( isDirectory ? "Directory" : "File" ) } '${ path } ' not found, checking paths case using case insensitive compare...") ;
164
-
165
- // Split the paths so we can test them seperately
166
- var directoryPath = isDirectory ? path : Path . GetDirectoryName ( path ) . Replace ( Path . DirectorySeparatorChar , '/' ) ;
167
- var filename = isDirectory ? null : Path . GetFileName ( path ) ;
168
-
169
- // If the directory does not exist, we find it
170
- string validDirectory = null ;
171
- if ( ! await client . GetDirectoryClient ( path ) . ExistsAsync ( ) )
172
- {
173
- var directoryParts = directoryPath . Split ( '/' ) ;
174
- foreach ( var directoryPart in directoryParts )
175
- {
176
- var searchItem = directoryPart ;
177
- var validPaths = MatchPathItemsCaseInsensitive ( client , validDirectory , searchItem , true ) ;
178
-
179
- if ( validPaths . Count == 0 )
180
- return null ;
181
- else if ( validPaths . Count > 1 )
182
- throw new Exception ( "Multiple paths matched with case insensitive compare." ) ;
183
-
184
- validDirectory = validPaths [ 0 ] ;
185
- }
186
- }
187
-
188
- if ( isDirectory )
189
- return validDirectory ;
190
-
191
- // Now check if the file exists using the corrected directory, and if not find a match...
192
- var testFilePath = $ "{ validDirectory ?? "" } /{ filename } ". TrimStart ( '/' ) ;
193
- if ( client . GetFileClient ( testFilePath ) . Exists ( ) )
194
- return testFilePath ;
195
-
196
- var files = MatchPathItemsCaseInsensitive ( client , validDirectory , filename , false ) ;
197
- if ( files . Count > 1 )
198
- throw new Exception ( "Multiple paths matched with case insensitive compare." ) ;
199
- return files . FirstOrDefault ( ) ;
200
- }
201
-
202
- private IList < string > MatchPathItemsCaseInsensitive ( DataLakeFileSystemClient client , string basePath , string searchItem , bool isDirectory )
203
- {
204
- var paths = client . GetPaths ( basePath ) . ToList ( ) ;
205
- return paths . Where ( p => p . IsDirectory == isDirectory && Path . GetFileName ( p . Name ) . Equals ( searchItem , StringComparison . CurrentCultureIgnoreCase ) )
206
- . Select ( p => p . Name )
207
- . ToList ( ) ;
208
-
209
- }
210
-
211
-
212
- private async Task < IActionResult > GetItemsAsync ( DataLakeFileSystemClient client , DataLakeConfig dataLakeConfig , DataLakeGetItemsConfig getItemsConfig )
213
- {
214
- var directory = getItemsConfig . IgnoreDirectoryCase ?
215
- await CheckPathAsync ( client , getItemsConfig . Directory , true ) :
216
- getItemsConfig . Directory ;
217
-
218
- var paramsJsonFragment = GetParamsJsonFragment ( dataLakeConfig , getItemsConfig ) ;
219
-
220
- if ( ! client . GetDirectoryClient ( directory ) . Exists ( ) )
221
- return new BadRequestObjectResult ( JObject . Parse ( $ "{{ { paramsJsonFragment } , \" error\" : \" Directory '{ directory } could not be found'\" }}") ) ;
222
-
223
- var paths = client
224
- . GetPaths ( path : directory ?? string . Empty , recursive : getItemsConfig . Recursive )
225
- . Select ( p => new DataLakeFile
226
- {
227
- Name = Path . GetFileName ( p . Name ) ,
228
- Directory = p . IsDirectory . GetValueOrDefault ( false ) ?
229
- p . Name :
230
- Path . GetDirectoryName ( p . Name ) . Replace ( Path . DirectorySeparatorChar , '/' ) ,
231
- FullPath = p . Name ,
232
- Url = Url . Combine ( dataLakeConfig . BaseUrl , p . Name ) ,
233
- IsDirectory = p . IsDirectory . GetValueOrDefault ( false ) ,
234
- ContentLength = p . ContentLength . GetValueOrDefault ( 0 ) ,
235
- LastModified = p . LastModified . ToUniversalTime ( ) . ToString ( "yyyy-MM-ddTHH:mm:ss.fffZ" )
236
- } )
237
- . ToList ( ) ;
238
-
239
- // 1: Filter the results using dynamic LINQ
240
- foreach ( var filter in getItemsConfig . Filters . Where ( f => f . IsValid ) )
241
- {
242
- var dynamicLinqQuery = filter . GetDynamicLinqString ( ) ;
243
- string dynamicLinqQueryValue = filter . GetDynamicLinqValue ( ) ;
244
- _logger . LogInformation ( $ "Applying filter: paths.AsQueryable().Where(\" { dynamicLinqQuery } \" , \" { filter . Value } \" ).ToList()") ;
245
- paths = paths . AsQueryable ( ) . Where ( dynamicLinqQuery , dynamicLinqQueryValue ) . ToList ( ) ;
246
- }
247
-
248
- // 2: Sort the results
249
- if ( ! string . IsNullOrWhiteSpace ( getItemsConfig . OrderByColumn ) )
250
- {
251
- paths = paths . AsQueryable ( )
252
- . OrderBy ( getItemsConfig . OrderByColumn + ( getItemsConfig . OrderByDescending ? " descending" : string . Empty ) )
253
- . ToList ( ) ;
254
- }
255
-
256
- // 3: Do a top N if required
257
- if ( getItemsConfig . Limit > 0 && getItemsConfig . Limit < paths . Count )
258
- paths = paths . Take ( getItemsConfig . Limit ) . ToList ( ) ;
259
-
260
-
261
-
262
- // Output the results
263
- var versionAttribute = Attribute . GetCustomAttribute ( Assembly . GetExecutingAssembly ( ) , typeof ( AssemblyInformationalVersionAttribute ) ) as AssemblyInformationalVersionAttribute ;
264
-
265
- var IsEveryFilterValid = getItemsConfig . Filters . All ( f => f . IsValid ) ;
266
- var filesListJson = IsEveryFilterValid ?
267
- $ "\" fileCount\" : { paths . Count } ," +
268
- $ "\" files\" : { JsonConvert . SerializeObject ( paths , Formatting . Indented ) } " :
269
- string . Empty ;
270
-
271
- var resultJson = $ "{{ { paramsJsonFragment } , { ( getItemsConfig . IgnoreDirectoryCase && directory != getItemsConfig . Directory ? $ "\" correctedFilePath\" : \" { directory } \" ," : string . Empty ) } { filesListJson } }}";
272
-
273
- return IsEveryFilterValid ?
274
- ( IActionResult ) new OkObjectResult ( JObject . Parse ( resultJson ) ) :
275
- ( IActionResult ) new BadRequestObjectResult ( JObject . Parse ( resultJson ) ) ;
276
- }
277
-
278
112
279
113
}
280
-
281
-
282
-
283
-
284
114
}
0 commit comments