Skip to content

Commit 4bf5568

Browse files
nonbbsvc-squareup-copybara
authored andcommitted
Add Hibernate VitessDialect to Misk
https://docs.google.com/document/d/1xsAqv4-FDzbcFbcu8X0_XVik0U7ZihCvl74fXOa8gAI/edit?tab=t.0#heading=h.wmejunk3y5b9 GitOrigin-RevId: 073eeb82b1d8c5ab4b47a6e98cec2eddf94e1520
1 parent ae61202 commit 4bf5568

File tree

11 files changed

+871
-35
lines changed

11 files changed

+871
-35
lines changed

misk-hibernate/api/misk-hibernate.api

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -380,12 +380,6 @@ public final class misk/hibernate/ValidationReport {
380380
public fun toString ()Ljava/lang/String;
381381
}
382382

383-
public final class misk/hibernate/VitessDialect : org/hibernate/dialect/MySQL8Dialect {
384-
public fun <init> ()V
385-
public fun buildSQLExceptionConversionDelegate ()Lorg/hibernate/exception/spi/SQLExceptionConversionDelegate;
386-
public fun useInputStreamToInsertBlob ()Z
387-
}
388-
389383
public abstract interface annotation class misk/hibernate/annotation/Keyspace : java/lang/annotation/Annotation {
390384
public abstract fun value ()Ljava/lang/String;
391385
}
@@ -527,3 +521,44 @@ public final class misk/hibernate/testing/TransacterFaultInjectorModule : misk/h
527521
public fun configureHibernate ()V
528522
}
529523

524+
public final class misk/hibernate/vitess/PoolWaiterCountExhaustedException : org/hibernate/JDBCException {
525+
public fun <init> (Ljava/sql/SQLException;)V
526+
}
527+
528+
public final class misk/hibernate/vitess/VitessDialect : org/hibernate/dialect/MySQL8Dialect {
529+
public fun <init> ()V
530+
public fun buildSQLExceptionConversionDelegate ()Lorg/hibernate/exception/spi/SQLExceptionConversionDelegate;
531+
public fun useInputStreamToInsertBlob ()Z
532+
}
533+
534+
public final class misk/hibernate/vitess/VitessExceptionDetector {
535+
public static final field INSTANCE Lmisk/hibernate/vitess/VitessExceptionDetector;
536+
public final fun isWaiterPoolExhausted (Ljava/lang/Throwable;)Z
537+
}
538+
539+
public final class misk/hibernate/vitess/VitessShardException : org/hibernate/JDBCException {
540+
public fun <init> (Lmisk/hibernate/vitess/VitessShardExceptionData;)V
541+
public final fun getExceptionData ()Lmisk/hibernate/vitess/VitessShardExceptionData;
542+
}
543+
544+
public final class misk/hibernate/vitess/VitessShardExceptionData {
545+
public fun <init> (Lmisk/vitess/Shard;Ljava/lang/String;ZZLjava/lang/Throwable;)V
546+
public final fun getCauseException ()Ljava/lang/Throwable;
547+
public final fun getExceptionMessage ()Ljava/lang/String;
548+
public final fun getShard ()Lmisk/vitess/Shard;
549+
public final fun isPrimary ()Z
550+
public final fun isShardHealthError ()Z
551+
public fun toString ()Ljava/lang/String;
552+
}
553+
554+
public final class misk/hibernate/vitess/VitessShardExceptionParser {
555+
public static final field Companion Lmisk/hibernate/vitess/VitessShardExceptionParser$Companion;
556+
public fun <init> ()V
557+
public final fun configureStackDepth (I)V
558+
public final fun parseShardInfo (Ljava/lang/Exception;)Ljava/util/Optional;
559+
}
560+
561+
public final class misk/hibernate/vitess/VitessShardExceptionParser$Companion {
562+
public final fun getLogger ()Lmu/KLogger;
563+
}
564+

