Skip to content

Commit d309722

Browse files
authored
fix: bulk import fixes (#238)
* fix: fixing the OneMillionUsersTest * fix: introduce batchExecute for QueryExecutorTemplate
1 parent c2ed495 commit d309722

File tree

12 files changed

+298
-349
lines changed

12 files changed

+298
-349
lines changed

src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.sql.PreparedStatement;
2323
import java.sql.ResultSet;
2424
import java.sql.SQLException;
25+
import java.util.List;
2526

2627
public interface QueryExecutorTemplate {
2728

@@ -44,6 +45,25 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
4445
}
4546
}
4647

48+
static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
49+
throws SQLException, StorageQueryException {
50+
assert setters != null;
51+
assert !setters.isEmpty();
52+
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
53+
int counter = 0;
54+
for(PreparedStatementValueSetter setter: setters) {
55+
setter.setValues(pst);
56+
pst.addBatch();
57+
counter++;
58+
59+
if(counter % 100 == 0) {
60+
pst.executeBatch();
61+
}
62+
}
63+
pst.executeBatch(); //for the possible remaining ones
64+
}
65+
}
66+
4767
static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
4868
throws SQLException, StorageQueryException {
4969
try (Connection con = ConnectionPool.getConnection(start)) {

src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,21 @@
2222
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
2323
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
2424
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
25+
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
2526
import io.supertokens.storage.postgresql.Start;
2627
import io.supertokens.storage.postgresql.config.Config;
2728
import io.supertokens.storage.postgresql.utils.Utils;
2829

2930
import javax.annotation.Nonnull;
3031
import javax.annotation.Nullable;
3132
import java.sql.Connection;
32-
import java.sql.PreparedStatement;
3333
import java.sql.ResultSet;
3434
import java.sql.SQLException;
3535
import java.util.ArrayList;
3636
import java.util.List;
3737
import java.util.Map;
3838

39-
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
40-
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
39+
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;
4140

4241
public class BulkImportQueries {
4342
static String getQueryToCreateBulkImportUsersTable(Start start) {
@@ -125,28 +124,23 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio
125124

126125
public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
127126
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
128-
throws SQLException {
127+
throws SQLException, StorageQueryException {
129128
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
130129
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
131130
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";
131+
List<PreparedStatementValueSetter> setters = new ArrayList<>();
132132

133-
PreparedStatement setErrorStatement = con.prepareStatement(query);
134-
135-
int counter = 0;
136133
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
137-
setErrorStatement.setString(1, errorStatus.toString());
138-
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
139-
setErrorStatement.setLong(3, System.currentTimeMillis());
140-
setErrorStatement.setString(4, appIdentifier.getAppId());
141-
setErrorStatement.setString(5, bulkImportUserId);
142-
setErrorStatement.addBatch();
143-
144-
if(counter % 100 == 0) {
145-
setErrorStatement.executeBatch();
146-
}
134+
setters.add(pst -> {
135+
pst.setString(1, errorStatus.toString());
136+
pst.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
137+
pst.setLong(3, System.currentTimeMillis());
138+
pst.setString(4, appIdentifier.getAppId());
139+
pst.setString(5, bulkImportUserId);
140+
});
147141
}
148142

149-
setErrorStatement.executeBatch();
143+
executeBatch(con, query, setters);
150144
}
151145

152146
public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(Start start,

src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java

Lines changed: 41 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,19 @@
2626
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
2727
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
2828
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
29+
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
2930
import io.supertokens.storage.postgresql.Start;
3031
import io.supertokens.storage.postgresql.config.Config;
3132
import io.supertokens.storage.postgresql.utils.Utils;
3233

3334
import java.sql.Connection;
34-
import java.sql.PreparedStatement;
3535
import java.sql.ResultSet;
3636
import java.sql.SQLException;
3737
import java.util.*;
3838
import java.util.stream.Collectors;
3939

4040
import static io.supertokens.pluginInterface.RECIPE_ID.EMAIL_PASSWORD;
41-
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
42-
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
41+
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;
4342
import static io.supertokens.storage.postgresql.config.Config.getConfig;
4443
import static java.lang.System.currentTimeMillis;
4544

@@ -354,59 +353,52 @@ public static void signUpMultipleForBulkImport_Transaction(Start start, Connecti
354353
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
355354
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";
356355

357-
PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
358-
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
359-
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
360-
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);
356+
List<PreparedStatementValueSetter> appIdToUserIdSetters = new ArrayList<>();
357+
List<PreparedStatementValueSetter> allAuthRecipeUsersSetters = new ArrayList<>();
358+
List<PreparedStatementValueSetter> emailPasswordUsersSetters = new ArrayList<>();
359+
List<PreparedStatementValueSetter> emailPasswordUsersToTenantSetters = new ArrayList<>();
361360

362-
int counter = 0;
363361
for (EmailPasswordImportUser user : usersToSignUp) {
364362
String userId = user.userId;
365363
TenantIdentifier tenantIdentifier = user.tenantIdentifier;
366364

367-
appIdToUserId.setString(1, tenantIdentifier.getAppId());
368-
appIdToUserId.setString(2, userId);
369-
appIdToUserId.setString(3, userId);
370-
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
371-
appIdToUserId.addBatch();
372-
373-
374-
allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
375-
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
376-
allAuthRecipeUsers.setString(3, userId);
377-
allAuthRecipeUsers.setString(4, userId);
378-
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
379-
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
380-
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
381-
allAuthRecipeUsers.addBatch();
382-
383-
emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
384-
emailPasswordUsers.setString(2, userId);
385-
emailPasswordUsers.setString(3, user.email);
386-
emailPasswordUsers.setString(4, user.passwordHash);
387-
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
388-
emailPasswordUsers.addBatch();
389-
390-
emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
391-
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
392-
emailPasswordUsersToTenant.setString(3, userId);
393-
emailPasswordUsersToTenant.setString(4, user.email);
394-
emailPasswordUsersToTenant.addBatch();
395-
counter++;
396-
if (counter % 100 == 0) {
397-
appIdToUserId.executeBatch();
398-
allAuthRecipeUsers.executeBatch();
399-
emailPasswordUsers.executeBatch();
400-
emailPasswordUsersToTenant.executeBatch();
401-
}
402-
}
365+
appIdToUserIdSetters.add(pst -> {
366+
pst.setString(1, tenantIdentifier.getAppId());
367+
pst.setString(2, userId);
368+
pst.setString(3, userId);
369+
pst.setString(4, EMAIL_PASSWORD.toString());
370+
});
371+
372+
allAuthRecipeUsersSetters.add(pst -> {
373+
pst.setString(1, tenantIdentifier.getAppId());
374+
pst.setString(2, tenantIdentifier.getTenantId());
375+
pst.setString(3, userId);
376+
pst.setString(4, userId);
377+
pst.setString(5, EMAIL_PASSWORD.toString());
378+
pst.setLong(6, user.timeJoinedMSSinceEpoch);
379+
pst.setLong(7, user.timeJoinedMSSinceEpoch);
380+
});
403381

404-
//execute the remaining ones
405-
appIdToUserId.executeBatch();
406-
allAuthRecipeUsers.executeBatch();
407-
emailPasswordUsers.executeBatch();
408-
emailPasswordUsersToTenant.executeBatch();
382+
emailPasswordUsersSetters.add(pst -> {
383+
pst.setString(1, tenantIdentifier.getAppId());
384+
pst.setString(2, userId);
385+
pst.setString(3, user.email);
386+
pst.setString(4, user.passwordHash);
387+
pst.setLong(5, user.timeJoinedMSSinceEpoch);
388+
});
389+
390+
emailPasswordUsersToTenantSetters.add(pst -> {
391+
pst.setString(1, tenantIdentifier.getAppId());
392+
pst.setString(2, tenantIdentifier.getTenantId());
393+
pst.setString(3, userId);
394+
pst.setString(4, user.email);
395+
});
396+
}
409397

398+
executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters);
399+
executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters);
400+
executeBatch(sqlCon, emailpassword_users_QUERY, emailPasswordUsersSetters);
401+
executeBatch(sqlCon, emailpassword_users_to_tenant_QUERY, emailPasswordUsersToTenantSetters);
410402
sqlCon.commit();
411403
} catch (SQLException throwables) {
412404
throw new StorageTransactionLogicException(throwables);

src/main/java/io/supertokens/storage/postgresql/queries/EmailVerificationQueries.java

Lines changed: 36 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,17 @@
2323
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
2424
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
2525
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
26+
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
2627
import io.supertokens.storage.postgresql.Start;
2728
import io.supertokens.storage.postgresql.config.Config;
2829
import io.supertokens.storage.postgresql.utils.Utils;
2930

3031
import java.sql.Connection;
31-
import java.sql.PreparedStatement;
3232
import java.sql.ResultSet;
3333
import java.sql.SQLException;
3434
import java.util.*;
3535

36-
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
37-
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
36+
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;
3837
import static io.supertokens.storage.postgresql.config.Config.getConfig;
3938
import static java.lang.System.currentTimeMillis;
4039

@@ -130,41 +129,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
130129
boolean isEmailVerified)
131130
throws SQLException, StorageQueryException {
132131

132+
String QUERY;
133133
if (isEmailVerified) {
134-
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
134+
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
135135
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
136-
PreparedStatement insertQuery = con.prepareStatement(QUERY);
137-
int counter = 0;
138-
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
139-
insertQuery.setString(1, appIdentifier.getAppId());
140-
insertQuery.setString(2, emailToUser.getKey());
141-
insertQuery.setString(3, emailToUser.getValue());
142-
insertQuery.addBatch();
143-
144-
counter++;
145-
if (counter % 100 == 0) {
146-
insertQuery.executeBatch();
147-
}
148-
}
149-
insertQuery.executeBatch();
150136
} else {
151-
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
137+
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
152138
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
153-
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
154-
int counter = 0;
155-
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
156-
deleteQuery.setString(1, appIdentifier.getAppId());
157-
deleteQuery.setString(2, emailToUser.getValue());
158-
deleteQuery.setString(3, emailToUser.getKey());
159-
deleteQuery.addBatch();
160-
161-
counter++;
162-
if (counter % 100 == 0) {
163-
deleteQuery.executeBatch();
164-
}
165-
}
166-
deleteQuery.executeBatch();
167139
}
140+
141+
List<PreparedStatementValueSetter> setters = new ArrayList<>();
142+
143+
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
144+
setters.add(pst -> {
145+
pst.setString(1, appIdentifier.getAppId());
146+
pst.setString(2, emailToUser.getKey());
147+
pst.setString(3, emailToUser.getValue());
148+
});
149+
}
150+
executeBatch(con, QUERY, setters);
168151
}
169152

