Skip to content

Commit d93273e

Browse files
authored
Refactor/move UserMerger utility into admin/tool (#8154)
1 parent d1a95c9 commit d93273e

File tree

3 files changed

+251
-260
lines changed

3 files changed

+251
-260
lines changed

app/lib/admin/tools/user_merger.dart

Lines changed: 249 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@
33
// BSD-style license that can be found in the LICENSE file.
44

55
import 'package:args/args.dart';
6-
6+
import 'package:logging/logging.dart';
7+
import 'package:pool/pool.dart';
8+
import 'package:pub_dev/account/backend.dart';
9+
import 'package:pub_dev/account/models.dart';
10+
import 'package:pub_dev/audit/models.dart';
11+
import 'package:pub_dev/package/models.dart';
12+
import 'package:pub_dev/publisher/models.dart';
713
import 'package:pub_dev/shared/datastore.dart';
814
import 'package:pub_dev/shared/exceptions.dart';
9-
import 'package:pub_dev/shared/user_merger.dart';
15+
16+
final _logger = Logger('user_merger');
1017

1118
final _argParser = ArgParser()
1219
..addOption('concurrency',
@@ -49,3 +56,243 @@ Future<String> executeUserMergerTool(List<String> args) async {
4956
return 'Fixed $count `User` entities.';
5057
}
5158
}
59+
60+
/// Utility class to merge user data.
61+
/// Specifically for the case where a two [User] entities exists with the same [User.oauthUserId].
62+
class UserMerger {
63+
final DatastoreDB _db;
64+
final int? _concurrency;
65+
final bool _omitEmailCheck;
66+
67+
UserMerger({
68+
required DatastoreDB db,
69+
int? concurrency = 1,
70+
bool? omitEmailCheck,
71+
}) : _db = db,
72+
_concurrency = concurrency,
73+
_omitEmailCheck = omitEmailCheck ?? false;
74+
75+
/// Fixes all OAuthUserID issues.
76+
Future<int> fixAll() async {
77+
final ids = await scanOauthUserIdsWithProblems();
78+
for (final id in ids) {
79+
await fixOAuthUserID(id);
80+
}
81+
return ids.length;
82+
}
83+
84+
/// Returns the OAuth userIds that have more than one User.
85+
Future<List<String>> scanOauthUserIdsWithProblems() async {
86+
_logger.info('Scanning Users...');
87+
final query = _db.query<User>();
88+
final counts = <String, int>{};
89+
await for (final user in query.run()) {
90+
if (user.oauthUserId == null) continue;
91+
counts[user.oauthUserId!] = (counts[user.oauthUserId!] ?? 0) + 1;
92+
}
93+
final result = counts.keys.where((k) => counts[k]! > 1).toList();
94+
_logger.info('$result OAuthUserID with more than one User.');
95+
return result;
96+
}
97+
98+
/// Runs user merging on the [oauthUserId] for each non-primary [User].
99+
Future<void> fixOAuthUserID(String oauthUserId) async {
100+
_logger.info('Fixing OAuthUserID=$oauthUserId');
101+
102+
final query = _db.query<User>()..filter('oauthUserId =', oauthUserId);
103+
final users = await query.run().toList();
104+
_logger.info('Users: ${users.map((u) => u.userId).join(', ')}');
105+
106+
final mapping = await _db.lookupValue<OAuthUserID>(
107+
_db.emptyKey.append(OAuthUserID, id: oauthUserId));
108+
_logger.info('Primary User: ${mapping.userId}');
109+
if (!users.any((u) => u.userId == mapping.userId)) {
110+
throw StateError('Primary User is missing!');
111+
}
112+
113+
// WARNING
114+
//
115+
// We only update user ids, we do not change e-mails.
116+
// The tool will NOT merge Users with non-matching e-mail addresses.
117+
if (!_omitEmailCheck) {
118+
for (int i = 1; i < users.length; i++) {
119+
if (users[0].email != users[i].email) {
120+
throw StateError(
121+
'User e-mail does not match: ${users[0].email} != ${users[i].email}');
122+
}
123+
}
124+
}
125+
126+
for (final user in users) {
127+
if (user.userId == mapping.userId) continue;
128+
await mergeUser(user.userId, mapping.userId);
129+
}
130+
}
131+
132+
/// Migrates data for User merge.
133+
Future<void> mergeUser(String fromUserId, String toUserId) async {
134+
_logger.info('Merging User: $fromUserId -> $toUserId');
135+
final fromUserKey = _db.emptyKey.append(User, id: fromUserId);
136+
final toUserKey = _db.emptyKey.append(User, id: toUserId);
137+
final fromUser = await _db.lookupOrNull<User>(fromUserKey);
138+
InvalidInputException.checkNotNull(fromUser, 'fromUser');
139+
final toUser = await _db.lookupOrNull<User>(toUserKey);
140+
InvalidInputException.checkNotNull(toUser, 'toUser');
141+
final fromUserMapping = fromUser!.oauthUserId == null
142+
? null
143+
: await _db.lookupOrNull<OAuthUserID>(
144+
_db.emptyKey.append(OAuthUserID, id: fromUser.oauthUserId));
145+
final toUserMapping = toUser!.oauthUserId == null
146+
? null
147+
: await _db.lookupOrNull<OAuthUserID>(
148+
_db.emptyKey.append(OAuthUserID, id: toUser.oauthUserId));
149+
150+
// Package
151+
await _processConcurrently(
152+
_db.query<Package>()..filter('uploaders =', fromUserId),
153+
(Package m) async {
154+
await withRetryTransaction(_db, (tx) async {
155+
final p = await tx.lookupValue<Package>(m.key);
156+
if (p.containsUploader(fromUserId)) {
157+
p.removeUploader(fromUserId);
158+
p.addUploader(toUserId);
159+
tx.insert(p);
160+
}
161+
});
162+
},
163+
);
164+
165+
// PackageVersion
166+
await _processConcurrently(
167+
_db.query<PackageVersion>()..filter('uploader =', fromUserId),
168+
(PackageVersion m) async {
169+
await withRetryTransaction(_db, (tx) async {
170+
final pv = await tx.lookupValue<PackageVersion>(m.key);
171+
if (pv.uploader == fromUserId) {
172+
pv.uploader = toUserId;
173+
tx.insert(pv);
174+
}
175+
});
176+
},
177+
);
178+
179+
// Like
180+
await _processConcurrently(
181+
_db.query<Like>(ancestorKey: fromUserKey),
182+
(Like like) async {
183+
await withRetryTransaction(_db, (tx) async {
184+
tx.queueMutations(
185+
inserts: [like.changeParentUser(toUserKey)],
186+
deletes: [like.key],
187+
);
188+
});
189+
},
190+
);
191+
192+
// UserSession
193+
await _processConcurrently(
194+
_db.query<UserSession>()..filter('userId =', fromUserId),
195+
(UserSession m) async {
196+
await withRetryTransaction(_db, (tx) async {
197+
final session = await tx.lookupValue<UserSession>(m.key);
198+
if (session.userId == fromUserId) {
199+
session.userId = toUserId;
200+
tx.insert(session);
201+
}
202+
});
203+
},
204+
);
205+
206+
// Consent's fromUserId attribute
207+
await _processConcurrently(
208+
_db.query<Consent>()..filter('fromAgent =', fromUserId),
209+
(Consent m) async {
210+
if (m.parentKey?.id != null) {
211+
throw StateError('Old Consent entity: ${m.consentId}.');
212+
}
213+
await withRetryTransaction(_db, (tx) async {
214+
final consent = await tx.lookupValue<Consent>(m.key);
215+
if (consent.fromAgent == fromUserId) {
216+
consent.fromAgent = toUserId;
217+
tx.insert(consent);
218+
}
219+
});
220+
},
221+
);
222+
223+
// PublisherMember
224+
await _processConcurrently(
225+
_db.query<PublisherMember>()..filter('userId =', fromUserId),
226+
(PublisherMember m) async {
227+
await withRetryTransaction(_db, (tx) async {
228+
tx.queueMutations(
229+
inserts: [m.changeParentUserId(toUserId)],
230+
deletes: [m.key],
231+
);
232+
});
233+
},
234+
);
235+
236+
// AuditLogRecord: agent
237+
await _processConcurrently(
238+
_db.query<AuditLogRecord>()..filter('agent =', fromUserId),
239+
(AuditLogRecord alr) async {
240+
await withRetryTransaction(_db, (tx) async {
241+
final r = await _db.lookupValue<AuditLogRecord>(alr.key);
242+
r.agent = toUserId;
243+
r.data = r.data?.map((key, value) => MapEntry<String, dynamic>(
244+
key, value == fromUserId ? toUserId : value));
245+
tx.insert(r);
246+
});
247+
});
248+
249+
// AuditLogRecord: users
250+
await _processConcurrently(
251+
_db.query<AuditLogRecord>()..filter('users =', fromUserId),
252+
(AuditLogRecord alr) async {
253+
await withRetryTransaction(_db, (tx) async {
254+
final r = await _db.lookupValue<AuditLogRecord>(alr.key);
255+
r.users!.remove(fromUserId);
256+
r.users!.add(toUserId);
257+
r.data = r.data?.map(
258+
(key, value) => MapEntry<String, dynamic>(
259+
key, value == fromUserId ? toUserId : value),
260+
);
261+
tx.insert(r);
262+
});
263+
});
264+
265+
await withRetryTransaction(_db, (tx) async {
266+
final u = await _db.lookupValue<User>(toUserKey);
267+
if (toUser.created!.isAfter(fromUser.created!)) {
268+
u.created = fromUser.created;
269+
}
270+
if (toUserMapping == null) {
271+
u.oauthUserId = null;
272+
}
273+
if (fromUserMapping?.userId == toUserId) {
274+
u.oauthUserId = fromUserMapping!.oauthUserId;
275+
}
276+
tx.insert(u);
277+
tx.delete(fromUserKey);
278+
if (fromUserMapping?.userId == fromUserId) {
279+
tx.delete(fromUserMapping!.key);
280+
}
281+
});
282+
283+
await purgeAccountCache(userId: fromUserId);
284+
await purgeAccountCache(userId: toUserId);
285+
}
286+
287+
Future<void> _processConcurrently<T extends Model>(
288+
Query<T> query, Future<void> Function(T) fn) async {
289+
final pool = Pool(_concurrency!);
290+
final futures = <Future>[];
291+
await for (final m in query.run()) {
292+
final f = pool.withResource(() => fn(m));
293+
futures.add(f);
294+
}
295+
await Future.wait(futures);
296+
await pool.close();
297+
}
298+
}

0 commit comments

Comments
 (0)