Skip to content

Commit aa3477a

Browse files
authored
feat: update protocol (#212)
1 parent ede54bc commit aa3477a

File tree

6 files changed

+49
-12
lines changed

6 files changed

+49
-12
lines changed

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,6 @@ static HStreamClientBuilder builder() {
173173
String getConnectorLogs(String name, int begin, int count);
174174

175175
void deleteConnector(String name);
176+
177+
String getTailRecordId(String streamName, long shardId);
176178
}

client/src/main/java/io/hstream/StreamShardReaderBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ public interface StreamShardReaderBuilder {
66

77
StreamShardReaderBuilder shardId(long shardId);
88

9-
StreamShardReaderBuilder shardOffset(StreamShardOffset shardOffset);
9+
StreamShardReaderBuilder from(StreamShardOffset shardOffset);
10+
StreamShardReaderBuilder maxReadBatches(long maxReadBatches);
11+
StreamShardReaderBuilder until(StreamShardOffset until);
1012

1113
StreamShardReaderBuilder receiver(StreamShardReaderReceiver streamShardReaderReceiver);
1214
StreamShardReaderBuilder batchReceiver(StreamShardReaderBatchReceiver streamShardReaderReceiver);

client/src/main/java/io/hstream/impl/StreamShardReaderBuilderImpl.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ public class StreamShardReaderBuilderImpl implements StreamShardReaderBuilder {
1010
private final HStreamClientKtImpl client;
1111
private String streamName;
1212
private long shardId;
13-
private StreamShardOffset shardOffset;
13+
private StreamShardOffset from;
14+
long maxReadBatches;
15+
StreamShardOffset until;
1416

1517
private StreamShardReaderReceiver receiver;
1618
private StreamShardReaderBatchReceiver batchReceiver;
@@ -32,8 +34,20 @@ public StreamShardReaderBuilder shardId(long shardId) {
3234
}
3335

3436
@Override
35-
public StreamShardReaderBuilder shardOffset(StreamShardOffset shardOffset) {
36-
this.shardOffset = shardOffset;
37+
public StreamShardReaderBuilder from(StreamShardOffset shardOffset) {
38+
this.from = shardOffset;
39+
return this;
40+
}
41+
42+
@Override
43+
public StreamShardReaderBuilder maxReadBatches(long maxReadBatches) {
44+
this.maxReadBatches = maxReadBatches;
45+
return this;
46+
}
47+
48+
@Override
49+
public StreamShardReaderBuilder until(StreamShardOffset until) {
50+
this.until = until;
3751
return this;
3852
}
3953

@@ -55,9 +69,9 @@ public StreamShardReader build() {
5569
checkArgument(streamName != null, "StreamShardReaderBuilder: `streamName` should not be null");
5670
checkArgument(shardId > 0, "StreamShardReaderBuilder: `shardId` error");
5771
checkArgument(
58-
shardOffset != null, "StreamShardReaderBuilder: `shardOffset` should not be null");
72+
from != null, "StreamShardReaderBuilder: `from` should not be null");
5973
checkArgument(receiver != null || batchReceiver != null,
6074
"StreamShardReaderBuilder: `receiver` should not be both null");
61-
return new StreamShardReaderKtImpl(client, streamName, shardId, shardOffset, receiver, batchReceiver);
75+
return new StreamShardReaderKtImpl(client, streamName, shardId, from, maxReadBatches, until, receiver, batchReceiver);
6276
}
6377
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import io.hstream.internal.GetConnectorSpecRequest
3737
import io.hstream.internal.GetQueryRequest
3838
import io.hstream.internal.GetStreamRequest
3939
import io.hstream.internal.GetSubscriptionRequest
40+
import io.hstream.internal.GetTailRecordIdRequest
4041
import io.hstream.internal.GetViewRequest
4142
import io.hstream.internal.HStreamApiGrpcKt
4243
import io.hstream.internal.ListConnectorsRequest
@@ -414,6 +415,17 @@ class HStreamClientKtImpl(
414415
}
415416
}
416417

418+
override fun getTailRecordId(streamName: String?, shardId: Long): String {
419+
checkNotNull(streamName)
420+
return unaryCallBlocked {
421+
val recordId = it.getTailRecordId(GetTailRecordIdRequest.newBuilder()
422+
.setStreamName(streamName)
423+
.setShardId(shardId)
424+
.build()).tailRecordId
425+
GrpcUtils.recordIdFromGrpc(recordId)
426+
}
427+
}
428+
417429
private suspend fun lookupSubscriptionServerUrl(subscriptionId: String?): String {
418430
return unaryCallCoroutine {
419431
val req: LookupSubscriptionRequest =

client/src/main/kotlin/io/hstream/impl/StreamShardReaderKtImpl.kt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ class StreamShardReaderKtImpl(
2727
private val client: HStreamClientKtImpl,
2828
private val streamName: String,
2929
private val shardId: Long,
30-
private val shardOffset: StreamShardOffset,
30+
private val from: StreamShardOffset,
31+
private val maxReadBatches: Long,
32+
private val until: StreamShardOffset?,
3133
private val receiver: StreamShardReaderReceiver?,
3234
private val batchReceiver: StreamShardReaderBatchReceiver?,
3335
) : AbstractService(), StreamShardReader {
@@ -44,10 +46,15 @@ class StreamShardReaderKtImpl(
4446
.setReaderId(readerName).build()
4547
val lookupShardReaderResp = client.unaryCallBlocked { it.lookupShardReader(lookupShardReaderRequest) }
4648
val serverUrl = lookupShardReaderResp.serverNode.host + ":" + lookupShardReaderResp.serverNode.port
47-
val respFlow = client.getCoroutineStub(serverUrl).readShardStream(
48-
ReadShardStreamRequest.newBuilder().setReaderId(readerName).setShardId(shardId)
49-
.setShardOffset(GrpcUtils.streamShardOffsetToGrpc(shardOffset)).build()
50-
)
49+
val readerBuilder = ReadShardStreamRequest.newBuilder()
50+
.setReaderId(readerName)
51+
.setShardId(shardId)
52+
.setFrom(GrpcUtils.streamShardOffsetToGrpc(from))
53+
.setMaxReadBatches(maxReadBatches)
54+
if (until != null) {
55+
readerBuilder.until = GrpcUtils.streamShardOffsetToGrpc(until)
56+
}
57+
val respFlow = client.getCoroutineStub(serverUrl).readShardStream(readerBuilder.build())
5158
notifyStarted()
5259
readerScope.launch {
5360
respFlow.collect {

client/src/main/proto

0 commit comments

Comments
 (0)