Skip to content

Commit f2f9eed

Browse files
committed
add status assertion in new ut case
1 parent 33e0f0d commit f2f9eed

File tree

4 files changed

+64
-29
lines changed

4 files changed

+64
-29
lines changed

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ class ExecuteStatement(
129129
if (jdbcStatement != null) {
130130
dialect.cancelStatement(jdbcStatement)
131131
jdbcStatement = null
132+
} else {
133+
warn(s"The cancel operation $statementId might be ignore due to jdbcStatement is null.")
132134
}
133135
}
134136
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.kyuubi.config.KyuubiConf
2323
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
2424
import org.apache.kyuubi.operation.HiveJDBCTestHelper
2525
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
26+
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE, RUNNING_STATE}
2627

2728
class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelper {
2829

@@ -87,16 +88,24 @@ class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelp
8788
// The SQL will sleep 120s
8889
tExecuteStatementReq.setStatement("SELECT sleep(120)")
8990
tExecuteStatementReq.setRunAsync(true)
90-
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
91-
assert(tExecuteStatementResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
92-
93-
Thread.sleep(1000) // wait for statement to start executing
94-
95-
val tCancelOperationReq = new TCancelOperationReq()
96-
tCancelOperationReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
97-
val TCancelOperationReq = client.CancelOperation(tCancelOperationReq)
98-
assert(TCancelOperationReq.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
99-
// If the statement is not cancelled successfully, will block here until 120s
91+
val executeResp = client.ExecuteStatement(tExecuteStatementReq)
92+
assert(executeResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
93+
94+
assertOperationStatusIn(
95+
client,
96+
executeResp.getOperationHandle,
97+
Set(RUNNING_STATE),
98+
5)
99+
100+
val cancelResp =
101+
client.CancelOperation(new TCancelOperationReq(executeResp.getOperationHandle))
102+
assert(cancelResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
103+
104+
assertOperationStatusIn(
105+
client,
106+
executeResp.getOperationHandle,
107+
Set(CANCELED_STATE),
108+
5)
100109
}
101110
}
102111
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.kyuubi.config.KyuubiConf
2424
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
2525
import org.apache.kyuubi.operation.HiveJDBCTestHelper
2626
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
27+
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState.{CANCELED_STATE, RUNNING_STATE}
2728

2829
class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with HiveJDBCTestHelper {
2930

@@ -83,21 +84,28 @@ class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with Hiv
8384
test("starrocks - JDBC ExecuteStatement cancel operation should kill SQL statement") {
8485
failAfter(20.seconds) {
8586
withSessionHandle { (client, handle) =>
86-
val tExecuteStatementReq = new TExecuteStatementReq()
87-
tExecuteStatementReq.setSessionHandle(handle)
87+
val executeReq = new TExecuteStatementReq()
88+
executeReq.setSessionHandle(handle)
8889
// The SQL will sleep 120s
89-
tExecuteStatementReq.setStatement("SELECT sleep(120)")
90-
tExecuteStatementReq.setRunAsync(true)
91-
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
92-
93-
Thread.sleep(1000) // wait for statement to start executing
94-
95-
val tCancelOperationReq = new TCancelOperationReq()
96-
tCancelOperationReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
97-
98-
val tFetchResultsResp = client.CancelOperation(tCancelOperationReq)
99-
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
100-
// If the statement is not cancelled successfully, will block here until 120s
90+
executeReq.setStatement("SELECT sleep(120)")
91+
executeReq.setRunAsync(true)
92+
val executeResp = client.ExecuteStatement(executeReq)
93+
94+
assertOperationStatusIn(
95+
client,
96+
executeResp.getOperationHandle,
97+
Set(RUNNING_STATE),
98+
5)
99+
100+
val cancelResp =
101+
client.CancelOperation(new TCancelOperationReq(executeResp.getOperationHandle))
102+
assert(cancelResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
103+
104+
assertOperationStatusIn(
105+
client,
106+
executeResp.getOperationHandle,
107+
Set(CANCELED_STATE),
108+
5)
101109
}
102110
}
103111
}

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,27 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
124124
}
125125

126126
def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = {
127-
val req = new TGetOperationStatusReq(op)
128-
var state = client.GetOperationStatus(req).getOperationState
129-
eventually(timeout(90.seconds), interval(100.milliseconds)) {
130-
state = client.GetOperationStatus(req).getOperationState
131-
assert(!Set(INITIALIZED_STATE, PENDING_STATE, RUNNING_STATE).contains(state))
127+
assertOperationStatusIn(
128+
client,
129+
op,
130+
Set(
131+
FINISHED_STATE,
132+
CANCELED_STATE,
133+
CLOSED_STATE,
134+
ERROR_STATE,
135+
UKNOWN_STATE,
136+
TIMEDOUT_STATE),
137+
90)
138+
}
139+
140+
def assertOperationStatusIn(
141+
client: Iface,
142+
op: TOperationHandle,
143+
status: Set[TOperationState],
144+
timeoutInSeconds: Int): Unit = {
145+
eventually(timeout(timeoutInSeconds.seconds), interval(100.milliseconds)) {
146+
val state = client.GetOperationStatus(new TGetOperationStatusReq(op)).getOperationState
147+
assert(status.contains(state))
132148
}
133149
}
134150

0 commit comments

Comments
 (0)