Skip to content

Commit 5d68864

Browse files
author
dotnetprog
committed
Adding upsert logics to handle activation/deactivation
Optimising some multithreading features and logging. Fixing Schema validation rules
1 parent cc4209e commit 5d68864

File tree

7 files changed

+196
-43
lines changed

7 files changed

+196
-43
lines changed
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
namespace Dataverse.ConfigurationMigrationTool.Console.Services.Dataverse
1+
namespace Dataverse.ConfigurationMigrationTool.Console.Common
22
{
3-
public static class IOrganizationServiceExtensions
3+
public static class LinqExtensions
44
{
5+
public static bool AreEnumerablesEqualIgnoreOrder<T>(this IEnumerable<T> list1, IEnumerable<T> list2)
6+
{
7+
return new HashSet<T>(list1).SetEquals(list2);
8+
}
59
public static IEnumerable<IEnumerable<T>> Batch<T>(
610
this IEnumerable<T> source, int size)
711
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace Dataverse.ConfigurationMigrationTool.Console.Common
2+
{
3+
public class Result<TValue, TFailure>
4+
{
5+
public TValue? Value { get; set; }
6+
public TFailure? Failure { get; set; }
7+
public bool IsSuccess => Value != null;
8+
public bool IsFailure => Failure != null;
9+
public Result(TValue value)
10+
{
11+
this.Value = value;
12+
}
13+
public Result(TFailure failure)
14+
{
15+
Failure = failure;
16+
}
17+
}
18+
19+
}

src/Dataverse.ConfigurationMigrationTool.Console/Dataverse.ConfigurationMigrationTool.Console/Features/Import/ImportTaskProcessorService.cs

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using Microsoft.Xrm.Sdk;
88
using Microsoft.Xrm.Sdk.Messages;
99
using Microsoft.Xrm.Sdk.Metadata;
10-
using System.ServiceModel;
1110

1211
namespace Dataverse.ConfigurationMigrationTool.Console.Features.Import
1312
{
@@ -45,25 +44,54 @@ public async Task<TaskResult> Execute(ImportDataTask task, Entities dataImport)
4544
{
4645
return TaskResult.Completed;
4746
}
48-
if (task.RelationshipSchema != null)
47+
var entityMD = await metadataService.GetEntity(entityToImport.Name);
48+
var result = task.RelationshipSchema != null ?
49+
await ImportRelationships(entityMD, task, entityToImport) :
50+
await ImportRecords(entityMD, task, entityToImport);
51+
return result;
52+
}
53+
54+
private async Task<TaskResult> ImportRelationships(EntityMetadata entity, ImportDataTask task, EntityImport entityImport)
55+
{
56+
var relMD = entity.ManyToManyRelationships.FirstOrDefault(m => m.IntersectEntityName == task.RelationshipSchema.Name);
57+
if (relMD == null) return TaskResult.Failed;
58+
59+
var rows = entityImport.M2mrelationships.M2mrelationship.Where(m2m => m2m.M2mrelationshipname == task.RelationshipSchema.Name).ToList();
60+
61+
var requests = rows.SelectMany(r => r.Targetids.Targetid.Select(t =>
62+
new AssociateRequest
4963
{
50-
//Import relationships
64+
Target = new EntityReference
65+
{
66+
Id = r.Sourceid,
67+
LogicalName = entityImport.Name
68+
},
69+
Relationship = new Relationship(relMD.SchemaName),
70+
RelatedEntities = new EntityReferenceCollection() {
71+
new EntityReference
72+
{
73+
Id = t,
74+
LogicalName = r.Targetentityname
75+
}
76+
}
5177

52-
}
53-
else
78+
}));
79+
var responses = await bulkOrganizationService.ExecuteBulk(requests, ["Cannot insert duplicate key"]);
80+
var resultTask = responses.Any() ? TaskResult.Failed : TaskResult.Completed;
81+
82+
foreach (var response in responses)
5483
{
55-
var entityMD = await metadataService.GetEntity(entityToImport.Name);
56-
//Import Entities
57-
return await ImportRecords(entityMD, task, entityToImport);
58-
}
5984

60-
return TaskResult.Completed;
85+
var targetRequest = (response.OriginalRequest as AssociateRequest).Target;
6186

62-
}
6387

64-
private async Task<TaskResult> ImportRelationships(ImportDataTask task, EntityImport entityImport)
65-
{
66-
return TaskResult.Completed;
88+
logger.LogError("{logicalname}({id}) Adding relationship ({relationship}) failed because: {fault}", targetRequest.LogicalName, targetRequest.Id, task.RelationshipSchema.Name, response.Fault.Message);
89+
90+
91+
}
92+
93+
94+
return resultTask;
6795
}
6896
private async Task<TaskResult> ImportRecords(EntityMetadata entity, ImportDataTask task, EntityImport entityImport)
6997
{
@@ -79,7 +107,7 @@ private async Task<TaskResult> ImportRecords(EntityMetadata entity, ImportDataTa
79107
//See if upsert request keep ids
80108

81109
//implement parallelism and batching
82-
var responses = await bulkOrganizationService.ExecuteBulk(recordsWithNoSelfDependancies);
110+
var responses = await bulkOrganizationService.UpsertBulk(recordsWithNoSelfDependancies);
83111

84112
foreach (var response in responses)
85113
{
@@ -123,7 +151,7 @@ private async Task<IEnumerable<OrganizationResponseFaultedResult>> ProcessDepend
123151

124152
if (retries.ContainsKey(record.Id) && retries[record.Id] >= 3)
125153
{
126-
logger.LogWarning("{entityType}({id}) was skipped because his parent was not proccessed.", entityImport.Name, record.Id)
154+
logger.LogWarning("{entityType}({id}) was skipped because his parent was not proccessed.", entityImport.Name, record.Id);
127155
continue;
128156
}
129157
//Enqueue record again until his parent is processed.
@@ -132,20 +160,11 @@ private async Task<IEnumerable<OrganizationResponseFaultedResult>> ProcessDepend
132160
continue;
133161
}
134162
var request = BuildUpsertRequest(entity, entityImport, record);
135-
try
136-
{
137-
138-
var response = (await _organizationService.ExecuteAsync(request)) as UpsertResponse;
139163

140-
141-
}
142-
catch (FaultException<OrganizationServiceFault> faultEx)
164+
var result = await bulkOrganizationService.Upsert(request);
165+
if (result.IsFailure)
143166
{
144-
results.Add(new OrganizationResponseFaultedResult
145-
{
146-
Fault = faultEx.Detail,
147-
OriginalRequest = request,
148-
});
167+
results.Add(result.Failure);
149168
}
150169

151170
}
@@ -159,6 +178,7 @@ private UpsertRequest BuildUpsertRequest(EntityMetadata entityMD, EntityImport e
159178
foreach (var field in record.Field)
160179
{
161180
var attrMD = entityMD.Attributes.FirstOrDefault(a => a.LogicalName == field.Name);
181+
if ((attrMD.IsValidForCreate != true && attrMD.LogicalName != "statecode") || attrMD.IsValidForUpdate != true) continue;
162182
entity[field.Name] = _dataverseValueConverter.Convert(attrMD, field);
163183
}
164184
return new UpsertRequest

src/Dataverse.ConfigurationMigrationTool.Console/Dataverse.ConfigurationMigrationTool.Console/Features/Import/Validators/SchemaValidator.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Dataverse.ConfigurationMigrationTool.Console.Features.Import.Mappers;
1+
using Dataverse.ConfigurationMigrationTool.Console.Common;
2+
using Dataverse.ConfigurationMigrationTool.Console.Features.Import.Mappers;
23
using Dataverse.ConfigurationMigrationTool.Console.Features.Import.Model;
34
using Dataverse.ConfigurationMigrationTool.Console.Features.Shared;
45
using Microsoft.Xrm.Sdk.Metadata;
@@ -57,7 +58,7 @@ public async Task<ValidationResult> Validate(ImportSchema value)
5758
});
5859
continue;
5960
}
60-
if (attributeMetadata is LookupAttributeMetadata lookupMD && lookupMD.Targets.First().ToLower() != fieldSchema.LookupType)
61+
if (attributeMetadata is LookupAttributeMetadata lookupMD && !lookupMD.Targets.AreEnumerablesEqualIgnoreOrder(fieldSchema.LookupType?.Split('|') ?? Array.Empty<string>()))
6162
{
6263
failures.Add(new ValidationFailure
6364
{
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
using Microsoft.Xrm.Sdk;
1+
using Dataverse.ConfigurationMigrationTool.Console.Common;
2+
using Microsoft.Xrm.Sdk;
3+
using Microsoft.Xrm.Sdk.Messages;
24

35
namespace Dataverse.ConfigurationMigrationTool.Console.Services.Dataverse
46
{
57
public interface IBulkOrganizationService
68
{
7-
Task<IEnumerable<OrganizationResponseFaultedResult>> ExecuteBulk(IEnumerable<OrganizationRequest> request);
9+
Task<IEnumerable<OrganizationResponseFaultedResult>> ExecuteBulk(IEnumerable<OrganizationRequest> request, IEnumerable<string> faultToSkips = null);
10+
Task<IEnumerable<OrganizationResponseFaultedResult>> UpsertBulk(IEnumerable<UpsertRequest> requests);
11+
Task<Result<UpsertResponse, OrganizationResponseFaultedResult>> Upsert(UpsertRequest request);
812

913
}
1014
}

src/Dataverse.ConfigurationMigrationTool.Console/Dataverse.ConfigurationMigrationTool.Console/Services/Dataverse/ParallelismBulkOrganizationService.cs

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
using Dataverse.ConfigurationMigrationTool.Console.Services.Dataverse.Configuration;
1+
using Dataverse.ConfigurationMigrationTool.Console.Common;
2+
using Dataverse.ConfigurationMigrationTool.Console.Services.Dataverse.Configuration;
23
using Microsoft.Extensions.Logging;
34
using Microsoft.Extensions.Options;
45
using Microsoft.PowerPlatform.Dataverse.Client;
56
using Microsoft.Xrm.Sdk;
67
using Microsoft.Xrm.Sdk.Messages;
78
using System.Collections.Concurrent;
9+
using System.Diagnostics;
10+
using System.ServiceModel;
811

912
namespace Dataverse.ConfigurationMigrationTool.Console.Services.Dataverse
1013
{
@@ -28,7 +31,7 @@ public ParallelismBulkOrganizationService(ServiceClient serviceClient, IOptions<
2831
this.logger = logger;
2932
}
3033

31-
public async Task<IEnumerable<OrganizationResponseFaultedResult>> ExecuteBulk(IEnumerable<OrganizationRequest> request)
34+
public async Task<IEnumerable<OrganizationResponseFaultedResult>> ExecuteBulk(IEnumerable<OrganizationRequest> request, IEnumerable<string> faultToSkips = null)
3235
{
3336
var result = new ConcurrentBag<OrganizationResponseFaultedResult>();
3437
logger.LogInformation("{count} request(s) to execute.", request.Count());
@@ -44,7 +47,7 @@ await Parallel.ForEachAsync(
4447
parallelOptions: parallelOptions,
4548
async (batch, token) =>
4649
{
47-
50+
logger.LogInformation("Processing a batch of {count} requests, current threads({countThreads})", batch.Count(), Process.GetCurrentProcess().Threads.Count);
4851
var scopedService = _serviceClient.Clone();
4952
scopedService.EnableAffinityCookie = false;
5053
var requests = new OrganizationRequestCollection();
@@ -60,11 +63,12 @@ await Parallel.ForEachAsync(
6063
};
6164
logger.LogInformation("Starting a batch of {count} requests", requests.Count);
6265
var response = (await scopedService.ExecuteAsync(request)) as ExecuteMultipleResponse;
63-
if (response.IsFaulted)
66+
var faultedResponses = response.Responses
67+
.Where(r => r.Fault != null && (faultToSkips?.All(f => !r.Fault.Message.Contains(f)) ?? true))
68+
.Select(fr => new OrganizationResponseFaultedResult { Fault = fr.Fault, OriginalRequest = requests[fr.RequestIndex] }).ToArray();
69+
if (faultedResponses.Any())
6470
{
65-
var faultedResponses = response.Responses
66-
.Where(r => r.Fault != null)
67-
.Select(fr => new OrganizationResponseFaultedResult { Fault = fr.Fault, OriginalRequest = requests[fr.RequestIndex] }).ToArray();
71+
6872
logger.LogInformation("A Batch finished with {count} failures", faultedResponses.Length);
6973
foreach (var faultedResponse in faultedResponses)
7074
{
@@ -77,6 +81,107 @@ await Parallel.ForEachAsync(
7781

7882
}
7983

84+
public async Task<Result<UpsertResponse, OrganizationResponseFaultedResult>> Upsert(UpsertRequest request)
85+
{
86+
var setStateRquest = new UpdateRequest();
87+
if (request.Target.Attributes.ContainsKey("statecode") ||
88+
request.Target.Attributes.ContainsKey("statuscode"))
89+
{
90+
setStateRquest.Target = new Entity();
91+
if (request.Target.Attributes.ContainsKey("statecode"))
92+
{
93+
setStateRquest.Target.Attributes["statecode"] = request.Target.Attributes["statecode"];
94+
}
95+
if (request.Target.Attributes.ContainsKey("statuscode"))
96+
{
97+
setStateRquest.Target.Attributes["statuscode"] = request.Target.Attributes["statuscode"];
98+
}
99+
100+
request.Target.Attributes.Remove("statecode");
101+
request.Target.Attributes.Remove("statuscode");
102+
}
103+
try
104+
{
105+
var response = (await _serviceClient.ExecuteAsync(request)) as UpsertResponse;
106+
if (setStateRquest.Target != null)
107+
{
108+
setStateRquest.Target.LogicalName = response.Target.LogicalName;
109+
setStateRquest.Target.Id = response.Target.Id;
110+
await _serviceClient.ExecuteAsync(setStateRquest);
111+
}
112+
return new Result<UpsertResponse, OrganizationResponseFaultedResult>(response);
113+
}
114+
catch (FaultException<OrganizationServiceFault> faultEx)
115+
{
116+
return new Result<UpsertResponse, OrganizationResponseFaultedResult>(new OrganizationResponseFaultedResult { Fault = faultEx.Detail, OriginalRequest = request });
117+
}
118+
119+
}
80120

121+
public async Task<IEnumerable<OrganizationResponseFaultedResult>> UpsertBulk(IEnumerable<UpsertRequest> requests)
122+
{
123+
var result = new ConcurrentBag<OrganizationResponseFaultedResult>();
124+
var requestsCount = requests.Count();
125+
int processCount = 0;
126+
logger.LogInformation("{count} rows(s) to execute.", requestsCount);
127+
var parallelOptions = new ParallelOptions()
128+
{
129+
MaxDegreeOfParallelism = _options.MaxDegreeOfParallism < _serviceClient.RecommendedDegreesOfParallelism ?
130+
_options.MaxDegreeOfParallism : _serviceClient.RecommendedDegreesOfParallelism
131+
};
132+
var batches = requests.Batch(_options.BatchSize);
133+
await Parallel.ForEachAsync(source: batches, parallelOptions: parallelOptions,
134+
async (batch, token) =>
135+
{
136+
var scopedService = _serviceClient.Clone();
137+
138+
scopedService.EnableAffinityCookie = false;
139+
foreach (var request in batch)
140+
{
141+
var setStateRquest = new UpdateRequest();
142+
if (request.Target.Attributes.ContainsKey("statecode") ||
143+
request.Target.Attributes.ContainsKey("statuscode"))
144+
{
145+
setStateRquest.Target = new Entity();
146+
if (request.Target.Attributes.ContainsKey("statecode"))
147+
{
148+
setStateRquest.Target.Attributes["statecode"] = request.Target.Attributes["statecode"];
149+
}
150+
if (request.Target.Attributes.ContainsKey("statuscode"))
151+
{
152+
setStateRquest.Target.Attributes["statuscode"] = request.Target.Attributes["statuscode"];
153+
}
154+
155+
request.Target.Attributes.Remove("statecode");
156+
request.Target.Attributes.Remove("statuscode");
157+
}
158+
try
159+
{
160+
var response = (await scopedService.ExecuteAsync(request)) as UpsertResponse;
161+
if (setStateRquest.Target != null)
162+
{
163+
setStateRquest.Target.LogicalName = response.Target.LogicalName;
164+
setStateRquest.Target.Id = response.Target.Id;
165+
await scopedService.ExecuteAsync(setStateRquest);
166+
}
167+
168+
}
169+
catch (FaultException<OrganizationServiceFault> faultEx)
170+
{
171+
result.Add(new OrganizationResponseFaultedResult { Fault = faultEx.Detail, OriginalRequest = request });
172+
}
173+
finally
174+
{
175+
Interlocked.Increment(ref processCount);
176+
}
177+
}
178+
179+
180+
logger.LogInformation("Proccessed: {proccessedCount}/{requestCount}, Threads({threadcount})", processCount, requestsCount, Process.GetCurrentProcess().Threads.Count);
181+
182+
183+
});
184+
return result;
185+
}
81186
}
82187
}

src/Dataverse.ConfigurationMigrationTool.Console/Dataverse.ConfigurationMigrationTool.Console/appsettings.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
},
77
"Dataverse": {
88
"MaxThreadCount": 100,
9-
"MaxDegreeOfParallism": 20,
10-
"BatchSize": 20
9+
"MaxDegreeOfParallism": 10,
10+
"BatchSize": 10
1111
}
1212
}

0 commit comments

Comments
 (0)