Skip to content

Commit 7b83aef

Browse files
authored
feat: add experimental StreamKeyReader (#215)
1 parent 28107fe commit 7b83aef

File tree

7 files changed

+263
-21
lines changed

7 files changed

+263
-21
lines changed

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ static HStreamClientBuilder builder() {
2929

3030
StreamShardReaderBuilder newStreamShardReader();
3131

32+
StreamKeyReaderBuilder newStreamKeyReader();
33+
3234
/**
3335
* Create a new stream with 1 replicas.
3436
*
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.hstream;
2+
3+
4+
public interface StreamKeyReader extends AutoCloseable {
5+
6+
boolean hasNext();
7+
ReceivedRecord next();
8+
9+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.hstream;
2+
3+
public interface StreamKeyReaderBuilder {
4+
5+
StreamKeyReaderBuilder streamName(String streamName);
6+
7+
StreamKeyReaderBuilder key(String key);
8+
9+
StreamKeyReaderBuilder from(StreamShardOffset from);
10+
11+
StreamKeyReaderBuilder until(StreamShardOffset until);
12+
StreamKeyReaderBuilder bufferSize(int bufferSize);
13+
14+
StreamKeyReader build();
15+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.hstream.impl;
2+
3+
import io.hstream.*;
4+
5+
import static com.google.common.base.Preconditions.checkArgument;
6+
import static com.google.common.base.Preconditions.checkNotNull;
7+
8+
public class StreamKeyReaderBuilderImpl implements StreamKeyReaderBuilder {
9+
10+
private final HStreamClientKtImpl client;
11+
private String streamName;
12+
private String key;
13+
private StreamShardOffset from = new StreamShardOffset(StreamShardOffset.SpecialOffset.EARLIEST);
14+
private StreamShardOffset until = new StreamShardOffset(StreamShardOffset.SpecialOffset.LATEST);
15+
16+
int bufferSize = 100;
17+
18+
public StreamKeyReaderBuilderImpl(HStreamClientKtImpl client) {
19+
this.client = client;
20+
}
21+
22+
@Override
23+
public StreamKeyReaderBuilder streamName(String streamName) {
24+
this.streamName = streamName;
25+
return this;
26+
}
27+
28+
@Override
29+
public StreamKeyReaderBuilder key(String key) {
30+
this.key = key;
31+
return this;
32+
}
33+
34+
35+
@Override
36+
public StreamKeyReaderBuilder from(StreamShardOffset shardOffset) {
37+
this.from = shardOffset;
38+
return this;
39+
}
40+
41+
@Override
42+
public StreamKeyReaderBuilder until(StreamShardOffset until) {
43+
this.until = until;
44+
return this;
45+
}
46+
47+
@Override
48+
public StreamKeyReaderBuilder bufferSize(int bufferSize) {
49+
this.bufferSize = bufferSize;
50+
return this;
51+
}
52+
53+
@Override
54+
public StreamKeyReader build() {
55+
checkNotNull(client);
56+
checkArgument(streamName != null, "StreamKeyReaderBuilder: `streamName` should not be null");
57+
checkArgument(key != null, "StreamKeyReaderBuilder: `key` should not be null");
58+
checkArgument(
59+
from != null, "StreamKeyReaderBuilder: `from` should not be null");
60+
checkArgument(
61+
until != null, "StreamKeyReaderBuilder: `from` should not be null");
62+
checkArgument(bufferSize > 0);
63+
return new StreamKeyReaderKtImpl(client, streamName, key, from, until, bufferSize);
64+
}
65+
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,7 @@ import com.google.common.base.Preconditions.checkArgument
44
import com.google.common.util.concurrent.MoreExecutors
55
import com.google.protobuf.Empty
66
import io.grpc.ChannelCredentials
7-
import io.hstream.BufferedProducerBuilder
8-
import io.hstream.Cluster
9-
import io.hstream.Connector
10-
import io.hstream.ConsumerBuilder
11-
import io.hstream.ConsumerInformation
12-
import io.hstream.CreateConnectorRequest
13-
import io.hstream.GetStreamResponse
14-
import io.hstream.GetSubscriptionResponse
15-
import io.hstream.HRecord
16-
import io.hstream.HStreamClient
17-
import io.hstream.HStreamDBClientException
18-
import io.hstream.ProducerBuilder
19-
import io.hstream.Query
20-
import io.hstream.QueryerBuilder
21-
import io.hstream.ReaderBuilder
22-
import io.hstream.Shard
23-
import io.hstream.Stream
24-
import io.hstream.StreamShardReaderBuilder
25-
import io.hstream.Subscription
26-
import io.hstream.View
7+
import io.hstream.*
278
import io.hstream.internal.CreateQueryRequest
289
import io.hstream.internal.DeleteConnectorRequest
2910
import io.hstream.internal.DeleteQueryRequest
@@ -153,6 +134,10 @@ class HStreamClientKtImpl(
153134
return StreamShardReaderBuilderImpl(this)
154135
}
155136

137+
override fun newStreamKeyReader(): StreamKeyReaderBuilder {
138+
return StreamKeyReaderBuilderImpl(this)
139+
}
140+
156141
override fun newQueryer(): QueryerBuilder {
157142
return QueryerBuilderImpl(this, clusterServerUrls.get(), channelProvider)
158143
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package io.hstream.impl
2+
3+
import com.google.protobuf.InvalidProtocolBufferException
4+
import io.hstream.*
5+
import io.hstream.ReceivedRecord
6+
import io.hstream.internal.*
7+
import io.hstream.util.GrpcUtils
8+
import io.hstream.util.RecordUtils
9+
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.flow.MutableSharedFlow
11+
import org.slf4j.LoggerFactory
12+
import java.time.Instant
13+
import java.util.UUID
14+
import java.util.concurrent.ArrayBlockingQueue
15+
import java.util.concurrent.Executors
16+
import java.util.concurrent.TimeUnit
17+
import java.util.concurrent.atomic.AtomicBoolean
18+
import java.util.concurrent.atomic.AtomicReference
19+
20+
class StreamKeyReaderKtImpl(
21+
private val client: HStreamClientKtImpl,
22+
private val streamName: String,
23+
private val key: String,
24+
private val from: StreamShardOffset,
25+
private val until: StreamShardOffset?,
26+
private val bufferSize: Int,
27+
) : StreamKeyReader {
28+
private val readerScope = CoroutineScope(Dispatchers.IO)
29+
private val readerName: String = UUID.randomUUID().toString()
30+
private val executorService = Executors.newSingleThreadExecutor()
31+
private val requestFlow = MutableSharedFlow<ReadStreamByKeyRequest>()
32+
private val buffer = ArrayBlockingQueue<ReceivedRecord>(bufferSize)
33+
private val exceptionRef: AtomicReference<Exception> = AtomicReference(null)
34+
private val isStopped: AtomicBoolean = AtomicBoolean(false)
35+
36+
init {
37+
doStart()
38+
}
39+
40+
private fun doStart() {
41+
logger.info("streamKeyReader $readerName is starting")
42+
val lookupRequest = LookupResourceRequest.newBuilder()
43+
.setResType(ResourceType.ResStream)
44+
.setResId(streamName)
45+
.build()
46+
val lookupResp = client.unaryCallBlocked { it.lookupResource(lookupRequest) }
47+
val serverUrl = lookupResp.host + ":" + lookupResp.port
48+
val requestBuilder = ReadStreamByKeyRequest.newBuilder()
49+
.setReaderId(readerName)
50+
.setStreamName(streamName)
51+
.setKey(key)
52+
.setFrom(GrpcUtils.streamShardOffsetToGrpc(from))
53+
.setReadRecordCount(bufferSize.toLong())
54+
if (until != null) {
55+
requestBuilder.until = GrpcUtils.streamShardOffsetToGrpc(until)
56+
}
57+
val respFlow = client.getCoroutineStub(serverUrl).readStreamByKey(requestFlow)
58+
readerScope.launch {
59+
launch {
60+
// wait until rpc called
61+
while (requestFlow.subscriptionCount.value == 0) {
62+
delay(100)
63+
}
64+
try{
65+
requestFlow.emit(requestBuilder.build())
66+
} catch (e: Exception) {
67+
logger.error("steamKeyReader $readerName failed", e)
68+
exceptionRef.compareAndSet(null, e)
69+
isStopped.set(true)
70+
}
71+
}
72+
launch {
73+
try {
74+
respFlow.collect {
75+
saveToBuffer(it)
76+
}
77+
} catch (e: Exception) {
78+
logger.error("steamKeyReader $readerName failed", e)
79+
exceptionRef.compareAndSet(null, e)
80+
isStopped.set(true)
81+
}
82+
// wait for saveToBuffer complete
83+
delay(100)
84+
isStopped.set(true)
85+
logger.info("server stopped")
86+
87+
}
88+
}
89+
}
90+
91+
override fun close() {
92+
readerScope.cancel()
93+
executorService.shutdownNow()
94+
logger.info("StreamKeyReader $readerName closed")
95+
}
96+
97+
override fun hasNext() : Boolean {
98+
return !isStopped.get() || !buffer.isEmpty()
99+
}
100+
101+
override fun next(): ReceivedRecord? {
102+
103+
var res: ReceivedRecord? = null
104+
105+
while (res == null) {
106+
res = buffer.poll(100, TimeUnit.MILLISECONDS)
107+
108+
if(res == null) {
109+
val e = exceptionRef.get()
110+
if(e != null) throw e
111+
112+
if(isStopped.get()) return null
113+
}
114+
}
115+
116+
readerScope.launch {
117+
try{
118+
requestFlow.emit(ReadStreamByKeyRequest.newBuilder().setReadRecordCount(1).build())
119+
} catch (e: Exception) {
120+
logger.error("steamKeyReader $readerName failed", e)
121+
exceptionRef.compareAndSet(null, e)
122+
isStopped.set(true)
123+
}
124+
}
125+
126+
return res
127+
}
128+
129+
private fun saveToBuffer(value: ReadStreamByKeyResponse) {
130+
for ((i, receivedRecord) in value.receivedRecordsList.withIndex()) {
131+
val recordId = value.getRecordIds(i)
132+
executorService.submit {
133+
val res = toReceivedRecord(receivedRecord, recordId, Instant.now())
134+
buffer.put(res)
135+
}
136+
}
137+
}
138+
139+
companion object {
140+
private val logger = LoggerFactory.getLogger(StreamKeyReaderKtImpl::class.java)
141+
142+
private fun toReceivedRecord(hStreamRecord: HStreamRecord, recordId: RecordId, createdTime: Instant): ReceivedRecord {
143+
return try {
144+
val header = RecordUtils.parseRecordHeaderFromHStreamRecord(hStreamRecord)
145+
if (RecordUtils.isRawRecord(hStreamRecord)) {
146+
147+
val rawRecord = RecordUtils.parseRawRecordFromHStreamRecord(hStreamRecord)
148+
ReceivedRecord(
149+
GrpcUtils.recordIdFromGrpc(recordId),
150+
Record.newBuilder().partitionKey(header.partitionKey).rawRecord(rawRecord).build(),
151+
createdTime
152+
)
153+
} else {
154+
val hRecord = RecordUtils.parseHRecordFromHStreamRecord(hStreamRecord)
155+
ReceivedRecord(
156+
GrpcUtils.recordIdFromGrpc(recordId),
157+
Record.newBuilder().partitionKey(header.partitionKey).hRecord(hRecord).build(),
158+
createdTime
159+
)
160+
}
161+
} catch (e: InvalidProtocolBufferException) {
162+
throw HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e)
163+
}
164+
}
165+
}
166+
}

client/src/main/proto

0 commit comments

Comments
 (0)