Skip to content

Commit 8128ede

Browse files
authored
point trs integration jobs to sftp storage container (#2643)
https://trello.com/c/CMuoKrz8/1719-replace-globalscape-sftp-with-sftp-enabled-storage-container Sftp has been enabled on a new storage account, previously the storage account received it's files that were uploaded to a dqt sftp server, and moved them to a new folder once processed by trs. We are moving away from a self hosted sftp server, meaning that we need to add sftp users to an sftp enabled storage account. Because of this we can no longer just move them to a new folder in the storage account, as they can be deleted or modified. - re-wire ewc/capita jobs so that it reads and writes to the new sftp enabled storage container. - move processed files to another storage container that sftp users cannot modify. - sftp enabled storage accounts cannot be accessed using the standard **BlobServiceClient**. This has been replaced with **DataLakeServiceClient**
1 parent 2f52e76 commit 8128ede

File tree

31 files changed

+686
-436
lines changed

31 files changed

+686
-436
lines changed

TeachingRecordSystem/Directory.Packages.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,6 @@
115115
<PackageVersion Include="xunit.v3" Version="3.1.0" />
116116
<PackageVersion Include="xunit.v3.assert" Version="3.1.0" />
117117
<PackageVersion Include="xunit.v3.extensibility.core" Version="3.1.0" />
118+
<PackageVersion Include="Azure.Storage.Files.DataLake" Version="12.16.0" />
118119
</ItemGroup>
119-
</Project>
120+
</Project>

TeachingRecordSystem/src/TeachingRecordSystem.Api/packages.lock.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2225,6 +2225,7 @@
22252225
"dependencies": {
22262226
"AngleSharp": "[1.1.2, )",
22272227
"Azure.Storage.Blobs": "[12.25.0, )",
2228+
"Azure.Storage.Files.DataLake": "[12.16.0, )",
22282229
"CloudNative.CloudEvents": "[2.8.0, )",
22292230
"CloudNative.CloudEvents.SystemTextJson": "[2.8.0, )",
22302231
"CsvHelper": "[30.1.0, )",
@@ -2340,6 +2341,17 @@
23402341
"Azure.Storage.Common": "12.24.0"
23412342
}
23422343
},
2344+
"Azure.Storage.Files.DataLake": {
2345+
"type": "CentralTransitive",
2346+
"requested": "[12.16.0, )",
2347+
"resolved": "12.16.0",
2348+
"contentHash": "D+mkKzL5diWWqGX4pDNeHOIYfv91H15BJSfSLeBCjV3mHFAhykxYDWXOTVJ0ThobJ3yI2R4SahjbVmdgd0vlyg==",
2349+
"dependencies": {
2350+
"Azure.Storage.Blobs": "12.18.0",
2351+
"Azure.Storage.Common": "12.17.0",
2352+
"System.Text.Json": "4.7.2"
2353+
}
2354+
},
23432355
"Castle.Core": {
23442356
"type": "CentralTransitive",
23452357
"requested": "[5.1.1, )",

TeachingRecordSystem/src/TeachingRecordSystem.AuthorizeAccess/packages.lock.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2510,6 +2510,7 @@
25102510
"dependencies": {
25112511
"AngleSharp": "[1.1.2, )",
25122512
"Azure.Storage.Blobs": "[12.25.0, )",
2513+
"Azure.Storage.Files.DataLake": "[12.16.0, )",
25132514
"CloudNative.CloudEvents": "[2.8.0, )",
25142515
"CloudNative.CloudEvents.SystemTextJson": "[2.8.0, )",
25152516
"CsvHelper": "[30.1.0, )",
@@ -2625,6 +2626,17 @@
26252626
"Azure.Storage.Common": "12.24.0"
26262627
}
26272628
},
2629+
"Azure.Storage.Files.DataLake": {
2630+
"type": "CentralTransitive",
2631+
"requested": "[12.16.0, )",
2632+
"resolved": "12.16.0",
2633+
"contentHash": "D+mkKzL5diWWqGX4pDNeHOIYfv91H15BJSfSLeBCjV3mHFAhykxYDWXOTVJ0ThobJ3yI2R4SahjbVmdgd0vlyg==",
2634+
"dependencies": {
2635+
"Azure.Storage.Blobs": "12.18.0",
2636+
"Azure.Storage.Common": "12.17.0",
2637+
"System.Text.Json": "4.7.2"
2638+
}
2639+
},
26282640
"CloudNative.CloudEvents": {
26292641
"type": "CentralTransitive",
26302642
"requested": "[2.8.0, )",

TeachingRecordSystem/src/TeachingRecordSystem.Cli/packages.lock.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,6 +1995,7 @@
19951995
"dependencies": {
19961996
"AngleSharp": "[1.1.2, )",
19971997
"Azure.Storage.Blobs": "[12.25.0, )",
1998+
"Azure.Storage.Files.DataLake": "[12.16.0, )",
19981999
"CloudNative.CloudEvents": "[2.8.0, )",
19992000
"CloudNative.CloudEvents.SystemTextJson": "[2.8.0, )",
20002001
"CsvHelper": "[30.1.0, )",
@@ -2063,6 +2064,17 @@
20632064
"Azure.Storage.Common": "12.24.0"
20642065
}
20652066
},
2067+
"Azure.Storage.Files.DataLake": {
2068+
"type": "CentralTransitive",
2069+
"requested": "[12.16.0, )",
2070+
"resolved": "12.16.0",
2071+
"contentHash": "D+mkKzL5diWWqGX4pDNeHOIYfv91H15BJSfSLeBCjV3mHFAhykxYDWXOTVJ0ThobJ3yI2R4SahjbVmdgd0vlyg==",
2072+
"dependencies": {
2073+
"Azure.Storage.Blobs": "12.18.0",
2074+
"Azure.Storage.Common": "12.17.0",
2075+
"System.Text.Json": "4.7.2"
2076+
}
2077+
},
20662078
"CloudNative.CloudEvents": {
20672079
"type": "CentralTransitive",
20682080
"requested": "[2.8.0, )",

TeachingRecordSystem/src/TeachingRecordSystem.Core/Extensions.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Azure.Storage;
2+
using Azure.Storage.Files.DataLake;
13
using Hangfire;
24
using Hangfire.PostgreSql;
35
using Microsoft.Extensions.Azure;
@@ -50,6 +52,22 @@ public static IServiceCollection AddBlobStorage(this IServiceCollection services
5052
{
5153
clientBuilder.AddBlobServiceClient(configuration.GetRequiredValue("StorageConnectionString"));
5254
});
55+
56+
services.AddKeyedSingleton<DataLakeServiceClient>("sftpstorage", (sp, key) =>
57+
{
58+
var sftpAccountName = configuration.GetValue<string>("SftpStorageName");
59+
var sftpAccessKey = configuration.GetValue<string>("SftpStorageAccessKey");
60+
61+
if (string.IsNullOrEmpty(sftpAccountName) || string.IsNullOrEmpty(sftpAccessKey))
62+
{
63+
throw new InvalidOperationException("Invalid SFTP Storage connection string configuration.");
64+
}
65+
66+
var dfsUri = new Uri($"https://{sftpAccountName}.dfs.core.windows.net");
67+
var credential = new StorageSharedKeyCredential(sftpAccountName, sftpAccessKey);
68+
69+
return new DataLakeServiceClient(dfsUri, credential);
70+
});
5371
}
5472

