@@ -10,19 +10,27 @@ namespace CluedIn.Connector.DataLake.Common.Connector
1010{
1111 public abstract class DataLakeClient : IDataLakeClient
1212 {
13- public async Task < DataLakeDirectoryClient > EnsureDataLakeDirectoryExist ( IDataLakeJobData configuration )
13+ public Task < DataLakeDirectoryClient > EnsureDataLakeDirectoryExist ( IDataLakeJobData configuration )
1414 {
15- var fileSystemClient = await GetFileSystemClientAsync ( configuration ) ;
16- var directory = GetDirectory ( configuration ) ;
17- var directoryClient = fileSystemClient . GetDirectoryClient ( directory ) ;
18- if ( ! await directoryClient . ExistsAsync ( ) )
19- {
20- directoryClient = await fileSystemClient . CreateDirectoryAsync ( directory ) ;
21- }
15+ return EnsureDataLakeDirectoryExist ( configuration , string . Empty ) ;
16+ }
17+
18+ public async Task < DataLakeDirectoryClient > EnsureDataLakeDirectoryExist ( IDataLakeJobData configuration , string subDirectory )
19+ {
20+ var fileSystemClient = await GetFileSystemClientAsync ( configuration , ensureExists : true ) ;
21+ var directoryClient = await GetDirectoryClientAsync ( configuration , fileSystemClient , subDirectory , ensureExists : true ) ;
2222
2323 return directoryClient ;
2424 }
2525
26+ public async Task DeleteDirectory ( IDataLakeJobData configuration , string subDirectory )
27+ {
28+ var fileSystemClient = await GetFileSystemClientAsync ( configuration , ensureExists : true ) ;
29+ var directoryClient = await GetDirectoryClientAsync ( configuration , fileSystemClient , subDirectory , ensureExists : true ) ;
30+
31+ await directoryClient . DeleteIfExistsAsync ( ) ;
32+ }
33+
2634 public async Task SaveData ( IDataLakeJobData configuration , string content , string fileName , string contentType )
2735 {
2836 using var stream = new MemoryStream ( System . Text . Encoding . UTF8 . GetBytes ( content ) ) ;
@@ -69,19 +77,20 @@ protected static TJobData CastJobData<TJobData>(IDataLakeJobData jobData) where
6977 return castedJobData ;
7078 }
7179
72- public async Task < bool > FileInPathExists ( IDataLakeJobData configuration , string fileName )
80+ public Task < bool > FileInPathExists ( IDataLakeJobData configuration , string fileName )
7381 {
74- var serviceClient = GetDataLakeServiceClient ( configuration ) ;
75- var fileSystemName = GetFileSystemName ( configuration ) ;
76- var fileSystemClient = serviceClient . GetFileSystemClient ( fileSystemName ) ;
82+ return FileInPathExists ( configuration , fileName , string . Empty ) ;
83+ }
7784
85+ public async Task < bool > FileInPathExists ( IDataLakeJobData configuration , string fileName , string subDirectory )
86+ {
87+ var fileSystemClient = await GetFileSystemClientAsync ( configuration , ensureExists : false ) ;
7888 if ( ! await fileSystemClient . ExistsAsync ( ) )
7989 {
8090 return false ;
8191 }
8292
83- var directory = GetDirectory ( configuration ) ;
84- var directoryClient = fileSystemClient . GetDirectoryClient ( directory ) ;
93+ var directoryClient = await GetDirectoryClientAsync ( configuration , fileSystemClient , subDirectory , ensureExists : false ) ;
8594 if ( ! await directoryClient . ExistsAsync ( ) )
8695 {
8796 return false ;
@@ -91,19 +100,38 @@ public async Task<bool> FileInPathExists(IDataLakeJobData configuration, string
91100 return await dataLakeFileClient . ExistsAsync ( ) ;
92101 }
93102
94- public async Task < PathProperties > GetFilePathProperties ( IDataLakeJobData configuration , string fileName )
103+ public Task < bool > DirectoryExists ( IDataLakeJobData configuration )
95104 {
96- var serviceClient = GetDataLakeServiceClient ( configuration ) ;
97- var fileSystemName = GetFileSystemName ( configuration ) ;
98- var fileSystemClient = serviceClient . GetFileSystemClient ( fileSystemName ) ;
105+ return DirectoryExists ( configuration , string . Empty ) ;
106+ }
107+
108+ public async Task < bool > DirectoryExists ( IDataLakeJobData configuration , string subDirectory )
109+ {
110+ var fileSystemClient = await GetFileSystemClientAsync ( configuration , ensureExists : false ) ;
111+ if ( ! await fileSystemClient . ExistsAsync ( ) )
112+ {
113+ return false ;
114+ }
115+
116+ var directoryClient = await GetDirectoryClientAsync ( configuration , fileSystemClient , subDirectory , ensureExists : false ) ;
117+ return await directoryClient . ExistsAsync ( ) ;
118+ }
119+
120+ public Task < PathProperties > GetFilePathProperties ( IDataLakeJobData configuration , string fileName )
121+ {
122+ return GetFilePathProperties ( configuration , fileName , string . Empty ) ;
123+ }
124+
125+ public async Task < PathProperties > GetFilePathProperties ( IDataLakeJobData configuration , string fileName , string subDirectory )
126+ {
127+ var fileSystemClient = await GetFileSystemClientAsync ( configuration , ensureExists : false ) ;
99128
100129 if ( ! await fileSystemClient . ExistsAsync ( ) )
101130 {
102131 return null ;
103132 }
104133
105- var directory = GetDirectory ( configuration ) ;
106- var directoryClient = fileSystemClient . GetDirectoryClient ( directory ) ;
134+ var directoryClient = await GetDirectoryClientAsync ( configuration , fileSystemClient , subDirectory , ensureExists : false ) ;
107135 if ( ! await directoryClient . ExistsAsync ( ) )
108136 {
109137 return null ;
@@ -118,13 +146,37 @@ public async Task<PathProperties> GetFilePathProperties(IDataLakeJobData configu
118146 return await dataLakeFileClient . GetPropertiesAsync ( ) ;
119147 }
120148
149+ private async Task < DataLakeDirectoryClient > GetDirectoryClientAsync (
150+ IDataLakeJobData configuration ,
151+ DataLakeFileSystemClient fileSystemClient ,
152+ string subDirectory ,
153+ bool ensureExists )
154+ {
155+ var directory = GetDirectory ( configuration ) ;
156+ var directoryClient = fileSystemClient . GetDirectoryClient ( directory ) ;
157+ if ( string . IsNullOrWhiteSpace ( subDirectory ) )
158+ {
159+ return directoryClient ;
160+ }
161+
162+ directoryClient = directoryClient . GetSubDirectoryClient ( subDirectory ) ;
163+
164+ if ( ensureExists && ! await directoryClient . ExistsAsync ( ) )
165+ {
166+ directoryClient = await fileSystemClient . CreateDirectoryAsync ( directoryClient . Path ) ;
167+ }
168+
169+ return directoryClient ;
170+ }
171+
121172 private async Task < DataLakeFileSystemClient > GetFileSystemClientAsync (
122- IDataLakeJobData configuration )
173+ IDataLakeJobData configuration ,
174+ bool ensureExists )
123175 {
124176 var dataLakeServiceClient = GetDataLakeServiceClient ( configuration ) ;
125177 var fileSystemName = GetFileSystemName ( configuration ) ;
126178 var dataLakeFileSystemClient = dataLakeServiceClient . GetFileSystemClient ( fileSystemName ) ;
127- if ( ! await dataLakeFileSystemClient . ExistsAsync ( ) )
179+ if ( ensureExists && ! await dataLakeFileSystemClient . ExistsAsync ( ) )
128180 {
129181 dataLakeFileSystemClient = await dataLakeServiceClient . CreateFileSystemAsync ( fileSystemName ) ;
130182 }
0 commit comments