@@ -18,13 +18,16 @@ import io.hstream.internal.StreamingFetchRequest
1818import io.hstream.internal.StreamingFetchResponse
1919import io.hstream.util.GrpcUtils
2020import io.hstream.util.RecordUtils
21+ import kotlinx.coroutines.CancellationException
22+ import kotlinx.coroutines.CoroutineScope
23+ import kotlinx.coroutines.Dispatchers
24+ import kotlinx.coroutines.Job
2125import kotlinx.coroutines.coroutineScope
2226import kotlinx.coroutines.delay
2327import kotlinx.coroutines.flow.MutableSharedFlow
2428import kotlinx.coroutines.flow.collect
2529import kotlinx.coroutines.launch
2630import org.slf4j.LoggerFactory
27- import java.util.concurrent.CompletableFuture
2831import java.util.concurrent.Executors
2932import java.util.concurrent.TimeUnit
3033
@@ -37,7 +40,8 @@ class ConsumerKtImpl(
3740 private val ackBufferSize : Int ,
3841 private val ackAgeLimit : Long
3942) : AbstractService(), Consumer {
40- private lateinit var fetchFuture: CompletableFuture <Unit >
43+ private val fetchScope = CoroutineScope (Dispatchers .IO )
44+ private lateinit var fetchJob: Job
4145 private val executorService = Executors .newSingleThreadExecutor()
4246 private val requestFlow = MutableSharedFlow <StreamingFetchRequest >()
4347 private val ackSender = AckSender (subscriptionId, requestFlow, consumerName, ackBufferSize, ackAgeLimit)
@@ -92,6 +96,11 @@ class ConsumerKtImpl(
9296 handleGRPCException(e)
9397 } catch (e: StatusRuntimeException ) {
9498 handleGRPCException(e)
99+ } catch (e: CancellationException ) {
100+ logger.info(" streamingFetch is canceled" )
101+ } catch (e: Throwable ) {
102+ logger.info(" streaming fetch failed, {}" , e)
103+ notifyFailed(HStreamDBClientException (e))
95104 }
96105 }
97106
@@ -160,10 +169,8 @@ class ConsumerKtImpl(
160169 try {
161170 logger.info(" consumer [{}] is starting" , consumerName)
162171 notifyStarted()
163- fetchFuture = (futureForIO { streamingFetchWithRetry(requestFlow) }).handle { _, err ->
164- if (err != null ) {
165- notifyFailed(HStreamDBClientException (err))
166- }
172+ fetchJob = fetchScope.launch {
173+ streamingFetchWithRetry(requestFlow)
167174 }
168175 logger.info(" consumer [{}] is started" , consumerName)
169176 } catch (e: Exception ) {
@@ -178,7 +185,7 @@ class ConsumerKtImpl(
178185 logger.info(" consumer [{}] is stopping" , consumerName)
179186
180187 ackSender.close()
181- fetchFuture .cancel(false )
188+ fetchJob .cancel()
182189 executorService.shutdown()
183190 try {
184191 executorService.awaitTermination(30 , TimeUnit .SECONDS )
0 commit comments