Skip to content

Commit de7779b

Browse files
committed
Refactor user insert logic into UserInsertProcessor class
Move two-phase user insert (bulk + enrich) from UserMetadataUpdater to new UserInsertProcessor class for better separation of concerns. Update UserMetadataUpdater to delegate to the new class. Add UserInsertProcessor.cs to project. Improves maintainability, testability, and code organization.
1 parent 4243006 commit de7779b

File tree

3 files changed

+287
-222
lines changed

3 files changed

+287
-222
lines changed
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
using Common.Entities;
2+
using DataUtils;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Data;
6+
using System.Data.Entity;
7+
using System.Data.SqlClient;
8+
using System.Diagnostics;
9+
using System.Linq;
10+
using System.Threading.Tasks;
11+
12+
namespace WebJob.Office365ActivityImporter.Engine.Graph
13+
{
14+
/// <summary>
15+
/// Handles inserting new users into the database via two-phase approach:
16+
/// fast bulk insert (SqlBulkCopy), then metadata enrichment in batches.
17+
/// </summary>
18+
internal class UserInsertProcessor
19+
{
20+
private readonly AnalyticsLogger _telemetry;
21+
private readonly UserBatchProcessor _batchProcessor;
22+
private const int BULK_INSERT_BATCH_SIZE = 10000;
23+
private const int METADATA_BATCH_SIZE = 500;
24+
25+
public UserInsertProcessor(AnalyticsLogger telemetry, UserBatchProcessor batchProcessor)
26+
{
27+
_telemetry = telemetry ?? throw new ArgumentNullException(nameof(telemetry));
28+
_batchProcessor = batchProcessor ?? throw new ArgumentNullException(nameof(batchProcessor));
29+
}
30+
31+
/// <summary>
32+
/// Inserts missing users into DB using two-phase approach: fast bulk insert, then metadata enrichment
33+
/// </summary>
34+
public async Task<List<Common.Entities.User>> InsertMissingUsers(
35+
AnalyticsEntitiesContext db,
36+
List<GraphUser> allGraphUsers,
37+
List<Common.Entities.User> graphMentionedDbUsers,
38+
bool readUserSkus,
39+
UserMetadataCache userMetaCache,
40+
UserDataMapper dataMapper,
41+
UserLicenseProcessor licenseProcessor,
42+
Func<AnalyticsEntitiesContext, GraphUser, List<GraphUser>, List<Common.Entities.User>, Common.Entities.User, bool, Dictionary<string, Common.Entities.User>, Task> updateAction)
43+
{
44+
_telemetry.LogInformation($"User import - Inserting missing users (two-phase: bulk insert + metadata enrichment)...");
45+
46+
// Create HashSet for O(1) lookup of existing DB users
47+
var existingUpns = new HashSet<string>(
48+
graphMentionedDbUsers.Select(u => u.UserPrincipalName?.ToLower()).Where(upn => !string.IsNullOrEmpty(upn)),
49+
StringComparer.OrdinalIgnoreCase);
50+
51+
// Build list of users to insert - optimized with HashSet lookup
52+
var usersToInsert = new List<GraphUser>();
53+
foreach (var graphUser in allGraphUsers)
54+
{
55+
var upn = graphUser.UserPrincipalName?.ToLower();
56+
if (!string.IsNullOrEmpty(upn) && !existingUpns.Contains(upn))
57+
{
58+
usersToInsert.Add(graphUser);
59+
existingUpns.Add(upn); // Prevent duplicate UPNs from Graph
60+
}
61+
}
62+
63+
_telemetry.LogInformation($"User import - Found {usersToInsert.Count.ToString("N0")} new users to insert");
64+
65+
if (usersToInsert.Count == 0)
66+
{
67+
return new List<Common.Entities.User>();
68+
}
69+
70+
// PHASE 1: Fast bulk insert with minimal data
71+
_telemetry.LogInformation($"User import - Phase 1: Starting bulk insert of {usersToInsert.Count.ToString("N0")} users...");
72+
await BulkInsertUsers(db, usersToInsert, BULK_INSERT_BATCH_SIZE);
73+
_telemetry.LogInformation($"User import - Phase 1: Bulk insert completed");
74+
75+
// PHASE 2: Load inserted users and enrich with metadata
76+
_telemetry.LogInformation($"User import - Phase 2: Starting metadata enrichment...");
77+
var insertedUserUpns = usersToInsert.Select(u => u.UserPrincipalName.ToLower()).ToList();
78+
var insertedDbUsers = await EnrichInsertedUsersWithMetadata(
79+
db,
80+
allGraphUsers,
81+
graphMentionedDbUsers,
82+
insertedUserUpns,
83+
readUserSkus,
84+
METADATA_BATCH_SIZE,
85+
userMetaCache,
86+
updateAction);
87+
88+
_telemetry.LogInformation($"User import - Phase 2: Metadata enrichment completed for {insertedDbUsers.Count.ToString("N0")} users");
89+
90+
// Cleanup
91+
existingUpns.Clear();
92+
usersToInsert.Clear();
93+
insertedUserUpns.Clear();
94+
95+
_telemetry.LogInformation($"User import - Completed inserting and enriching {insertedDbUsers.Count.ToString("N0")} new users");
96+
97+
return insertedDbUsers;
98+
}
99+
100+
/// <summary>
101+
/// Phase 1: Uses SqlBulkCopy for fast bulk insert of minimal user data
102+
/// </summary>
103+
private async Task BulkInsertUsers(AnalyticsEntitiesContext db, List<GraphUser> graphUsers, int batchSize)
104+
{
105+
var connectionString = db.Database.Connection.ConnectionString;
106+
var totalInserted = 0;
107+
108+
// Process in batches to manage memory
109+
for (int batchStart = 0; batchStart < graphUsers.Count; batchStart += batchSize)
110+
{
111+
var batch = graphUsers.Skip(batchStart).Take(batchSize).ToList();
112+
var dataTable = CreateUserDataTable(batch);
113+
114+
using (var bulkCopy = new SqlBulkCopy(connectionString))
115+
{
116+
bulkCopy.DestinationTableName = "dbo.users";
117+
bulkCopy.BatchSize = batchSize;
118+
bulkCopy.BulkCopyTimeout = 600; // 10 minutes
119+
120+
// Map only columns that exist in both GraphUser and the User table
121+
bulkCopy.ColumnMappings.Add("UserPrincipalName", "user_name");
122+
bulkCopy.ColumnMappings.Add("AzureAdId", "azure_ad_id");
123+
bulkCopy.ColumnMappings.Add("AccountEnabled", "account_enabled");
124+
bulkCopy.ColumnMappings.Add("Mail", "mail");
125+
bulkCopy.ColumnMappings.Add("PostalCode", "postalcode");
126+
127+
await bulkCopy.WriteToServerAsync(dataTable);
128+
}
129+
130+
totalInserted += batch.Count;
131+
_telemetry.LogInformation($"User import - Bulk inserted {totalInserted.ToString("N0")}/{graphUsers.Count.ToString("N0")} users to SQL");
132+
133+
dataTable.Clear();
134+
dataTable.Dispose();
135+
}
136+
}
137+
138+
/// <summary>
139+
/// Creates a DataTable from GraphUser list with minimal essential columns for bulk insert
140+
/// </summary>
141+
private DataTable CreateUserDataTable(List<GraphUser> graphUsers)
142+
{
143+
var dataTable = new DataTable();
144+
145+
// Add only columns that exist in both GraphUser and the User database table
146+
dataTable.Columns.Add("UserPrincipalName", typeof(string));
147+
dataTable.Columns.Add("AzureAdId", typeof(string));
148+
dataTable.Columns.Add("AccountEnabled", typeof(bool));
149+
dataTable.Columns.Add("Mail", typeof(string));
150+
dataTable.Columns.Add("PostalCode", typeof(string));
151+
152+
// Populate rows
153+
foreach (var graphUser in graphUsers)
154+
{
155+
var row = dataTable.NewRow();
156+
row["UserPrincipalName"] = graphUser.UserPrincipalName ?? (object)DBNull.Value;
157+
row["AzureAdId"] = graphUser.Id ?? (object)DBNull.Value;
158+
row["AccountEnabled"] = graphUser.AccountEnabled ?? false;
159+
row["Mail"] = graphUser.Mail ?? (object)DBNull.Value;
160+
row["PostalCode"] = graphUser.PostalCode ?? (object)DBNull.Value;
161+
162+
dataTable.Rows.Add(row);
163+
}
164+
165+
return dataTable;
166+
}
167+
168+
/// <summary>
169+
/// Phase 2: Loads newly inserted users and enriches them with metadata (managers, licenses, etc.)
170+
/// </summary>
171+
private async Task<List<Common.Entities.User>> EnrichInsertedUsersWithMetadata(
172+
AnalyticsEntitiesContext db,
173+
List<GraphUser> allGraphUsers,
174+
List<Common.Entities.User> graphMentionedDbUsers,
175+
List<string> insertedUserUpns,
176+
bool readUserSkus,
177+
int batchSize,
178+
UserMetadataCache userMetaCache,
179+
Func<AnalyticsEntitiesContext, GraphUser, List<GraphUser>, List<Common.Entities.User>, Common.Entities.User, bool, Dictionary<string, Common.Entities.User>, Task> updateAction)
180+
{
181+
var enrichedUsers = new List<Common.Entities.User>(insertedUserUpns.Count);
182+
183+
// Create dictionary for fast Graph user lookup - pre-allocate capacity
184+
var graphUsersByUpn = new Dictionary<string, GraphUser>(allGraphUsers.Count, StringComparer.OrdinalIgnoreCase);
185+
foreach (var graphUser in allGraphUsers)
186+
{
187+
if (!string.IsNullOrEmpty(graphUser.UserPrincipalName))
188+
{
189+
graphUsersByUpn[graphUser.UserPrincipalName] = graphUser;
190+
}
191+
}
192+
193+
// Build DB user dictionary for manager resolution from existing users
194+
// (newly inserted users added incrementally as each batch is loaded with tracking;
195+
// cross-batch managers resolved via DB fallback in UpdateUserManager)
196+
var dbUsersByAadId = new Dictionary<string, Common.Entities.User>(
197+
graphMentionedDbUsers.Count, StringComparer.OrdinalIgnoreCase);
198+
199+
foreach (var user in graphMentionedDbUsers)
200+
{
201+
if (!string.IsNullOrEmpty(user.AzureAdId) && !dbUsersByAadId.ContainsKey(user.AzureAdId))
202+
{
203+
dbUsersByAadId[user.AzureAdId] = user;
204+
}
205+
}
206+
207+
// Process in batches - use GetRange for O(1) extraction
208+
var enrichSw = Stopwatch.StartNew();
209+
for (int batchStart = 0; batchStart < insertedUserUpns.Count; batchStart += batchSize)
210+
{
211+
var batchCount = Math.Min(batchSize, insertedUserUpns.Count - batchStart);
212+
var batchUpns = insertedUserUpns.GetRange(batchStart, batchCount);
213+
214+
// Load batch of newly inserted users from database WITH TRACKING for updates
215+
var batchUsers = await db.users
216+
.Where(u => batchUpns.Contains(u.UserPrincipalName.ToLower()))
217+
.Include(u => u.LicenseLookups)
218+
.ToListAsync();
219+
220+
// Update dbUsersByAadId with TRACKED entities from this batch
221+
foreach (var trackedUser in batchUsers)
222+
{
223+
if (!string.IsNullOrEmpty(trackedUser.AzureAdId))
224+
{
225+
dbUsersByAadId[trackedUser.AzureAdId] = trackedUser;
226+
}
227+
}
228+
229+
// Pre-populate cache with tracked entities from this batch to prevent duplicate inserts
230+
foreach (var trackedUser in batchUsers)
231+
{
232+
var upnLower = trackedUser.UserPrincipalName?.ToLower();
233+
if (!string.IsNullOrEmpty(upnLower))
234+
{
235+
await userMetaCache.UserCache.GetOrCreateNewResource(upnLower, trackedUser);
236+
}
237+
}
238+
239+
// Update metadata for each user
240+
foreach (var dbUser in batchUsers)
241+
{
242+
var upnLower = dbUser.UserPrincipalName?.ToLower();
243+
if (!string.IsNullOrEmpty(upnLower) && graphUsersByUpn.TryGetValue(upnLower, out var graphUser))
244+
{
245+
await updateAction(db, graphUser, allGraphUsers, new List<Common.Entities.User>(), dbUser, readUserSkus, dbUsersByAadId);
246+
}
247+
}
248+
249+
// Save batch
250+
db.ChangeTracker.DetectChanges();
251+
await db.SaveChangesAsync();
252+
253+
enrichedUsers.AddRange(batchUsers);
254+
var percentDone = (double)enrichedUsers.Count / insertedUserUpns.Count * 100;
255+
var elapsedMs = enrichSw.ElapsedMilliseconds;
256+
var estimatedTotalMs = elapsedMs / percentDone * 100;
257+
var remainingMs = estimatedTotalMs - elapsedMs;
258+
var remaining = TimeSpan.FromMilliseconds(remainingMs);
259+
_telemetry.LogInformation($"User import - Enriched metadata for {enrichedUsers.Count.ToString("N0")}/{insertedUserUpns.Count.ToString("N0")} users ({percentDone:F1}% done, estimated {remaining.Hours}h {remaining.Minutes}m {remaining.Seconds}s remaining)");
260+
261+
// Clear change tracker to free memory after each batch
262+
_batchProcessor.DetachAllEntitiesExceptLookups(db);
263+
}
264+
265+
// Cleanup
266+
graphUsersByUpn.Clear();
267+
dbUsersByAadId.Clear();
268+
269+
return enrichedUsers;
270+
}
271+
}
272+
}

0 commit comments

Comments
 (0)