Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/4.6.3-release-notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Fixes
- Correctly remove LandingZone directory when TableName option is used to configure the OpenMirroring Connector
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Data;
using System.Linq;
using System.Net;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using System.Transactions;

Expand Down Expand Up @@ -490,7 +489,7 @@ protected virtual Task<string> GetOutputFileNameAsync(ExecutionContext context,
{
if (HasCustomFileNamePattern(configuration))
{
return ReplaceNameUsingPatternAsync(context, configuration.FileNamePattern, streamId, containerName, asOfTime, outputFormat);
return PatternHelper.ReplaceNameUsingPatternAsync(context, configuration.FileNamePattern, streamId, containerName, asOfTime, outputFormat);
}

return GetDefaultOutputFileNameAsync(context, configuration, streamId, containerName, asOfTime, outputFormat);
Expand All @@ -509,53 +508,6 @@ protected virtual Task<string> GetDefaultOutputFileNameAsync(ExecutionContext co
return Task.FromResult($"{streamIdFormatted}_{asOfTime:yyyyMMddHHmmss}.{fileExtension}");
}

protected static Task<string> ReplaceNameUsingPatternAsync(ExecutionContext context, string pattern, Guid streamId, string containerName, DateTimeOffset asOfTime, string outputFormat)
{
var timeRegexPattern = @"\{(DataTime)(\:[a-zA-Z0-9\-\._]+)?\}";
var streamIdRegexPattern = @"\{(StreamId)(\:[a-zA-Z0-9\-\._]+)?\}";
var containerNameRegexPattern = @"\{(ContainerName)(\:[a-zA-Z0-9\-\._]+)?\}";
var outputFormatRegexPattern = @"\{(OutputFormat)(\:[a-zA-Z0-9\-\._]+)?\}";

var timeReplaced = Replace(timeRegexPattern, pattern, (match, format) => asOfTime.ToString(format ?? "o"));
var streamIdReplaced = Replace(streamIdRegexPattern, timeReplaced, (match, format) => streamId.ToString(format ?? "D"));
var containerNameReplaced = Replace(containerNameRegexPattern, streamIdReplaced, (match, format) => containerName);
var outputFormatReplaced = Replace(outputFormatRegexPattern, containerNameReplaced, (match, format) =>
{
return format?.ToLowerInvariant() switch
{
"toupper" => outputFormat.ToUpperInvariant(),
"toupperinvariant" => outputFormat.ToUpperInvariant(),
"tolower" => outputFormat.ToLowerInvariant(),
"tolowerinvariant" => outputFormat.ToLowerInvariant(),
null => outputFormat,
_ => throw new NotSupportedException($"Format '{format}' is not supported"),
};
});

return Task.FromResult(outputFormatReplaced);
}

private static string Replace(string pattern, string input, Func<Match, string, string> formatter)
{
var regex = new Regex(pattern);
var matches = regex.Matches(input);
var result = input;
foreach (var match in matches.Reverse())
{
if (match.Groups.Count != 3 || !match.Groups[1].Success)
{
continue;
}
var format = match.Groups[2].Success
? match.Groups[2].Captures.Single().Value[1..]
: null;
var formatted = formatter(match, format);
result = $"{result[0..match.Index]}{formatted}{result[(match.Index + match.Length)..]}";
}

return result;
}

protected virtual string GetFileExtension(string outputFormat)
{
return outputFormat;
Expand Down
58 changes: 58 additions & 0 deletions src/Connector.DataLake.Common/Connector/PatternHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;

using CluedIn.Core;

namespace CluedIn.Connector.DataLake.Common.Connector;

internal class PatternHelper
{
public static Task<string> ReplaceNameUsingPatternAsync(ExecutionContext context, string pattern, Guid streamId, string containerName, DateTimeOffset dataTime, string outputFormat)
{
var timeRegexPattern = @"\{(DataTime)(\:[a-zA-Z0-9\-\._]+)?\}";
var streamIdRegexPattern = @"\{(StreamId)(\:[a-zA-Z0-9\-\._]+)?\}";
var containerNameRegexPattern = @"\{(ContainerName)(\:[a-zA-Z0-9\-\._]+)?\}";
var outputFormatRegexPattern = @"\{(OutputFormat)(\:[a-zA-Z0-9\-\._]+)?\}";

var timeReplaced = Replace(timeRegexPattern, pattern, (match, format) => dataTime.ToString(format ?? "o"));
var streamIdReplaced = Replace(streamIdRegexPattern, timeReplaced, (match, format) => streamId.ToString(format ?? "D"));
var containerNameReplaced = Replace(containerNameRegexPattern, streamIdReplaced, (match, format) => containerName);
var outputFormatReplaced = Replace(outputFormatRegexPattern, containerNameReplaced, (match, format) =>
{
return format?.ToLowerInvariant() switch
{
"toupper" => outputFormat.ToUpperInvariant(),
"toupperinvariant" => outputFormat.ToUpperInvariant(),
"tolower" => outputFormat.ToLowerInvariant(),
"tolowerinvariant" => outputFormat.ToLowerInvariant(),
null => outputFormat,
_ => throw new NotSupportedException($"Format '{format}' is not supported"),
};
});

return Task.FromResult(outputFormatReplaced);
}

private static string Replace(string pattern, string input, Func<Match, string, string> formatter)
{
var regex = new Regex(pattern);
var matches = regex.Matches(input);
var result = input;
foreach (var match in matches.Reverse())
{
if (match.Groups.Count != 3 || !match.Groups[1].Success)
{
continue;
}
var format = match.Groups[2].Success
? match.Groups[2].Captures.Single().Value[1..]
: null;
var formatted = formatter(match, format);
result = $"{result[0..match.Index]}{formatted}{result[(match.Index + match.Length)..]}";
}

return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

Expand All @@ -25,6 +25,7 @@ public class OpenMirroringConnector : DataLakeConnector

private readonly ILogger<OpenMirroringConnector> _logger;
private readonly OpenMirroringClient _client;
private readonly IDateTimeOffsetProvider _dateTimeOffsetProvider;

public OpenMirroringConnector(
ILogger<OpenMirroringConnector> logger,
Expand All @@ -36,6 +37,7 @@ public OpenMirroringConnector(
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_client = client ?? throw new ArgumentNullException(nameof(client));
_dateTimeOffsetProvider = dateTimeOffsetProvider;
}

protected override async Task<ConnectionVerificationResult> VerifyDataLakeConnection(IDataLakeJobData jobData)
Expand Down Expand Up @@ -106,8 +108,9 @@ public override async Task ArchiveContainer(ExecutionContext executionContext, I
var providerDefinitionId = streamModel.ConnectorProviderDefinitionId!.Value;
var containerName = streamModel.ContainerName;

var jobData = await DataLakeJobDataFactory.GetConfiguration(executionContext, providerDefinitionId, containerName) as OpenMirroringConnectorJobData;
await Client.DeleteDirectory(jobData, streamModel.Id.ToString("N"));
var jobData = await DataLakeJobDataFactory.GetConfiguration(executionContext, providerDefinitionId, containerName);
var subDirectory = await OutputDirectoryHelper.GetSubDirectory(executionContext, jobData, streamModel.Id, containerName, _dateTimeOffsetProvider.GetCurrentUtcTime(), jobData.OutputFormat);
await Client.DeleteDirectory(jobData, subDirectory);
await base.ArchiveContainer(executionContext, streamModel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,13 @@ private protected override async Task<ExportHistory> GetLastExport(ExecutionCont

private protected override Task<string> GetSubDirectory(ExecutionContext executionContext, IDataLakeJobData configuration, ExportJobDataBase exportJobData)
{
if (configuration is OpenMirroringConnectorJobData casted &&
!string.IsNullOrWhiteSpace(casted.TableName))
{
return ReplaceNameUsingPatternAsync(
executionContext,
casted.TableName,
exportJobData.StreamId,
exportJobData.StreamModel.ContainerName,
exportJobData.AsOfTime,
exportJobData.OutputFormat);
}

return Task.FromResult(exportJobData.StreamId.ToString("N"));
return OutputDirectoryHelper.GetSubDirectory(
executionContext,
configuration,
exportJobData.StreamModel.Id,
exportJobData.StreamModel.ContainerName,
exportJobData.AsOfTime,
exportJobData.OutputFormat);
}

protected override ISqlDataWriter GetSqlDataWriter(string outputFormat)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;

using CluedIn.Connector.DataLake.Common;
using CluedIn.Connector.DataLake.Common.Connector;
using CluedIn.Core;

namespace CluedIn.Connector.FabricOpenMirroring.Connector;

internal class OutputDirectoryHelper
{
public static Task<string> GetSubDirectory(ExecutionContext executionContext, IDataLakeJobData configuration, Guid streamId, string containerName, DateTimeOffset dataTime, string outputFormat)
{
if (configuration is OpenMirroringConnectorJobData casted &&
!string.IsNullOrWhiteSpace(casted.TableName))
{
return PatternHelper.ReplaceNameUsingPatternAsync(
executionContext,
casted.TableName,
streamId,
containerName,
dataTime,
outputFormat);
}

return Task.FromResult(streamId.ToString("N"));
}
}
4 changes: 2 additions & 2 deletions src/Connector.OneLake/Connector/OneLakeExportEntitiesJob.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Threading.Tasks;

using CluedIn.Connector.DataLake.Common.Connector;
Expand Down Expand Up @@ -40,7 +40,7 @@ private protected override async Task PostExportAsync(ExecutionContext context,
return;
}

var replacedTableName = await ReplaceNameUsingPatternAsync(
var replacedTableName = await PatternHelper.ReplaceNameUsingPatternAsync(
context,
jobData.TableName,
exportJobData.StreamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Threading.Tasks;

using Azure;
using Azure.Storage.Files.DataLake;
using Azure.Storage.Files.DataLake.Models;

Expand Down Expand Up @@ -342,7 +343,7 @@ private protected async Task AssertExportJobOutputFileContents(

await assertMethod(fileClient, fsClient, setupContainerResult);

await fsClient.GetDirectoryClient(directoryName).DeleteAsync();
await fsClient.GetDirectoryClient(directoryName).DeleteIfExistsAsync();
}
finally
{
Expand Down Expand Up @@ -756,6 +757,7 @@ protected static async Task DeleteDirectory(DataLakeServiceClient client, string
{
var fsClient = client.GetFileSystemClient(fileSystemName);
var directoryClient = fsClient.GetDirectoryClient(directoryName);

await directoryClient.DeleteIfExistsAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
using System.Linq;
using System.Threading.Tasks;

using Azure.Identity;
using Azure.Storage.Files.DataLake;
using Azure.Storage.Files.DataLake.Models;

using CluedIn.Connector.FabricOpenMirroring.Connector;
using CluedIn.Connector.DataLake.Common;
using CluedIn.Connector.DataLake.Common.Connector;
using CluedIn.Connector.DataLake.Common.Tests.Integration;
using CluedIn.Connector.FabricOpenMirroring.Connector;
using CluedIn.Core;
using CluedIn.Core.Data.Parts;
using CluedIn.Core.Streams.Models;
Expand All @@ -22,8 +24,6 @@
using Xunit.Abstractions;

using Encoding = System.Text.Encoding;
using Azure.Identity;
using CluedIn.Connector.DataLake.Common.Connector;

namespace CluedIn.Connector.FabricOpenMirroring.Tests.Integration;

Expand Down Expand Up @@ -384,6 +384,48 @@ await executeExportArg.ExportJob.DoRunAsync(
});
}

[Fact]
public async Task VerifyStoreData_Sync_WithStreamCacheCanUseTableName()
{
await VerifyStoreData_Sync_WithStreamCache(
"parquet",
AssertParquetResultEscaped,
configureDirectoryName: (jobData, setupResult) =>
{
return $"{jobData.RootDirectoryPath}/MyTable";
},
configureAuthentication: (dictionary) =>
{
dictionary[nameof(OpenMirroringConstants.TableName)] = "MyTable";
});
}

[Fact]
public async Task Archive_Sync_CanDeleteDirectoryWhenUseTableName()
{
await VerifyStoreData_Sync_WithStreamCache(
"parquet",
async (fileClient, dataLakeFileSystemClient, setupContainerResult) =>
{
await AssertParquetResultEscaped(fileClient, dataLakeFileSystemClient, setupContainerResult);
var connector = setupContainerResult.ConnectorMock.Object;
await connector.ArchiveContainer(setupContainerResult.Context, setupContainerResult.StreamModel);

var fsClient = dataLakeFileSystemClient.GetDirectoryClient($"{setupContainerResult.DataLakeJobData.RootDirectoryPath}/ToBeArchived");
var exists = await fsClient.ExistsAsync();
Assert.False(exists);
},
configureDirectoryName: (jobData, setupResult) =>
{
return $"{jobData.RootDirectoryPath}/ToBeArchived";
},
configureAuthentication: (dictionary) =>
{
dictionary[nameof(OpenMirroringConstants.TableName)] = "ToBeArchived";
});

}

private async Task AssertParquetResultEscapedWithRowMarker(
DataLakeFileClient fileClient,
DataLakeFileSystemClient fileSystemClient,
Expand All @@ -402,7 +444,8 @@ private async Task VerifyStoreData_Sync_WithStreamCache(
Func<DataLakeFileClient, DataLakeFileSystemClient, SetupContainerResult, Task> assertMethod,
Func<ExecuteExportArg, Task<PathItem>> executeExport = null,
Action<Mock<IDateTimeOffsetProvider>> configureTimeProvider = null,
Action<Dictionary<string, object>> configureAuthentication = null)
Action<Dictionary<string, object>> configureAuthentication = null,
Func<OpenMirroringConnectorJobData, SetupContainerResult, string> configureDirectoryName = null)
{
var configuration = CreateConfigurationWithStreamCache(format);
configureAuthentication?.Invoke(configuration);
Expand All @@ -415,9 +458,12 @@ private async Task VerifyStoreData_Sync_WithStreamCache(
await connector.StoreData(setupResult.Context, setupResult.StreamModel, data);
var exportJob = CreateExportJob(setupResult);

var directoryName = configureDirectoryName == null
? $"{jobData.RootDirectoryPath}/{setupResult.StreamModel.Id:N}"
: configureDirectoryName(jobData, setupResult);
await AssertExportJobOutputFileContents(
jobData.FileSystemName,
$"{jobData.RootDirectoryPath}/{setupResult.StreamModel.Id:N}",
directoryName,
setupResult,
GetDataLakeClient(jobData),
exportJob,
Expand Down
Loading