Skip to content

Commit 1f237bd

Browse files
authored
add requestTimeoutMs for Client, Producer, Reader (#166)
1 parent 3361e2b commit 1f237bd

14 files changed

+97
-24
lines changed

client/src/main/java/io/hstream/BufferedProducerBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,7 @@ public interface BufferedProducerBuilder {
3131
*/
3232
BufferedProducerBuilder compressionType(CompressionType compressionType);
3333

34+
BufferedProducerBuilder requestTimeoutMs(long timeoutMs);
35+
3436
BufferedProducer build();
3537
}

client/src/main/java/io/hstream/HStreamClientBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,7 @@ public interface HStreamClientBuilder {
5151
*/
5252
HStreamClientBuilder tlsCertPath(String certPath);
5353

54+
HStreamClientBuilder requestTimeoutMs(long timeoutMs);
55+
5456
HStreamClient build();
5557
}

client/src/main/java/io/hstream/ProducerBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,7 @@ public interface ProducerBuilder {
55

66
ProducerBuilder stream(String streamName);
77

8+
ProducerBuilder requestTimeoutMs(long timeoutMs);
9+
810
Producer build();
911
}

client/src/main/java/io/hstream/ReaderBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@ public interface ReaderBuilder {
1313

1414
ReaderBuilder readerId(String readerId);
1515

16+
ReaderBuilder requestTimeoutMs(long timeoutMs);
17+
1618
Reader build();
1719
}

client/src/main/java/io/hstream/impl/BufferedProducerBuilderImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.hstream.impl;
22

3+
import com.google.common.base.Preconditions;
34
import io.hstream.BatchSetting;
45
import io.hstream.BufferedProducer;
56
import io.hstream.BufferedProducerBuilder;
@@ -15,6 +16,8 @@ public class BufferedProducerBuilderImpl implements BufferedProducerBuilder {
1516
private FlowControlSetting flowControlSetting = FlowControlSetting.newBuilder().build();
1617
private CompressionType compressionType = CompressionType.NONE;
1718

19+
private long requestTimeoutMs = DefaultSettings.GRPC_CALL_TIMEOUT_MS;
20+
1821
public BufferedProducerBuilderImpl(HStreamClientKtImpl client) {
1922
this.client = client;
2023
}
@@ -43,6 +46,12 @@ public BufferedProducerBuilder compressionType(CompressionType compressionType)
4346
return this;
4447
}
4548

49+
@Override
50+
public BufferedProducerBuilder requestTimeoutMs(long timeoutMs) {
51+
this.requestTimeoutMs = timeoutMs;
52+
return this;
53+
}
54+
4655
@Override
4756
public BufferedProducer build() {
4857
if (streamName == null) {
@@ -56,7 +65,8 @@ public BufferedProducer build() {
5665
"BatchSetting.ageLimit:[%d] should not be greater than flowControlSetting.bytesLimit:[%d]",
5766
batchBytes, flowBytes));
5867
}
68+
Preconditions.checkArgument(requestTimeoutMs > 0);
5969
return new BufferedProducerKtImpl(
60-
client, streamName, batchSetting, flowControlSetting, compressionType);
70+
client, streamName, requestTimeoutMs, batchSetting, flowControlSetting, compressionType);
6171
}
6272
}

client/src/main/java/io/hstream/impl/DefaultSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ public class DefaultSettings {
88

99
public static final String DEFAULT_PARTITION_KEY = "";
1010

11-
public static final long GRPC_CALL_TIMEOUT_SECONDS = 5;
11+
public static final long GRPC_CALL_TIMEOUT_MS = 5000;
1212
}

