Skip to content

Commit 81124c1

Browse files
authored
feat: add a new StreamShardReader (#197)
1 parent 635ea5f commit 81124c1

File tree

8 files changed

+215
-1
lines changed

8 files changed

+215
-1
lines changed

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ static HStreamClientBuilder builder() {
2525
/** @return a {@link ReaderBuilder} */
2626
ReaderBuilder newReader();
2727

28+
StreamShardReaderBuilder newStreamShardReader();
29+
2830
/**
2931
* Create a new stream with 1 replicas.
3032
*
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.hstream;
2+
3+
import com.google.common.util.concurrent.Service;
4+
5+
public interface StreamShardReader extends Service {}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.hstream;
2+
3+
public interface StreamShardReaderBuilder {
4+
5+
StreamShardReaderBuilder streamName(String streamName);
6+
7+
StreamShardReaderBuilder shardId(long shardId);
8+
9+
StreamShardReaderBuilder shardOffset(StreamShardOffset shardOffset);
10+
11+
StreamShardReaderBuilder receiver(StreamShardReaderReceiver streamShardReaderReceiver);
12+
13+
StreamShardReader build();
14+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.hstream;
2+
3+
public interface StreamShardReaderReceiver {
4+
5+
void process(ReceivedRecord receivedRecord);
6+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.hstream.impl;
2+
3+
import static com.google.common.base.Preconditions.*;
4+
import static com.google.common.base.Preconditions.checkArgument;
5+
6+
import io.hstream.StreamShardOffset;
7+
import io.hstream.StreamShardReader;
8+
import io.hstream.StreamShardReaderBuilder;
9+
import io.hstream.StreamShardReaderReceiver;
10+
11+
public class StreamShardReaderBuilderImpl implements StreamShardReaderBuilder {
12+
13+
private final HStreamClientKtImpl client;
14+
private String streamName;
15+
private long shardId;
16+
private StreamShardOffset shardOffset;
17+
18+
private StreamShardReaderReceiver receiver;
19+
20+
public StreamShardReaderBuilderImpl(HStreamClientKtImpl client) {
21+
this.client = client;
22+
}
23+
24+
@Override
25+
public StreamShardReaderBuilder streamName(String streamName) {
26+
this.streamName = streamName;
27+
return this;
28+
}
29+
30+
@Override
31+
public StreamShardReaderBuilder shardId(long shardId) {
32+
this.shardId = shardId;
33+
return this;
34+
}
35+
36+
@Override
37+
public StreamShardReaderBuilder shardOffset(StreamShardOffset shardOffset) {
38+
this.shardOffset = shardOffset;
39+
return this;
40+
}
41+
42+
@Override
43+
public StreamShardReaderBuilder receiver(StreamShardReaderReceiver streamShardReaderReceiver) {
44+
this.receiver = streamShardReaderReceiver;
45+
return this;
46+
}
47+
48+
@Override
49+
public StreamShardReader build() {
50+
checkNotNull(client);
51+
checkArgument(streamName != null, "StreamShardReaderBuilder: `streamName` should not be null");
52+
checkArgument(shardId > 0, "StreamShardReaderBuilder: `shardId` error");
53+
checkArgument(
54+
shardOffset != null, "StreamShardReaderBuilder: `shardOffset` should not be null");
55+
checkArgument(receiver != null, "StreamShardReaderBuilder: `receiver` should not be null");
56+
return new StreamShardReaderKtImpl(client, streamName, shardId, shardOffset, receiver);
57+
}
58+
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import io.hstream.QueryerBuilder
2121
import io.hstream.ReaderBuilder
2222
import io.hstream.Shard
2323
import io.hstream.Stream
24+
import io.hstream.StreamShardReaderBuilder
2425
import io.hstream.Subscription
2526
import io.hstream.View
2627
import io.hstream.internal.CreateQueryRequest
@@ -146,6 +147,10 @@ class HStreamClientKtImpl(
146147
return ReaderBuilderImpl(this)
147148
}
148149

150+
override fun newStreamShardReader(): StreamShardReaderBuilder {
151+
return StreamShardReaderBuilderImpl(this)
152+
}
153+
149154
override fun newQueryer(): QueryerBuilder {
150155
return QueryerBuilderImpl(this, clusterServerUrls.get(), channelProvider)
151156
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package io.hstream.impl
2+
3+
import com.google.common.util.concurrent.AbstractService
4+
import com.google.protobuf.InvalidProtocolBufferException
5+
import io.hstream.HStreamDBClientException
6+
import io.hstream.ReceivedRecord
7+
import io.hstream.Record
8+
import io.hstream.StreamShardOffset
9+
import io.hstream.StreamShardReader
10+
import io.hstream.StreamShardReaderReceiver
11+
import io.hstream.internal.LookupShardReaderRequest
12+
import io.hstream.internal.ReadShardStreamRequest
13+
import io.hstream.internal.ReadShardStreamResponse
14+
import io.hstream.util.GrpcUtils
15+
import io.hstream.util.RecordUtils
16+
import kotlinx.coroutines.CoroutineScope
17+
import kotlinx.coroutines.Dispatchers
18+
import kotlinx.coroutines.launch
19+
import org.slf4j.LoggerFactory
20+
import java.time.Instant
21+
import java.util.UUID
22+
import java.util.concurrent.Executors
23+
24+
class StreamShardReaderKtImpl(
25+
private val client: HStreamClientKtImpl,
26+
private val streamName: String,
27+
private val shardId: Long,
28+
private val shardOffset: StreamShardOffset,
29+
private val receiver: StreamShardReaderReceiver,
30+
) : AbstractService(), StreamShardReader {
31+
private val readerScope = CoroutineScope(Dispatchers.IO)
32+
private val readerName: String = UUID.randomUUID().toString()
33+
private val executorService = Executors.newSingleThreadExecutor()
34+
35+
override fun doStart() {
36+
Thread {
37+
38+
try {
39+
logger.info("streamShardReader $readerName is starting")
40+
val lookupShardReaderRequest = LookupShardReaderRequest.newBuilder()
41+
.setReaderId(readerName).build()
42+
val lookupShardReaderResp = client.unaryCallBlocked { it.lookupShardReader(lookupShardReaderRequest) }
43+
val serverUrl = lookupShardReaderResp.serverNode.host + ":" + lookupShardReaderResp.serverNode.port
44+
val respFlow = client.getCoroutineStub(serverUrl).readShardStream(
45+
ReadShardStreamRequest.newBuilder().setReaderId(readerName).setShardId(shardId)
46+
.setTimeout(1000)
47+
.setShardOffset(GrpcUtils.streamShardOffsetToGrpc(shardOffset)).build()
48+
)
49+
notifyStarted()
50+
readerScope.launch {
51+
respFlow.collect {
52+
process(it)
53+
}
54+
}
55+
} catch (e: Exception) {
56+
logger.error("steamShardReader $readerName failed to start", e)
57+
notifyFailed(HStreamDBClientException(e))
58+
}
59+
}.start()
60+
}
61+
62+
override fun doStop() {
63+
Thread {
64+
executorService.shutdownNow()
65+
notifyStopped()
66+
}
67+
.start()
68+
}
69+
70+
private fun process(value: ReadShardStreamResponse) {
71+
if (!isRunning) {
72+
return
73+
}
74+
75+
for (receivedRecord in value.receivedRecordsList) {
76+
77+
val receivedHStreamRecords = RecordUtils.decompress(receivedRecord)
78+
val createdTimestamp = receivedRecord.record.publishTime
79+
val createdTime = Instant.ofEpochSecond(createdTimestamp.seconds, createdTimestamp.nanos.toLong())
80+
for (receivedHStreamRecord in receivedHStreamRecords) {
81+
82+
executorService.submit {
83+
if (!isRunning) {
84+
return@submit
85+
}
86+
87+
try {
88+
receiver.process(toReceivedRecord(receivedHStreamRecord, createdTime))
89+
} catch (e: Exception) {
90+
notifyFailed(e)
91+
}
92+
}
93+
}
94+
}
95+
}
96+
97+
companion object {
98+
private val logger = LoggerFactory.getLogger(StreamShardReaderKtImpl::class.java)
99+
100+
private fun toReceivedRecord(receivedHStreamRecord: ReceivedHStreamRecord, createdTime: Instant): ReceivedRecord {
101+
return try {
102+
val header = RecordUtils.parseRecordHeaderFromHStreamRecord(receivedHStreamRecord.record)
103+
if (RecordUtils.isRawRecord(receivedHStreamRecord.record)) {
104+
105+
val rawRecord = RecordUtils.parseRawRecordFromHStreamRecord(receivedHStreamRecord.record)
106+
ReceivedRecord(
107+
GrpcUtils.recordIdFromGrpc(receivedHStreamRecord.recordId),
108+
Record.newBuilder().partitionKey(header.partitionKey).rawRecord(rawRecord).build(),
109+
createdTime
110+
)
111+
} else {
112+
val hRecord = RecordUtils.parseHRecordFromHStreamRecord(receivedHStreamRecord.record)
113+
ReceivedRecord(
114+
GrpcUtils.recordIdFromGrpc(receivedHStreamRecord.recordId),
115+
Record.newBuilder().partitionKey(header.partitionKey).hRecord(hRecord).build(),
116+
createdTime
117+
)
118+
}
119+
} catch (e: InvalidProtocolBufferException) {
120+
throw HStreamDBClientException.InvalidRecordException("parse HStreamRecord error", e)
121+
}
122+
}
123+
}
124+
}

client/src/main/proto

Submodule proto updated 1 file

0 commit comments

Comments
 (0)