Skip to content

Commit 7fb6f93

Browse files
committed
Merge branch 'release/4.4.3'
2 parents 33f6837 + fcaabaf commit 7fb6f93

File tree

7 files changed

+218
-5
lines changed

7 files changed

+218
-5
lines changed

docs/4.4.3-release-notes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
### Fixes
2+
- Handle TaskCanceledException when stop/starting a stream

src/Connector.DataLake.Common/Buffer.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,14 @@ private async Task Idle()
7575
{
7676
while (true)
7777
{
78-
await Task.Delay(100, _idleCancellationTokenSource.Token);
78+
try
79+
{
80+
await Task.Delay(100, _idleCancellationTokenSource.Token);
81+
}
82+
catch (TaskCanceledException taskCanceledException)
83+
{
84+
// TODO add log
85+
}
7986

8087
if (!_idleCancellationTokenSource.IsCancellationRequested && DateTime.Now.Subtract(_lastAdded).TotalMilliseconds < _timeout)
8188
{
@@ -159,7 +166,9 @@ public async Task Flush()
159166
var t = _idleTask;
160167
if (t != null)
161168
{
169+
Console.WriteLine("Canceling");
162170
_idleCancellationTokenSource.Cancel();
171+
Console.WriteLine("Canceled");
163172

164173
await t;
165174
}

src/Connector.DataLake.Common/Connector/DataLakeClient.cs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
using Azure.Storage.Files.DataLake;
22
using Azure.Storage.Files.DataLake.Models;
3-
3+
using CluedIn.Core.Connectors;
44
using System;
5+
using System.Collections.Generic;
56
using System.IO;
67
using System.Threading.Tasks;
78

@@ -130,5 +131,53 @@ private async Task<DataLakeFileSystemClient> GetFileSystemClientAsync(
130131

131132
return dataLakeFileSystemClient;
132133
}
134+
135+
public async Task<IEnumerable<IConnectorContainer>> GetFilesInDirectory(IDataLakeJobData configuration, string subDirectory = null)
136+
{
137+
var serviceClient = GetDataLakeServiceClient(configuration);
138+
var fileSystemName = GetFileSystemName(configuration);
139+
var fileSystemClient = serviceClient.GetFileSystemClient(fileSystemName);
140+
141+
if (!await fileSystemClient.ExistsAsync())
142+
{
143+
return null;
144+
}
145+
146+
var directory = GetDirectory(configuration);
147+
if (!string.IsNullOrEmpty(subDirectory))
148+
directory = Path.Combine(directory, subDirectory);
149+
150+
var directoryClient = fileSystemClient.GetDirectoryClient(directory);
151+
if (!await directoryClient.ExistsAsync())
152+
{
153+
return null;
154+
}
155+
156+
var files = directoryClient.GetPathsAsync().GetAsyncEnumerator();
157+
await files.MoveNextAsync();
158+
var item = files.Current;
159+
160+
var result = new List<IConnectorContainer>();
161+
while (item != null)
162+
{
163+
if (!item.IsDirectory.GetValueOrDefault())
164+
{
165+
result.Add(new DataLakeContainer
166+
{
167+
Name = Path.GetFileName(item.Name),
168+
FullyQualifiedName = directoryClient.Uri.ToString() + "/" + Path.GetFileName(item.Name)
169+
});
170+
}
171+
172+
if (!await files.MoveNextAsync())
173+
{
174+
break;
175+
}
176+
177+
item = files.Current;
178+
}
179+
180+
return result;
181+
}
133182
}
134183
}

src/Connector.DataLake.Common/Connector/DataLakeConnector.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,12 +566,14 @@ public override async Task ArchiveContainer(ExecutionContext executionContext, I
566566
await RenameCacheTableIfExists(executionContext, streamModel);
567567
}
568568

569-
public override Task<IEnumerable<IConnectorContainer>> GetContainers(ExecutionContext executionContext,
569+
public override async Task<IEnumerable<IConnectorContainer>> GetContainers(ExecutionContext executionContext,
570570
Guid providerDefinitionId)
571571
{
572572
_logger.LogInformation($"DataLakeConnector.GetContainers: entry");
573573

574-
throw new NotImplementedException(nameof(GetContainers));
574+
var jobData = await _dataLakeJobDataFactory.GetConfiguration(executionContext, providerDefinitionId, "");
575+
576+
return await _client.GetFilesInDirectory(jobData);
575577
}
576578

577579
public override Task EmptyContainer(ExecutionContext executionContext, IReadOnlyStreamModel streamModel)

src/Connector.DataLake.Common/Connector/IDataLakeClient.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
using System.Threading.Tasks;
1+
using System.Collections.Generic;
2+
using System.Threading.Tasks;
23

34
using Azure.Storage.Files.DataLake;
45
using Azure.Storage.Files.DataLake.Models;
6+
using CluedIn.Core.Connectors;
57

68
namespace CluedIn.Connector.DataLake.Common.Connector;
79

@@ -12,4 +14,5 @@ public interface IDataLakeClient
1214
Task DeleteFile(IDataLakeJobData configuration, string fileName);
1315
Task<bool> FileInPathExists(IDataLakeJobData configuration, string fileName);
1416
Task<PathProperties> GetFilePathProperties(IDataLakeJobData configuration, string fileName);
17+
Task<IEnumerable<IConnectorContainer>> GetFilesInDirectory(IDataLakeJobData configuration, string subDirectory = null);
1518
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using CluedIn.Core.Connectors;
2+
3+
namespace CluedIn.Connector.DataLake.Common;
4+
5+
internal class DataLakeContainer : IConnectorContainer
6+
{
7+
public string Name { get; set; }
8+
public string Id { get; set; }
9+
public string FullyQualifiedName { get; set; }
10+
}

test/integration/Connector.AzureDataLake.Tests.Integration/AzureDataLakeConnectorTests.cs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,144 @@ await executeExportArg.ExportJob.DoRunAsync(
746746
});
747747
}
748748

