@@ -96,21 +96,10 @@ private async Task<TaskResult> ImportRelationships(EntityMetadata entity, Import
96
96
}
97
97
private async Task < TaskResult > ImportRecords ( EntityMetadata entity , ImportDataTask task , EntityImport entityImport )
98
98
{
99
- logger . LogInformation ( "Importing {entityname} records" , entityImport . Name ) ;
100
- var recordsWithNoSelfDependancies = entityImport . Records . Record . Where ( r =>
101
- ! r . Field . Any ( f => f . Lookupentity == entityImport . Name &&
102
- entityImport . Records . Record . Any ( r2 => r2 . Id != r . Id && r2 . Id . ToString ( ) == f . Value ) ) ) . Select ( r => BuildUpsertRequest ( entity , entityImport , r ) ) . ToList ( ) ;
103
- var recordsWithSelfDependancies = entityImport . Records . Record . Where ( r =>
104
- r . Field . Any ( f => f . Lookupentity == entityImport . Name &&
105
- entityImport . Records . Record . Any ( r2 => r2 . Id != r . Id && r2 . Id . ToString ( ) == f . Value ) ) ) . ToList ( ) ;
99
+ logger . LogInformation ( "Importing {entityname} records ({count})" , entityImport . Name , entityImport . Records . Record . Count ) ;
106
100
107
- logger . LogInformation ( "records with no self dependancies: {count}" , recordsWithNoSelfDependancies . Count ) ;
108
- logger . LogInformation ( "records with self dependancies: {count}" , recordsWithSelfDependancies . Count ) ;
109
- //See if upsert request keep ids
110
-
111
- //implement parallelism and batching
112
- var responses = await bulkOrganizationService . UpsertBulk ( recordsWithNoSelfDependancies ) ;
113
101
102
+ var responses = await ProcessDependantRecords ( entityImport . Records . Record , entity , entityImport ) ;
114
103
foreach ( var response in responses )
115
104
{
116
105
@@ -122,66 +111,31 @@ private async Task<TaskResult> ImportRecords(EntityMetadata entity, ImportDataTa
122
111
123
112
}
124
113
var resultTask = responses . Any ( ) ? TaskResult . Failed : TaskResult . Completed ;
125
-
126
- var singleResponses = await ProcessDependantRecords ( recordsWithSelfDependancies , entity , entityImport ) ;
127
- foreach ( var response in singleResponses )
128
- {
129
-
130
- var targetRequest = ( response . OriginalRequest as UpsertRequest ) . Target ;
131
-
132
-
133
- logger . LogError ( "{logicalname}({id}) upsert failed because: {fault}" , targetRequest . LogicalName , targetRequest . Id , response . Fault . Message ) ;
134
-
135
-
136
- }
137
- resultTask = singleResponses . Any ( ) ? TaskResult . Failed : resultTask ;
138
114
logger . LogInformation ( "Import Task of {entityname} records terminated in a {State} state" , entityImport . Name , resultTask ) ;
139
115
return resultTask ;
140
116
141
117
}
142
118
private async Task < IEnumerable < OrganizationResponseFaultedResult > > ProcessDependantRecords ( IEnumerable < Record > records , EntityMetadata entity , EntityImport entityImport )
143
119
{
144
120
145
- var retries = new Dictionary < Guid , int > ( ) ;
146
- var queue = new Queue < Record > ( records ) ;
147
121
var results = new List < OrganizationResponseFaultedResult > ( ) ;
148
- while ( queue . Count > 0 )
122
+ var recordsCanBeProcessed = records . Where ( r => ! r . Field . Any ( f => f . Lookupentity == entityImport . Name &&
123
+ records . Any ( r2 => r2 . Id != r . Id && r2 . Id . ToString ( ) == f . Value ) ) ) . Select ( r => BuildUpsertRequest ( entity , entityImport , r ) ) . ToList ( ) ;
124
+ logger . LogInformation ( "Processing {count} records" , recordsCanBeProcessed . Count ) ;
125
+ if ( recordsCanBeProcessed . Count == 0 )
149
126
{
150
- var record = queue . Dequeue ( ) ;
151
-
152
- if ( record . Field . Any ( f => f . Lookupentity == entityImport . Name && ( queue . Any ( r => r . Id . ToString ( ) == f . Value ) ||
153
- retries . Any ( kv => kv . Key . ToString ( ) == f . Value && kv . Value >= MAX_RETRIES ) ) ) )
127
+ if ( records . Any ( ) )
154
128
{
155
-
156
- if ( retries . ContainsKey ( record . Id ) && retries [ record . Id ] >= MAX_RETRIES )
157
- {
158
- logger . LogWarning ( "{entityType}({id}) was skipped because his parent was not proccessed." , entityImport . Name , record . Id ) ;
159
- continue ;
160
- }
161
-
162
-
163
- //Enqueue record again until his parent is processed.
164
- queue . Enqueue ( record ) ;
165
- retries [ record . Id ] = retries . ContainsKey ( record . Id ) ? retries [ record . Id ] + 1 : 1 ;
166
- continue ;
129
+ logger . LogWarning ( "{count} records skipped because of circular dependancies." , records . Count ( ) ) ;
167
130
}
168
- var request = BuildUpsertRequest ( entity , entityImport , record ) ;
169
-
170
- var result = await bulkOrganizationService . Upsert ( request ) ;
171
- if ( result . IsFailure )
172
- {
173
- results . Add ( result . Failure ) ;
174
- }
175
-
176
- }
177
- var maxretries = retries . Where ( kv => kv . Value >= MAX_RETRIES ) . Select ( kv => kv . Key ) . ToList ( ) ;
178
- if ( maxretries . Any ( ) )
179
- {
180
- logger . LogWarning ( "The following records ({count}) were not processed due to circular dependencies: {ids}" , maxretries . Count , string . Join ( ", " , maxretries ) ) ;
131
+ return results ;
181
132
}
133
+ var responses = await bulkOrganizationService . UpsertBulk ( recordsCanBeProcessed ) ;
134
+ results . AddRange ( responses ) ;
182
135
136
+ responses = await ProcessDependantRecords ( records . Where ( r => ! recordsCanBeProcessed . Any ( r2 => r . Id == r2 . Target . Id ) ) , entity , entityImport ) ;
137
+ results . AddRange ( responses ) ;
183
138
return results ;
184
-
185
139
}
186
140
private UpsertRequest BuildUpsertRequest ( EntityMetadata entityMD , EntityImport entityImport , Record record )
187
141
{
0 commit comments