Skip to content

Commit 53ef2d9

Browse files
authored
fix: Kotlin coroutine will not throw exception to outside when cancelling jobs (#59)
1 parent dbc9521 commit 53ef2d9

File tree

3 files changed

+83
-74
lines changed

3 files changed

+83
-74
lines changed

client/src/main/kotlin/io/hstream/impl/ConsumerKtImpl.kt

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,27 @@ class ConsumerKtImpl(
4242
private val executorService = Executors.newSingleThreadExecutor()
4343

4444
private suspend fun streamingFetchWithRetry(requestFlow: Flow<StreamingFetchRequest>) {
45+
// Note: A failed grpc call can throw both 'StatusException' and 'StatusRuntimeException'.
46+
// This function is for handling them.
47+
suspend fun handleGRPCException(e: Throwable) {
48+
logger.error("streamingFetch error:", e)
49+
val status = Status.fromThrowable(e)
50+
// WARNING: Use status.code to make comparison because 'Status' contains
51+
// extra information which varies from objects to objects.
52+
53+
// 'status example': Status{code=UNAVAILABLE, description=Connection closed
54+
// after GOAWAY. HTTP/2 error code: NO_ERROR, debug data:
55+
// Server shutdown, cause=null}
56+
// 'Status.UNAVAILABLE': Status{code=UNAVAILABLE, description=null, cause=null}
57+
if (status.code == Status.UNAVAILABLE.code) {
58+
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
59+
refreshServerUrl()
60+
streamingFetchWithRetry(requestFlow)
61+
} else {
62+
notifyFailed(HStreamDBClientException(e))
63+
}
64+
}
65+
4566
if (!isRunning) return
4667
check(serverUrl != null)
4768
val stub = HStreamApiGrpcKt.HStreamApiCoroutineStub(HStreamClientKtImpl.channelProvider.get(serverUrl))
@@ -65,29 +86,10 @@ class ConsumerKtImpl(
6586
}
6687
}
6788
}
68-
} catch (e: Exception) {
69-
// Note: a failed grpc call can throw both 'StatusException' and 'StatusRuntimeException'.
70-
when (e) {
71-
is StatusException, is StatusRuntimeException -> {
72-
logger.error("streamingFetch error:", e)
73-
val status = Status.fromThrowable(e)
74-
// WARNING: Use status.code to make comparison because 'Status' contains
75-
// extra information which varies from objects to objects.
76-
77-
// 'status example': Status{code=UNAVAILABLE, description=Connection closed
78-
// after GOAWAY. HTTP/2 error code: NO_ERROR, debug data:
79-
// Server shutdown, cause=null}
80-
// 'Status.UNAVAILABLE': Status{code=UNAVAILABLE, description=null, cause=null}
81-
if (status.code == Status.UNAVAILABLE.code) {
82-
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
83-
refreshServerUrl()
84-
streamingFetchWithRetry(requestFlow)
85-
} else {
86-
notifyFailed(HStreamDBClientException(e))
87-
}
88-
}
89-
else -> throw e
90-
}
89+
} catch (e: StatusException) {
90+
handleGRPCException(e)
91+
} catch (e: StatusRuntimeException) {
92+
handleGRPCException(e)
9193
}
9294
}
9395

client/src/main/kotlin/io/hstream/impl/ProducerKtImpl.kt

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ class ProducerKtImpl(
127127
}
128128

