Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 94 additions & 9 deletions src/Connector.DataLake.Common/Connector/DataLakeClient.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Azure.Storage.Files.DataLake;
using Azure.Storage.Files.DataLake.Models;
using CluedIn.Core.Connectors;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Files.DataLake;
using Azure.Storage.Files.DataLake.Models;
using CluedIn.Core.Connectors;
using Serilog;

namespace CluedIn.Connector.DataLake.Common.Connector
{
Expand Down Expand Up @@ -150,20 +152,103 @@ private async Task<DataLakeDirectoryClient> GetDirectoryClientAsync(
string subDirectory,
bool ensureExists)
{
// ... (Initialization and logging remain the same)
var directory = configuration.RootDirectoryPath;
var directoryClient = fileSystemClient.GetDirectoryClient(directory);
if (string.IsNullOrWhiteSpace(subDirectory))

// Log the initial directory calculation
Log.Logger.Information("Requested directory path relative to file system: {RootDirectory} with subdirectory {SubDirectory}.",
directory, subDirectory);

if (!string.IsNullOrWhiteSpace(subDirectory))
{
return directoryClient;
directoryClient = directoryClient.GetSubDirectoryClient(subDirectory);
}

directoryClient = directoryClient.GetSubDirectoryClient(subDirectory);

if (ensureExists && !await directoryClient.ExistsAsync())
// --- START OF REPLACEMENT LOGIC ---
if (ensureExists)
{
directoryClient = await fileSystemClient.CreateDirectoryAsync(directoryClient.Path);
var fullRelativePath = directoryClient.Path;
Log.Logger.Information("Starting recursive path creation for full path: {FullPath}", fullRelativePath);

if (!string.IsNullOrWhiteSpace(fullRelativePath))
{
var pathSegments = fullRelativePath.Split('/');
var currentPath = "";

foreach (var segment in pathSegments)
{
if (string.IsNullOrWhiteSpace(segment))
continue;

currentPath = string.IsNullOrEmpty(currentPath) ? segment : $"{currentPath}/{segment}";
var segmentClient = fileSystemClient.GetDirectoryClient(currentPath);

Log.Logger.Information("Checking/creating directory segment: {CurrentPath}", currentPath);

Response<PathInfo> response;
try
{
response = await segmentClient.CreateIfNotExistsAsync();
}
catch(RequestFailedException ex)
{
if (ex.ErrorCode == "OperationNotAllowedOnThePath")
{
continue;
}

throw;
}

if (response == null)
{
Log.Logger.Information("Null response received for: {CurrentPath}", currentPath);
// weird behaviour where null is being returned for
// - XXXXXX.MountedRelationalDatabase/Files
// - XXXXXX.MountedRelationalDatabase/Files/LandingZone
continue;
}

var rawResponse = response.GetRawResponse();
var statusCode = rawResponse.Status;

if (statusCode == 201)
{
Log.Logger.Information("Successfully CREATED directory segment: {CurrentPath}", currentPath);
}
else if (statusCode == 409) // Most common "already exists" for this operation
{
Log.Logger.Information("Directory segment already exists (409 Conflict): {CurrentPath}", currentPath);
}
else if (statusCode == 400 &&
rawResponse.Headers.TryGetValue("x-ms-error-code", out var errorCode) &&
errorCode == "OperationNotAllowedOnThePath")
{
// This specifically indicates the directory *is* there, but the operation
// (which includes setting default properties) failed because of an existing state.
Log.Logger.Information("OperationNotAllowedOnThePath: Path exists or is immutable: {CurrentPath}", currentPath);
}
else if (statusCode == 200) // Less common "already exists" (as discussed)
{
Log.Logger.Information("Directory segment already exists (200 OK): {CurrentPath}", currentPath);
}
else
{
// All other unexpected failures (Auth, Internal Server Error, etc.)
Log.Logger.Error("Failed to ensure directory segment exists. Path: {CurrentPath}, Status Code: {StatusCode}",
currentPath, statusCode);

// Use RequestFailedException(rawResponse) to capture the full error details
throw new RequestFailedException(rawResponse);
}
}
}
Log.Logger.Information("Recursive path creation complete. Full path is guaranteed to exist. {FullPath}", fullRelativePath);
}
// --- END OF REPLACEMENT LOGIC ---

Log.Logger.Information("Returning directory client for URI: {DirectoryUri}", directoryClient.Uri);
return directoryClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using System.Transactions;

using Azure;
using Azure.Storage.Files.DataLake;

using CluedIn.Connector.DataLake.Common.Connector.SqlDataWriter;
Expand Down Expand Up @@ -178,11 +178,12 @@ public override async Task DoRunAsync(ExecutionContext context, IDataLakeJobArgs
}

context.Log.LogInformation(
"Begin writing to file '{OutputFileName}' using data at {DataTime} and {TemporaryOutputFileName} ({TemporaryFileClientUri}).",
"Begin writing to file '{OutputFileName}' using data at {DataTime} and {TemporaryOutputFileName} ({TemporaryFileClientUri}) IsOverwriteEnabled={IsOverwriteEnabled}.",
outputFileName,
asOfTime,
temporaryOutputFileName,
temporaryFileClient.Uri);
temporaryFileClient.Uri,
configuration.IsOverwriteEnabled);

var totalRows = await writeFileContentsAsync();
if (configuration.IsDeltaMode && totalRows == 0 && !GetIsEmptyFileAllowed(exportJobData))
Expand Down Expand Up @@ -227,9 +228,29 @@ async Task<long> writeFileContentsAsync()
{
var fieldNamesToUse = await GetFieldNamesAsync(context, exportJobData, configuration, fieldNames);
var sqlDataWriter = GetSqlDataWriter(outputFormat);
await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled);
using var bufferedStream = new DataLakeBufferedWriteStream(outputStream);
return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader);
await directoryClient.CreateIfNotExistsAsync();

try
{
await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled);
using var bufferedStream = new DataLakeBufferedWriteStream(outputStream);
return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader);
}
catch (RequestFailedException e)
{
context.Log.LogInformation("Request failed");
if (e.Status == 404)
{
context.Log.LogInformation("Request failed 404 - creating file");

await temporaryFileClient.CreateAsync();

await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled);
using var bufferedStream = new DataLakeBufferedWriteStream(outputStream);
return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader);
}
throw new ApplicationException($"Failed to write to {temporaryFileClient.Uri}", e);
}
}

async Task setFilePropertiesAsync()
Expand Down
Loading