Skip to content

Commit 8f0eec2

Browse files
authored
fix: unit of requestTimeoutMs (#203)
1 parent 541959b commit 8f0eec2

File tree

3 files changed

+5
-5
lines changed

3 files changed

+5
-5
lines changed

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ class HStreamClientKtImpl(
113113
return HStreamApiGrpcKt.HStreamApiCoroutineStub(channelProvider.get(url))
114114
}
115115

116-
fun getCoroutineStubWithTimeout(url: String, timeoutSeconds: Long = requestTimeoutMs): HStreamApiGrpcKt.HStreamApiCoroutineStub {
117-
return HStreamApiGrpcKt.HStreamApiCoroutineStub(channelProvider.get(url)).withDeadlineAfter(timeoutSeconds, TimeUnit.SECONDS)
116+
fun getCoroutineStubWithTimeoutMs(url: String, timeoutMs: Long = requestTimeoutMs): HStreamApiGrpcKt.HStreamApiCoroutineStub {
117+
return HStreamApiGrpcKt.HStreamApiCoroutineStub(channelProvider.get(url)).withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS)
118118
}
119119

120120
init {

client/src/main/kotlin/io/hstream/impl/ProducerKtImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ open class ProducerKtImpl(
141141
val serverUrl = lookupServerUrl(shardId, forceUpdate)
142142
logger.debug("try append with serverUrl [{}], current left tryTimes is [{}]", serverUrl, tryTimes)
143143
return try {
144-
client.getCoroutineStubWithTimeout(serverUrl, requestTimeoutMs)
144+
client.getCoroutineStubWithTimeoutMs(serverUrl, requestTimeoutMs)
145145
.append(appendRequest).recordIdsList.map(GrpcUtils::recordIdFromGrpc)
146146
} catch (e: StatusException) {
147147
handleGRPCException(serverUrl, e)

client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class ReaderKtImpl(
5151
readerScope.launch {
5252
try {
5353
val readShardRequest = ReadShardRequest.newBuilder().setReaderId(readerId).setMaxRecords(maxRecords).build()
54-
val readShardResponse = client.getCoroutineStubWithTimeout(serverUrl, requestTimeoutMs)
54+
val readShardResponse = client.getCoroutineStubWithTimeoutMs(serverUrl, requestTimeoutMs)
5555
.readShard(readShardRequest)
5656
val res = readShardResponse.receivedRecordsList.flatMap {
5757
RecordUtils.decompress(it).map { receivedHStreamRecord ->
@@ -83,7 +83,7 @@ class ReaderKtImpl(
8383
.setReaderId(readerId)
8484
.build()
8585
runBlocking {
86-
client.getCoroutineStubWithTimeout(serverUrl, requestTimeoutMs).deleteShardReader(deleteShardReaderRequest)
86+
client.getCoroutineStubWithTimeoutMs(serverUrl, requestTimeoutMs).deleteShardReader(deleteShardReaderRequest)
8787
}
8888

8989
logger.info("Reader [{}] closed", readerId)

0 commit comments

Comments
 (0)