Skip to content

Commit 84803c2

Browse files
authored
feat: add StreamBuilder and Replace Record with ReceivedRecord in Reader (#152)
feat: add StreamBuilder and HStreamClient.createStream(Stream stream)
1 parent 320d9dc commit 84803c2

File tree

6 files changed

+110
-7
lines changed

6 files changed

+110
-7
lines changed

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ static HStreamClientBuilder builder() {
5959
*/
6060
void createStream(String stream, short replicationFactor, int shardCount, int backlogDuration);
6161

62+
/**
63+
* Create a new stream.
64+
*
65+
* @param stream Stream Object, you should use {@link Stream.Builder} to build it.
66+
*/
67+
void createStream(Stream stream);
68+
6269
/**
6370
* List shards in a stream.
6471
*

client/src/main/java/io/hstream/Reader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
public interface Reader extends AutoCloseable {
88

99
/**
10-
* Read {@link Record}s from a stream shard.
10+
* Read {@link ReceivedRecord}s from a stream shard.
1111
*
1212
* @param maxRecords the max number of the returned records
13-
* @return the {@link Record}s wrapped in a {@link CompletableFuture}
13+
* @return the {@link ReceivedRecord}s wrapped in a {@link CompletableFuture}
1414
*/
15-
CompletableFuture<List<Record>> read(int maxRecords);
15+
CompletableFuture<List<ReceivedRecord>> read(int maxRecords);
1616
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.hstream;
2+
3+
public class ReceivedRecord {
4+
String recordId;
5+
Record record;
6+
7+
public ReceivedRecord(String recordId, Record record) {
8+
this.recordId = recordId;
9+
this.record = record;
10+
}
11+
12+
public String getRecordId() {
13+
return recordId;
14+
}
15+
16+
public void setRecordId(String recordId) {
17+
this.recordId = recordId;
18+
}
19+
20+
public Record getRecord() {
21+
return record;
22+
}
23+
24+
public void setRecord(Record record) {
25+
this.record = record;
26+
}
27+
}

client/src/main/java/io/hstream/Stream.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.hstream;
22

3+
import static com.google.common.base.Preconditions.checkArgument;
4+
import static com.google.common.base.Preconditions.checkNotNull;
5+
36
import java.util.Objects;
47

58
public class Stream {
@@ -62,4 +65,59 @@ public boolean equals(Object o) {
6265
public int hashCode() {
6366
return Objects.hash(streamName, replicationFactor, backlogDuration, shardCount);
6467
}
68+
69+
public static final class Builder {
70+
private String streamName;
71+
private int replicationFactor = 1;
72+
private int backlogDuration = 3600 * 24;
73+
private int shardCount = 1;
74+
75+
/**
76+
* @param streamName required, the name of the stream
77+
* @return Stream.Builder instance
78+
*/
79+
public Builder streamName(String streamName) {
80+
this.streamName = streamName;
81+
return this;
82+
}
83+
84+
/**
85+
* @param replicationFactor optional(default: 1), replication factor of the stream
86+
* @return Stream.Builder instance
87+
*/
88+
public Builder replicationFactor(int replicationFactor) {
89+
this.replicationFactor = replicationFactor;
90+
return this;
91+
}
92+
93+
/**
94+
* @param backlogDuration optional(default: 3600 * 24), backlog duration(in seconds) of the
95+
* stream
96+
* @return Stream.Builder instance
97+
*/
98+
public Builder backlogDuration(int backlogDuration) {
99+
this.backlogDuration = backlogDuration;
100+
return this;
101+
}
102+
103+
/**
104+
* @param shardCount optional(default: 1), number of shards in the stream
105+
* @return Stream.Builder instance
106+
*/
107+
public Builder shardCount(int shardCount) {
108+
this.shardCount = shardCount;
109+
return this;
110+
}
111+
112+
public Stream build() {
113+
checkNotNull(streamName);
114+
checkArgument(replicationFactor >= 1 && replicationFactor <= 15);
115+
checkArgument(shardCount >= 1);
116+
return new Stream(streamName, replicationFactor, backlogDuration, shardCount);
117+
}
118+
}
119+
120+
public static Builder newBuilder() {
121+
return new Builder();
122+
}
65123
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,15 @@ class HStreamClientKtImpl(bootstrapServerUrls: List<String>, credentials: Channe
121121
}
122122
}
123123

124+
override fun createStream(stream: Stream?) {
125+
checkNotNull(stream)
126+
unaryCallBlocked {
127+
it.createStream(
128+
GrpcUtils.streamToGrpc(stream)
129+
)
130+
}
131+
}
132+
124133
override fun listShards(streamName: String?): List<Shard> {
125134
checkNotNull(streamName)
126135
val listShardsRequest = ListShardsRequest.newBuilder().setStreamName(streamName).build()

client/src/main/kotlin/io/hstream/impl/ReaderKtImpl.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.hstream.impl
22

33
import io.hstream.HStreamDBClientException
44
import io.hstream.Reader
5+
import io.hstream.ReceivedRecord
56
import io.hstream.Record
67
import io.hstream.StreamShardOffset
78
import io.hstream.internal.CreateShardReaderRequest
@@ -43,8 +44,8 @@ class ReaderKtImpl(
4344
logger.info("created Reader [{}] for stream [{}] shard [{}]", readerId, streamName, shardId)
4445
}
4546

46-
override fun read(maxRecords: Int): CompletableFuture<MutableList<Record>> {
47-
val readFuture = CompletableFuture<MutableList<Record>>()
47+
override fun read(maxRecords: Int): CompletableFuture<MutableList<ReceivedRecord>> {
48+
val readFuture = CompletableFuture<MutableList<ReceivedRecord>>()
4849
readerScope.launch {
4950
try {
5051
val readShardRequest = ReadShardRequest.newBuilder().setReaderId(readerId).setMaxRecords(maxRecords).build()
@@ -54,16 +55,17 @@ class ReaderKtImpl(
5455
RecordUtils.decompress(it).map { receivedHStreamRecord ->
5556
val hStreamRecord = receivedHStreamRecord.record
5657
val header = RecordUtils.parseRecordHeaderFromHStreamRecord(hStreamRecord)
57-
if (RecordUtils.isRawRecord(hStreamRecord)) {
58+
val record = if (RecordUtils.isRawRecord(hStreamRecord)) {
5859
val rawRecord = RecordUtils.parseRawRecordFromHStreamRecord(hStreamRecord)
5960
Record.newBuilder().rawRecord(rawRecord).partitionKey(header.partitionKey).build()
6061
} else {
6162
val hRecord = RecordUtils.parseHRecordFromHStreamRecord(hStreamRecord)
6263
Record.newBuilder().hRecord(hRecord).partitionKey(header.partitionKey).build()
6364
}
65+
ReceivedRecord(GrpcUtils.recordIdFromGrpc(receivedHStreamRecord.recordId), record)
6466
}
6567
}
66-
readFuture.complete(res as MutableList<Record>?)
68+
readFuture.complete(res as MutableList<ReceivedRecord>?)
6769
} catch (e: Throwable) {
6870
readFuture.completeExceptionally(HStreamDBClientException(e))
6971
}

0 commit comments

Comments
 (0)