Skip to content

Commit 43a713e

Browse files
Merge branch 'release/4.5.1'
2 parents 6f88997 + cc7260b commit 43a713e

File tree

16 files changed

+312
-46
lines changed

16 files changed

+312
-46
lines changed

docs/4.4.5-release-notes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
### Fix
2+
- Fixed issue where the buffer would not wait after a flush

docs/4.4.6-release-notes.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
### Features
2+
- Add _partnerEvents.json when creating _metadata.json in Open Mirroring
3+
4+
### Fixes
5+
- Fix issue where rows are not updated in Open Mirroring when entities are updated

docs/4.5.1-release-notes.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
### Features
2+
- Add _partnerEvents.json when creating _metadata.json in Open Mirroring
3+
4+
### Fixes
5+
- Fix issue where rows are not updated in Open Mirroring when entities are updated

src/Connector.DataLake.Common/Connector/DataLakeConnector.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,9 @@ IF EXISTS (
359359
BEGIN
360360
UPDATE [{tableName}]
361361
SET
362-
[{DataLakeConstants.ChangeTypeKey}] = {syncItem.ChangeType.ToString()}
362+
[{DataLakeConstants.ChangeTypeKey}] = @ChangeType,
363+
[Timestamp] = @Timestamp,
364+
[Epoch] = @Epoch
363365
WHERE {DataLakeConstants.IdKey} = @{DataLakeConstants.IdKey};
364366
END
365367
""";
@@ -368,6 +370,9 @@ UPDATE [{tableName}]
368370
CommandType = CommandType.Text
369371
};
370372
command.Parameters.Add(new SqlParameter($"@{DataLakeConstants.IdKey}", syncItem.EntityId));
373+
command.Parameters.Add(new SqlParameter($"@ChangeType", syncItem.ChangeType.ToString()));
374+
command.Parameters.Add(new SqlParameter($"@Timestamp", syncItem.Data["Timestamp"]));
375+
command.Parameters.Add(new SqlParameter($"@Epoch", syncItem.Data["Epoch"]));
371376

372377
var rowsAffected = await command.ExecuteNonQueryAsync();
373378
if (rowsAffected != 1)

src/Connector.DataLake.Common/Connector/DataLakeExportEntitiesJobBase.cs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
using System.Threading.Tasks;
88
using System.Transactions;
99

10-
using Apache.Arrow;
11-
1210
using Azure.Storage.Files.DataLake;
1311

1412
using CluedIn.Connector.DataLake.Common.Connector.SqlDataWriter;
@@ -20,8 +18,6 @@
2018
using Microsoft.Data.SqlClient;
2119
using Microsoft.Extensions.Logging;
2220

23-
using Parquet;
24-
2521
namespace CluedIn.Connector.DataLake.Common.Connector;
2622

2723
internal abstract class DataLakeExportEntitiesJobBase : DataLakeJobBase
@@ -117,7 +113,7 @@ public override async Task DoRunAsync(ExecutionContext context, IDataLakeJobArgs
117113

118114
var subDirectory = await GetSubDirectory(configuration, streamId);
119115
var directoryClient = await _dataLakeClient.EnsureDataLakeDirectoryExist(configuration, subDirectory);
120-
await InitializeDirectoryAsync(configuration, streamId, directoryClient);
116+
await InitializeDirectoryAsync(context, connection, configuration, exportJobData, directoryClient);
121117
var startExportTime = _dateTimeOffsetProvider.GetCurrentUtcTime();
122118
var exportHistory = new ExportHistory(
123119
streamId,
@@ -152,10 +148,6 @@ public override async Task DoRunAsync(ExecutionContext context, IDataLakeJobArgs
152148
.Select(reader.GetName)
153149
.ToList();
154150

155-
var fieldNamesToUse = configuration.IsDeltaMode
156-
? fieldNames
157-
: fieldNames.Where(fieldName => fieldName != DataLakeConstants.ChangeTypeKey).ToList();
158-
159151
var temporaryOutputFileName = outputFileName + TemporaryFileSuffix;
160152
using var loggingScope = context.Log.BeginScope(new Dictionary<string, object>
161153
{
@@ -214,10 +206,11 @@ public override async Task DoRunAsync(ExecutionContext context, IDataLakeJobArgs
214206

215207
async Task<long> writeFileContentsAsync()
216208
{
209+
var fieldNamesToUse = await GetFieldNamesAsync(context, exportJobData, configuration, fieldNames);
217210
var sqlDataWriter = GetSqlDataWriter(outputFormat);
218211
await using var outputStream = await temporaryFileClient.OpenWriteAsync(configuration.IsOverwriteEnabled);
219212
using var bufferedStream = new DataLakeBufferedWriteStream(outputStream);
220-
return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, reader);
213+
return await sqlDataWriter?.WriteAsync(context, configuration, bufferedStream, fieldNamesToUse, IsInitialExport, reader);
221214
}
222215

223216
async Task setFilePropertiesAsync()
@@ -268,8 +261,22 @@ async Task deleteFileIfExistsAsync(string file)
268261
await targetFileClient.DeleteIfExistsAsync();
269262
}
270263
}
264+
265+
private protected virtual async Task<List<string>> GetFieldNamesAsync(
266+
ExecutionContext context,
267+
ExportJobData exportJobData,
268+
IDataLakeJobData configuration,
269+
List<string> fieldNames)
270+
{
271+
return configuration.IsDeltaMode
272+
? fieldNames
273+
: fieldNames.Where(fieldName => fieldName != DataLakeConstants.ChangeTypeKey).ToList();
274+
}
275+
271276
private protected ExportHistory LastExport { get; private set; }
272277

278+
protected virtual bool IsInitialExport => LastExport == null;
279+
273280
private protected virtual bool GetIsEmptyFileAllowed(ExportJobData exportJobData) => true;
274281

275282
private protected virtual Task PostExportAsync(ExecutionContext context, ExportJobData exportJobData)
@@ -282,7 +289,7 @@ protected virtual Task<string> GetSubDirectory(IDataLakeJobData configuration, G
282289
return Task.FromResult(string.Empty);
283290
}
284291

285-
protected virtual Task InitializeDirectoryAsync(IDataLakeJobData configuration, Guid streamId, DataLakeDirectoryClient client)
292+
private protected virtual Task InitializeDirectoryAsync(ExecutionContext context, SqlConnection connection, IDataLakeJobData configuration, ExportJobData exportJobData, DataLakeDirectoryClient client)
286293
{
287294
return Task.CompletedTask;
288295
}

src/Connector.DataLake.Common/Connector/SqlDataWriter/CsvSqlDataWriter.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public override async Task<long> WriteOutputAsync(
1818
IDataLakeJobData configuration,
1919
Stream outputStream,
2020
ICollection<string> fieldNames,
21+
bool isInitialExport,
2122
SqlDataReader reader)
2223
{
2324
context.Log.LogInformation("Begin writing output.");
@@ -27,14 +28,20 @@ public override async Task<long> WriteOutputAsync(
2728
await using var csv = new CsvWriter(writer, csvConfig);
2829
foreach (var fieldName in fieldNames)
2930
{
30-
var fieldNameToUse = configuration.ShouldEscapeVocabularyKeys ? EscapeVocabularyKey(fieldName) : fieldName;
31+
var fieldNameToUse = GetFieldName(configuration, fieldName);
3132
csv.WriteField(fieldNameToUse);
3233
}
34+
3335
await csv.NextRecordAsync();
3436

3537
var totalProcessed = 0L;
3638
while (await reader.ReadAsync())
3739
{
40+
if (ShouldSkip(configuration, isInitialExport, reader))
41+
{
42+
continue;
43+
}
44+
3845
var fieldValues = fieldNames.Select(name => GetValue(name, reader, configuration));
3946
foreach (var field in fieldValues)
4047
{
@@ -52,4 +59,9 @@ public override async Task<long> WriteOutputAsync(
5259

5360
return totalProcessed;
5461
}
62+
63+
protected virtual string GetFieldName(IDataLakeJobData configuration, string fieldName)
64+
{
65+
return configuration.ShouldEscapeVocabularyKeys ? EscapeVocabularyKey(fieldName) : fieldName;
66+
}
5567
}

src/Connector.DataLake.Common/Connector/SqlDataWriter/ISqlDataWriter.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ Task<long> WriteAsync(
1313
IDataLakeJobData configuration,
1414
Stream outputStream,
1515
ICollection<string> fieldNames,
16+
bool isInitialExport,
1617
SqlDataReader reader);
1718
}

src/Connector.DataLake.Common/Connector/SqlDataWriter/JsonSqlDataWriter.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public override async Task<long> WriteOutputAsync(
1616
IDataLakeJobData configuration,
1717
Stream outputStream,
1818
ICollection<string> fieldNames,
19+
bool isInitialExport,
1920
SqlDataReader reader)
2021
{
2122
await using var stringWriter = new StreamWriter(outputStream);
@@ -26,10 +27,15 @@ public override async Task<long> WriteOutputAsync(
2627
await writer.WriteStartArrayAsync();
2728
while (await reader.ReadAsync())
2829
{
30+
if (ShouldSkip(configuration, isInitialExport, reader))
31+
{
32+
continue;
33+
}
34+
2935
await writer.WriteStartObjectAsync();
3036

3137
foreach(var field in fieldNames)
32-
{
38+
{
3339
await writer.WritePropertyNameAsync(field);
3440
var value = GetValue(field, reader, configuration);
3541
if (value is JArray jArray)

src/Connector.DataLake.Common/Connector/SqlDataWriter/ParquetSqlDataWriter.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public override async Task<long> WriteOutputAsync(
3434
IDataLakeJobData configuration,
3535
Stream outputStream,
3636
ICollection<string> fieldNames,
37+
bool isInitialExport,
3738
SqlDataReader reader)
3839
{
3940
var fields = new List<Field>();
@@ -51,6 +52,11 @@ public override async Task<long> WriteOutputAsync(
5152

5253
while (await reader.ReadAsync())
5354
{
55+
if (ShouldSkip(configuration, isInitialExport, reader))
56+
{
57+
continue;
58+
}
59+
5460
var fieldValues = fieldNames.Select(key => {
5561
var value = GetValue(key, reader, configuration);
5662
return value;

src/Connector.DataLake.Common/Connector/SqlDataWriter/SqlDataWriterBase.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Data;
44
using System.IO;
55
using System.Text.RegularExpressions;
66
using System.Threading.Tasks;
77
using CluedIn.Core;
8+
using CluedIn.Core.Data.Parts;
9+
810
using Microsoft.Data.SqlClient;
911
using Microsoft.Extensions.Logging;
1012

@@ -15,6 +17,11 @@ internal abstract class SqlDataWriterBase : ISqlDataWriter
1517
protected const int LoggingThreshold = 1000;
1618
private static readonly Regex NonAlphaNumericRegex = new("[^a-zA-Z0-9_]");
1719
protected virtual object GetValue(string key, SqlDataReader reader, IDataLakeJobData configuration)
20+
{
21+
return GetValueInternal(key, reader);
22+
}
23+
24+
private static object GetValueInternal(string key, SqlDataReader reader)
1825
{
1926
var value = reader.GetValue(key);
2027

@@ -31,11 +38,12 @@ public async Task<long> WriteAsync(
3138
IDataLakeJobData configuration,
3239
Stream outputStream,
3340
ICollection<string> fieldNames,
41+
bool isInitialExport,
3442
SqlDataReader reader)
3543
{
3644
context.Log.LogInformation("Begin writing output.");
3745

38-
var totalProcessed = await WriteOutputAsync(context, configuration, outputStream, fieldNames, reader);
46+
var totalProcessed = await WriteOutputAsync(context, configuration, outputStream, fieldNames, isInitialExport, reader);
3947
context.Log.LogInformation("End writing output. Total processed: {TotalProcessed}.", totalProcessed);
4048
return totalProcessed;
4149
}
@@ -45,10 +53,18 @@ public abstract Task<long> WriteOutputAsync(
4553
IDataLakeJobData configuration,
4654
Stream outputStream,
4755
ICollection<string> fieldNames,
56+
bool isInitialExport,
4857
SqlDataReader reader);
4958

5059
protected string EscapeVocabularyKey(string fieldName)
5160
{
5261
return NonAlphaNumericRegex.Replace(fieldName, "_");
5362
}
63+
64+
protected virtual bool ShouldSkip(IDataLakeJobData configuration, bool isInitialExport, SqlDataReader reader)
65+
{
66+
return configuration.IsDeltaMode
67+
&& isInitialExport
68+
&& GetValueInternal(DataLakeConstants.ChangeTypeKey, reader).Equals(VersionChangeType.Removed.ToString());
69+
}
5470
}

0 commit comments

Comments
 (0)