Skip to content

Commit 4a49925

Browse files
authored
feat: add StreamShardReaderBatchReceiver, and add lookup for query/view apis (#209)
1 parent 218da96 commit 4a49925

File tree

5 files changed

+44
-12
lines changed

5 files changed

+44
-12
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.hstream;
2+
3+
import java.util.List;
4+
5+
public interface StreamShardReaderBatchReceiver {
6+
7+
void process(List<ReceivedRecord> receivedRecords);
8+
}

client/src/main/java/io/hstream/StreamShardReaderBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public interface StreamShardReaderBuilder {
99
StreamShardReaderBuilder shardOffset(StreamShardOffset shardOffset);
1010

1111
StreamShardReaderBuilder receiver(StreamShardReaderReceiver streamShardReaderReceiver);
12+
StreamShardReaderBuilder batchReceiver(StreamShardReaderBatchReceiver streamShardReaderReceiver);
1213

1314
StreamShardReader build();
1415
}

client/src/main/java/io/hstream/impl/StreamShardReaderBuilderImpl.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
import static com.google.common.base.Preconditions.*;
44
import static com.google.common.base.Preconditions.checkArgument;
55

6-
import io.hstream.StreamShardOffset;
7-
import io.hstream.StreamShardReader;
8-
import io.hstream.StreamShardReaderBuilder;
9-
import io.hstream.StreamShardReaderReceiver;
6+
import io.hstream.*;
107

118
public class StreamShardReaderBuilderImpl implements StreamShardReaderBuilder {
129

@@ -16,6 +13,7 @@ public class StreamShardReaderBuilderImpl implements StreamShardReaderBuilder {
1613
private StreamShardOffset shardOffset;
1714

1815
private StreamShardReaderReceiver receiver;
16+
private StreamShardReaderBatchReceiver batchReceiver;
1917

2018
public StreamShardReaderBuilderImpl(HStreamClientKtImpl client) {
2119
this.client = client;
@@ -45,14 +43,21 @@ public StreamShardReaderBuilder receiver(StreamShardReaderReceiver streamShardRe
4543
return this;
4644
}
4745

46+
@Override
47+
public StreamShardReaderBuilder batchReceiver(StreamShardReaderBatchReceiver batchReceiver) {
48+
this.batchReceiver = batchReceiver;
49+
return this;
50+
}
51+
4852
@Override
4953
public StreamShardReader build() {
5054
checkNotNull(client);
5155
checkArgument(streamName != null, "StreamShardReaderBuilder: `streamName` should not be null");
5256
checkArgument(shardId > 0, "StreamShardReaderBuilder: `shardId` error");
5357
checkArgument(
5458
shardOffset != null, "StreamShardReaderBuilder: `shardOffset` should not be null");
55-
checkArgument(receiver != null, "StreamShardReaderBuilder: `receiver` should not be null");
56-
return new StreamShardReaderKtImpl(client, streamName, shardId, shardOffset, receiver);
59+
checkArgument(receiver != null || batchReceiver != null,
60+
"StreamShardReaderBuilder: `receiver` should not be both null");
61+
return new StreamShardReaderKtImpl(client, streamName, shardId, shardOffset, receiver, batchReceiver);
5762
}
5863
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ class HStreamClientKtImpl(
310310

311311
override fun terminateQuery(name: String?) {
312312
checkNotNull(name)
313-
unaryCallBlocked {
313+
unaryCallBlockedWithLookup(ResourceType.ResQuery, name) {
314314
it.terminateQuery(TerminateQueryRequest.newBuilder().setQueryId(name).build())
315315
}
316316
}
@@ -325,14 +325,14 @@ class HStreamClientKtImpl(
325325
}
326326

327327
override fun getView(name: String?): View {
328-
return unaryCallBlocked {
328+
return unaryCallBlockedWithLookup(ResourceType.ResView, name) {
329329
val result = it.getView(GetViewRequest.newBuilder().setViewId(name).build())
330330
GrpcUtils.viewFromInternal(result)
331331
}
332332
}
333333

334334
override fun deleteView(name: String?) {
335-
unaryCallBlocked {
335+
return unaryCallBlockedWithLookup(ResourceType.ResView, name) {
336336
it.deleteView(DeleteViewRequest.newBuilder().setViewId(name).build())
337337
}
338338
}

client/src/main/kotlin/io/hstream/impl/StreamShardReaderKtImpl.kt

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import io.hstream.ReceivedRecord
77
import io.hstream.Record
88
import io.hstream.StreamShardOffset
99
import io.hstream.StreamShardReader
10+
import io.hstream.StreamShardReaderBatchReceiver
1011
import io.hstream.StreamShardReaderReceiver
1112
import io.hstream.internal.LookupShardReaderRequest
1213
import io.hstream.internal.ReadShardStreamRequest
@@ -20,13 +21,15 @@ import org.slf4j.LoggerFactory
2021
import java.time.Instant
2122
import java.util.UUID
2223
import java.util.concurrent.Executors
24+
import kotlin.streams.toList
2325

2426
class StreamShardReaderKtImpl(
2527
private val client: HStreamClientKtImpl,
2628
private val streamName: String,
2729
private val shardId: Long,
2830
private val shardOffset: StreamShardOffset,
29-
private val receiver: StreamShardReaderReceiver,
31+
private val receiver: StreamShardReaderReceiver?,
32+
private val batchReceiver: StreamShardReaderBatchReceiver?,
3033
) : AbstractService(), StreamShardReader {
3134
private val readerScope = CoroutineScope(Dispatchers.IO)
3235
private val readerName: String = UUID.randomUUID().toString()
@@ -76,19 +79,34 @@ class StreamShardReaderKtImpl(
7679
val receivedHStreamRecords = RecordUtils.decompress(receivedRecord)
7780
val createdTimestamp = receivedRecord.record.publishTime
7881
val createdTime = Instant.ofEpochSecond(createdTimestamp.seconds, createdTimestamp.nanos.toLong())
79-
for (receivedHStreamRecord in receivedHStreamRecords) {
8082

83+
if (batchReceiver != null) {
8184
executorService.submit {
8285
if (!isRunning) {
8386
return@submit
8487
}
8588

8689
try {
87-
receiver.process(toReceivedRecord(receivedHStreamRecord, createdTime))
90+
val records = receivedHStreamRecords.stream().map { toReceivedRecord(it, createdTime) }.toList()
91+
batchReceiver.process(records)
8892
} catch (e: Exception) {
8993
notifyFailed(e)
9094
}
9195
}
96+
} else {
97+
for (receivedHStreamRecord in receivedHStreamRecords) {
98+
executorService.submit {
99+
if (!isRunning) {
100+
return@submit
101+
}
102+
103+
try {
104+
receiver!!.process(toReceivedRecord(receivedHStreamRecord, createdTime))
105+
} catch (e: Exception) {
106+
notifyFailed(e)
107+
}
108+
}
109+
}
92110
}
93111
}
94112
}

0 commit comments

Comments
 (0)