Skip to content

Commit b3c7fd6

Browse files
authored
Merge pull request #371 from alex268/add_retry_handler
Added simple handler for topic's internal retryable streams
2 parents bc59ac1 + eabb8fb commit b3c7fd6

File tree

6 files changed

+42
-7
lines changed

6 files changed

+42
-7
lines changed

table/src/test/java/tech/ydb/table/values/DecimalValueTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,6 @@ public void allTypeInfiniteAndNan() {
192192
BigDecimal scaledInf = new BigDecimal(inf, scale);
193193
BigDecimal scaledNan = new BigDecimal(nan, scale);
194194

195-
System.out.println("Nan for " + scaled + " -> " + scaledNan);
196-
197195
assertIsInf(scaled.newValue(scaledInf));
198196
assertIsNegInf(scaled.newValue(scaledInf.negate()));
199197

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.concurrent.TimeUnit;
99
import java.util.concurrent.atomic.AtomicBoolean;
1010
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.function.BiConsumer;
1112

1213
import org.slf4j.Logger;
1314

@@ -29,12 +30,15 @@ public abstract class GrpcStreamRetrier {
2930
protected final String id;
3031
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
3132
protected final AtomicBoolean isStopped = new AtomicBoolean(false);
32-
private final ScheduledExecutorService scheduler;
3333
protected final AtomicInteger reconnectCounter = new AtomicInteger(0);
3434

35-
protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
35+
private final ScheduledExecutorService scheduler;
36+
private final BiConsumer<Status, Throwable> errorsHandler;
37+
38+
protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer<Status, Throwable> errorsHandler) {
3639
this.scheduler = scheduler;
3740
this.id = generateRandomId(ID_LENGTH);
41+
this.errorsHandler = errorsHandler;
3842
}
3943

4044
protected abstract Logger getLogger();
@@ -127,6 +131,10 @@ protected void onSessionClosed(Status status, Throwable th) {
127131
}
128132
}
129133

134+
if (errorsHandler != null) {
135+
errorsHandler.accept(status, th);
136+
}
137+
130138
if (!isStopped.get()) {
131139
tryScheduleReconnect();
132140
} else {

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
5555
private final String consumerName;
5656

5757
public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
58-
super(topicRpc.getScheduler());
58+
super(topicRpc.getScheduler(), settings.getErrorsHandler());
5959
this.topicRpc = topicRpc;
6060
this.settings = settings;
6161
this.session = new ReadSessionImpl();

topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
import java.util.ArrayList;
44
import java.util.List;
55
import java.util.concurrent.Executor;
6+
import java.util.function.BiConsumer;
67

78
import javax.annotation.Nullable;
89

910
import com.google.common.collect.ImmutableList;
1011

12+
import tech.ydb.core.Status;
13+
1114
/**
1215
* @author Nikolay Perfilov
1316
*/
@@ -19,13 +22,15 @@ public class ReaderSettings {
1922
private final List<TopicReadSettings> topics;
2023
private final long maxMemoryUsageBytes;
2124
private final Executor decompressionExecutor;
25+
private final BiConsumer<Status, Throwable> errorsHandler;
2226

2327
private ReaderSettings(Builder builder) {
2428
this.consumerName = builder.consumerName;
2529
this.readerName = builder.readerName;
2630
this.topics = ImmutableList.copyOf(builder.topics);
2731
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
2832
this.decompressionExecutor = builder.decompressionExecutor;
33+
this.errorsHandler = builder.errorsHandler;
2934
}
3035

3136
public String getConsumerName() {
@@ -41,6 +46,10 @@ public List<TopicReadSettings> getTopics() {
4146
return topics;
4247
}
4348

49+
public BiConsumer<Status, Throwable> getErrorsHandler() {
50+
return errorsHandler;
51+
}
52+
4453
public long getMaxMemoryUsageBytes() {
4554
return maxMemoryUsageBytes;
4655
}
@@ -63,6 +72,7 @@ public static class Builder {
6372
private List<TopicReadSettings> topics = new ArrayList<>();
6473
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
6574
private Executor decompressionExecutor = null;
75+
private BiConsumer<Status, Throwable> errorsHandler = null;
6676

6777
public Builder setConsumerName(String consumerName) {
6878
this.consumerName = consumerName;
@@ -103,6 +113,11 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
103113
return this;
104114
}
105115

116+
public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
117+
this.errorsHandler = handler;
118+
return this;
119+
}
120+
106121
/**
107122
* Set executor for decompression tasks.
108123
* If not set, default executor will be used.

topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package tech.ydb.topic.settings;
22

3+
import java.util.function.BiConsumer;
4+
5+
import tech.ydb.core.Status;
36
import tech.ydb.topic.description.Codec;
47

58
/**
@@ -16,6 +19,7 @@ public class WriterSettings {
1619
private final Codec codec;
1720
private final long maxSendBufferMemorySize;
1821
private final int maxSendBufferMessagesCount;
22+
private final BiConsumer<Status, Throwable> errorsHandler;
1923

2024
private WriterSettings(Builder builder) {
2125
this.topicPath = builder.topicPath;
@@ -25,6 +29,7 @@ private WriterSettings(Builder builder) {
2529
this.codec = builder.codec;
2630
this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize;
2731
this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount;
32+
this.errorsHandler = builder.errorsHandler;
2833
}
2934

3035
public static Builder newBuilder() {
@@ -43,6 +48,10 @@ public String getMessageGroupId() {
4348
return messageGroupId;
4449
}
4550

51+
public BiConsumer<Status, Throwable> getErrorsHandler() {
52+
return errorsHandler;
53+
}
54+
4655
public Long getPartitionId() {
4756
return partitionId;
4857
}
@@ -70,6 +79,7 @@ public static class Builder {
7079
private Codec codec = Codec.GZIP;
7180
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
7281
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
82+
private BiConsumer<Status, Throwable> errorsHandler = null;
7383

7484
/**
7585
* Set path to a topic to write to
@@ -148,9 +158,13 @@ public Builder setMaxSendBufferMessagesCount(int maxMessagesCount) {
148158
return this;
149159
}
150160

161+
public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
162+
this.errorsHandler = handler;
163+
return this;
164+
}
165+
151166
public WriterSettings build() {
152167
return new WriterSettings(this);
153168
}
154-
155169
}
156170
}

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
6666
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;
6767

6868
public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
69-
super(topicRpc.getScheduler());
69+
super(topicRpc.getScheduler(), settings.getErrorsHandler());
7070
this.topicRpc = topicRpc;
7171
this.settings = settings;
7272
this.session = new WriteSessionImpl();

0 commit comments

Comments
 (0)