Skip to content

Commit 433859b

Browse files
committed
Merge branch 'release/4.4.1'
2 parents 1a3e3fa + 0abcf3f commit 433859b

File tree

5 files changed

+114
-35
lines changed

5 files changed

+114
-35
lines changed

docs/4.4.1-release-notes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
### Fixes
2+
- Prevent unnecessary warning messages when using sync mode with stream cache disabled
3+
- Add option to enable or disable escaping of non a-z,A-Z,0-9,_ characters
4+
- Add option to enable or disable writing of guids as strings

src/Connector.AzureDataLake/AzureDataLakeConnectorJobData.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ public AzureDataLakeConnectorJobData(
1919
public string DirectoryName => GetConfigurationValue(AzureDataLakeConstants.DirectoryName) as string;
2020
public string FileSystemName => GetConfigurationValue(AzureDataLakeConstants.FileSystemName) as string;
2121

22-
public override bool ShouldEscapeVocabularyKeys => IsStreamCacheEnabled && DataLakeConstants.OutputFormats.Parquet.Equals(OutputFormat, StringComparison.OrdinalIgnoreCase);
23-
24-
public override bool ShouldWriteGuidAsString => ShouldEscapeVocabularyKeys;
25-
2622
protected override void AddToHashCode(HashCode hash)
2723
{
2824
hash.Add(AccountName);

src/Connector.AzureDataLake/AzureDataLakeConstants.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,52 @@ private static AuthMethods GetAzureDataLakeAuthMethods(ApplicationContext applic
6666
};
6767

6868
controls.AddRange(GetAuthMethods(applicationContext));
69+
controls.Add(
70+
new()
71+
{
72+
Name = ShouldEscapeVocabularyKeys,
73+
DisplayName = "Replace Non-Alphanumeric Characters in Column Names",
74+
Type = "checkbox",
75+
IsRequired = false,
76+
Help = """
77+
Replaces characters in the column names that are not in this list ('a-z', 'A-Z', '0-9' and '_') with the character '_'.
78+
Enable this if you plan to access the output file in Microsoft Purview.
79+
""",
80+
DisplayDependencies = new[]
81+
{
82+
new ControlDisplayDependency
83+
{
84+
Name = OutputFormat,
85+
Operator = ControlDependencyOperator.Equals,
86+
Value = OutputFormats.Parquet.ToLowerInvariant(),
87+
UnfulfilledAction = ControlDependencyUnfulfilledAction.Hidden,
88+
},
89+
},
90+
}
91+
);
92+
controls.Add(
93+
new()
94+
{
95+
Name = ShouldWriteGuidAsString,
96+
DisplayName = "Write Guid as string",
97+
Type = "checkbox",
98+
IsRequired = false,
99+
Help = """
100+
Write Guid values as string instead of byte array.
101+
Enable this if you plan to access the output file in Microsoft Purview.
102+
""",
103+
DisplayDependencies = new[]
104+
{
105+
new ControlDisplayDependency
106+
{
107+
Name = OutputFormat,
108+
Operator = ControlDependencyOperator.Equals,
109+
Value = OutputFormats.Parquet.ToLowerInvariant(),
110+
UnfulfilledAction = ControlDependencyUnfulfilledAction.Hidden,
111+
},
112+
},
113+
}
114+
);
69115

70116
return new AuthMethods
71117
{

src/Connector.DataLake.Common/DataLakeScheduler.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ async Task addStreamExportJobs(ExecutionContext executionContext, ProviderDefini
4848
if (stream.Mode != StreamMode.Sync)
4949
{
5050
_logger.LogDebug("Stream {StreamId} is not in {Mode} mode. Removing it from scheduler '{SchedulerName}'.", stream.Id, StreamMode.Sync, _schedulerName);
51-
removeExportJob(jobKey);
51+
removeExportJob(jobKey, warnIfNotRemoved: false);
5252
return;
5353
}
5454

5555
var jobSchedule = await _dataLakeJobDataFactory.GetScheduleAsync(executionContext, stream);
5656
if (IsExportJobDisabled(jobSchedule))
5757
{
58-
_logger.LogDebug("Stream {StreamId} is disabled. Removing it from scheduler '{SchedulerName}'.", stream.Id, _schedulerName);
59-
removeExportJob(jobKey);
58+
_logger.LogDebug("Stream {StreamId} export is disabled. Removing it from scheduler '{SchedulerName}'.", stream.Id, _schedulerName);
59+
removeExportJob(jobKey, warnIfNotRemoved: false);
6060
return;
6161
}
6262

@@ -72,7 +72,7 @@ async Task addStreamExportJobs(ExecutionContext executionContext, ProviderDefini
7272
var removed = 0;
7373
foreach (var jobKey in jobKeysToRemove)
7474
{
75-
removeExportJob(jobKey);
75+
removeExportJob(jobKey, warnIfNotRemoved: true);
7676
removed++;
7777
}
7878

@@ -81,15 +81,15 @@ async Task addStreamExportJobs(ExecutionContext executionContext, ProviderDefini
8181
_logger.LogInformation("Total of obsolete {RemovedCount} stream jobs removed from scheduler '{SchedulerName}'.", removed, _schedulerName);
8282
}
8383

84-
void removeExportJob(string jobKey)
84+
void removeExportJob(string jobKey, bool warnIfNotRemoved)
8585
{
8686
if (jobQueue.TryRemove(jobKey, out var _))
8787
{
8888
_logger.LogDebug("Stream export for stream {StreamId} removed from scheduler '{SchedulerName}'.", jobKey, _schedulerName);
8989
}
90-
else
90+
else if (warnIfNotRemoved)
9191
{
92-
_logger.LogWarning("Failed to disable stream export for stream {StreamId} from scheduler '{SchedulerName}'..", jobKey, _schedulerName);
92+
_logger.LogWarning("Failed to disable stream export for stream {StreamId} from scheduler '{SchedulerName}'.", jobKey, _schedulerName);
9393
}
9494
}
9595
}

test/integration/Connector.AzureDataLake.Tests.Integration/AzureDataLakeConnectorTests.cs

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
using Castle.MicroKernel.Registration;
1616
using Castle.Windsor;
1717

18+
using CluedIn.Connector.AzureDataLake;
1819
using CluedIn.Connector.AzureDataLake.Connector;
1920
using CluedIn.Connector.DataLake.Common;
2021
using CluedIn.Core;
@@ -32,6 +33,7 @@
3233
using CsvHelper.Configuration;
3334

3435
using Microsoft.Data.SqlClient;
36+
using Microsoft.Data.SqlClient.Server;
3537
using Microsoft.Extensions.Internal;
3638
using Microsoft.Extensions.Logging;
3739

@@ -626,9 +628,22 @@ public async Task VerifyStoreData_Sync_WithStreamCacheAndCsvFormat()
626628
}
627629

628630
[Fact]
629-
public async Task VerifyStoreData_Sync_WithStreamCacheAndParquetFormat()
631+
public async Task VerifyStoreData_Sync_WithStreamCacheAndParquetFormatUnescaped()
630632
{
631-
await VerifyStoreData_Sync_WithStreamCache("pArQuet", AssertParquetResult);
633+
await VerifyStoreData_Sync_WithStreamCache("pArQuet", AssertParquetResultUnescaped);
634+
}
635+
636+
[Fact]
637+
public async Task VerifyStoreData_Sync_WithStreamCacheAndParquetFormatWithEscaped()
638+
{
639+
await VerifyStoreData_Sync_WithStreamCache(
640+
"pArQuet",
641+
AssertParquetResultEscaped,
642+
configureAuthentication: (values) =>
643+
{
644+
values.Add(nameof(DataLakeConstants.ShouldEscapeVocabularyKeys), true);
645+
values.Add(nameof(DataLakeConstants.ShouldWriteGuidAsString), true);
646+
});
632647
}
633648

634649
[Fact]
@@ -806,7 +821,8 @@ private async Task VerifyStoreData_Sync_WithStreamCache(
806821
string format,
807822
Func<DataLakeFileClient, Task> assertMethod,
808823
Func<ExecuteExportArg, Task<PathItem>> executeExport = null,
809-
Action<Mock<IDateTimeOffsetProvider>> configureTimeProvider = null)
824+
Action<Mock<IDateTimeOffsetProvider>> configureTimeProvider = null,
825+
Action<Dictionary<string, object>> configureAuthentication = null)
810826
{
811827
var organizationId = Guid.NewGuid();
812828
var providerDefinitionId = Guid.Parse("c444cda8-d9b5-45cc-a82d-fef28e08d55c");
@@ -890,7 +906,7 @@ private async Task VerifyStoreData_Sync_WithStreamCache(
890906
var directoryName = $"xunit-{DateTime.Now.Ticks}";
891907

892908
var connectorConnectionMock = new Mock<IConnectorConnectionV2>();
893-
connectorConnectionMock.Setup(x => x.Authentication).Returns(new Dictionary<string, object>()
909+
var authenticationValues = new Dictionary<string, object>()
894910
{
895911
{ nameof(AzureDataLakeConstants.AccountName), accountName },
896912
{ nameof(AzureDataLakeConstants.AccountKey), accountKey },
@@ -900,7 +916,10 @@ private async Task VerifyStoreData_Sync_WithStreamCache(
900916
{ nameof(DataLakeConstants.StreamCacheConnectionString), streamCacheConnectionString },
901917
{ nameof(DataLakeConstants.OutputFormat), format },
902918
{ nameof(DataLakeConstants.UseCurrentTimeForExport), true },
903-
});
919+
};
920+
configureAuthentication?.Invoke(authenticationValues);
921+
922+
connectorConnectionMock.Setup(x => x.Authentication).Returns(authenticationValues);
904923

905924
var azureDataLakeClient = new AzureDataLakeClient();
906925
var jobDataFactory = new Mock<AzureDataLakeJobDataFactory>();
@@ -1264,7 +1283,17 @@ private async Task AssertCsvResult(DataLakeFileClient fileClient)
12641283
Assert.Equal(sb.ToString(), content);
12651284
}
12661285

1267-
private async Task AssertParquetResult(DataLakeFileClient fileClient)
1286+
private Task AssertParquetResultUnescaped(DataLakeFileClient fileClient)
1287+
{
1288+
return AssertParquetResult(fileClient, ".");
1289+
}
1290+
1291+
private Task AssertParquetResultEscaped(DataLakeFileClient fileClient)
1292+
{
1293+
return AssertParquetResult(fileClient, "_");
1294+
}
1295+
1296+
private async Task AssertParquetResult(DataLakeFileClient fileClient, string separator)
12681297
{
12691298
using var memoryStream = new MemoryStream();
12701299
await fileClient.ReadToAsync(memoryStream);
@@ -1300,32 +1329,30 @@ Name Jean Luc Picard
13001329
PersistVersion 1
13011330
ProviderDefinitionId c444cda8-d9b5-45cc-a82d-fef28e08d55c
13021331
Timestamp 2024-08-21T03:16:00.0000000+05:00
1303-
user_age 123
1304-
user_dobInDateTime 2000-01-02T03:04:05
1305-
user_dobInDateTimeOffset 2000-01-02T03:04:05+12:34
1306-
user_lastName Picard
1332+
user{{{separator}}}age 123
1333+
user{{{separator}}}dobInDateTime 2000-01-02T03:04:05
1334+
user{{{separator}}}dobInDateTimeOffset 2000-01-02T03:04:05+12:34
1335+
user{{{separator}}}lastName Picard
13071336
13081337
""", sb.ToString());
13091338

13101339
object getValue(Parquet.Data.DataColumn dataColumn)
13111340
{
1312-
if (dataColumn.Field.ClrType == typeof(Guid))
1313-
return ((Guid[])dataColumn.Data)[0];
1314-
else if (dataColumn.Field.ClrType == typeof(int))
1315-
{
1316-
if (dataColumn.Field.IsNullable)
1317-
return ((int?[])dataColumn.Data)[0];
1341+
var type = dataColumn.Field.ClrType;
13181342

1319-
return ((int[])dataColumn.Data)[0];
1343+
if (type == typeof(Guid))
1344+
{
1345+
return getValueFromArray<Guid?, Guid>(dataColumn);
13201346
}
1321-
else if (dataColumn.Field.ClrType == typeof(long))
1347+
else if (type == typeof(int))
13221348
{
1323-
if (dataColumn.Field.IsNullable)
1324-
return ((long?[])dataColumn.Data)[0];
1325-
1326-
return ((long[])dataColumn.Data)[0];
1349+
return getValueFromArray<int?, int>(dataColumn);
13271350
}
1328-
else if (dataColumn.Field.ClrType == typeof(string))
1351+
else if (type == typeof(long))
1352+
{
1353+
return getValueFromArray<long?, long>(dataColumn);
1354+
}
1355+
else if (type == typeof(string))
13291356
{
13301357
var value = ((string[])dataColumn.Data);
13311358

@@ -1337,9 +1364,15 @@ object getValue(Parquet.Data.DataColumn dataColumn)
13371364

13381365
throw new NotSupportedException($"Type {dataColumn.Field.ClrType} not supported.");
13391366
}
1340-
}
13411367

1368+
object getValueFromArray<TNullable, TNonNullable>(Parquet.Data.DataColumn dataColumn)
1369+
{
1370+
if (dataColumn.Field.IsNullable)
1371+
return ((TNullable[])dataColumn.Data)[0];
13421372

1373+
return ((TNonNullable[])dataColumn.Data)[0];
1374+
}
1375+
}
13431376

13441377
private static async Task WaitForFileToBeDeleted(string fileSystemName, string directoryName, DataLakeServiceClient client, PathItem path)
13451378
{

0 commit comments

Comments
 (0)