Skip to content

Commit 2496a96

Browse files
authored
fix: bulk migration stuck in processing (#1155)
* fix: additional error handling for email verification when bulk migrating * fix: proper checking of batchupdateexception * chore: changelog and build version update * fix: change test data generation * fix: clear up debug messages * fix: fix tests
1 parent e76fbbc commit 2496a96

File tree

8 files changed

+237
-41
lines changed

8 files changed

+237
-41
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## [Unreleased]
99

10+
## [11.0.3]
11+
12+
- Fixes BatchUpdateException checks and error handling to prevent bulk import users stuck in `PROCESSING` state
13+
- Adds more DEBUG logging to the bulk import users process
14+
1015
## [11.0.2]
1116

1217
- Fixes `AuthRecipe#getUserByAccountInfo` to consider the tenantId instead of the appId when fetching the webauthn user

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ compileTestJava { options.encoding = "UTF-8" }
2020
// }
2121
//}
2222

23-
version = "11.0.2"
23+
version = "11.0.3"
2424

2525
repositories {
2626
mavenCentral()

src/main/java/io/supertokens/bulkimport/BulkImport.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.supertokens.multitenancy.exception.AnotherPrimaryUserWithEmailAlreadyExistsException;
3333
import io.supertokens.multitenancy.exception.AnotherPrimaryUserWithPhoneNumberAlreadyExistsException;
3434
import io.supertokens.multitenancy.exception.AnotherPrimaryUserWithThirdPartyInfoAlreadyExistsException;
35+
import io.supertokens.output.Logging;
3536
import io.supertokens.passwordless.Passwordless;
3637
import io.supertokens.pluginInterface.Storage;
3738
import io.supertokens.pluginInterface.StorageUtils;
@@ -209,13 +210,28 @@ public static void processUsersImportSteps(Main main, AppIdentifier appIdentifie
209210
Storage bulkImportProxyStorage, List<BulkImportUser> users, Storage[] allStoragesForApp)
210211
throws StorageTransactionLogicException {
211212
try {
213+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing login methods..");
212214
processUsersLoginMethods(main, appIdentifier, bulkImportProxyStorage, users);
215+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing login methods DONE");
216+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating Primary users and linking accounts..");
213217
createPrimaryUsersAndLinkAccounts(main, appIdentifier, bulkImportProxyStorage, users);
218+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating Primary users and linking accounts DONE");
219+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating user id mappings..");
214220
createMultipleUserIdMapping(appIdentifier, users, allStoragesForApp);
221+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating user id mappings DONE");
222+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Verifying email addresses..");
215223
verifyMultipleEmailForAllLoginMethods(appIdentifier, bulkImportProxyStorage, users);
224+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Verifying email addresses DONE");
225+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating TOTP devices..");
216226
createMultipleTotpDevices(main, appIdentifier, bulkImportProxyStorage, users);
227+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating TOTP devices DONE");
228+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating user metadata..");
217229
createMultipleUserMetadata(appIdentifier, bulkImportProxyStorage, users);
230+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating user metadata DONE");
231+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating user roles..");
218232
createMultipleUserRoles(main, appIdentifier, bulkImportProxyStorage, users);
233+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Creating user roles DONE");
234+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Effective processUsersImportSteps DONE");
219235
} catch ( StorageQueryException | FeatureNotEnabledException |
220236
TenantOrAppNotFoundException e) {
221237
throw new StorageTransactionLogicException(e);
@@ -225,6 +241,7 @@ public static void processUsersImportSteps(Main main, AppIdentifier appIdentifie
225241
public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifier, Storage storage,
226242
List<BulkImportUser> users) throws StorageTransactionLogicException {
227243
//sort login methods together
244+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Sorting login methods by recipeId..");
228245
Map<String, List<LoginMethod>> sortedLoginMethods = new HashMap<>();
229246
for (BulkImportUser user: users) {
230247
for(LoginMethod loginMethod : user.loginMethods){
@@ -236,19 +253,25 @@ public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifi
236253
}
237254

238255
List<ImportUserBase> importedUsers = new ArrayList<>();
239-
if (sortedLoginMethods.containsKey("emailpassword")) {
240-
importedUsers.addAll(
241-
processEmailPasswordLoginMethods(main, storage, sortedLoginMethods.get("emailpassword"),
256+
if (sortedLoginMethods.containsKey("emailpassword")) {
257+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing emailpassword login methods..");
258+
importedUsers.addAll(
259+
processEmailPasswordLoginMethods(main, storage, sortedLoginMethods.get("emailpassword"),
242260
appIdentifier));
261+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing emailpassword login methods DONE");
243262
}
244263
if (sortedLoginMethods.containsKey("thirdparty")) {
264+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing thirdparty login methods..");
245265
importedUsers.addAll(
246266
processThirdpartyLoginMethods(main, storage, sortedLoginMethods.get("thirdparty"),
247267
appIdentifier));
268+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing thirdparty login methods DONE");
248269
}
249270
if (sortedLoginMethods.containsKey("passwordless")) {
271+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing passwordless login methods..");
250272
importedUsers.addAll(processPasswordlessLoginMethods(main, appIdentifier, storage,
251273
sortedLoginMethods.get("passwordless")));
274+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "Processing passwordless login methods DONE");
252275
}
253276
Set<String> actualKeys = new HashSet<>(sortedLoginMethods.keySet());
254277
List.of("emailpassword", "thirdparty", "passwordless").forEach(actualKeys::remove);
@@ -288,9 +311,9 @@ private static List<? extends ImportUserBase> processPasswordlessLoginMethods(Ma
288311
}
289312

290313
Passwordless.createPasswordlessUsers(storage, usersToImport);
291-
292314
return usersToImport;
293315
} catch (StorageQueryException | StorageTransactionLogicException e) {
316+
Logging.debug(main, TenantIdentifier.BASE_TENANT, "exception: " + e.getMessage());
294317
if (e.getCause() instanceof BulkImportBatchInsertException) {
295318
Map<String, Exception> errorsByPosition = ((BulkImportBatchInsertException) e.getCause()).exceptionByUserId;
296319
for (String userid : errorsByPosition.keySet()) {
@@ -683,12 +706,31 @@ public static void verifyMultipleEmailForAllLoginMethods(AppIdentifier appIdenti
683706
Map<String, String> emailToUserId = collectVerifiedEmailAddressesByUserIds(users);
684707
try {
685708
verifyCollectedEmailAddressesForUsers(appIdentifier, storage, emailToUserId);
686-
} catch (StorageQueryException e) {
709+
} catch (StorageQueryException | StorageTransactionLogicException e) {
710+
if (e.getCause() instanceof BulkImportBatchInsertException) {
711+
Map<String, Exception> errorsByPosition =
712+
((BulkImportBatchInsertException) e.getCause()).exceptionByUserId;
713+
for (String userid : errorsByPosition.keySet()) {
714+
Exception exception = errorsByPosition.get(userid);
715+
if (exception instanceof DuplicateEmailException) {
716+
String message =
717+
"E043: Email " + errorsByPosition.get(userid) + " is already verified for the user";
718+
errorsByPosition.put(userid, new Exception(message));
719+
} else if (exception instanceof NullPointerException) {
720+
String message = "E044: null email address was found for the userId " + userid +
721+
" while verifying the email";
722+
errorsByPosition.put(userid, new Exception(message));
723+
}
724+
}
725+
throw new StorageTransactionLogicException(
726+
new BulkImportBatchInsertException("translated", errorsByPosition));
727+
}
687728
throw new StorageTransactionLogicException(e);
688729
}
689730
}
690731

691-
private static void verifyCollectedEmailAddressesForUsers(AppIdentifier appIdentifier, Storage storage, Map<String, String> emailToUserId)
732+
private static void verifyCollectedEmailAddressesForUsers(AppIdentifier appIdentifier, Storage storage,
733+
Map<String, String> emailToUserId)
692734
throws StorageQueryException, StorageTransactionLogicException {
693735
if(!emailToUserId.isEmpty()) {
694736
EmailVerificationSQLStorage emailVerificationSQLStorage = StorageUtils
@@ -706,10 +748,11 @@ private static void verifyCollectedEmailAddressesForUsers(AppIdentifier appIdent
706748

707749
@NotNull
708750
private static Map<String, String> collectVerifiedEmailAddressesByUserIds(List<BulkImportUser> users) {
709-
Map<String, String> emailToUserId = new HashMap<>();
751+
Map<String, String> emailToUserId = new LinkedHashMap<>();
710752
for (BulkImportUser user : users) {
711753
for (LoginMethod lm : user.loginMethods) {
712-
if(lm.isVerified) {
754+
//we skip passwordless` 'null' email addresses
755+
if (lm.isVerified && !(lm.recipeId.equals("passwordless") && lm.email == null)) {
713756
//collect the verified email addresses for the userId
714757
emailToUserId.put(lm.getSuperTokenOrExternalUserId(), lm.email);
715758
}

src/main/java/io/supertokens/cronjobs/bulkimport/ProcessBulkImportUsers.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ protected void doTaskPerApp(AppIdentifier app)
109109
}
110110

111111
List<List<BulkImportUser>> loadedUsersChunks = makeChunksOf(users, numberOfBatchChunks);
112+
for (List<BulkImportUser> chunk : loadedUsersChunks) {
113+
Logging.debug(main, app.getAsPublicTenantIdentifier(), "Chunk size: " + chunk.size());
114+
}
112115

113116
try {
114117
List<Future<?>> tasks = new ArrayList<>();
@@ -124,14 +127,26 @@ protected void doTaskPerApp(AppIdentifier app)
124127
Thread.sleep(1000);
125128
}
126129
Logging.debug(main, app.getAsPublicTenantIdentifier(), "Task " + task + " finished");
127-
Void result = (Void) task.get(); //to know if there were any errors while executing and for waiting in this thread for all the other threads to finish up
130+
try {
131+
Void result = (Void) task.get(); //to know if there were any errors while executing and for
132+
// waiting in this thread for all the other threads to finish up
133+
Logging.debug(main, app.getAsPublicTenantIdentifier(),
134+
"Task " + task + " finished with result: " + result);
135+
} catch (ExecutionException executionException) {
136+
Logging.error(main, app.getAsPublicTenantIdentifier(),
137+
"Error while processing bulk import users", true,
138+
executionException);
139+
throw new RuntimeException(executionException);
140+
}
128141
usersProcessed += loadedUsersChunks.get(tasks.indexOf(task)).size();
129142
failedUsers = bulkImportSQLStorage.getBulkImportUsersCount(app, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED);
130143
Logging.debug(main, app.getAsPublicTenantIdentifier(), "Chunk " + tasks.indexOf(task) + " finished processing, all chunks processed: "
131144
+ usersProcessed + " users (" + failedUsers + " failed)");
132145
}
133-
134-
} catch (ExecutionException | InterruptedException e) {
146+
Logging.debug(main, app.getAsPublicTenantIdentifier(), "Processing round finished");
147+
} catch (InterruptedException e) {
148+
Logging.error(main, app.getAsPublicTenantIdentifier(), "Error while processing bulk import users", true,
149+
e);
135150
throw new RuntimeException(e);
136151
}
137152
}

0 commit comments

Comments
 (0)