Skip to content

Commit 0fdaf3d

Browse files
committed
fix calling consumer's receiver
* capture receiver's exception to prevent the thread from unexpectedly exiting
1 parent 8c2c727 commit 0fdaf3d

File tree

4 files changed

+103
-14
lines changed

4 files changed

+103
-14
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
args: '--set-exit-if-changed'
2323

2424
- name: Prepare test env
25-
run: script/prepare-test-env-latest.sh
25+
run: script/prepare-test-env.sh
2626

2727
- name: Set up JDK 11
2828
uses: actions/setup-java@v2

script/prepare-test-env.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
set -e
44

5-
DEFAULT_HSTREAM_DOCKER_TAG="v0.5.1.0"
5+
DEFAULT_HSTREAM_DOCKER_TAG="v0.5.2.0"
66

77
docker pull hstreamdb/hstream:${DEFAULT_HSTREAM_DOCKER_TAG} || true
88

src/main/java/io/hstream/impl/ConsumerImpl.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.util.concurrent.AbstractService;
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
55
import com.google.protobuf.InvalidProtocolBufferException;
6+
import io.grpc.StatusRuntimeException;
67
import io.grpc.stub.StreamObserver;
78
import io.hstream.*;
89
import io.hstream.util.RecordUtils;
@@ -92,31 +93,62 @@ public void onNext(SubscribeResponse response) {
9293

9394
executorService.submit(
9495
() -> {
95-
do {
96+
awaitRunning();
97+
while (isRunning()) {
9698
logger.info("start fetch and processing ...");
97-
FetchResponse fetchResponse = grpcBlockingStub.fetch(fetchRequest);
99+
FetchResponse fetchResponse;
100+
try {
101+
fetchResponse = grpcBlockingStub.fetch(fetchRequest);
102+
} catch (StatusRuntimeException e) {
103+
logger.error("fetch records error", e);
104+
throw new HStreamDBClientException.ConsumerException(
105+
"fetch records error", e);
106+
}
107+
98108
logger.info("fetched {} records", fetchResponse.getReceivedRecordsCount());
99109
for (ReceivedRecord receivedRecord : fetchResponse.getReceivedRecordsList()) {
100110
if (RecordUtils.isRawRecord(receivedRecord)) {
101111
logger.info("ready to process rawRecord");
102-
rawRecordReceiver.processRawRecord(
103-
toReceivedRawRecord(receivedRecord),
104-
new ResponderImpl(
105-
grpcBlockingStub, subscriptionId, receivedRecord.getRecordId()));
112+
try {
113+
rawRecordReceiver.processRawRecord(
114+
toReceivedRawRecord(receivedRecord),
115+
new ResponderImpl(
116+
grpcBlockingStub, subscriptionId, receivedRecord.getRecordId()));
117+
} catch (Exception e) {
118+
logger.error("process rawRecord error", e);
119+
}
106120
} else {
107121
logger.info("ready to process hrecord");
108-
hRecordReceiver.processHRecord(
109-
toReceivedHRecord(receivedRecord),
110-
new ResponderImpl(
111-
grpcBlockingStub, subscriptionId, receivedRecord.getRecordId()));
122+
try {
123+
hRecordReceiver.processHRecord(
124+
toReceivedHRecord(receivedRecord),
125+
new ResponderImpl(
126+
grpcBlockingStub, subscriptionId, receivedRecord.getRecordId()));
127+
128+
} catch (Exception e) {
129+
logger.error("process hrecord error", e);
130+
}
112131
}
113132
}
114133
logger.info("processed {} records", fetchResponse.getReceivedRecordsCount());
115-
} while (isRunning());
134+
}
116135
});
117136

118137
scheduledExecutorService.scheduleAtFixedRate(
119-
() -> grpcStub.sendConsumerHeartbeat(consumerHeartbeatRequest, heartbeatObserver),
138+
() -> {
139+
awaitRunning();
140+
if (!isRunning()) {
141+
return;
142+
}
143+
144+
try {
145+
grpcStub.sendConsumerHeartbeat(consumerHeartbeatRequest, heartbeatObserver);
146+
} catch (StatusRuntimeException e) {
147+
logger.error("send heartbeat error", e);
148+
throw new HStreamDBClientException.ConsumerException(
149+
"send heart beat error", e);
150+
}
151+
},
120152
0,
121153
1,
122154
TimeUnit.SECONDS);

src/test/java/io/hstream/HStreamClientTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,63 @@ public void cleanUp() {
4040
client.deleteStream(TEST_STREAM);
4141
}
4242

43+
// @Test
44+
// public void testReceiverException() throws Exception {
45+
// Consumer consumer =
46+
// client
47+
// .newConsumer()
48+
// .subscription(TEST_SUBSCRIPTION)
49+
// .rawRecordReceiver(
50+
// (receivedRawRecord, responder) -> {
51+
// throw new RuntimeException("receiver exception");
52+
// })
53+
// .build();
54+
// Service.Listener listener =
55+
// new Service.Listener() {
56+
// @Override
57+
// public void starting() {
58+
// super.starting();
59+
// }
60+
61+
// @Override
62+
// public void running() {
63+
// super.running();
64+
// }
65+
66+
// @Override
67+
// public void stopping(Service.State from) {
68+
// super.stopping(from);
69+
// }
70+
71+
// @Override
72+
// public void terminated(Service.State from) {
73+
// super.terminated(from);
74+
// }
75+
76+
// @Override
77+
// public void failed(Service.State from, Throwable failure) {
78+
// logger.error("consumer failed from state {} ", from, failure);
79+
// super.failed(from, failure);
80+
// // consumer.stopAsync();
81+
// }
82+
// };
83+
84+
// consumer.addListener(listener, Executors.newSingleThreadExecutor());
85+
// consumer.startAsync().awaitRunning();
86+
87+
// Producer producer = client.newProducer().stream(TEST_STREAM).build();
88+
// Random random = new Random();
89+
// byte[] rawRecord = new byte[100];
90+
// for (int i = 0; i < 5; ++i) {
91+
// Thread.sleep(5000);
92+
// random.nextBytes(rawRecord);
93+
// producer.write(rawRecord);
94+
// }
95+
96+
// consumer.awaitTerminated(1, TimeUnit.SECONDS);
97+
// consumer.stopAsync().awaitTerminated();
98+
// }
99+
43100
@Test
44101
public void testWriteRawRecord() throws Exception {
45102
CompletableFuture<RecordId> recordIdFuture = new CompletableFuture<>();

0 commit comments

Comments
 (0)