5573
return services;

TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/CapitaExportAmendJob.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
using System.ComponentModel.DataAnnotations;
22
using System.Globalization;
33
using System.Text;
4-
using Azure.Storage.Blobs;
4+
using Azure.Storage.Files.DataLake;
55
using CsvHelper;
66
using CsvHelper.Configuration;
7+
using Microsoft.Extensions.DependencyInjection;
78
using Microsoft.Extensions.Logging;
89
using Microsoft.Extensions.Options;
910
using TeachingRecordSystem.Core.DataStore.Postgres;
1011
using TeachingRecordSystem.Core.DataStore.Postgres.Models;
1112
using TeachingRecordSystem.Core.Events.Legacy;
1213

13-
public class CapitaExportAmendJob(BlobServiceClient blobServiceClient, ILogger<CapitaExportAmendJob> logger, TrsDbContext dbContext, IClock clock, IOptions<CapitaTpsUserOption> capitaUser)
14+
public class CapitaExportAmendJob([FromKeyedServices("sftpstorage")] DataLakeServiceClient dataLakeServiceClient, ILogger<CapitaExportAmendJob> logger, TrsDbContext dbContext, IClock clock, IOptions<CapitaTpsUserOption> capitaUser)
1415
{
1516
public const string JobSchedule = "0 3 * * *";
1617
public const string LastRunDateKey = "LastRunDate";
17-
public const string StorageContainer = "dqt-integrations";
18-
public const string EXPORTS_FOLDER = "capita/exports";
18+
public const string StorageContainer = "capita-integrations";
19+
public const string ExportsFolder = "exports";
1920

2021
public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
2122
{
@@ -142,19 +143,19 @@ public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
142143
return integrationJob.IntegrationTransactionId;
143144
}
144145

145-
public async Task UploadFileAsync(Stream fileContentStream, string fileName)
146+
public async Task UploadFileAsync(Stream fileContentStream, string fileName, CancellationToken cancellationToken = default)
146147
{
147-
// Get the container client
148-
var containerClient = blobServiceClient!.GetBlobContainerClient(StorageContainer);
149-
await containerClient.CreateIfNotExistsAsync();
150-
151-
var targetFileName = $"{EXPORTS_FOLDER}/{fileName}";
152-
153-
// Get the blob client for the target file
154-
var blobClient = containerClient.GetBlobClient(targetFileName);
155-
156-
// Upload the stream
157-
await blobClient.UploadAsync(fileContentStream, overwrite: true);
148+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
149+
await fileSystemClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
150+
var targetPath = $"{ExportsFolder}/{fileName}";
151+
var fileClient = fileSystemClient.GetFileClient(targetPath);
152+
await fileClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
153+
154+
await using var memory = new MemoryStream();
155+
await fileContentStream.CopyToAsync(memory, cancellationToken);
156+
memory.Position = 0;
157+
await fileClient.AppendAsync(memory, offset: 0, cancellationToken: cancellationToken);
158+
await fileClient.FlushAsync(memory.Length, cancellationToken: cancellationToken);
158159
}
159160

160161
public string GetFileName(IClock now)
@@ -331,4 +332,3 @@ public class CapitaTpsUserOption
331332
[Required]
332333
public required Guid CapitaTpsUserId { get; set; }
333334
}
334-

TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/CapitaExportNewJob.cs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
using System.Globalization;
22
using System.Text;
3-
using Azure.Storage.Blobs;
3+
using Azure.Storage.Files.DataLake;
44
using CsvHelper;
55
using CsvHelper.Configuration;
6+
using Microsoft.Extensions.DependencyInjection;
67
using Microsoft.Extensions.Logging;
78
using TeachingRecordSystem.Core.DataStore.Postgres;
89
using TeachingRecordSystem.Core.DataStore.Postgres.Models;
910
using TeachingRecordSystem.Core.Events.Legacy;
1011

11-
public class CapitaExportNewJob(BlobServiceClient blobServiceClient, ILogger<CapitaExportNewJob> logger, TrsDbContext dbContext, IClock clock)
12+
public class CapitaExportNewJob([FromKeyedServices("sftpstorage")] DataLakeServiceClient dataLakeServiceClient, ILogger<CapitaExportNewJob> logger, TrsDbContext dbContext, IClock clock)
1213
{
1314
public const string JobSchedule = "0 3 * * *";
1415
public const string LastRunDateKey = "LastRunDate";
15-
public const string StorageContainer = "dqt-integrations";
16-
public const string EXPORTS_FOLDER = "capita/exports";
16+
public const string StorageContainer = "capita-integrations";
17+
public const string ExportsFolder = "exports";
1718

1819
public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
1920
{
@@ -165,19 +166,36 @@ public async Task<long> ExecuteAsync(CancellationToken cancellationToken)
165166
return integrationJob.IntegrationTransactionId;
166167
}
167168

168-
public async Task UploadFileAsync(Stream fileContentStream, string fileName)
169+
public async Task UploadFileAsync(Stream fileContentStream, string fileName, CancellationToken cancellationToken = default)
169170
{
170171
// Get the container client
171-
var containerClient = blobServiceClient!.GetBlobContainerClient(StorageContainer);
172-
await containerClient.CreateIfNotExistsAsync();
172+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
173+
await fileSystemClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
173174

174-
var targetFileName = $"{EXPORTS_FOLDER}/{fileName}";
175+
var targetPath = $"{ExportsFolder}/{fileName}";
176+
var fileClient = fileSystemClient.GetFileClient(targetPath);
175177

176-
// Get the blob client for the target file
177-
var blobClient = containerClient.GetBlobClient(targetFileName);
178+
await fileClient.DeleteIfExistsAsync(cancellationToken: cancellationToken);
178179

179-
// Upload the stream
180-
await blobClient.UploadAsync(fileContentStream, overwrite: true);
180+
await fileClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
181+
182+
Stream uploadStream = fileContentStream;
183+
if (!fileContentStream.CanSeek)
184+
{
185+
var memory = new MemoryStream();
186+
await fileContentStream.CopyToAsync(memory, cancellationToken);
187+
memory.Position = 0;
188+
uploadStream = memory;
189+
}
190+
191+
await fileClient.AppendAsync(uploadStream, offset: 0, cancellationToken: cancellationToken);
192+
await fileClient.FlushAsync(uploadStream.Length, cancellationToken: cancellationToken);
193+
194+
// Dispose temporary memory stream if we created one
195+
if (uploadStream != fileContentStream)
196+
{
197+
await uploadStream.DisposeAsync();
198+
}
181199
}
182200

183201
public string GetFileName(IClock now)

TeachingRecordSystem/src/TeachingRecordSystem.Core/Jobs/CapitaImportJob.cs

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
using System.Globalization;
22
using System.Text;
33
using System.Text.RegularExpressions;
4-
using Azure.Storage.Blobs;
5-
using Azure.Storage.Blobs.Models;
6-
using Azure.Storage.Blobs.Specialized;
4+
using Azure.Storage.Files.DataLake;
75
using CsvHelper;
86
using CsvHelper.Configuration;
7+
using Microsoft.Extensions.DependencyInjection;
98
using Microsoft.Extensions.Logging;
109
using Microsoft.Extensions.Options;
1110
using TeachingRecordSystem.Core.DataStore.Postgres;
1211
using TeachingRecordSystem.Core.DataStore.Postgres.Models;
1312
using TeachingRecordSystem.Core.Dqt;
1413
using TeachingRecordSystem.Core.Services.PersonMatching;
1514

15+
1616
namespace TeachingRecordSystem.Core.Jobs;
1717

18-
public class CapitaImportJob(BlobServiceClient blobServiceClient, ILogger<CapitaImportJob> logger, TrsDbContext dbContext, IClock clock, IPersonMatchingService personMatchingService, IOptions<CapitaTpsUserOption> capitaUser)
18+
public class CapitaImportJob([FromKeyedServices("sftpstorage")] DataLakeServiceClient dataLakeServiceClient, ILogger<CapitaImportJob> logger, TrsDbContext dbContext, IClock clock, IPersonMatchingService personMatchingService, IOptions<CapitaTpsUserOption> capitaUser)
1919
{
2020
public const string JobSchedule = "0 4 * * *";
21-
public const string StorageContainer = "dqt-integrations";
22-
public const string PICKUP_FOLDER = "capita/pickup";
21+
public const string StorageContainer = "capita-integrations";
22+
public const string PickupFolder = "pickup";
2323
private const string ProcessedFolder = "capita/processed";
24+
public const string ArchivedContainer = "archived-integration-transactions";
2425

2526
public async Task ExecuteAsync(CancellationToken cancellationToken)
2627
{
@@ -37,29 +38,30 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
3738

3839
public async Task ArchiveFileAsync(string fileName, CancellationToken cancellationToken)
3940
{
40-
var blobContainerClient = blobServiceClient.GetBlobContainerClient(StorageContainer);
41-
var sourceBlobClient = blobContainerClient.GetBlobClient(fileName);
42-
var fileNameParts = fileName.Split("/");
43-
var fileNameWithoutFolder = $"{DateTime.Now.ToString("ddMMyyyyHHmm")}-{fileNameParts.Last()}";
44-
var targetFileName = $"{ProcessedFolder}/{fileNameWithoutFolder}";
45-
46-
// Acquire a lease to prevent another client modifying the source blob
47-
var lease = sourceBlobClient.GetBlobLeaseClient();
48-
await lease.AcquireAsync(TimeSpan.FromSeconds(60), cancellationToken: cancellationToken);
49-
50-
var targetBlobClient = blobContainerClient.GetBlobClient(targetFileName);
51-
var copyOperation = await targetBlobClient.StartCopyFromUriAsync(sourceBlobClient.Uri, cancellationToken: cancellationToken);
52-
await copyOperation.WaitForCompletionAsync();
53-
54-
// Release the lease
55-
var sourceProperties = await sourceBlobClient.GetPropertiesAsync(cancellationToken: cancellationToken);
56-
if (sourceProperties.Value.LeaseState == LeaseState.Leased)
57-
{
58-
await lease.ReleaseAsync(cancellationToken: cancellationToken);
59-
}
41+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
42+
var arhivedFileSystemClient = dataLakeServiceClient.GetFileSystemClient(ArchivedContainer);
43+
var sourceFile = fileSystemClient.GetFileClient(fileName);
44+
45+
var fileNameParts = fileName.Split('/');
46+
var fileNameWithoutFolder = $"{DateTime.UtcNow:ddMMyyyyHHmm}-{fileNameParts.Last()}";
47+
var targetPath = $"{ProcessedFolder}/{fileNameWithoutFolder}";
48+
var targetFile = arhivedFileSystemClient.GetFileClient(targetPath);
49+
50+
await targetFile.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
51+
52+
// Read the source file
53+
var readResponse = await sourceFile.ReadAsync(cancellationToken: cancellationToken);
54+
await using var sourceStream = readResponse.Value.Content;
6055

61-
// Now remove the original blob
62-
await sourceBlobClient.DeleteAsync(DeleteSnapshotsOption.IncludeSnapshots, cancellationToken: cancellationToken);
56+
await using var memory = new MemoryStream();
57+
await sourceStream.CopyToAsync(memory, cancellationToken);
58+
memory.Position = 0;
59+
60+
await targetFile.AppendAsync(memory, offset: 0, cancellationToken: cancellationToken);
61+
await targetFile.FlushAsync(memory.Length, cancellationToken: cancellationToken);
62+
63+
// Delete the original file
64+
await sourceFile.DeleteIfExistsAsync(cancellationToken: cancellationToken);
6365
}
6466

6567
public async Task<long> ImportAsync(StreamReader reader, string fileName)
@@ -407,27 +409,26 @@ public async Task<TrnRequestMatchResult> GetPotentialMatchingPersonsAsync(Capita
407409

408410
public async Task<Stream> GetDownloadStreamAsync(string fileName)
409411
{
410-
BlobContainerClient containerClient = blobServiceClient.GetBlobContainerClient(StorageContainer);
411-
BlobClient blobClient = containerClient.GetBlobClient($"{fileName}");
412-
var streamingResult = await blobClient.DownloadStreamingAsync();
413-
return streamingResult.Value.Content;
412+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
413+
var fileClient = fileSystemClient.GetFileClient(fileName);
414+
var readResponse = await fileClient.ReadAsync();
415+
return readResponse.Value.Content; // Stream, must be disposed by caller
414416
}
415417

416418
private async Task<string[]> GetImportFilesAsync(CancellationToken cancellationToken)
417419
{
418-
var blobContainerClient = blobServiceClient.GetBlobContainerClient(StorageContainer);
420+
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(StorageContainer);
419421
var fileNames = new List<string>();
420-
var resultSegment = blobContainerClient.GetBlobsByHierarchyAsync(prefix: PICKUP_FOLDER, delimiter: "", cancellationToken: cancellationToken).AsPages();
421-
await foreach (Azure.Page<BlobHierarchyItem> blobPage in resultSegment)
422+
423+
await foreach (var pathItem in fileSystemClient.GetPathsAsync($"{PickupFolder}/", recursive: false, cancellationToken: cancellationToken))
422424
{
423-
foreach (BlobHierarchyItem blobhierarchyItem in blobPage.Values)
425+
// Only add files, skip directories
426+
if (pathItem.IsDirectory == false)
424427
{
425-
if (blobhierarchyItem.IsBlob)
426-
{
427-
fileNames.Add(blobhierarchyItem.Blob.Name);
428-
}
428+
fileNames.Add(pathItem.Name);
429429
}
430430
}
431+
431432
return fileNames.ToArray();
432433
}
433434
}

0 commit comments

Comments
 (0)