Skip to content

Commit 09fe259

Browse files
aparajonsvc-squareup-copybara
authored andcommitted
VitessTestDb: harden schema apply logic
GitOrigin-RevId: 85fe78af2e8a20c3ce940baa7e2539afa1edaf28
1 parent 10d11e4 commit 09fe259

File tree

2 files changed

+69
-11
lines changed

2 files changed

+69
-11
lines changed

misk-vitess/src/testFixtures/kotlin/misk/vitess/testing/internal/VitessQueryExecutor.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import misk.vitess.testing.VitessTableType
55
import misk.vitess.testing.VitessTestDbException
66
import java.sql.Connection
77
import java.sql.DriverManager
8+
import java.sql.SQLRecoverableException
9+
import java.sql.SQLTransientException
810
import java.sql.Statement
911

1012
/**
@@ -36,6 +38,49 @@ internal class VitessQueryExecutor(
3638
return connection.use { conn -> executeUpdate(conn, query, target) }
3739
}
3840

41+
/**
42+
* Execute update with retry logic to handle retryable failures.
43+
*/
44+
fun executeUpdateWithRetries(query: String, target: String = "@primary"): Int {
45+
var lastException: Exception? = null
46+
val maxRetries = 3
47+
val retryDelayMs = 500L
48+
49+
val retryableErrorMessages = listOf(
50+
"Keyspace does not have exactly one shard",
51+
)
52+
53+
for (attempt in 1..maxRetries) {
54+
try {
55+
return executeUpdate(query, target)
56+
} catch (e: Exception) {
57+
lastException = e
58+
59+
// Check if this is a retryable exception type or message
60+
val isRetryableException = e.cause is SQLRecoverableException ||
61+
e.cause is SQLTransientException ||
62+
e is SQLRecoverableException ||
63+
e is SQLTransientException
64+
65+
val isRetryableMessage = retryableErrorMessages.any { errorMessage ->
66+
e.message?.contains(errorMessage, ignoreCase = true) == true
67+
}
68+
69+
if ((isRetryableException || isRetryableMessage) && attempt < maxRetries) {
70+
Thread.sleep(retryDelayMs * attempt)
71+
} else {
72+
throw e // Re-throw if not a retry-able error or max retries reached
73+
}
74+
}
75+
}
76+
77+
// If we get here, all retries failed
78+
throw VitessQueryExecutorException(
79+
"Failed to executeUpdate after `$maxRetries` attempts. Query: `$query`, Last error: `${lastException?.message}`",
80+
lastException
81+
)
82+
}
83+
3984
fun executeTransaction(query: String, target: String = "@primary"): Boolean {
4085
val connection = getVtgateConnection()
4186
return connection.use { conn ->

misk-vitess/src/testFixtures/kotlin/misk/vitess/testing/internal/VitessSchemaApplier.kt

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,28 @@ internal class VitessSchemaApplier(
136136
val ddlUpdates = ConcurrentLinkedQueue<DdlUpdate>()
137137
val vschemaUpdates = ConcurrentLinkedQueue<VSchemaUpdate>()
138138

139-
// Run all keyspace operations in parallel to speed up schema change processing.
140-
val executorService = Executors.newFixedThreadPool(keyspaces.size)
139+
// Apply VSchemas in parallel but wait for all to complete before proceeding to DDL's.
140+
val vschemaExecutorService = Executors.newFixedThreadPool(keyspaces.size)
141+
val vschemaFutures = mutableListOf<Future<VSchemaUpdate>>()
142+
143+
keyspaces.forEach { keyspace ->
144+
vschemaFutures.add(
145+
vschemaExecutorService.submit<VSchemaUpdate> {
146+
processVschema(keyspace)
147+
}
148+
)
149+
}
150+
151+
vschemaFutures.forEach { future ->
152+
vschemaUpdates.add(future.get())
153+
}
154+
vschemaExecutorService.shutdown()
155+
156+
val ddlExecutorService = Executors.newFixedThreadPool(keyspaces.size)
141157
val futures = mutableListOf<Future<Unit>>()
142158
keyspaces.forEach { keyspace ->
143159
futures.add(
144-
executorService.submit<Unit> {
145-
val vschemaUpdate = processVschema(keyspace)
146-
vschemaUpdates.add(vschemaUpdate)
147-
160+
ddlExecutorService.submit<Unit> {
148161
var schemaChangesProcessedForKeyspace: Boolean
149162
if (enableDeclarativeSchemaChanges) {
150163
schemaChangesProcessedForKeyspace = applyDeclarativeSchemaChanges(keyspace, vitessQueryExecutor, ddlUpdates)
@@ -161,7 +174,7 @@ internal class VitessSchemaApplier(
161174
}
162175

163176
futures.forEach { it.get() }
164-
executorService.shutdown()
177+
ddlExecutorService.shutdown()
165178

166179
initializeSequenceTables(vitessQueryExecutor)
167180
println("🔧 Schema is applied.")
@@ -236,9 +249,9 @@ internal class VitessSchemaApplier(
236249
keyspace.ddlCommands
237250
.sortedBy { it.first }
238251
.map { it.second }
239-
.forEach {
240-
vitessQueryExecutor.executeUpdate(query = it, target = keyspace.name)
241-
ddlUpdates.add(DdlUpdate(ddl = it, keyspace = keyspace.name))
252+
.forEach { ddl ->
253+
vitessQueryExecutor.executeUpdateWithRetries(ddl, keyspace.name)
254+
ddlUpdates.add(DdlUpdate(ddl = ddl, keyspace = keyspace.name))
242255
}
243256

244257
return true
@@ -286,7 +299,7 @@ internal class VitessSchemaApplier(
286299

287300
private fun applyDdlCommands(ddl: String, keyspace: VitessKeyspace, vitessQueryExecutor: VitessQueryExecutor) {
288301
printDebug("Applying schema changes:\n$ddl")
289-
vitessQueryExecutor.executeUpdate(ddl, keyspace.name)
302+
vitessQueryExecutor.executeUpdateWithRetries(ddl, keyspace.name)
290303
}
291304

292305
private fun executeDockerCommand(command: List<String>, keyspace: String): String {

0 commit comments

Comments
 (0)