diff --git a/docs/4.6.3-release-notes.md b/docs/4.6.3-release-notes.md new file mode 100644 index 0000000..d8c1ab2 --- /dev/null +++ b/docs/4.6.3-release-notes.md @@ -0,0 +1,2 @@ +### Fixes +- Correctly remove LandingZone directory when TableName option is used to configure the OpenMirroring Connector diff --git a/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs b/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs index 5e11f21..b7ba75e 100644 --- a/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs +++ b/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs @@ -3,7 +3,6 @@ using System.Data; using System.Linq; using System.Net; -using System.Text.RegularExpressions; using System.Threading.Tasks; using System.Transactions; @@ -490,7 +489,7 @@ protected virtual Task GetOutputFileNameAsync(ExecutionContext context, { if (HasCustomFileNamePattern(configuration)) { - return ReplaceNameUsingPatternAsync(context, configuration.FileNamePattern, streamId, containerName, asOfTime, outputFormat); + return PatternHelper.ReplaceNameUsingPatternAsync(context, configuration.FileNamePattern, streamId, containerName, asOfTime, outputFormat); } return GetDefaultOutputFileNameAsync(context, configuration, streamId, containerName, asOfTime, outputFormat); @@ -509,53 +508,6 @@ protected virtual Task GetDefaultOutputFileNameAsync(ExecutionContext co return Task.FromResult($"{streamIdFormatted}_{asOfTime:yyyyMMddHHmmss}.{fileExtension}"); } - protected static Task ReplaceNameUsingPatternAsync(ExecutionContext context, string pattern, Guid streamId, string containerName, DateTimeOffset asOfTime, string outputFormat) - { - var timeRegexPattern = @"\{(DataTime)(\:[a-zA-Z0-9\-\._]+)?\}"; - var streamIdRegexPattern = @"\{(StreamId)(\:[a-zA-Z0-9\-\._]+)?\}"; - var containerNameRegexPattern = @"\{(ContainerName)(\:[a-zA-Z0-9\-\._]+)?\}"; - var outputFormatRegexPattern = @"\{(OutputFormat)(\:[a-zA-Z0-9\-\._]+)?\}"; - - var timeReplaced = Replace(timeRegexPattern, pattern, (match, format) => asOfTime.ToString(format ?? "o")); - var streamIdReplaced = Replace(streamIdRegexPattern, timeReplaced, (match, format) => streamId.ToString(format ?? "D")); - var containerNameReplaced = Replace(containerNameRegexPattern, streamIdReplaced, (match, format) => containerName); - var outputFormatReplaced = Replace(outputFormatRegexPattern, containerNameReplaced, (match, format) => - { - return format?.ToLowerInvariant() switch - { - "toupper" => outputFormat.ToUpperInvariant(), - "toupperinvariant" => outputFormat.ToUpperInvariant(), - "tolower" => outputFormat.ToLowerInvariant(), - "tolowerinvariant" => outputFormat.ToLowerInvariant(), - null => outputFormat, - _ => throw new NotSupportedException($"Format '{format}' is not supported"), - }; - }); - - return Task.FromResult(outputFormatReplaced); - } - - private static string Replace(string pattern, string input, Func formatter) - { - var regex = new Regex(pattern); - var matches = regex.Matches(input); - var result = input; - foreach (var match in matches.Reverse()) - { - if (match.Groups.Count != 3 || !match.Groups[1].Success) - { - continue; - } - var format = match.Groups[2].Success - ? match.Groups[2].Captures.Single().Value[1..] - : null; - var formatted = formatter(match, format); - result = $"{result[0..match.Index]}{formatted}{result[(match.Index + match.Length)..]}"; - } - - return result; - } - protected virtual string GetFileExtension(string outputFormat) { return outputFormat; diff --git a/src/Connector.DataLake.Common/Connector/PatternHelper.cs b/src/Connector.DataLake.Common/Connector/PatternHelper.cs new file mode 100644 index 0000000..aafa14e --- /dev/null +++ b/src/Connector.DataLake.Common/Connector/PatternHelper.cs @@ -0,0 +1,58 @@ +using System; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading.Tasks; + +using CluedIn.Core; + +namespace CluedIn.Connector.DataLake.Common.Connector; + +internal class PatternHelper +{ + public static Task ReplaceNameUsingPatternAsync(ExecutionContext context, string pattern, Guid streamId, string containerName, DateTimeOffset dataTime, string outputFormat) + { + var timeRegexPattern = @"\{(DataTime)(\:[a-zA-Z0-9\-\._]+)?\}"; + var streamIdRegexPattern = @"\{(StreamId)(\:[a-zA-Z0-9\-\._]+)?\}"; + var containerNameRegexPattern = @"\{(ContainerName)(\:[a-zA-Z0-9\-\._]+)?\}"; + var outputFormatRegexPattern = @"\{(OutputFormat)(\:[a-zA-Z0-9\-\._]+)?\}"; + + var timeReplaced = Replace(timeRegexPattern, pattern, (match, format) => dataTime.ToString(format ?? "o")); + var streamIdReplaced = Replace(streamIdRegexPattern, timeReplaced, (match, format) => streamId.ToString(format ?? "D")); + var containerNameReplaced = Replace(containerNameRegexPattern, streamIdReplaced, (match, format) => containerName); + var outputFormatReplaced = Replace(outputFormatRegexPattern, containerNameReplaced, (match, format) => + { + return format?.ToLowerInvariant() switch + { + "toupper" => outputFormat.ToUpperInvariant(), + "toupperinvariant" => outputFormat.ToUpperInvariant(), + "tolower" => outputFormat.ToLowerInvariant(), + "tolowerinvariant" => outputFormat.ToLowerInvariant(), + null => outputFormat, + _ => throw new NotSupportedException($"Format '{format}' is not supported"), + }; + }); + + return Task.FromResult(outputFormatReplaced); + } + + private static string Replace(string pattern, string input, Func formatter) + { + var regex = new Regex(pattern); + var matches = regex.Matches(input); + var result = input; + foreach (var match in matches.Reverse()) + { + if (match.Groups.Count != 3 || !match.Groups[1].Success) + { + continue; + } + var format = match.Groups[2].Success + ? match.Groups[2].Captures.Single().Value[1..] + : null; + var formatted = formatter(match, format); + result = $"{result[0..match.Index]}{formatted}{result[(match.Index + match.Length)..]}"; + } + + return result; + } +} diff --git a/src/Connector.FabricOpenMirroring/Connector/OpenMirroringConnector.cs b/src/Connector.FabricOpenMirroring/Connector/OpenMirroringConnector.cs index 6abde32..72d54a7 100644 --- a/src/Connector.FabricOpenMirroring/Connector/OpenMirroringConnector.cs +++ b/src/Connector.FabricOpenMirroring/Connector/OpenMirroringConnector.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Threading.Tasks; @@ -25,6 +25,7 @@ public class OpenMirroringConnector : DataLakeConnector private readonly ILogger _logger; private readonly OpenMirroringClient _client; + private readonly IDateTimeOffsetProvider _dateTimeOffsetProvider; public OpenMirroringConnector( ILogger logger, @@ -36,6 +37,7 @@ public OpenMirroringConnector( { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _client = client ?? throw new ArgumentNullException(nameof(client)); + _dateTimeOffsetProvider = dateTimeOffsetProvider; } protected override async Task VerifyDataLakeConnection(IDataLakeJobData jobData) @@ -106,8 +108,9 @@ public override async Task ArchiveContainer(ExecutionContext executionContext, I var providerDefinitionId = streamModel.ConnectorProviderDefinitionId!.Value; var containerName = streamModel.ContainerName; - var jobData = await DataLakeJobDataFactory.GetConfiguration(executionContext, providerDefinitionId, containerName) as OpenMirroringConnectorJobData; - await Client.DeleteDirectory(jobData, streamModel.Id.ToString("N")); + var jobData = await DataLakeJobDataFactory.GetConfiguration(executionContext, providerDefinitionId, containerName); + var subDirectory = await OutputDirectoryHelper.GetSubDirectory(executionContext, jobData, streamModel.Id, containerName, _dateTimeOffsetProvider.GetCurrentUtcTime(), jobData.OutputFormat); + await Client.DeleteDirectory(jobData, subDirectory); await base.ArchiveContainer(executionContext, streamModel); } diff --git a/src/Connector.FabricOpenMirroring/Connector/OpenMirroringExportEntitiesJob.cs b/src/Connector.FabricOpenMirroring/Connector/OpenMirroringExportEntitiesJob.cs index b162d28..c9adbfa 100644 --- a/src/Connector.FabricOpenMirroring/Connector/OpenMirroringExportEntitiesJob.cs +++ b/src/Connector.FabricOpenMirroring/Connector/OpenMirroringExportEntitiesJob.cs @@ -202,19 +202,13 @@ private protected override async Task GetLastExport(ExecutionCont private protected override Task GetSubDirectory(ExecutionContext executionContext, IDataLakeJobData configuration, ExportJobDataBase exportJobData) { - if (configuration is OpenMirroringConnectorJobData casted && - !string.IsNullOrWhiteSpace(casted.TableName)) - { - return ReplaceNameUsingPatternAsync( - executionContext, - casted.TableName, - exportJobData.StreamId, - exportJobData.StreamModel.ContainerName, - exportJobData.AsOfTime, - exportJobData.OutputFormat); - } - - return Task.FromResult(exportJobData.StreamId.ToString("N")); + return OutputDirectoryHelper.GetSubDirectory( + executionContext, + configuration, + exportJobData.StreamModel.Id, + exportJobData.StreamModel.ContainerName, + exportJobData.AsOfTime, + exportJobData.OutputFormat); } protected override ISqlDataWriter GetSqlDataWriter(string outputFormat) diff --git a/src/Connector.FabricOpenMirroring/Connector/OutputDirectoryHelper.cs b/src/Connector.FabricOpenMirroring/Connector/OutputDirectoryHelper.cs new file mode 100644 index 0000000..763f66b --- /dev/null +++ b/src/Connector.FabricOpenMirroring/Connector/OutputDirectoryHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; + +using CluedIn.Connector.DataLake.Common; +using CluedIn.Connector.DataLake.Common.Connector; +using CluedIn.Core; + +namespace CluedIn.Connector.FabricOpenMirroring.Connector; + +internal class OutputDirectoryHelper +{ + public static Task GetSubDirectory(ExecutionContext executionContext, IDataLakeJobData configuration, Guid streamId, string containerName, DateTimeOffset dataTime, string outputFormat) + { + if (configuration is OpenMirroringConnectorJobData casted && + !string.IsNullOrWhiteSpace(casted.TableName)) + { + return PatternHelper.ReplaceNameUsingPatternAsync( + executionContext, + casted.TableName, + streamId, + containerName, + dataTime, + outputFormat); + } + + return Task.FromResult(streamId.ToString("N")); + } +} diff --git a/src/Connector.OneLake/Connector/OneLakeExportEntitiesJob.cs b/src/Connector.OneLake/Connector/OneLakeExportEntitiesJob.cs index 30faaa8..f444ea9 100644 --- a/src/Connector.OneLake/Connector/OneLakeExportEntitiesJob.cs +++ b/src/Connector.OneLake/Connector/OneLakeExportEntitiesJob.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; using CluedIn.Connector.DataLake.Common.Connector; @@ -40,7 +40,7 @@ private protected override async Task PostExportAsync(ExecutionContext context, return; } - var replacedTableName = await ReplaceNameUsingPatternAsync( + var replacedTableName = await PatternHelper.ReplaceNameUsingPatternAsync( context, jobData.TableName, exportJobData.StreamId, diff --git a/test/integration/Connector.DataLake.Common.Tests.Integration/DataLakeConnectorTestsBase.cs b/test/integration/Connector.DataLake.Common.Tests.Integration/DataLakeConnectorTestsBase.cs index cd031d7..0a51376 100644 --- a/test/integration/Connector.DataLake.Common.Tests.Integration/DataLakeConnectorTestsBase.cs +++ b/test/integration/Connector.DataLake.Common.Tests.Integration/DataLakeConnectorTestsBase.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Threading.Tasks; +using Azure; using Azure.Storage.Files.DataLake; using Azure.Storage.Files.DataLake.Models; @@ -342,7 +343,7 @@ private protected async Task AssertExportJobOutputFileContents( await assertMethod(fileClient, fsClient, setupContainerResult); - await fsClient.GetDirectoryClient(directoryName).DeleteAsync(); + await fsClient.GetDirectoryClient(directoryName).DeleteIfExistsAsync(); } finally { @@ -756,6 +757,7 @@ protected static async Task DeleteDirectory(DataLakeServiceClient client, string { var fsClient = client.GetFileSystemClient(fileSystemName); var directoryClient = fsClient.GetDirectoryClient(directoryName); + await directoryClient.DeleteIfExistsAsync(); } diff --git a/test/integration/Connector.FabricOpenMirroring.Tests.Integration/FabricOpenMirroringConnectorTests.cs b/test/integration/Connector.FabricOpenMirroring.Tests.Integration/FabricOpenMirroringConnectorTests.cs index f193d9d..d2e9cb0 100644 --- a/test/integration/Connector.FabricOpenMirroring.Tests.Integration/FabricOpenMirroringConnectorTests.cs +++ b/test/integration/Connector.FabricOpenMirroring.Tests.Integration/FabricOpenMirroringConnectorTests.cs @@ -4,12 +4,14 @@ using System.Linq; using System.Threading.Tasks; +using Azure.Identity; using Azure.Storage.Files.DataLake; using Azure.Storage.Files.DataLake.Models; -using CluedIn.Connector.FabricOpenMirroring.Connector; using CluedIn.Connector.DataLake.Common; +using CluedIn.Connector.DataLake.Common.Connector; using CluedIn.Connector.DataLake.Common.Tests.Integration; +using CluedIn.Connector.FabricOpenMirroring.Connector; using CluedIn.Core; using CluedIn.Core.Data.Parts; using CluedIn.Core.Streams.Models; @@ -22,8 +24,6 @@ using Xunit.Abstractions; using Encoding = System.Text.Encoding; -using Azure.Identity; -using CluedIn.Connector.DataLake.Common.Connector; namespace CluedIn.Connector.FabricOpenMirroring.Tests.Integration; @@ -384,6 +384,48 @@ await executeExportArg.ExportJob.DoRunAsync( }); } + [Fact] + public async Task VerifyStoreData_Sync_WithStreamCacheCanUseTableName() + { + await VerifyStoreData_Sync_WithStreamCache( + "parquet", + AssertParquetResultEscaped, + configureDirectoryName: (jobData, setupResult) => + { + return $"{jobData.RootDirectoryPath}/MyTable"; + }, + configureAuthentication: (dictionary) => + { + dictionary[nameof(OpenMirroringConstants.TableName)] = "MyTable"; + }); + } + + [Fact] + public async Task Archive_Sync_CanDeleteDirectoryWhenUseTableName() + { + await VerifyStoreData_Sync_WithStreamCache( + "parquet", + async (fileClient, dataLakeFileSystemClient, setupContainerResult) => + { + await AssertParquetResultEscaped(fileClient, dataLakeFileSystemClient, setupContainerResult); + var connector = setupContainerResult.ConnectorMock.Object; + await connector.ArchiveContainer(setupContainerResult.Context, setupContainerResult.StreamModel); + + var fsClient = dataLakeFileSystemClient.GetDirectoryClient($"{setupContainerResult.DataLakeJobData.RootDirectoryPath}/ToBeArchived"); + var exists = await fsClient.ExistsAsync(); + Assert.False(exists); + }, + configureDirectoryName: (jobData, setupResult) => + { + return $"{jobData.RootDirectoryPath}/ToBeArchived"; + }, + configureAuthentication: (dictionary) => + { + dictionary[nameof(OpenMirroringConstants.TableName)] = "ToBeArchived"; + }); + + } + private async Task AssertParquetResultEscapedWithRowMarker( DataLakeFileClient fileClient, DataLakeFileSystemClient fileSystemClient, @@ -402,7 +444,8 @@ private async Task VerifyStoreData_Sync_WithStreamCache( Func assertMethod, Func> executeExport = null, Action> configureTimeProvider = null, - Action> configureAuthentication = null) + Action> configureAuthentication = null, + Func configureDirectoryName = null) { var configuration = CreateConfigurationWithStreamCache(format); configureAuthentication?.Invoke(configuration); @@ -415,9 +458,12 @@ private async Task VerifyStoreData_Sync_WithStreamCache( await connector.StoreData(setupResult.Context, setupResult.StreamModel, data); var exportJob = CreateExportJob(setupResult); + var directoryName = configureDirectoryName == null + ? $"{jobData.RootDirectoryPath}/{setupResult.StreamModel.Id:N}" + : configureDirectoryName(jobData, setupResult); await AssertExportJobOutputFileContents( jobData.FileSystemName, - $"{jobData.RootDirectoryPath}/{setupResult.StreamModel.Id:N}", + directoryName, setupResult, GetDataLakeClient(jobData), exportJob,