Skip to content

Commit 403a4c4

Browse files
authored
update Append (#132)
* update Append * update proto
1 parent 672b992 commit 403a4c4

File tree

5 files changed

+20
-11
lines changed

5 files changed

+20
-11
lines changed

client/build.gradle.kts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ tasks.test {
6565
}
6666
}
6767

68+
sourceSets {
69+
main {
70+
proto {
71+
exclude("api/*")
72+
}
73+
}
74+
}
75+
6876
protobuf {
6977
protoc {
7078
artifact = "com.google.protobuf:protoc:3.19.2"

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ static HStreamClientBuilder builder() {
6363
* List shards in a stream.
6464
*
6565
* @param streamName the name of the stream
66+
* @return a list of {@link Shard}s.
6667
*/
6768
List<Shard> listShards(String streamName);
6869

client/src/main/kotlin/io/hstream/impl/BufferedProducerKtImpl.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,17 @@ class BufferedProducerKtImpl(
108108
val job = shardAppendJobs[shardId]
109109
shardAppendJobs[shardId] = batchScope.launch {
110110
job?.join()
111-
writeSingleKeyHStreamRecords(records, futures)
111+
writeShard(shardId, records, futures)
112112
logger.info("wrote batch for shard:$shardId")
113113
flowController?.release(recordsBytesSize)
114114
}
115115
}
116116
}
117117

118118
// only can be called by flush()
119-
private suspend fun writeSingleKeyHStreamRecords(records: Records, futures: Futures) {
119+
private suspend fun writeShard(shardId: Long, records: Records, futures: Futures) {
120120
try {
121-
val ids = super.writeHStreamRecords(records, records[0].header.key)
121+
val ids = super.writeHStreamRecords(records, shardId)
122122
for (i in ids.indices) {
123123
futures[i].complete(ids[i])
124124
}

client/src/main/kotlin/io/hstream/impl/ProducerKtImpl.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val s
8686
val future = CompletableFuture<String>()
8787
writeRecordScope.launch {
8888
try {
89-
val ids = writeHStreamRecords(listOf(hStreamRecord), hStreamRecord.header.key)
89+
val shardId = calculateShardIdByPartitionKey(hStreamRecord.header.key)
90+
val ids = writeHStreamRecords(listOf(hStreamRecord), shardId)
9091
future.complete(ids[0])
9192
} catch (e: Throwable) {
9293
future.completeExceptionally(e)
@@ -112,7 +113,7 @@ open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val s
112113

113114
private suspend fun appendWithRetry(
114115
appendRequest: AppendRequest,
115-
partitionKey: String,
116+
shardId: Long,
116117
tryTimes: Int,
117118
forceUpdate: Boolean = false
118119
): List<String> {
@@ -123,15 +124,14 @@ open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val s
123124
val status = Status.fromThrowable(e)
124125
if (status.code == Status.UNAVAILABLE.code && tryTimes > 1) {
125126
delay(DefaultSettings.REQUEST_RETRY_INTERVAL_SECONDS * 1000)
126-
return appendWithRetry(appendRequest, partitionKey, tryTimes - 1, true)
127+
return appendWithRetry(appendRequest, shardId, tryTimes - 1, true)
127128
} else {
128129
throw HStreamDBClientException(e)
129130
}
130131
}
131132

132133
check(tryTimes > 0)
133134

134-
val shardId = calculateShardIdByPartitionKey(partitionKey)
135135
val serverUrl = lookupServerUrl(shardId, forceUpdate)
136136
logger.info("try append with serverUrl [{}], current left tryTimes is [{}]", serverUrl, tryTimes)
137137
return try {
@@ -144,9 +144,9 @@ open class ProducerKtImpl(private val client: HStreamClientKtImpl, private val s
144144
}
145145
}
146146

147-
protected suspend fun writeHStreamRecords(hStreamRecords: List<HStreamRecord>, key: String): List<String> {
148-
val appendRequest = AppendRequest.newBuilder().setStreamName(stream).addAllRecords(hStreamRecords).build()
149-
return appendWithRetry(appendRequest, key, DefaultSettings.APPEND_RETRY_MAX_TIMES)
147+
protected suspend fun writeHStreamRecords(hStreamRecords: List<HStreamRecord>, shardId: Long): List<String> {
148+
val appendRequest = AppendRequest.newBuilder().setStreamName(stream).setShardId(shardId).addAllRecords(hStreamRecords).build()
149+
return appendWithRetry(appendRequest, shardId, DefaultSettings.APPEND_RETRY_MAX_TIMES)
150150
}
151151

152152
companion object {

0 commit comments

Comments
 (0)