Skip to content

Commit d3bf1f7

Browse files
committed
Added RetryConfig to configure retries behavior
1 parent 767fda0 commit d3bf1f7

File tree

5 files changed

+44
-3
lines changed

5 files changed

+44
-3
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.slf4j.Logger;
1313

14+
import tech.ydb.common.retry.RetryConfig;
1415
import tech.ydb.core.Status;
1516

1617
/**
@@ -26,13 +27,15 @@ public abstract class GrpcStreamRetrier {
2627
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
2728
.toCharArray();
2829

30+
private final RetryConfig retryConfig;
2931
protected final String id;
3032
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
3133
protected final AtomicBoolean isStopped = new AtomicBoolean(false);
3234
private final ScheduledExecutorService scheduler;
3335
protected final AtomicInteger reconnectCounter = new AtomicInteger(0);
3436

35-
protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
37+
protected GrpcStreamRetrier(RetryConfig retryConfig, ScheduledExecutorService scheduler) {
38+
this.retryConfig = retryConfig;
3639
this.scheduler = scheduler;
3740
this.id = generateRandomId(ID_LENGTH);
3841
}

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
5555
private final String consumerName;
5656

5757
public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
58-
super(topicRpc.getScheduler());
58+
super(settings.getRetryConfig(), topicRpc.getScheduler());
5959
this.topicRpc = topicRpc;
6060
this.settings = settings;
6161
this.session = new ReadSessionImpl();

topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import com.google.common.collect.ImmutableList;
1010

11+
import tech.ydb.common.retry.RetryConfig;
12+
1113
/**
1214
* @author Nikolay Perfilov
1315
*/
@@ -17,13 +19,15 @@ public class ReaderSettings {
1719
private final String consumerName;
1820
private final String readerName;
1921
private final List<TopicReadSettings> topics;
22+
private final RetryConfig retryConfig;
2023
private final long maxMemoryUsageBytes;
2124
private final Executor decompressionExecutor;
2225

2326
private ReaderSettings(Builder builder) {
2427
this.consumerName = builder.consumerName;
2528
this.readerName = builder.readerName;
2629
this.topics = ImmutableList.copyOf(builder.topics);
30+
this.retryConfig = builder.retryConfig;
2731
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
2832
this.decompressionExecutor = builder.decompressionExecutor;
2933
}
@@ -37,6 +41,10 @@ public String getReaderName() {
3741
return readerName;
3842
}
3943

44+
public RetryConfig getRetryConfig() {
45+
return retryConfig;
46+
}
47+
4048
public List<TopicReadSettings> getTopics() {
4149
return topics;
4250
}
@@ -61,6 +69,7 @@ public static class Builder {
6169
private boolean readWithoutConsumer = false;
6270
private String readerName = null;
6371
private List<TopicReadSettings> topics = new ArrayList<>();
72+
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
6473
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
6574
private Executor decompressionExecutor = null;
6675

@@ -81,6 +90,7 @@ public Builder withoutConsumer() {
8190

8291
/**
8392
* Set reader name for debug purposes
93+
* @param readerName name of reader
8494
* @return settings builder
8595
*/
8696
public Builder setReaderName(String readerName) {
@@ -98,6 +108,16 @@ public Builder setTopics(List<TopicReadSettings> topics) {
98108
return this;
99109
}
100110

111+
/**
112+
* Set {@link RetryConfig} to define behavior of the stream internal retries
113+
* @param config retry mode
114+
* @return settings builder
115+
*/
116+
public Builder setRetryConfig(RetryConfig config) {
117+
this.retryConfig = config;
118+
return this;
119+
}
120+
101121
public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
102122
this.maxMemoryUsageBytes = maxMemoryUsageBytes;
103123
return this;

topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.topic.settings;
22

3+
import tech.ydb.common.retry.RetryConfig;
34
import tech.ydb.topic.description.Codec;
45

56
/**
@@ -14,6 +15,7 @@ public class WriterSettings {
1415
private final String messageGroupId;
1516
private final Long partitionId;
1617
private final Codec codec;
18+
private final RetryConfig retryConfig;
1719
private final long maxSendBufferMemorySize;
1820
private final int maxSendBufferMessagesCount;
1921

@@ -23,6 +25,7 @@ private WriterSettings(Builder builder) {
2325
this.messageGroupId = builder.messageGroupId;
2426
this.partitionId = builder.partitionId;
2527
this.codec = builder.codec;
28+
this.retryConfig = builder.retryConfig;
2629
this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize;
2730
this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount;
2831
}
@@ -51,6 +54,10 @@ public Codec getCodec() {
5154
return codec;
5255
}
5356

57+
public RetryConfig getRetryConfig() {
58+
return retryConfig;
59+
}
60+
5461
public long getMaxSendBufferMemorySize() {
5562
return maxSendBufferMemorySize;
5663
}
@@ -68,6 +75,7 @@ public static class Builder {
6875
private String messageGroupId = null;
6976
private Long partitionId = null;
7077
private Codec codec = Codec.GZIP;
78+
private RetryConfig retryConfig = RetryConfig.idempotentRetryForever();
7179
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
7280
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
7381

@@ -125,6 +133,16 @@ public Builder setCodec(Codec codec) {
125133
return this;
126134
}
127135

136+
/**
137+
* Set {@link RetryConfig} to define behavior of the stream internal retries
138+
* @param config retry mode
139+
* @return settings builder
140+
*/
141+
public Builder setRetryConfig(RetryConfig config) {
142+
this.retryConfig = config;
143+
return this;
144+
}
145+
128146
/**
129147
* Set memory usage limit for send buffer.
130148
* Writer will not accept new messages if memory usage exceeds this limit.

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
6666
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;
6767

6868
public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
69-
super(topicRpc.getScheduler());
69+
super(settings.getRetryConfig(), topicRpc.getScheduler());
7070
this.topicRpc = topicRpc;
7171
this.settings = settings;
7272
this.session = new WriteSessionImpl();

0 commit comments

Comments
 (0)