@@ -7,9 +7,9 @@ import io.hstream.FlowControlSetting
77import io.hstream.HStreamDBClientException
88import io.hstream.internal.HStreamRecord
99import kotlinx.coroutines.CoroutineScope
10+ import kotlinx.coroutines.Deferred
1011import kotlinx.coroutines.Dispatchers
11- import kotlinx.coroutines.Job
12- import kotlinx.coroutines.launch
12+ import kotlinx.coroutines.async
1313import org.slf4j.LoggerFactory
1414import java.util.LinkedList
1515import java.util.concurrent.CompletableFuture
@@ -34,7 +34,7 @@ class BufferedProducerKtImpl(
3434 private var shardAppendBuffer: HashMap <Long , Records > = HashMap ()
3535 private var shardAppendFutures: HashMap <Long , Futures > = HashMap ()
3636 private var shardAppendBytesSize: HashMap <Long , Int > = HashMap ()
37- private var shardAppendJobs : HashMap <Long , Job > = HashMap ()
37+ private var shardAppendResults : HashMap <Long , Deferred < Unit > > = HashMap ()
3838 private var batchScope: CoroutineScope = CoroutineScope (Dispatchers .Default )
3939
4040 private val flowController: FlowController ? = if (flowControlSetting.bytesLimit > 0 ) FlowController (flowControlSetting.bytesLimit) else null
@@ -109,13 +109,15 @@ class BufferedProducerKtImpl(
109109 shardAppendBytesSize.remove(shardId)
110110 timerServices[shardId]?.cancel(true )
111111 timerServices.remove(shardId)
112- var job = shardAppendJobs[shardId]
113- shardAppendJobs[shardId] = batchScope.launch {
114- job?.join()
115- job = null
112+ var result = shardAppendResults[shardId]
113+ shardAppendResults[shardId] = batchScope.async {
114+ // TODO: handle exception
115+ result?.await()
116+ result = null
116117 writeShard(shardId, records, futures)
117118 logger.info(" wrote batch for shard:$shardId " )
118119 flowController?.release(recordsBytesSize)
120+ Unit
119121 }
120122 }
121123 }
0 commit comments