Skip to content

Commit 08ffae0

Browse files
authored
encapsulate the Stream class & documentation (#22)
* encapsulate class Stream * updated examples in README * added some javadoc
1 parent ed9dbfc commit 08ffae0

File tree

8 files changed

+139
-26
lines changed

8 files changed

+139
-26
lines changed

README.md

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -137,31 +137,27 @@ batchedProducer.flush()
137137
### Consume Data from a Stream
138138

139139
```java
140-
141140
// first, create a subscription for the stream
142141
Subscription subscription =
143-
Subscription.newBuilder()
144-
.setSubscriptionId("my_subscription")
145-
.setStreamName("test_stream")
146-
.setOffset(
147-
SubscriptionOffset.newBuilder()
148-
.setSpecialOffset(SubscriptionOffset.SpecialOffset.LATEST)
149-
.build())
150-
.build();
151-
client.createSubscription(subscription);
152-
153-
// second, create a consumer attacth to the subscription
142+
new Subscription(
143+
"my_subscription",
144+
"test_stream",
145+
new SubscriptionOffset(SubscriptionOffset.SpecialOffset.LATEST));
146+
client.createSubscription(subscription);
147+
148+
// second, create a consumer attach to the subscription
154149
Consumer consumer =
155-
client
156-
.newConsumer()
157-
.subscription("my_subscription")
158-
.rawRecordReceiver(
159-
(receivedRawRecord, responder) -> {
160-
System.out.println(receivedRawRecord.getRecordId());
161-
responder.ack();})
162-
.build();
150+
client
151+
.newConsumer()
152+
.subscription("my_subscription")
153+
.rawRecordReceiver(
154+
((receivedRawRecord, responder) -> {
155+
System.out.println(receivedRawRecord.getRecordId());
156+
responder.ack();
157+
}))
158+
.build();
163159

164-
// third, start the consumer
160+
// third, start the consumer
165161
consumer.startAsync().awaitRunning();
166162

167163
```

src/main/java/io/hstream/HStreamClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.hstream;
22

3-
import io.hstream.internal.Stream;
43
import java.util.List;
54

65
/** HstreamClient implement the hstream client, user can use it to interact with server */

src/main/java/io/hstream/RecordId.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,41 @@
22

33
import java.util.Objects;
44

5+
/** A class for storing identification information of data records */
56
public class RecordId {
67

8+
/** An identification number for the data record that is unique among all the data records */
79
private long batchId;
10+
/** An index number identifies the data record in its own batch */
811
private int batchIndex;
912

13+
/**
14+
* A constructor for RecordId
15+
*
16+
* @param batchId the unique identification number of the data record in the database
17+
* @param batchIndex the index of the data record in its own batch
18+
*/
1019
public RecordId(long batchId, int batchIndex) {
1120
this.batchId = batchId;
1221
this.batchIndex = batchIndex;
1322
}
1423

24+
/** get the unique identification number of the data record */
1525
public long getBatchId() {
1626
return batchId;
1727
}
1828

29+
/** get the index of the data record in its own batch */
1930
public int getBatchIndex() {
2031
return batchIndex;
2132
}
2233

34+
/** update the identification number */
2335
public void setBatchId(long batchId) {
2436
this.batchId = batchId;
2537
}
2638

39+
/** update the data record index */
2740
public void setBatchIndex(int batchIndex) {
2841
this.batchIndex = batchIndex;
2942
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.hstream;
2+
3+
import java.util.Objects;
4+
5+
/** A class for storing information about streams */
6+
public class Stream {
7+
private String streamName;
8+
private int replicationFactor;
9+
10+
/**
11+
* a constructor for a real-time data stream
12+
*
13+
* @param streamName the name of the stream
14+
* @param replicationFactor the number of replicas to be stored in the database
15+
*/
16+
public Stream(String streamName, int replicationFactor) {
17+
this.streamName = streamName;
18+
this.replicationFactor = replicationFactor;
19+
}
20+
21+
/** get the name of the stream */
22+
public String getStreamName() {
23+
return streamName;
24+
}
25+
26+
/** get the replication factor of the stream */
27+
public int getReplicationFactor() {
28+
return replicationFactor;
29+
}
30+
31+
/** update the name of the stream */
32+
public void setStreamName(String streamName) {
33+
this.streamName = streamName;
34+
}
35+
36+
/** update the replication factor of the stream */
37+
public void setReplicationFactor(int replicationFactor) {
38+
this.replicationFactor = replicationFactor;
39+
}
40+
41+
@Override
42+
public boolean equals(Object o) {
43+
if (this == o) {
44+
return true;
45+
}
46+
if (o == null || getClass() != o.getClass()) {
47+
return false;
48+
}
49+
Stream stream = (Stream) o;
50+
return replicationFactor == stream.replicationFactor
51+
&& Objects.equals(streamName, stream.streamName);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hash(streamName, replicationFactor);
57+
}
58+
}

src/main/java/io/hstream/Subscription.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,57 @@
22

33
import java.util.Objects;
44

5+
/** A class for storing information about subscriptions */
56
public class Subscription {
67

8+
/** An identifier for the subscription */
79
private String subscriptionId;
10+
/** The name of the stream being subscribed to */
811
private String streamName;
12+
/** The offset that indicates the position to start consuming data from the stream */
913
private SubscriptionOffset subscriptionOffset;
1014

15+
/**
16+
* A constructor for subscriptions
17+
*
18+
* @param subscriptionId An identifier for the subscription
19+
* @param streamName The name of the stream being subscribed to
20+
* @param subscriptionOffset A {@link SubscriptionOffset} to indicate the position to start
21+
* consuming data
22+
*/
1123
public Subscription(
1224
String subscriptionId, String streamName, SubscriptionOffset subscriptionOffset) {
1325
this.subscriptionId = subscriptionId;
1426
this.streamName = streamName;
1527
this.subscriptionOffset = subscriptionOffset;
1628
}
1729

30+
/** get the identifier of the subscription */
1831
public String getSubscriptionId() {
1932
return subscriptionId;
2033
}
2134

35+
/** get the name of stream being subscribed to */
2236
public String getStreamName() {
2337
return streamName;
2438
}
2539

40+
/** get the subscription offset */
2641
public SubscriptionOffset getSubscriptionOffset() {
2742
return subscriptionOffset;
2843
}
2944

45+
/** update the identifier of the subscription */
3046
public void setSubscriptionId(String subscriptionId) {
3147
this.subscriptionId = subscriptionId;
3248
}
3349

50+
/** update the name of the stream */
3451
public void setStreamName(String streamName) {
3552
this.streamName = streamName;
3653
}
3754

55+
/** update the subscription offset */
3856
public void setSubscriptionOffset(SubscriptionOffset subscriptionOffset) {
3957
this.subscriptionOffset = subscriptionOffset;
4058
}

src/main/java/io/hstream/SubscriptionOffset.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,36 @@
22

33
import java.util.Objects;
44

5+
/** A class with information about subscription offset */
56
public class SubscriptionOffset {
7+
/** Subscription offsets that can be used */
68
public enum SpecialOffset {
9+
/** Start consuming messages from the earliest position in the stream */
710
EARLIEST,
11+
/**
12+
* Ignore the history messages up to the moment of subscription and ead the messages thereafter.
13+
*/
814
LATEST,
915
UNRECOGNIZED;
1016
}
1117

1218
private SpecialOffset specialOffset;
1319

20+
/**
21+
* A constructor for subscription offset
22+
*
23+
* @param specialOffset One of the offsets defined in {@link SpecialOffset}
24+
*/
1425
public SubscriptionOffset(SpecialOffset specialOffset) {
1526
this.specialOffset = specialOffset;
1627
}
1728

29+
/** get the offset */
1830
public SpecialOffset getSpecialOffset() {
1931
return specialOffset;
2032
}
2133

34+
/** update the offset */
2235
public void setSpecialOffset(SpecialOffset specialOffset) {
2336
this.specialOffset = specialOffset;
2437
}

src/main/java/io/hstream/impl/ClientImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import io.hstream.internal.DeleteSubscriptionRequest;
99
import io.hstream.internal.HStreamApiGrpc;
1010
import io.hstream.internal.ListStreamsResponse;
11-
import io.hstream.internal.Stream;
1211
import io.hstream.util.GrpcUtils;
1312
import java.util.List;
1413
import java.util.concurrent.TimeUnit;
@@ -48,9 +47,9 @@ public QueryerBuilder newQueryer() {
4847

4948
@Override
5049
public void createStream(String streamName) {
51-
Stream stream = Stream.newBuilder().setStreamName(streamName).setReplicationFactor(3).build();
50+
Stream stream = new Stream(streamName, 3);
5251

53-
blockingStub.createStream(stream);
52+
blockingStub.createStream(GrpcUtils.streamToGrpc(stream));
5453
}
5554

5655
@Override
@@ -65,7 +64,9 @@ public void deleteStream(String streamName) {
6564
public List<Stream> listStreams() {
6665
Empty empty = Empty.newBuilder().build();
6766
ListStreamsResponse listStreamsResponse = blockingStub.listStreams(empty);
68-
return listStreamsResponse.getStreamsList();
67+
return listStreamsResponse.getStreamsList().stream()
68+
.map(GrpcUtils::streamFromGrpc)
69+
.collect(Collectors.toList());
6970
}
7071

7172
@Override

src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
import io.hstream.*;
44

5+
/**
6+
* A class of utility functions to convert between the GRPC generated classes and the custom classes
7+
* e.g. {@link RecordId}, {@link Subscription}, and {@link Stream}
8+
*/
59
public class GrpcUtils {
610

711
public static io.hstream.internal.RecordId recordIdToGrpc(RecordId recordId) {
@@ -69,4 +73,15 @@ public static Subscription subscriptionFromGrpc(io.hstream.internal.Subscription
6973
subscription.getStreamName(),
7074
subscriptionOffsetFromGrpc(subscription.getOffset()));
7175
}
76+
77+
public static io.hstream.internal.Stream streamToGrpc(Stream stream) {
78+
return io.hstream.internal.Stream.newBuilder()
79+
.setStreamName(stream.getStreamName())
80+
.setReplicationFactor(stream.getReplicationFactor())
81+
.build();
82+
}
83+
84+
public static Stream streamFromGrpc(io.hstream.internal.Stream stream) {
85+
return new Stream(stream.getStreamName(), stream.getReplicationFactor());
86+
}
7287
}

0 commit comments

Comments
 (0)