Skip to content

Commit f158e14

Browse files
committed
fix: Delete output directory correctly when table name is used
AB#55494
1 parent 8fa1624 commit f158e14

File tree

6 files changed

+102
-67
lines changed

6 files changed

+102
-67
lines changed

src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Data;
44
using System.Linq;
55
using System.Net;
6-
using System.Text.RegularExpressions;
76
using System.Threading.Tasks;
87
using System.Transactions;
98

@@ -490,7 +489,7 @@ protected virtual Task<string> GetOutputFileNameAsync(ExecutionContext context,
490489
{
491490
if (HasCustomFileNamePattern(configuration))
492491
{
493-
return ReplaceNameUsingPatternAsync(context, configuration.FileNamePattern, streamId, containerName, asOfTime, outputFormat);
492+
return PatternHelper.ReplaceNameUsingPatternAsync(context, configuration.FileNamePattern, streamId, containerName, asOfTime, outputFormat);
494493
}
495494

496495
return GetDefaultOutputFileNameAsync(context, configuration, streamId, containerName, asOfTime, outputFormat);
@@ -509,53 +508,6 @@ protected virtual Task<string> GetDefaultOutputFileNameAsync(ExecutionContext co
509508
return Task.FromResult($"{streamIdFormatted}_{asOfTime:yyyyMMddHHmmss}.{fileExtension}");
510509
}
511510

512-
protected static Task<string> ReplaceNameUsingPatternAsync(ExecutionContext context, string pattern, Guid streamId, string containerName, DateTimeOffset asOfTime, string outputFormat)
513-
{
514-
var timeRegexPattern = @"\{(DataTime)(\:[a-zA-Z0-9\-\._]+)?\}";
515-
var streamIdRegexPattern = @"\{(StreamId)(\:[a-zA-Z0-9\-\._]+)?\}";
516-
var containerNameRegexPattern = @"\{(ContainerName)(\:[a-zA-Z0-9\-\._]+)?\}";
517-
var outputFormatRegexPattern = @"\{(OutputFormat)(\:[a-zA-Z0-9\-\._]+)?\}";
518-
519-
var timeReplaced = Replace(timeRegexPattern, pattern, (match, format) => asOfTime.ToString(format ?? "o"));
520-
var streamIdReplaced = Replace(streamIdRegexPattern, timeReplaced, (match, format) => streamId.ToString(format ?? "D"));
521-
var containerNameReplaced = Replace(containerNameRegexPattern, streamIdReplaced, (match, format) => containerName);
522-
var outputFormatReplaced = Replace(outputFormatRegexPattern, containerNameReplaced, (match, format) =>
523-
{
524-
return format?.ToLowerInvariant() switch
525-
{
526-
"toupper" => outputFormat.ToUpperInvariant(),
527-
"toupperinvariant" => outputFormat.ToUpperInvariant(),
528-
"tolower" => outputFormat.ToLowerInvariant(),
529-
"tolowerinvariant" => outputFormat.ToLowerInvariant(),
530-
null => outputFormat,
531-
_ => throw new NotSupportedException($"Format '{format}' is not supported"),
532-
};
533-
});
534-
535-
return Task.FromResult(outputFormatReplaced);
536-
}
537-
538-
private static string Replace(string pattern, string input, Func<Match, string, string> formatter)
539-
{
540-
var regex = new Regex(pattern);
541-
var matches = regex.Matches(input);
542-
var result = input;
543-
foreach (var match in matches.Reverse())
544-
{
545-
if (match.Groups.Count != 3 || !match.Groups[1].Success)
546-
{
547-
continue;
548-
}
549-
var format = match.Groups[2].Success
550-
? match.Groups[2].Captures.Single().Value[1..]
551-
: null;
552-
var formatted = formatter(match, format);
553-
result = $"{result[0..match.Index]}{formatted}{result[(match.Index + match.Length)..]}";
554-
}
555-
556-
return result;
557-
}
558-
559511
protected virtual string GetFileExtension(string outputFormat)
560512
{
561513
return outputFormat;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System;
2+
using System.Linq;
3+
using System.Text.RegularExpressions;
4+
using System.Threading.Tasks;
5+
6+
using CluedIn.Core;
7+
8+
namespace CluedIn.Connector.DataLake.Common.Connector;
9+
10+
internal class PatternHelper
11+
{
12+
public static Task<string> ReplaceNameUsingPatternAsync(ExecutionContext context, string pattern, Guid streamId, string containerName, DateTimeOffset dataTime, string outputFormat)
13+
{
14+
var timeRegexPattern = @"\{(DataTime)(\:[a-zA-Z0-9\-\._]+)?\}";
15+
var streamIdRegexPattern = @"\{(StreamId)(\:[a-zA-Z0-9\-\._]+)?\}";
16+
var containerNameRegexPattern = @"\{(ContainerName)(\:[a-zA-Z0-9\-\._]+)?\}";
17+
var outputFormatRegexPattern = @"\{(OutputFormat)(\:[a-zA-Z0-9\-\._]+)?\}";
18+
19+
var timeReplaced = Replace(timeRegexPattern, pattern, (match, format) => dataTime.ToString(format ?? "o"));
20+
var streamIdReplaced = Replace(streamIdRegexPattern, timeReplaced, (match, format) => streamId.ToString(format ?? "D"));
21+
var containerNameReplaced = Replace(containerNameRegexPattern, streamIdReplaced, (match, format) => containerName);
22+
var outputFormatReplaced = Replace(outputFormatRegexPattern, containerNameReplaced, (match, format) =>
23+
{
24+
return format?.ToLowerInvariant() switch
25+
{
26+
"toupper" => outputFormat.ToUpperInvariant(),
27+
"toupperinvariant" => outputFormat.ToUpperInvariant(),
28+
"tolower" => outputFormat.ToLowerInvariant(),
29+
"tolowerinvariant" => outputFormat.ToLowerInvariant(),
30+
null => outputFormat,
31+
_ => throw new NotSupportedException($"Format '{format}' is not supported"),
32+
};
33+
});
34+
35+
return Task.FromResult(outputFormatReplaced);
36+
}
37+
38+
private static string Replace(string pattern, string input, Func<Match, string, string> formatter)
39+
{
40+
var regex = new Regex(pattern);
41+
var matches = regex.Matches(input);
42+
var result = input;
43+
foreach (var match in matches.Reverse())
44+
{
45+
if (match.Groups.Count != 3 || !match.Groups[1].Success)
46+
{
47+
continue;
48+
}
49+
var format = match.Groups[2].Success
50+
? match.Groups[2].Captures.Single().Value[1..]
51+
: null;
52+
var formatted = formatter(match, format);
53+
result = $"{result[0..match.Index]}{formatted}{result[(match.Index + match.Length)..]}";
54+
}
55+
56+
return result;
57+
}
58+
}

src/Connector.FabricOpenMirroring/Connector/OpenMirroringConnector.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Threading.Tasks;
44

@@ -25,6 +25,7 @@ public class OpenMirroringConnector : DataLakeConnector
2525

2626
private readonly ILogger<OpenMirroringConnector> _logger;
2727
private readonly OpenMirroringClient _client;
28+
private readonly IDateTimeOffsetProvider _dateTimeOffsetProvider;
2829

2930
public OpenMirroringConnector(
3031
ILogger<OpenMirroringConnector> logger,
@@ -36,6 +37,7 @@ public OpenMirroringConnector(
3637
{
3738
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
3839
_client = client ?? throw new ArgumentNullException(nameof(client));
40+
_dateTimeOffsetProvider = dateTimeOffsetProvider;
3941
}
4042

4143
protected override async Task<ConnectionVerificationResult> VerifyDataLakeConnection(IDataLakeJobData jobData)
@@ -106,8 +108,9 @@ public override async Task ArchiveContainer(ExecutionContext executionContext, I
106108
var providerDefinitionId = streamModel.ConnectorProviderDefinitionId!.Value;
107109
var containerName = streamModel.ContainerName;
108110

109-
var jobData = await DataLakeJobDataFactory.GetConfiguration(executionContext, providerDefinitionId, containerName) as OpenMirroringConnectorJobData;
110-
await Client.DeleteDirectory(jobData, streamModel.Id.ToString("N"));
111+
var jobData = await DataLakeJobDataFactory.GetConfiguration(executionContext, providerDefinitionId, containerName);
112+
var subDirectory = await OutputDirectoryHelper.GetSubDirectory(executionContext, jobData, streamModel.Id, containerName, _dateTimeOffsetProvider.GetCurrentUtcTime(), jobData.OutputFormat);
113+
await Client.DeleteDirectory(jobData, subDirectory);
111114
await base.ArchiveContainer(executionContext, streamModel);
112115
}
113116

src/Connector.FabricOpenMirroring/Connector/OpenMirroringExportEntitiesJob.cs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,13 @@ private protected override async Task<ExportHistory> GetLastExport(ExecutionCont
202202

203203
private protected override Task<string> GetSubDirectory(ExecutionContext executionContext, IDataLakeJobData configuration, ExportJobDataBase exportJobData)
204204
{
205-
if (configuration is OpenMirroringConnectorJobData casted &&
206-
!string.IsNullOrWhiteSpace(casted.TableName))
207-
{
208-
return ReplaceNameUsingPatternAsync(
209-
executionContext,
210-
casted.TableName,
211-
exportJobData.StreamId,
212-
exportJobData.StreamModel.ContainerName,
213-
exportJobData.AsOfTime,
214-
exportJobData.OutputFormat);
215-
}
216-
217-
return Task.FromResult(exportJobData.StreamId.ToString("N"));
205+
return OutputDirectoryHelper.GetSubDirectory(
206+
executionContext,
207+
configuration,
208+
exportJobData.StreamModel.Id,
209+
exportJobData.StreamModel.ContainerName,
210+
exportJobData.AsOfTime,
211+
exportJobData.OutputFormat);
218212
}
219213

220214
protected override ISqlDataWriter GetSqlDataWriter(string outputFormat)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
using CluedIn.Connector.DataLake.Common;
5+
using CluedIn.Connector.DataLake.Common.Connector;
6+
using CluedIn.Core;
7+
8+
namespace CluedIn.Connector.FabricOpenMirroring.Connector;
9+
10+
internal class OutputDirectoryHelper
11+
{
12+
public static Task<string> GetSubDirectory(ExecutionContext executionContext, IDataLakeJobData configuration, Guid streamId, string containerName, DateTimeOffset dataTime, string outputFormat)
13+
{
14+
if (configuration is OpenMirroringConnectorJobData casted &&
15+
!string.IsNullOrWhiteSpace(casted.TableName))
16+
{
17+
return PatternHelper.ReplaceNameUsingPatternAsync(
18+
executionContext,
19+
casted.TableName,
20+
streamId,
21+
containerName,
22+
dataTime,
23+
outputFormat);
24+
}
25+
26+
return Task.FromResult(streamId.ToString("N"));
27+
}
28+
}

src/Connector.OneLake/Connector/OneLakeExportEntitiesJob.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Threading.Tasks;
33

44
using CluedIn.Connector.DataLake.Common.Connector;
@@ -40,7 +40,7 @@ private protected override async Task PostExportAsync(ExecutionContext context,
4040
return;
4141
}
4242

43-
var replacedTableName = await ReplaceNameUsingPatternAsync(
43+
var replacedTableName = await PatternHelper.ReplaceNameUsingPatternAsync(
4444
context,
4545
jobData.TableName,
4646
exportJobData.StreamId,

0 commit comments

Comments
 (0)