misk-hibernate/src/main/kotlin/misk/hibernate/VitessDialect.kt

Lines changed: 0 additions & 28 deletions
This file was deleted.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package misk.hibernate.vitess
2+
3+
import java.sql.SQLException
4+
import org.hibernate.JDBCException
5+
6+
class PoolWaiterCountExhaustedException(root: SQLException?) : JDBCException(
7+
"Vitess pool waiter count exhausted",
8+
root
9+
)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package misk.hibernate.vitess
2+
3+
import misk.vitess.CowriteException
4+
import org.hibernate.cfg.Environment
5+
import org.hibernate.dialect.Dialect
6+
import org.hibernate.dialect.MySQL8Dialect
7+
import org.hibernate.dialect.function.NoArgSQLFunction
8+
import org.hibernate.exception.ConstraintViolationException
9+
import org.hibernate.exception.GenericJDBCException
10+
import org.hibernate.exception.spi.SQLExceptionConversionDelegate
11+
import org.hibernate.type.StandardBasicTypes
12+
import java.sql.SQLException
13+
import java.util.Optional
14+
15+
class VitessDialect : MySQL8Dialect() {
16+
private val vitessShardExceptionParser
17+
: VitessShardExceptionParser = VitessShardExceptionParser()
18+
19+
init {
20+
// Statement batching is not implemented yet
21+
getDefaultProperties().setProperty(
22+
Environment.STATEMENT_BATCH_SIZE,
23+
Dialect.NO_BATCH
24+
)
25+
26+
registerKeyword("virtual")
27+
registerKeyword("status")
28+
registerFunction(
29+
"current_timestamp",
30+
NoArgSQLFunction("current_timestamp", StandardBasicTypes.TIMESTAMP)
31+
)
32+
}
33+
34+
override fun useInputStreamToInsertBlob(): Boolean {
35+
return false
36+
}
37+
38+
override fun buildSQLExceptionConversionDelegate(): SQLExceptionConversionDelegate {
39+
return SQLExceptionConversionDelegate { sqlException: SQLException, message: String, sql: String? ->
40+
val exceptionMessage = sqlException.message
41+
if (exceptionMessage != null && exceptionMessage.contains("Duplicate entry")) {
42+
return@SQLExceptionConversionDelegate ConstraintViolationException(
43+
message,
44+
sqlException,
45+
sql,
46+
null
47+
)
48+
} else if (exceptionMessage != null && exceptionMessage.contains("multi-db transaction attempted")) {
49+
throw CowriteException(message, sqlException)
50+
} else if (VitessExceptionDetector.isWaiterPoolExhausted(sqlException)) {
51+
return@SQLExceptionConversionDelegate PoolWaiterCountExhaustedException(sqlException)
52+
} else {
53+
val vitessShardException: Optional<VitessShardExceptionData> =
54+
vitessShardExceptionParser.parseShardInfo(sqlException)
55+
if (vitessShardException.isPresent()) {
56+
return@SQLExceptionConversionDelegate VitessShardException(vitessShardException.get())
57+
}
58+
return@SQLExceptionConversionDelegate GenericJDBCException(
59+
sqlException.message,
60+
sqlException,
61+
sql
62+
)
63+
}
64+
}
65+
}
66+
}
67+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package misk.hibernate.vitess
2+
3+
object VitessExceptionDetector {
4+
private const val WAITER_POOL_EXCEPTION_STRING = "pool waiter count exceeded"
5+
6+
fun isWaiterPoolExhausted(e: Throwable?): Boolean {
7+
var ex = e
8+
var i = 0
9+
while (ex != null && i < 100) {
10+
if (ex.message != null && ex.message!!.contains(WAITER_POOL_EXCEPTION_STRING)) {
11+
return true
12+
}
13+
ex = ex.cause
14+
++i
15+
}
16+
return false
17+
}
18+
}
19+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package misk.hibernate.vitess
2+
3+
import org.hibernate.JDBCException
4+
import java.sql.SQLException
5+
6+
/**
7+
* Custom exception indicating an error related to a specific Vitess shard operation.
8+
* Wraps details provided by [VitessShardExceptionData].
9+
*/
10+
class VitessShardException(
11+
/** Holds detailed information about the Vitess shard error. */
12+
val exceptionData: VitessShardExceptionData
13+
) : JDBCException(
14+
// Construct the message for the superclass
15+
"Vitess Shard Error: ${exceptionData.exceptionMessage}",
16+
// Construct the SQLException cause for the superclass
17+
SQLException(exceptionData.causeException)
18+
)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package misk.hibernate.vitess
2+
3+
import misk.vitess.Shard
4+
5+
class VitessShardExceptionData(
6+
val shard: Shard,
7+
val exceptionMessage: String,
8+
val isShardHealthError: Boolean,
9+
val isPrimary: Boolean,
10+
val causeException: Throwable
11+
) {
12+
override fun toString(): String {
13+
return String.format(
14+
"VitessShardExceptionData{shard=%s, isShardHealthError=%s, isPrimary=%s}",
15+
shard.toString(), isShardHealthError, isPrimary
16+
)
17+
}
18+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package misk.hibernate.vitess
2+
3+
import com.google.common.annotations.VisibleForTesting
4+
import misk.vitess.Shard
5+
import wisp.logging.getLogger
6+
import java.util.Optional
7+
import java.util.function.Consumer
8+
import java.util.regex.Matcher
9+
import java.util.regex.Pattern
10+
11+
class VitessShardExceptionParser {
12+
/*
13+
* This method is used to check if the exception is a shard exception. It will also do a general
14+
* first level check on all causes to see if it's a shard health error and parse it out.
15+
*
16+
* Example exception message:
17+
* java.sql.SQLException: target: sharded_keyspaces.-40.primary: vttablet: rpc error: code =
18+
* Aborted desc = transaction 1729618751317964257: not found (CallerID: hsig4pj3doiu6hpew0jk)
19+
*
20+
* Returns: VitessShardException(sharded_keyspaces/-40)
21+
*/
22+
fun parseShardInfo(exception: Exception): Optional<VitessShardExceptionData> {
23+
var vitessShardException: Optional<VitessShardExceptionData>
24+
var cause = exception.cause
25+
var message = exception.message
26+
27+
vitessShardException = parseVitessShardException(exception, message!!)
28+
29+
vitessShardException.ifPresent({ vitessShardException: VitessShardExceptionData ->
30+
this.logShardInfo(
31+
vitessShardException
32+
)
33+
})
34+
35+
// Check if first error is a primary shard health error, if so we return
36+
if (isPrimaryShardAndContainsHealthError(vitessShardException)) {
37+
return vitessShardException
38+
}
39+
40+
var i = 0
41+
while (cause != null && i < MAX_STACK_DEPTH) {
42+
message = cause.message
43+
if (message == null) {
44+
cause = cause.cause
45+
i++
46+
continue
47+
}
48+
49+
vitessShardException = parseVitessShardException(cause, message)
50+
51+
if (isPrimaryShardAndContainsHealthError(vitessShardException)) {
52+
vitessShardException.ifPresent(
53+
{ vitessShardException: VitessShardExceptionData ->
54+
this.logShardInfo(
55+
vitessShardException
56+
)
57+
})
58+
return vitessShardException
59+
}
60+
61+
cause = cause.cause
62+
i++
63+
}
64+
if (vitessShardException.isPresent()) vitessShardException.ifPresent(
65+
{ vitessShardException: VitessShardExceptionData ->
66+
this.logShardInfo(
67+
vitessShardException
68+
)
69+
})
70+
71+
return vitessShardException
72+
}
73+
74+
private fun parseVitessShardException(
75+
exception: Throwable,
76+
message: String
77+
): Optional<VitessShardExceptionData> {
78+
val shardString = getShardString(message)
79+
val isShardHealthError = isShardHealthErrorCheck(message)
80+
81+
if (shardString.isNotEmpty()) {
82+
val isPrimary = message.contains(".primary:")
83+
val shard: Shard = Shard.parse(shardString)!!
84+
85+
return Optional.of<VitessShardExceptionData>(
86+
VitessShardExceptionData(
87+
shard,
88+
message,
89+
isShardHealthError,
90+
isPrimary,
91+
exception
92+
)
93+
)
94+
}
95+
return Optional.empty<VitessShardExceptionData>()
96+
}
97+
98+
private fun isPrimaryShardAndContainsHealthError(shardException: Optional<VitessShardExceptionData>): Boolean {
99+
if (shardException.isPresent()) {
100+
val exception: VitessShardExceptionData = shardException.get()
101+
return exception.isPrimary && exception.isShardHealthError
102+
}
103+
return false
104+
}
105+
106+
private fun isShardHealthErrorCheck(message: String): Boolean {
107+
// Check if message contains at least one included pattern
108+
return INCLUDED_MESSAGES.stream()
109+
.anyMatch { pattern: String? -> Pattern.compile(pattern).matcher(message).find() }
110+
}
111+
112+
private fun getShardString(message: String): String {
113+
val matcher = SHARD_PATTERN.matcher(message)
114+
return if (matcher.find()) parseShardString(matcher) else ""
115+
}
116+
117+
/*
118+
* This method is used to parse the shard information from the exception message.
119+
*
120+
* Example exception message:
121+
* java.sql.SQLException: target: sharded_keyspaces.-40.primary: vttablet: rpc error: code = A
122+
* borted desc = transaction 1729618751317964257: not found (CallerID: hsig4pj3doiu6hpew0jk)
123+
*
124+
* Returns: sharded_keyspaces/-40
125+
*/
126+
private fun parseShardString(matcher: Matcher): String {
127+
// Group 1 is the keyspace (sharded_keyspaces)
128+
// Group 2 is the shard (-40, 40-80, ff-, de-df, etc.)
129+
val keyspace = matcher.group(1)
130+
val shard = matcher.group(2)
131+
132+
return "$keyspace/$shard"
133+
}
134+
135+
private fun logShardInfo(vitessShardException: VitessShardExceptionData) {
136+
logger.warn(
137+
("""vitessShardException: ${vitessShardException.shard}
138+
| message: ${vitessShardException.causeException.message}
139+
| isprimary:${vitessShardException.isPrimary}
140+
| isShardHealthError: ${vitessShardException.isShardHealthError}""".trimMargin()
141+
)
142+
)
143+
}
144+
145+
@VisibleForTesting fun configureStackDepth(maxStackDepth: Int) {
146+
MAX_STACK_DEPTH = maxStackDepth
147+
}
148+
149+
companion object {
150+
private var MAX_STACK_DEPTH =
151+
300 // Making the stack depth intentionally high to avoid missing root cause
152+
153+
private val SHARD_PATTERN: Pattern = Pattern.compile(
154+
"target:\\s+([^.]+)\\.(-?[0-9]+|[0-9a-f]+-[0-9a-f]+|[0-9]+(?:-[0-9]+)?|[0-9a-f]+-|[0-9a-f]+)\\.(primary|replica):"
155+
)
156+
157+
private val INCLUDED_MESSAGES = listOf(
158+
"due to context deadline exceeded",
159+
"primary is not serving",
160+
"code = Aborted desc = transaction.*not found",
161+
"code = Aborted desc = transaction.*ended at.*\\(unlocked closed connection\\)",
162+
"operation not allowed in state NOT_SERVING"
163+
)
164+
165+
val logger = getLogger<VitessShardExceptionParser>()
166+
}
167+
}
168+

0 commit comments

Comments
 (0)