Skip to content

Commit b1a6f9d

Browse files
authored
update Producers with the new sharding model (#126)
1 parent 322e8b0 commit b1a6f9d

File tree

3 files changed

+86
-56
lines changed

3 files changed

+86
-56
lines changed

client/src/main/kotlin/io/hstream/impl/BufferedProducerKtImpl.kt

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ class BufferedProducerKtImpl(
2929
private val flowControlSetting: FlowControlSetting,
3030
) : ProducerKtImpl(client, stream), BufferedProducer {
3131
private var lock = ReentrantLock()
32-
private var orderingBuffer: HashMap<String, Records> = HashMap()
33-
private var orderingFutures: HashMap<String, Futures> = HashMap()
34-
private var orderingBytesSize: HashMap<String, Int> = HashMap()
35-
private var orderingJobs: HashMap<String, Job> = HashMap()
32+
private var shardAppendBuffer: HashMap<Long, Records> = HashMap()
33+
private var shardAppendFutures: HashMap<Long, Futures> = HashMap()
34+
private var shardAppendBytesSize: HashMap<Long, Int> = HashMap()
35+
private var shardAppendJobs: HashMap<Long, Job> = HashMap()
3636
private var batchScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
3737

3838
private val flowController: FlowController? = if (flowControlSetting.bytesLimit > 0) FlowController(flowControlSetting.bytesLimit) else null
@@ -41,7 +41,7 @@ class BufferedProducerKtImpl(
4141
private var closed: Boolean = false
4242

4343
private val scheduler = Executors.newScheduledThreadPool(1)
44-
private var timerServices: HashMap<String, ScheduledFuture<*>> = HashMap()
44+
private var timerServices: HashMap<Long, ScheduledFuture<*>> = HashMap()
4545

4646
override fun writeInternal(hStreamRecord: HStreamRecord): CompletableFuture<String> {
4747
if (closed) {
@@ -59,56 +59,57 @@ class BufferedProducerKtImpl(
5959
}
6060

6161
val recordFuture = CompletableFuture<String>()
62-
val key = hStreamRecord.header.key
63-
if (!orderingBuffer.containsKey(key)) {
64-
orderingBuffer[key] = LinkedList()
65-
orderingFutures[key] = LinkedList()
66-
orderingBytesSize[key] = 0
62+
val partitionKey = hStreamRecord.header.key
63+
val shardId = calculateShardIdByPartitionKey(partitionKey)
64+
if (!shardAppendBuffer.containsKey(shardId)) {
65+
shardAppendBuffer[shardId] = LinkedList()
66+
shardAppendFutures[shardId] = LinkedList()
67+
shardAppendBytesSize[shardId] = 0
6768
if (batchSetting.ageLimit > 0) {
68-
timerServices[key] =
69-
scheduler.schedule({ flushForKey(key) }, batchSetting.ageLimit, TimeUnit.MILLISECONDS)
69+
timerServices[shardId] =
70+
scheduler.schedule({ flushForShard(shardId) }, batchSetting.ageLimit, TimeUnit.MILLISECONDS)
7071
}
7172
}
72-
orderingBuffer[key]!!.add(hStreamRecord)
73-
orderingFutures[key]!!.add(recordFuture)
74-
orderingBytesSize[key] = orderingBytesSize[key]!! + hStreamRecord.payload.size()
75-
if (isFull(key)) {
76-
flushForKey(key)
73+
shardAppendBuffer[shardId]!!.add(hStreamRecord)
74+
shardAppendFutures[shardId]!!.add(recordFuture)
75+
shardAppendBytesSize[shardId] = shardAppendBytesSize[shardId]!! + hStreamRecord.payload.size()
76+
if (isFull(shardId)) {
77+
flushForShard(shardId)
7778
}
7879
return recordFuture
7980
}
8081
}
8182

82-
private fun isFull(key: String): Boolean {
83-
val recordCount = orderingBuffer[key]!!.size
84-
val bytesSize = orderingBytesSize[key]!!
83+
private fun isFull(shardId: Long): Boolean {
84+
val recordCount = shardAppendBuffer[shardId]!!.size
85+
val bytesSize = shardAppendBytesSize[shardId]!!
8586
return batchSetting.recordCountLimit in 1..recordCount || batchSetting.bytesLimit in 1..bytesSize
8687
}
8788

8889
override fun flush() {
8990
lock.withLock {
90-
for (key in orderingBuffer.keys.toList()) {
91-
flushForKey(key)
91+
for (shard in shardAppendBuffer.keys.toList()) {
92+
flushForShard(shard)
9293
}
9394
}
9495
}
9596

96-
private fun flushForKey(key: String) {
97+
private fun flushForShard(shardId: Long) {
9798
lock.withLock {
98-
val records = orderingBuffer[key]!!
99-
val futures = orderingFutures[key]!!
100-
val recordsBytesSize = orderingBytesSize[key]!!
101-
logger.info("ready to flush recordBuffer for key:$key, current buffer size is [{}]", records.size)
102-
orderingBuffer.remove(key)
103-
orderingFutures.remove(key)
104-
orderingBytesSize.remove(key)
105-
timerServices[key]?.cancel(true)
106-
timerServices.remove(key)
107-
val job = orderingJobs[key]
108-
orderingJobs[key] = batchScope.launch {
99+
val records = shardAppendBuffer[shardId]!!
100+
val futures = shardAppendFutures[shardId]!!
101+
val recordsBytesSize = shardAppendBytesSize[shardId]!!
102+
logger.info("ready to flush recordBuffer for shard:$shardId, current buffer size is [{}]", records.size)
103+
shardAppendBuffer.remove(shardId)
104+
shardAppendFutures.remove(shardId)
105+
shardAppendBytesSize.remove(shardId)
106+
timerServices[shardId]?.cancel(true)
107+
timerServices.remove(shardId)
108+
val job = shardAppendJobs[shardId]
109+
shardAppendJobs[shardId] = batchScope.launch {
109110
job?.join()
110111
writeSingleKeyHStreamRecords(records, futures)
111-
logger.info("wrote batch for key:$key")
112+
logger.info("wrote batch for shard:$shardId")
112113
flowController?.release(recordsBytesSize)
113114
}
114115
}

client/src/main/kotlin/io/hstream/impl/ProducerKtImpl.kt

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import io.hstream.Producer
99
import io.hstream.Record
1010
import io.hstream.internal.AppendRequest
1111
import io.hstream.internal.HStreamRecord
12-
import io.hstream.internal.LookupStreamRequest
12+
import io.hstream.internal.ListShardsRequest
13+
import io.hstream.internal.LookupShardRequest
14+
import io.hstream.internal.Shard
1315
import io.hstream.util.GrpcUtils
1416
import io.hstream.util.RecordUtils
1517
import kotlinx.coroutines.CoroutineScope
@@ -19,39 +21,49 @@ import kotlinx.coroutines.launch
1921
import kotlinx.coroutines.sync.Mutex
2022
import kotlinx.coroutines.sync.withLock
2123
import org.slf4j.LoggerFactory
24+
import java.math.BigInteger
25+
import java.nio.charset.StandardCharsets
2226
import java.util.concurrent.CompletableFuture
2327
import kotlin.collections.HashMap
2428

2529
open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val stream: String) : Producer {
26-
private val serverUrls: HashMap<String, String> = HashMap()
30+
private val serverUrls: HashMap<Long, String> = HashMap()
2731
private val serverUrlsLock: Mutex = Mutex()
32+
private val shards: List<Shard>
2833

29-
private suspend fun lookupServerUrl(orderingKey: String, forceUpdate: Boolean = false): String {
34+
init {
35+
val listShardRequest = ListShardsRequest.newBuilder()
36+
.setStreamName(stream)
37+
.build()
38+
val listShardResponse = client.unaryCallBlocked { it.listShards(listShardRequest) }
39+
shards = listShardResponse.shardsList
40+
}
41+
42+
private suspend fun lookupServerUrl(shardId: Long, forceUpdate: Boolean = false): String {
3043
if (forceUpdate) {
31-
return updateServerUrl(orderingKey)
44+
return updateServerUrl(shardId)
3245
}
3346
val server: String? = serverUrlsLock.withLock {
34-
return@withLock serverUrls[orderingKey]
47+
return@withLock serverUrls[shardId]
3548
}
3649
if (server != null) {
3750
return server
3851
}
39-
return updateServerUrl(orderingKey)
52+
return updateServerUrl(shardId)
4053
}
4154

42-
private suspend fun updateServerUrl(orderingKey: String): String {
43-
val req = LookupStreamRequest.newBuilder()
44-
.setStreamName(stream)
45-
.setOrderingKey(orderingKey)
55+
private suspend fun updateServerUrl(shardId: Long): String {
56+
val req = LookupShardRequest.newBuilder()
57+
.setShardId(shardId)
4658
.build()
4759
val server = client.unaryCallCoroutine {
48-
val serverNode = it.lookupStream(req).serverNode
60+
val serverNode = it.lookupShard(req).serverNode
4961
return@unaryCallCoroutine "${serverNode.host}:${serverNode.port}"
5062
}
5163
serverUrlsLock.withLock {
52-
serverUrls[orderingKey] = server
64+
serverUrls[shardId] = server
5365
}
54-
logger.debug("updateServerUrl, key:$orderingKey, server:$server")
66+
logger.debug("updateServerUrl, key:$shardId, server:$server")
5567
return server
5668
}
5769

@@ -83,9 +95,24 @@ open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val s
8395
return future
8496
}
8597

98+
protected fun calculateShardIdByPartitionKey(partitionKey: String): Long {
99+
val hashcode = com.google.common.hash.Hashing.md5().hashString(partitionKey, StandardCharsets.UTF_8)
100+
val hashValue = BigInteger(hashcode.toString(), 16)
101+
for (shard in shards) {
102+
val start = BigInteger(shard.startHashRangeKey)
103+
val end = BigInteger(shard.endHashRangeKey)
104+
if (hashValue.compareTo(start) >= 0 && hashValue.compareTo(end) <= 0) {
105+
return shard.shardId
106+
}
107+
}
108+
109+
check(false)
110+
return -1
111+
}
112+
86113
private suspend fun appendWithRetry(
87114
appendRequest: AppendRequest,
88-
orderingKey: String,
115+
partitionKey: String,
89116
tryTimes: Int,
90117
forceUpdate: Boolean = false
91118
): List<String> {
@@ -96,22 +123,24 @@ open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val s
96123
val status = Status.fromThrowable(e)
97124
if (status.code == Status.UNAVAILABLE.code && tryTimes > 1) {
98125
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
99-
return appendWithRetry(appendRequest, orderingKey, tryTimes - 1, true)
126+
return appendWithRetry(appendRequest, partitionKey, tryTimes - 1, true)
100127
} else {
101128
throw HStreamDBClientException(e)
102129
}
103130
}
104131

105132
check(tryTimes > 0)
106-
val serverUrl = lookupServerUrl(orderingKey, forceUpdate)
133+
134+
val shardId = calculateShardIdByPartitionKey(partitionKey)
135+
val serverUrl = lookupServerUrl(shardId, forceUpdate)
107136
logger.info("try append with serverUrl [{}], current left tryTimes is [{}]", serverUrl, tryTimes)
108-
try {
109-
return client.getCoroutineStub(serverUrl)
137+
return try {
138+
client.getCoroutineStub(serverUrl)
110139
.append(appendRequest).recordIdsList.map(GrpcUtils::recordIdFromGrpc)
111140
} catch (e: StatusException) {
112-
return handleGRPCException(serverUrl, e)
141+
handleGRPCException(serverUrl, e)
113142
} catch (e: StatusRuntimeException) {
114-
return handleGRPCException(serverUrl, e)
143+
handleGRPCException(serverUrl, e)
115144
}
116145
}
117146

client/src/main/proto

Submodule proto updated 1 file

0 commit comments

Comments
 (0)