Skip to content

Commit 69038f3

Browse files
authored
add default timeout for grpc call (#160)
1 parent 5e61fed commit 69038f3

File tree

5 files changed

+12
-4
lines changed

5 files changed

+12
-4
lines changed

client/src/main/java/io/hstream/impl/DefaultSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ public class DefaultSettings {
77
public static final int APPEND_RETRY_MAX_TIMES = 5;
88

99
public static final String DEFAULT_PARTITION_KEY = "";
10+
11+
public static final long GRPC_CALL_TIMEOUT_SECONDS = 5;
1012
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import io.hstream.util.GrpcUtils
3535
import kotlinx.coroutines.runBlocking
3636
import org.slf4j.LoggerFactory
3737
import java.util.concurrent.CompletableFuture
38+
import java.util.concurrent.TimeUnit
3839
import java.util.concurrent.atomic.AtomicReference
3940
import kotlin.streams.toList
4041

@@ -62,6 +63,10 @@ class HStreamClientKtImpl(bootstrapServerUrls: List<String>, credentials: Channe
6263
return HStreamApiGrpcKt.HStreamApiCoroutineStub(channelProvider.get(url))
6364
}
6465

66+
fun getCoroutineStubWithTimeout(url: String, timeoutSeconds: Long): HStreamApiGrpcKt.HStreamApiCoroutineStub {
67+
return HStreamApiGrpcKt.HStreamApiCoroutineStub(channelProvider.get(url)).withDeadlineAfter(timeoutSeconds, TimeUnit.SECONDS)
68+
}
69+
6570
init {
6671
logger.info("client init with bootstrapServerUrls [{}]", bootstrapServerUrls)
6772
val describeClusterResponse = unaryCallWithCurrentUrls(

client/src/main/kotlin/io/hstream/impl/ProducerKtImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val s
137137
val serverUrl = lookupServerUrl(shardId, forceUpdate)
138138
logger.debug("try append with serverUrl [{}], current left tryTimes is [{}]", serverUrl, tryTimes)
139139
return try {
140-
client.getCoroutineStub(serverUrl)
140+
client.getCoroutineStubWithTimeout(serverUrl, DefaultSettings.GRPC_CALL_TIMEOUT_SECONDS)
141141
.append(appendRequest).recordIdsList.map(GrpcUtils::recordIdFromGrpc)
142142
} catch (e: StatusException) {
143143
handleGRPCException(serverUrl, e)

client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ReaderKtImpl(
5050
readerScope.launch {
5151
try {
5252
val readShardRequest = ReadShardRequest.newBuilder().setReaderId(readerId).setMaxRecords(maxRecords).build()
53-
val readShardResponse = client.getCoroutineStub(serverUrl)
53+
val readShardResponse = client.getCoroutineStubWithTimeout(serverUrl, DefaultSettings.GRPC_CALL_TIMEOUT_SECONDS)
5454
.readShard(readShardRequest)
5555
val res = readShardResponse.receivedRecordsList.flatMap {
5656
RecordUtils.decompress(it).map { receivedHStreamRecord ->
@@ -82,7 +82,7 @@ class ReaderKtImpl(
8282
.setReaderId(readerId)
8383
.build()
8484
runBlocking {
85-
client.getCoroutineStub(serverUrl).deleteShardReader(deleteShardReaderRequest)
85+
client.getCoroutineStubWithTimeout(serverUrl, DefaultSettings.GRPC_CALL_TIMEOUT_SECONDS).deleteShardReader(deleteShardReaderRequest)
8686
}
8787

8888
logger.info("Reader [{}] closed", readerId)

client/src/main/kotlin/io/hstream/impl/Utils.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import kotlinx.coroutines.future.future
1717
import org.slf4j.Logger
1818
import org.slf4j.LoggerFactory
1919
import java.util.concurrent.CompletableFuture
20+
import java.util.concurrent.TimeUnit
2021
import java.util.concurrent.atomic.AtomicReference
2122
import kotlin.coroutines.CoroutineContext
2223

@@ -94,7 +95,7 @@ suspend fun <Resp> unaryCallCoroutine(urlsRef: AtomicReference<List<String>>, ch
9495
logger.debug("unary rpc with urls [{}]", urls)
9596

9697
try {
97-
return call(HStreamApiCoroutineStub(channelProvider.get(urls[0])))
98+
return call(HStreamApiCoroutineStub(channelProvider.get(urls[0])).withDeadlineAfter(DefaultSettings.GRPC_CALL_TIMEOUT_SECONDS, TimeUnit.SECONDS))
9899
} catch (e: StatusException) {
99100
return handleGRPCException(urls, e)
100101
} catch (e: StatusRuntimeException) {

0 commit comments

Comments
 (0)