129129
private suspend fun appendWithRetry(appendRequest: AppendRequest, tryTimes: Int): List<RecordId> {
130+
// Note: A failed grpc call can throw both 'StatusException' and 'StatusRuntimeException'.
131+
// This function is for handling them.
132+
suspend fun handleGRPCException(serverUrl: String, e: Throwable): List<RecordId> {
133+
logger.error("append with serverUrl [{}] error", serverUrl, e)
134+
val status = Status.fromThrowable(e)
135+
if (status.code == Status.UNAVAILABLE.code && tryTimes > 1) {
136+
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
137+
refreshServerUrl()
138+
return appendWithRetry(appendRequest, tryTimes - 1)
139+
} else {
140+
throw HStreamDBClientException(e)
141+
}
142+
}
143+
130144
check(tryTimes > 0)
131145
var serverUrl = serverUrlRef.get()
132146
if (serverUrl == null) {
@@ -138,22 +152,10 @@ class ProducerKtImpl(
138152
try {
139153
return HStreamApiGrpcKt.HStreamApiCoroutineStub(HStreamClientKtImpl.channelProvider.get(serverUrl))
140154
.append(appendRequest).recordIdsList.map(GrpcUtils::recordIdFromGrpc)
141-
} catch (e: Exception) {
142-
// Note: a failed grpc call can throw both 'StatusException' and 'StatusRuntimeException'.
143-
when (e) {
144-
is StatusException, is StatusRuntimeException -> {
145-
logger.error("append with serverUrl [{}] error", serverUrl, e)
146-
val status = Status.fromThrowable(e)
147-
if (status.code == Status.UNAVAILABLE.code && tryTimes > 1) {
148-
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
149-
refreshServerUrl()
150-
return appendWithRetry(appendRequest, tryTimes - 1)
151-
} else {
152-
throw HStreamDBClientException(e)
153-
}
154-
}
155-
else -> throw e
156-
}
155+
} catch (e: StatusException) {
156+
return handleGRPCException(serverUrl, e)
157+
} catch (e: StatusRuntimeException) {
158+
return handleGRPCException(serverUrl, e)
157159
}
158160
}
159161

client/src/main/kotlin/io/hstream/impl/Utils.kt

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,32 +23,34 @@ import kotlin.coroutines.CoroutineContext
2323
val logger: Logger = LoggerFactory.getLogger("io.hstream.impl.Utils")
2424

2525
suspend fun <Resp> unaryCallWithCurrentUrlsCoroutine(serverUrls: List<String>, channelProvider: ChannelProvider, call: suspend (stub: HStreamApiCoroutineStub) -> Resp): Resp {
26+
// Note: A failed grpc call can throw both 'StatusException' and 'StatusRuntimeException'.
27+
// This function is for handling them.
28+
suspend fun handleGRPCException(i: Int, e: Throwable) {
29+
logger.error("call unary rpc with url [{}] error", serverUrls[i], e)
30+
val status = Status.fromThrowable(e)
31+
if (status.code == Status.UNAVAILABLE.code) {
32+
if (i == serverUrls.size - 1) {
33+
throw HStreamDBClientException(e)
34+
} else {
35+
logger.info("unary rpc will be retried with url [{}]", serverUrls[i + 1])
36+
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
37+
return
38+
}
39+
} else {
40+
throw HStreamDBClientException(e)
41+
}
42+
}
43+
2644
check(serverUrls.isNotEmpty())
2745
logger.debug("call unaryCallWithCurrentUrlsCoroutine with urls [{}]", serverUrls)
2846
for (i in serverUrls.indices) {
2947
val stub = HStreamApiCoroutineStub(channelProvider.get(serverUrls[i]))
3048
try {
3149
return call(stub)
32-
} catch (e: Exception) {
33-
// Note: a failed grpc call can throw both 'StatusException' and 'StatusRuntimeException'.
34-
when (e) {
35-
is StatusException, is StatusRuntimeException -> {
36-
logger.error("call unary rpc with url [{}] error", serverUrls[i], e)
37-
val status = Status.fromThrowable(e)
38-
if (status.code == Status.UNAVAILABLE.code) {
39-
if (i == serverUrls.size - 1) {
40-
throw HStreamDBClientException(e)
41-
} else {
42-
logger.info("unary rpc will be retried with url [{}]", serverUrls[i + 1])
43-
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
44-
continue
45-
}
46-
} else {
47-
throw HStreamDBClientException(e)
48-
}
49-
}
50-
else -> throw e
51-
}
50+
} catch (e: StatusException) {
51+
handleGRPCException(i, e)
52+
} catch (e: StatusRuntimeException) {
53+
handleGRPCException(i, e)
5254
}
5355
}
5456

@@ -72,28 +74,31 @@ suspend fun refreshClusterInfo(serverUrls: List<String>, channelProvider: Channe
7274
}
7375

7476
suspend fun <Resp> unaryCallCoroutine(urlsRef: AtomicReference<List<String>>, channelProvider: ChannelProvider, call: suspend (stub: HStreamApiCoroutineStub) -> Resp): Resp {
77+
// Note: A failed grpc call can throw both 'StatusException' and 'StatusRuntimeException'.
78+
// This function is for handling them.
79+
suspend fun handleGRPCException(urls: List<String>, e: Throwable): Resp {
80+
logger.error("unary rpc error with url [{}]", urls[0], e)
81+
val status = Status.fromThrowable(e)
82+
if (status.code == Status.UNAVAILABLE.code && urls.size > 1) {
83+
val newServerUrls = refreshClusterInfo(urls.subList(1, urls.size), channelProvider)
84+
urlsRef.set(newServerUrls)
85+
return unaryCallWithCurrentUrlsCoroutine(urlsRef.get(), channelProvider, call)
86+
} else {
87+
throw HStreamDBClientException(e)
88+
}
89+
}
90+
7591
val urls = urlsRef.get()
7692
check(urls.isNotEmpty())
7793

7894
logger.debug("unary rpc with urls [{}]", urls)
7995

8096
try {
8197
return call(HStreamApiCoroutineStub(channelProvider.get(urls[0])))
82-
} catch (e: Exception) {
83-
when (e) {
84-
is StatusException, is StatusRuntimeException -> {
85-
logger.error("unary rpc error with url [{}]", urls[0], e)
86-
val status = Status.fromThrowable(e)
87-
if (status.code == Status.UNAVAILABLE.code && urls.size > 1) {
88-
val newServerUrls = refreshClusterInfo(urls.subList(1, urls.size), channelProvider)
89-
urlsRef.set(newServerUrls)
90-
return unaryCallWithCurrentUrlsCoroutine(urlsRef.get(), channelProvider, call)
91-
} else {
92-
throw HStreamDBClientException(e)
93-
}
94-
}
95-
else -> throw e
96-
}
98+
} catch (e: StatusException) {
99+
return handleGRPCException(urls, e)
100+
} catch (e: StatusRuntimeException) {
101+
return handleGRPCException(urls, e)
97102
}
98103
}
99104

0 commit comments

Comments
 (0)