749+
[Fact]
750+
public async Task GetContainers_InvalidParamsTest()
751+
{
752+
var azureDataLakeConstantsMock = CreateConstantsMock();
753+
754+
var providerDefinitionId = Guid.Parse("c444cda8-d9b5-45cc-a82d-fef28e08d55c");
755+
756+
var connectorConnectionMock = new Mock<IConnectorConnectionV2>();
757+
758+
var accountName = Environment.GetEnvironmentVariable("ADL2_ACCOUNTNAME");
759+
Assert.NotNull(accountName);
760+
var accountKey = Environment.GetEnvironmentVariable("ADL2_ACCOUNTKEY");
761+
Assert.NotNull(accountKey);
762+
763+
var fileSystemName = $"xunit-fs-{DateTime.Now.Ticks}";
764+
var directoryName = $"xunit-{DateTime.Now.Ticks}";
765+
766+
var streamCacheConnectionStringEncoded = Environment.GetEnvironmentVariable("ADL2_STREAMCACHE");
767+
var streamCacheConnectionString = Encoding.UTF8.GetString(Convert.FromBase64String(streamCacheConnectionStringEncoded));
768+
769+
var authenticationValues = new Dictionary<string, object>()
770+
{
771+
{ nameof(AzureDataLakeConstants.AccountName), accountName },
772+
{ nameof(AzureDataLakeConstants.AccountKey), accountKey },
773+
{ nameof(AzureDataLakeConstants.FileSystemName), fileSystemName },
774+
{ nameof(AzureDataLakeConstants.DirectoryName), directoryName },
775+
{ nameof(DataLakeConstants.IsStreamCacheEnabled), true },
776+
{ nameof(DataLakeConstants.StreamCacheConnectionString), streamCacheConnectionString },
777+
{ nameof(DataLakeConstants.OutputFormat), "JSON" },
778+
{ nameof(DataLakeConstants.UseCurrentTimeForExport), true },
779+
};
780+
connectorConnectionMock.Setup(x => x.Authentication).Returns(authenticationValues);
781+
782+
var jobDataFactory = new Mock<AzureDataLakeJobDataFactory>();
783+
jobDataFactory.Setup(x => x.GetConfiguration(It.IsAny<ExecutionContext>(), providerDefinitionId, It.IsAny<string>()))
784+
.ReturnsAsync(new AzureDataLakeConnectorJobData(connectorConnectionMock.Object.Authentication.ToDictionary(x => x.Key, x => x.Value)));
785+
786+
var connector = new AzureDataLakeConnector(
787+
Mock.Of<ILogger<AzureDataLakeConnector>>(),
788+
new AzureDataLakeClient(),
789+
azureDataLakeConstantsMock.Object,
790+
jobDataFactory.Object,
791+
Mock.Of<IDateTimeOffsetProvider>());
792+
793+
var container = new WindsorContainer();
794+
container.Register(Component.For<ILogger<OrganizationDataStores>>()
795+
.Instance(new Mock<ILogger<OrganizationDataStores>>().Object));
796+
var applicationContext = new ApplicationContext(container);
797+
var organizationId = Guid.NewGuid();
798+
var organization = new Organization(applicationContext, organizationId);
799+
var context = new ExecutionContext(applicationContext, organization, Mock.Of<ILogger>());
800+
801+
var containers = await connector.GetContainers(context, providerDefinitionId);
802+
Assert.Null(containers);
803+
804+
//This is an existing container in the Azure Data Lake account
805+
//This is an existing directory in the Azure Data Lake account
806+
//There are existing files in the directory
807+
//Changing this or removing the files will cause the test to fail
808+
authenticationValues[AzureDataLakeConstants.FileSystemName] = "apac-container";
809+
authenticationValues[AzureDataLakeConstants.DirectoryName] = "TestExport01";
810+
811+
connectorConnectionMock.Setup(x => x.Authentication).Returns(authenticationValues);
812+
jobDataFactory.Setup(x => x.GetConfiguration(It.IsAny<ExecutionContext>(), providerDefinitionId, It.IsAny<string>()))
813+
.ReturnsAsync(new AzureDataLakeConnectorJobData(connectorConnectionMock.Object.Authentication.ToDictionary(x => x.Key, x => x.Value)));
814+
815+
connector = new AzureDataLakeConnector(
816+
Mock.Of<ILogger<AzureDataLakeConnector>>(),
817+
new AzureDataLakeClient(),
818+
azureDataLakeConstantsMock.Object,
819+
jobDataFactory.Object,
820+
Mock.Of<IDateTimeOffsetProvider>());
821+
822+
containers = await connector.GetContainers(context, providerDefinitionId);
823+
Assert.NotNull(containers);
824+
}
825+
826+
[Fact]
827+
public async Task GetContainers_HasValuesTest()
828+
{
829+
var azureDataLakeConstantsMock = CreateConstantsMock();
830+
831+
var providerDefinitionId = Guid.Parse("c444cda8-d9b5-45cc-a82d-fef28e08d55c");
832+
833+
var connectorConnectionMock = new Mock<IConnectorConnectionV2>();
834+
835+
var accountName = Environment.GetEnvironmentVariable("ADL2_ACCOUNTNAME");
836+
Assert.NotNull(accountName);
837+
var accountKey = Environment.GetEnvironmentVariable("ADL2_ACCOUNTKEY");
838+
Assert.NotNull(accountKey);
839+
840+
//This is an existing container in the Azure Data Lake account
841+
//This is an existing directory in the Azure Data Lake account
842+
//There are existing files in the directory
843+
//Changing this or removing the files will cause the test to fail
844+
var fileSystemName = $"apac-container";
845+
var directoryName = $"TestExport01";
846+
847+
var streamCacheConnectionStringEncoded = Environment.GetEnvironmentVariable("ADL2_STREAMCACHE");
848+
var streamCacheConnectionString = Encoding.UTF8.GetString(Convert.FromBase64String(streamCacheConnectionStringEncoded));
849+
850+
var authenticationValues = new Dictionary<string, object>()
851+
{
852+
{ nameof(AzureDataLakeConstants.AccountName), accountName },
853+
{ nameof(AzureDataLakeConstants.AccountKey), accountKey },
854+
{ nameof(AzureDataLakeConstants.FileSystemName), fileSystemName },
855+
{ nameof(AzureDataLakeConstants.DirectoryName), directoryName },
856+
{ nameof(DataLakeConstants.IsStreamCacheEnabled), true },
857+
{ nameof(DataLakeConstants.StreamCacheConnectionString), streamCacheConnectionString },
858+
{ nameof(DataLakeConstants.OutputFormat), "JSON" },
859+
{ nameof(DataLakeConstants.UseCurrentTimeForExport), true },
860+
};
861+
connectorConnectionMock.Setup(x => x.Authentication).Returns(authenticationValues);
862+
863+
var jobDataFactory = new Mock<AzureDataLakeJobDataFactory>();
864+
jobDataFactory.Setup(x => x.GetConfiguration(It.IsAny<ExecutionContext>(), providerDefinitionId, It.IsAny<string>()))
865+
.ReturnsAsync(new AzureDataLakeConnectorJobData(connectorConnectionMock.Object.Authentication.ToDictionary(x => x.Key, x => x.Value)));
866+
867+
var connector = new AzureDataLakeConnector(
868+
Mock.Of<ILogger<AzureDataLakeConnector>>(),
869+
new AzureDataLakeClient(),
870+
azureDataLakeConstantsMock.Object,
871+
jobDataFactory.Object,
872+
Mock.Of<IDateTimeOffsetProvider>());
873+
874+
var container = new WindsorContainer();
875+
container.Register(Component.For<ILogger<OrganizationDataStores>>()
876+
.Instance(new Mock<ILogger<OrganizationDataStores>>().Object));
877+
var applicationContext = new ApplicationContext(container);
878+
var organizationId = Guid.NewGuid();
879+
var organization = new Organization(applicationContext, organizationId);
880+
var context = new ExecutionContext(applicationContext, organization, Mock.Of<ILogger>());
881+
882+
var containers = await connector.GetContainers(context, providerDefinitionId);
883+
Assert.NotNull(containers);
884+
Assert.NotEmpty(containers);
885+
}
886+
749887
private static async Task<DateTimeOffset> GetFileDataTime(ExecuteExportArg executeExportArg, PathItem path)
750888
{
751889
var fsClient = executeExportArg.Client.GetFileSystemClient(executeExportArg.FileSystemName);

0 commit comments

Comments
 (0)