Skip to content

Commit fd595e6

Browse files
authored
fix error when producer write failed (#36)
1 parent 3a82119 commit fd595e6

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

client/src/main/java/io/hstream/impl/ProducerImpl.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,18 @@ private void flush() {
8686
logger.info("start flush recordBuffer, current buffer size is: {}", recordBufferCount);
8787

8888
writeHStreamRecords(recordBuffer)
89-
.thenAccept(
90-
recordIds -> {
91-
for (int i = 0; i < recordIds.size(); ++i) {
92-
futures.get(i).complete(recordIds.get(i));
89+
.handle(
90+
(recordIds, exception) -> {
91+
if (exception == null) {
92+
for (int i = 0; i < recordIds.size(); ++i) {
93+
futures.get(i).complete(recordIds.get(i));
94+
}
95+
} else {
96+
for (int i = 0; i < futures.size(); ++i) {
97+
futures.get(i).completeExceptionally(exception);
98+
}
9399
}
100+
return null;
94101
})
95102
.join();
96103

@@ -125,7 +132,8 @@ public void onNext(AppendResponse appendResponse) {
125132

126133
@Override
127134
public void onError(Throwable t) {
128-
throw new HStreamDBClientException(t);
135+
logger.warn("write records error: ", t);
136+
completableFuture.completeExceptionally(new HStreamDBClientException(t));
129137
}
130138

131139
@Override

0 commit comments

Comments
 (0)