Skip to content

Commit cc4209e

Browse files
author
dotnetprog
committed
add single row depandent processing
1 parent f519d44 commit cc4209e

File tree

3 files changed

+69
-7
lines changed

3 files changed

+69
-7
lines changed

src/Dataverse.ConfigurationMigrationTool.Console/Dataverse.ConfigurationMigrationTool.Console/Features/Import/ImportTaskProcessorService.cs

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
using Dataverse.ConfigurationMigrationTool.Console.Features.Shared;
44
using Dataverse.ConfigurationMigrationTool.Console.Services.Dataverse;
55
using Microsoft.Extensions.Logging;
6+
using Microsoft.PowerPlatform.Dataverse.Client;
67
using Microsoft.Xrm.Sdk;
78
using Microsoft.Xrm.Sdk.Messages;
89
using Microsoft.Xrm.Sdk.Metadata;
10+
using System.ServiceModel;
911

1012
namespace Dataverse.ConfigurationMigrationTool.Console.Features.Import
1113
{
@@ -25,13 +27,15 @@ public class ImportTaskProcessorService : IImportTaskProcessorService
2527
private readonly ILogger<ImportTaskProcessorService> logger;
2628
private readonly IDataverseValueConverter _dataverseValueConverter;
2729
private readonly IBulkOrganizationService bulkOrganizationService;
30+
private IOrganizationServiceAsync2 _organizationService;
2831

29-
public ImportTaskProcessorService(IMetadataService metadataService, ILogger<ImportTaskProcessorService> logger, IDataverseValueConverter dataverseValueConverter, IBulkOrganizationService bulkOrganizationService)
32+
public ImportTaskProcessorService(IMetadataService metadataService, ILogger<ImportTaskProcessorService> logger, IDataverseValueConverter dataverseValueConverter, IBulkOrganizationService bulkOrganizationService, IOrganizationServiceAsync2 organizationService)
3033
{
3134
this.metadataService = metadataService;
3235
this.logger = logger;
3336
_dataverseValueConverter = dataverseValueConverter;
3437
this.bulkOrganizationService = bulkOrganizationService;
38+
_organizationService = organizationService;
3539
}
3640

3741
public async Task<TaskResult> Execute(ImportDataTask task, Entities dataImport)
@@ -65,17 +69,17 @@ private async Task<TaskResult> ImportRecords(EntityMetadata entity, ImportDataTa
6569
{
6670

6771
var recordsWithNoSelfDependancies = entityImport.Records.Record.Where(r =>
68-
!entityImport.Records.Record.Any(r2 => r2.Id != r.Id &&
69-
r2.Field.Any(f => f.Lookupentity != null && f.Value == r.Id.ToString())));
72+
!r.Field.Any(f => f.Lookupentity == entityImport.Name &&
73+
entityImport.Records.Record.Any(r2 => r2.Id != r.Id && r2.Id.ToString() == f.Value))).Select(r => BuildUpsertRequest(entity, entityImport, r)).ToList();
7074
var recordsWithSelfDependancies = entityImport.Records.Record.Where(r =>
71-
entityImport.Records.Record.Any(r2 => r2.Id != r.Id &&
72-
r2.Field.Any(f => f.Lookupentity != null && f.Value == r.Id.ToString())));
73-
var requests = recordsWithNoSelfDependancies.Select(r => BuildUpsertRequest(entity, entityImport, r)).ToList();
75+
r.Field.Any(f => f.Lookupentity == entityImport.Name &&
76+
entityImport.Records.Record.Any(r2 => r2.Id != r.Id && r2.Id.ToString() == f.Value))).ToList();
77+
7478

7579
//See if upsert request keep ids
7680

7781
//implement parallelism and batching
78-
var responses = await bulkOrganizationService.ExecuteBulk(requests);
82+
var responses = await bulkOrganizationService.ExecuteBulk(recordsWithNoSelfDependancies);
7983

8084
foreach (var response in responses)
8185
{
@@ -88,11 +92,66 @@ private async Task<TaskResult> ImportRecords(EntityMetadata entity, ImportDataTa
8892

8993
}
9094
var resultTask = responses.Any() ? TaskResult.Failed : TaskResult.Completed;
95+
96+
var singleResponses = await ProcessDependantRecords(recordsWithSelfDependancies, entity, entityImport);
97+
foreach (var response in singleResponses)
98+
{
99+
100+
var targetRequest = (response.OriginalRequest as UpsertRequest).Target;
101+
102+
103+
logger.LogError("{logicalname}({id}) upsert failed because: {fault}", targetRequest.LogicalName, targetRequest.Id, response.Fault.Message);
104+
105+
106+
}
107+
resultTask = singleResponses.Any() ? TaskResult.Failed : resultTask;
91108
logger.LogInformation("Import Task of {entityname} records terminated in a {State} state", entityImport.Name, resultTask);
92109
return resultTask;
93110

94111
}
112+
private async Task<IEnumerable<OrganizationResponseFaultedResult>> ProcessDependantRecords(IEnumerable<Record> records, EntityMetadata entity, EntityImport entityImport)
113+
{
114+
var retries = new Dictionary<Guid, int>();
115+
var queue = new Queue<Record>(records);
116+
var results = new List<OrganizationResponseFaultedResult>();
117+
while (queue.Count > 0)
118+
{
119+
var record = queue.Dequeue();
120+
121+
if (record.Field.Any(f => f.Lookupentity == entityImport.Name && queue.Any(r => r.Id.ToString() == f.Value)))
122+
{
123+
124+
if (retries.ContainsKey(record.Id) && retries[record.Id] >= 3)
125+
{
126+
logger.LogWarning("{entityType}({id}) was skipped because his parent was not proccessed.", entityImport.Name, record.Id)
127+
continue;
128+
}
129+
//Enqueue record again until his parent is processed.
130+
queue.Enqueue(record);
131+
retries[record.Id] = retries.ContainsKey(record.Id) ? retries[record.Id] + 1 : 1;
132+
continue;
133+
}
134+
var request = BuildUpsertRequest(entity, entityImport, record);
135+
try
136+
{
137+
138+
var response = (await _organizationService.ExecuteAsync(request)) as UpsertResponse;
139+
140+
141+
}
142+
catch (FaultException<OrganizationServiceFault> faultEx)
143+
{
144+
results.Add(new OrganizationResponseFaultedResult
145+
{
146+
Fault = faultEx.Detail,
147+
OriginalRequest = request,
148+
});
149+
}
150+
151+
}
152+
return results;
95153

154+
}
96155
private UpsertRequest BuildUpsertRequest(EntityMetadata entityMD, EntityImport entityImport, Record record)
97156
{
98157
var entity = new Entity(entityImport.Name, record.Id);

src/Dataverse.ConfigurationMigrationTool.Console/Dataverse.ConfigurationMigrationTool.Console/Services/Dataverse/IBulkOrganizationService.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ namespace Dataverse.ConfigurationMigrationTool.Console.Services.Dataverse
55
public interface IBulkOrganizationService
66
{
77
Task<IEnumerable<OrganizationResponseFaultedResult>> ExecuteBulk(IEnumerable<OrganizationRequest> request);
8+
89
}
910
}

src/Dataverse.ConfigurationMigrationTool.Console/Dataverse.ConfigurationMigrationTool.Console/Services/Dataverse/ParallelismBulkOrganizationService.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,7 @@ await Parallel.ForEachAsync(
7676
return result;
7777

7878
}
79+
80+
7981
}
8082
}

0 commit comments

Comments
 (0)