client/src/main/java/io/hstream/impl/HStreamClientBuilderImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public class HStreamClientBuilderImpl implements HStreamClientBuilder {
2323
private String keyPath;
2424
private String certPath;
2525

26+
private long requestTimeoutMs = DefaultSettings.GRPC_CALL_TIMEOUT_MS;
27+
2628
@Override
2729
public HStreamClientBuilder serviceUrl(String serviceUrl) {
2830
this.serviceUrl = serviceUrl;
@@ -59,9 +61,16 @@ public HStreamClientBuilder tlsCertPath(String certPath) {
5961
return this;
6062
}
6163

64+
@Override
65+
public HStreamClientBuilder requestTimeoutMs(long timeoutMs) {
66+
this.requestTimeoutMs = timeoutMs;
67+
return this;
68+
}
69+
6270
@Override
6371
public HStreamClient build() {
6472
checkNotNull(serviceUrl);
73+
checkArgument(requestTimeoutMs > 0);
6574
Pair<UrlSchema, List<String>> schemaHosts = parseServerUrls(serviceUrl);
6675
// FIXME: remove enableTls option
6776
if (schemaHosts.getKey().equals(UrlSchema.HSTREAMS) && !enableTls) {
@@ -74,12 +83,13 @@ public HStreamClient build() {
7483
if (enableTlsAuthentication) {
7584
credentialsBuilder = credentialsBuilder.keyManager(new File(certPath), new File(keyPath));
7685
}
77-
return new HStreamClientKtImpl(schemaHosts.getRight(), credentialsBuilder.build());
86+
return new HStreamClientKtImpl(
87+
schemaHosts.getRight(), requestTimeoutMs, credentialsBuilder.build());
7888
} catch (IOException e) {
7989
throw new HStreamDBClientException(String.format("invalid tls options, %s", e));
8090
}
8191
}
82-
return new HStreamClientKtImpl(schemaHosts.getRight(), null);
92+
return new HStreamClientKtImpl(schemaHosts.getRight(), requestTimeoutMs, null);
8393
}
8494

8595
private Pair<UrlSchema, List<String>> parseServerUrls(String url) {

client/src/main/java/io/hstream/impl/ProducerBuilderImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
99

1010
private final HStreamClientKtImpl client;
1111
private String streamName;
12+
private long requestTimeoutMs = DefaultSettings.GRPC_CALL_TIMEOUT_MS;
1213

1314
public ProducerBuilderImpl(HStreamClientKtImpl client) {
1415
this.client = client;
@@ -20,9 +21,16 @@ public ProducerBuilder stream(String streamName) {
2021
return this;
2122
}
2223

24+
@Override
25+
public ProducerBuilder requestTimeoutMs(long timeoutMs) {
26+
this.requestTimeoutMs = timeoutMs;
27+
return this;
28+
}
29+
2330
@Override
2431
public Producer build() {
2532
checkNotNull(streamName);
26-
return new ProducerKtImpl(client, streamName);
33+
checkArgument(requestTimeoutMs > 0);
34+
return new ProducerKtImpl(client, streamName, requestTimeoutMs);
2735
}
2836
}

client/src/main/java/io/hstream/impl/ReaderBuilderImpl.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.hstream.impl;
22

3-
import static com.google.common.base.Preconditions.checkNotNull;
4-
import static com.google.common.base.Preconditions.checkState;
3+
import static com.google.common.base.Preconditions.*;
54

65
import io.hstream.Reader;
76
import io.hstream.ReaderBuilder;
@@ -19,6 +18,8 @@ public class ReaderBuilderImpl implements ReaderBuilder {
1918
private int timeoutMs = 0;
2019
private String readerId = UUID.randomUUID().toString();
2120

21+
private long requestTimeoutMs = DefaultSettings.GRPC_CALL_TIMEOUT_MS;
22+
2223
public ReaderBuilderImpl(HStreamClientKtImpl client) {
2324
this.client = client;
2425
}
@@ -53,6 +54,12 @@ public ReaderBuilder readerId(String readerId) {
5354
return this;
5455
}
5556

57+
@Override
58+
public ReaderBuilder requestTimeoutMs(long timeoutMs) {
59+
this.requestTimeoutMs = timeoutMs;
60+
return this;
61+
}
62+
5663
@Override
5764
public Reader build() {
5865
checkNotNull(client);
@@ -61,6 +68,8 @@ public Reader build() {
6168
checkNotNull(shardOffset);
6269
checkState(timeoutMs >= 0);
6370
checkNotNull(readerId);
64-
return new ReaderKtImpl(client, streamName, shardId, shardOffset, timeoutMs, readerId);
71+
checkArgument(requestTimeoutMs > 0);
72+
return new ReaderKtImpl(
73+
client, streamName, shardId, shardOffset, timeoutMs, readerId, requestTimeoutMs);
6574
}
6675
}

client/src/main/kotlin/io/hstream/impl/BufferedProducerKtImpl.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ typealias Futures = MutableList<CompletableFuture<String>>
2626
class BufferedProducerKtImpl(
2727
client: HStreamClientKtImpl,
2828
stream: String,
29+
requestTimeoutMs: Long,
2930
private val batchSetting: BatchSetting,
3031
private val flowControlSetting: FlowControlSetting,
3132
private val compressionType: CompressionType,
32-
) : ProducerKtImpl(client, stream), BufferedProducer {
33+
) : ProducerKtImpl(client, stream, requestTimeoutMs), BufferedProducer {
3334
private var lock = ReentrantLock()
3435
private var shardAppendBuffer: HashMap<Long, Records> = HashMap()
3536
private var shardAppendFutures: HashMap<Long, Futures> = HashMap()

0 commit comments

Comments
 (0)