Skip to content

Commit c3333b6

Browse files
authored
chore: Merge release/4.6.1 to master (#138)
2 parents e20715d + d469e71 commit c3333b6

File tree

4 files changed

+68
-4
lines changed

4 files changed

+68
-4
lines changed

docs/4.6.1-release-notes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
### Features
2+
- Additional information output to help diagnose common azure issues
3+
- Handle change in microsoft's libraries that resulted in a 404 when writing to a new file

src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Azure.Storage.Files.DataLake;
1111

1212
using CluedIn.Connector.DataLake.Common.Connector.SqlDataWriter;
13+
using CluedIn.Connector.DataLake.Common.Extensions;
1314
using CluedIn.Core;
1415
using CluedIn.Core.Data.Relational;
1516
using CluedIn.Core.Streams;
@@ -164,12 +165,26 @@ public override async Task DoRunAsync(ExecutionContext context, IDataLakeJobArgs
164165
[DataTimeKey] = asOfTime,
165166
});
166167

168+
DataLakeFileClient temporaryFileClient;
169+
try
170+
{
171+
temporaryFileClient = directoryClient.GetFileClient(temporaryOutputFileName);
172+
}
173+
catch
174+
{
175+
context.Log.LogInformation(
176+
"Error creating file client for {TemporaryOutputFileName}.",
177+
temporaryOutputFileName);
178+
throw;
179+
}
180+
167181
context.Log.LogInformation(
168-
"Begin writing to file '{OutputFileName}' using data at {DataTime} and {TemporaryOutputFileName}.",
182+
"Begin writing to file '{OutputFileName}' using data at {DataTime} and {TemporaryOutputFileName} ({TemporaryFileClientUri}).",
169183
outputFileName,
170184
asOfTime,
171-
temporaryOutputFileName);
172-
var temporaryFileClient = directoryClient.GetFileClient(temporaryOutputFileName);
185+
temporaryOutputFileName,
186+
temporaryFileClient.Uri);
187+
173188
var totalRows = await writeFileContentsAsync();
174189
if (configuration.IsDeltaMode && totalRows == 0 && !GetIsEmptyFileAllowed(exportJobData))
175190
{
@@ -213,7 +228,7 @@ async Task<long> writeFileContentsAsync()
213228
{
214229
var fieldNamesToUse = await GetFieldNamesAsync(context, exportJobData, configuration, fieldNames);
215230
var sqlDataWriter = GetSqlDataWriter(outputFormat);
216-
await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled);
231+
await using var outputStream = await temporaryFileClient.OpenWriteExAsync(configuration.IsOverwriteEnabled);
217232
using var bufferedStream = new DataLakeBufferedWriteStream(outputStream);
218233
return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader);
219234
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using Azure.Storage.Files.DataLake;
2+
using Azure;
3+
using System.IO;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Azure.Storage.Files.DataLake.Models;
7+
8+
namespace CluedIn.Connector.DataLake.Common.Extensions;
9+
10+
public static class DataLakeFileClientExtensions
11+
{
12+
/// <summary>
13+
/// Opens a stream for writing to the Data Lake file, safely handling the case where
14+
/// OpenWriteAsync fails with a 404 because the file does not yet exist.
15+
/// This pattern is necessary due to the DataLake SDK's internal GetProperties check.
16+
/// </summary>
17+
/// <param name="fileClient">The DataLakeFileClient instance.</param>
18+
/// <param name="overwrite">Whether to overwrite the file if it exists.</param>
19+
/// <param name="options">Optional parameters for the write operation.</param>
20+
/// <param name="cancellationToken">Cancellation token.</param>
21+
/// <returns>A writable Stream for the file.</returns>
22+
public static async Task<Stream> OpenWriteExAsync(
23+
this DataLakeFileClient fileClient,
24+
bool overwrite,
25+
DataLakeFileOpenWriteOptions options = null,
26+
CancellationToken cancellationToken = default)
27+
{
28+
try
29+
{
30+
// 1. Attempt the standard OpenWriteAsync call.
31+
return await fileClient.OpenWriteAsync(overwrite, options, cancellationToken);
32+
}
33+
catch (RequestFailedException ex) when (ex.Status == 404)
34+
{
35+
// This is the specific error when GetPropertiesAsync fails because the file doesn't exist.
36+
37+
await fileClient.CreateAsync(cancellationToken: cancellationToken);
38+
39+
// Retry the OpenWriteAsync call. This time, GetPropertiesAsync should succeed
40+
// (because the file exists now), and the stream will be opened.
41+
// We pass the original 'overwrite' flag here to handle the stream logic correctly.
42+
return await fileClient.OpenWriteAsync(overwrite, options, cancellationToken);
43+
}
44+
}
45+
}

test/integration/Connector.DataLake.Common.Tests.Integration/DataLakeConnectorTestsBase.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ protected virtual StreamModel CreateStreamModel(Guid providerDefinitionId, Strea
218218
ExportIncomingEdges = true,
219219
ExportOutgoingEdges = true,
220220
Status = StreamStatus.Started,
221+
OrganizationId = Guid.NewGuid(), // can't be Guid.Empty
221222
};
222223
return streamModel;
223224
}

0 commit comments

Comments
 (0)