diff --git a/src/Connector.DataLake.Common/Connector/DataLakeClient.cs b/src/Connector.DataLake.Common/Connector/DataLakeClient.cs index 6bca5f7..0442f37 100644 --- a/src/Connector.DataLake.Common/Connector/DataLakeClient.cs +++ b/src/Connector.DataLake.Common/Connector/DataLakeClient.cs @@ -1,10 +1,12 @@ -using Azure.Storage.Files.DataLake; -using Azure.Storage.Files.DataLake.Models; -using CluedIn.Core.Connectors; using System; using System.Collections.Generic; using System.IO; using System.Threading.Tasks; +using Azure; +using Azure.Storage.Files.DataLake; +using Azure.Storage.Files.DataLake.Models; +using CluedIn.Core.Connectors; +using Serilog; namespace CluedIn.Connector.DataLake.Common.Connector { @@ -150,20 +152,103 @@ private async Task GetDirectoryClientAsync( string subDirectory, bool ensureExists) { + // ... (Initialization and logging remain the same) var directory = configuration.RootDirectoryPath; var directoryClient = fileSystemClient.GetDirectoryClient(directory); - if (string.IsNullOrWhiteSpace(subDirectory)) + + // Log the initial directory calculation + Log.Logger.Information("Requested directory path relative to file system: {RootDirectory} with subdirectory {SubDirectory}.", + directory, subDirectory); + + if (!string.IsNullOrWhiteSpace(subDirectory)) { - return directoryClient; + directoryClient = directoryClient.GetSubDirectoryClient(subDirectory); } - directoryClient = directoryClient.GetSubDirectoryClient(subDirectory); - - if (ensureExists && !await directoryClient.ExistsAsync()) + // --- START OF REPLACEMENT LOGIC --- + if (ensureExists) { - directoryClient = await fileSystemClient.CreateDirectoryAsync(directoryClient.Path); + var fullRelativePath = directoryClient.Path; + Log.Logger.Information("Starting recursive path creation for full path: {FullPath}", fullRelativePath); + + if (!string.IsNullOrWhiteSpace(fullRelativePath)) + { + var pathSegments = fullRelativePath.Split('/'); + var currentPath = ""; + + foreach (var segment in pathSegments) + { + if (string.IsNullOrWhiteSpace(segment)) + continue; + + currentPath = string.IsNullOrEmpty(currentPath) ? segment : $"{currentPath}/{segment}"; + var segmentClient = fileSystemClient.GetDirectoryClient(currentPath); + + Log.Logger.Information("Checking/creating directory segment: {CurrentPath}", currentPath); + + Response response; + try + { + response = await segmentClient.CreateIfNotExistsAsync(); + } + catch(RequestFailedException ex) + { + if (ex.ErrorCode == "OperationNotAllowedOnThePath") + { + continue; + } + + throw; + } + + if (response == null) + { + Log.Logger.Information("Null response received for: {CurrentPath}", currentPath); + // weird behaviour where null is being returned for + // - XXXXXX.MountedRelationalDatabase/Files + // - XXXXXX.MountedRelationalDatabase/Files/LandingZone + continue; + } + + var rawResponse = response.GetRawResponse(); + var statusCode = rawResponse.Status; + + if (statusCode == 201) + { + Log.Logger.Information("Successfully CREATED directory segment: {CurrentPath}", currentPath); + } + else if (statusCode == 409) // Most common "already exists" for this operation + { + Log.Logger.Information("Directory segment already exists (409 Conflict): {CurrentPath}", currentPath); + } + else if (statusCode == 400 && + rawResponse.Headers.TryGetValue("x-ms-error-code", out var errorCode) && + errorCode == "OperationNotAllowedOnThePath") + { + // This specifically indicates the directory *is* there, but the operation + // (which includes setting default properties) failed because of an existing state. + Log.Logger.Information("OperationNotAllowedOnThePath: Path exists or is immutable: {CurrentPath}", currentPath); + } + else if (statusCode == 200) // Less common "already exists" (as discussed) + { + Log.Logger.Information("Directory segment already exists (200 OK): {CurrentPath}", currentPath); + } + else + { + // All other unexpected failures (Auth, Internal Server Error, etc.) + Log.Logger.Error("Failed to ensure directory segment exists. Path: {CurrentPath}, Status Code: {StatusCode}", + currentPath, statusCode); + + // Use RequestFailedException(rawResponse) to capture the full error details + throw new RequestFailedException(rawResponse); + } + } + } + Log.Logger.Information("Recursive path creation complete. Full path is guaranteed to exist. {FullPath}", fullRelativePath); } + // --- END OF REPLACEMENT LOGIC --- + Log.Logger.Information("Returning directory client for URI: {DirectoryUri}", directoryClient.Uri); return directoryClient; } diff --git a/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs b/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs index 6b5f0e7..6901a0f 100644 --- a/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs +++ b/src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs @@ -6,7 +6,7 @@ using System.Text.RegularExpressions; using System.Threading.Tasks; using System.Transactions; - +using Azure; using Azure.Storage.Files.DataLake; using CluedIn.Connector.DataLake.Common.Connector.SqlDataWriter; @@ -178,11 +178,12 @@ public override async Task DoRunAsync(ExecutionContext context, IDataLakeJobArgs } context.Log.LogInformation( - "Begin writing to file '{OutputFileName}' using data at {DataTime} and {TemporaryOutputFileName} ({TemporaryFileClientUri}).", + "Begin writing to file '{OutputFileName}' using data at {DataTime} and {TemporaryOutputFileName} ({TemporaryFileClientUri}) IsOverwriteEnabled={IsOverwriteEnabled}.", outputFileName, asOfTime, temporaryOutputFileName, - temporaryFileClient.Uri); + temporaryFileClient.Uri, + configuration.IsOverwriteEnabled); var totalRows = await writeFileContentsAsync(); if (configuration.IsDeltaMode && totalRows == 0 && !GetIsEmptyFileAllowed(exportJobData)) @@ -227,9 +228,29 @@ async Task writeFileContentsAsync() { var fieldNamesToUse = await GetFieldNamesAsync(context, exportJobData, configuration, fieldNames); var sqlDataWriter = GetSqlDataWriter(outputFormat); - await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled); - using var bufferedStream = new DataLakeBufferedWriteStream(outputStream); - return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader); + await directoryClient.CreateIfNotExistsAsync(); + + try + { + await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled); + using var bufferedStream = new DataLakeBufferedWriteStream(outputStream); + return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader); + } + catch (RequestFailedException e) + { + context.Log.LogInformation("Request failed"); + if (e.Status == 404) + { + context.Log.LogInformation("Request failed 404 - creating file"); + + await temporaryFileClient.CreateAsync(); + + await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled); + using var bufferedStream = new DataLakeBufferedWriteStream(outputStream); + return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader); + } + throw new ApplicationException($"Failed to write to {temporaryFileClient.Uri}", e); + } } async Task setFilePropertiesAsync()