170153
public static void deleteAllEmailVerificationTokensForUser_Transaction(Start start, Connection con,
@@ -610,30 +593,30 @@ public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, A
610593
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
611594
String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
612595
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
613-
PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query);
614-
PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query);
615596

616-
int counter = 0;
597+
List<PreparedStatementValueSetter> emailVerificationSetters = new ArrayList<>();
598+
List<PreparedStatementValueSetter> emalVerificationTokensSetters = new ArrayList<>();
599+
617600
for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){
618-
updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
619-
updateEmailVerificationQuery.setString(2, appIdentifier.getAppId());
620-
updateEmailVerificationQuery.setString(3, supertokensUserId);
621-
updateEmailVerificationQuery.addBatch();
622-
623-
updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
624-
updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId());
625-
updateEmailVerificationTokensQuery.setString(3, supertokensUserId);
626-
updateEmailVerificationTokensQuery.addBatch();
627-
628-
counter++;
629-
if(counter % 100 == 0) {
630-
updateEmailVerificationQuery.executeBatch();
631-
updateEmailVerificationTokensQuery.executeBatch();
632-
}
601+
emailVerificationSetters.add(pst -> {
602+
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
603+
pst.setString(2, appIdentifier.getAppId());
604+
pst.setString(3, supertokensUserId);
605+
});
606+
607+
emalVerificationTokensSetters.add(pst -> {
608+
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
609+
pst.setString(2, appIdentifier.getAppId());
610+
pst.setString(3, supertokensUserId);
611+
});
612+
}
613+
614+
if(emailVerificationSetters.isEmpty()){
615+
return null;
633616
}
634-
updateEmailVerificationQuery.executeBatch();
635-
updateEmailVerificationTokensQuery.executeBatch();
636617

618+
executeBatch(sqlCon, update_email_verification_table_query, emailVerificationSetters);
619+
executeBatch(sqlCon, update_email_verification_tokens_table_query, emalVerificationTokensSetters);
637620
} catch (SQLException e) {
638621
throw new StorageTransactionLogicException(e);
639622
}

0 commit comments

Comments
 (0)