Skip to content

Commit 9fdc4cf

Browse files
committed
add builder for Subscription
1 parent 54c6dd0 commit 9fdc4cf

File tree

5 files changed

+100
-92
lines changed

5 files changed

+100
-92
lines changed

client/src/main/java/io/hstream/Subscription.java

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,12 @@
55
/** A class for storing information about subscriptions */
66
public class Subscription {
77

8-
/** An identifier for the subscription */
98
private String subscriptionId;
10-
/** The name of the stream being subscribed to */
119
private String streamName;
12-
/** The offset that indicates the position to start consuming data from the stream */
1310
private SubscriptionOffset subscriptionOffset;
14-
1511
private int ackTimeoutSeconds;
1612

17-
/**
18-
* A constructor for subscriptions
19-
*
20-
* @param subscriptionId An identifier for the subscription
21-
* @param streamName The name of the stream being subscribed to
22-
* @param subscriptionOffset A {@link SubscriptionOffset} to indicate the position to start
23-
* consuming data
24-
*/
25-
public Subscription(
13+
private Subscription(
2614
String subscriptionId,
2715
String streamName,
2816
SubscriptionOffset subscriptionOffset,
@@ -33,17 +21,19 @@ public Subscription(
3321
this.ackTimeoutSeconds = ackTimeoutSeconds;
3422
}
3523

36-
/** get the identifier of the subscription */
24+
/** @return {@link Subscription.Builder} */
25+
public static Builder newBuilder() {
26+
return new Builder();
27+
}
28+
3729
public String getSubscriptionId() {
3830
return subscriptionId;
3931
}
4032

41-
/** get the name of stream being subscribed to */
4233
public String getStreamName() {
4334
return streamName;
4435
}
4536

46-
/** get the subscription offset */
4737
public SubscriptionOffset getSubscriptionOffset() {
4838
return subscriptionOffset;
4939
}
@@ -52,25 +42,6 @@ public int getAckTimeoutSeconds() {
5242
return ackTimeoutSeconds;
5343
}
5444

55-
/** update the identifier of the subscription */
56-
public void setSubscriptionId(String subscriptionId) {
57-
this.subscriptionId = subscriptionId;
58-
}
59-
60-
/** update the name of the stream */
61-
public void setStreamName(String streamName) {
62-
this.streamName = streamName;
63-
}
64-
65-
/** update the subscription offset */
66-
public void setSubscriptionOffset(SubscriptionOffset subscriptionOffset) {
67-
this.subscriptionOffset = subscriptionOffset;
68-
}
69-
70-
public void setAckTimeoutSeconds(int ackTimeoutSeconds) {
71-
this.ackTimeoutSeconds = ackTimeoutSeconds;
72-
}
73-
7445
@Override
7546
public boolean equals(Object o) {
7647
if (this == o) return true;
@@ -86,4 +57,36 @@ public boolean equals(Object o) {
8657
public int hashCode() {
8758
return Objects.hash(subscriptionId, streamName, subscriptionOffset, ackTimeoutSeconds);
8859
}
60+
61+
public static class Builder {
62+
63+
private String subscriptionId;
64+
private String streamName;
65+
private SubscriptionOffset subscriptionOffset;
66+
private int ackTimeoutSeconds;
67+
68+
public Builder subscription(String subscriptionId) {
69+
this.subscriptionId = subscriptionId;
70+
return this;
71+
}
72+
73+
public Builder stream(String streamName) {
74+
this.streamName = streamName;
75+
return this;
76+
}
77+
78+
public Builder offset(SubscriptionOffset subscriptionOffset) {
79+
this.subscriptionOffset = subscriptionOffset;
80+
return this;
81+
}
82+
83+
public Builder ackTimeoutSeconds(int ackTimeoutSeconds) {
84+
this.ackTimeoutSeconds = ackTimeoutSeconds;
85+
return this;
86+
}
87+
88+
public Subscription build() {
89+
return new Subscription(subscriptionId, streamName, subscriptionOffset, ackTimeoutSeconds);
90+
}
91+
}
8992
}
Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,52 @@
11
package io.hstream;
22

3-
import java.util.Objects;
4-
5-
/** A class with information about subscription offset */
63
public class SubscriptionOffset {
7-
/** Subscription offsets that can be used */
4+
85
public enum SpecialOffset {
9-
/** Start consuming messages from the earliest position in the stream */
106
EARLIEST,
11-
/**
12-
* Ignore the history messages up to the moment of subscription and ead the messages thereafter.
13-
*/
14-
LATEST,
15-
UNRECOGNIZED;
7+
LATEST;
8+
}
9+
10+
private enum OffsetType {
11+
SPECIAL,
12+
NORMAL;
1613
}
1714

1815
private SpecialOffset specialOffset;
16+
private RecordId recordId;
17+
private OffsetType offsetType;
1918

20-
/**
21-
* A constructor for subscription offset
22-
*
23-
* @param specialOffset One of the offsets defined in {@link SpecialOffset}
24-
*/
2519
public SubscriptionOffset(SpecialOffset specialOffset) {
2620
this.specialOffset = specialOffset;
21+
this.offsetType = OffsetType.SPECIAL;
2722
}
2823

29-
/** get the offset */
30-
public SpecialOffset getSpecialOffset() {
31-
return specialOffset;
24+
public SubscriptionOffset(RecordId recordId) {
25+
this.recordId = recordId;
26+
this.offsetType = OffsetType.NORMAL;
3227
}
3328

34-
/** update the offset */
35-
public void setSpecialOffset(SpecialOffset specialOffset) {
36-
this.specialOffset = specialOffset;
29+
public boolean isSpecialOffset() {
30+
return offsetType.equals(OffsetType.SPECIAL);
3731
}
3832

39-
@Override
40-
public boolean equals(Object o) {
41-
if (this == o) {
42-
return true;
43-
}
44-
if (o == null || getClass() != o.getClass()) {
45-
return false;
33+
public boolean isNormalOffset() {
34+
return offsetType.equals(OffsetType.NORMAL);
35+
}
36+
37+
public SpecialOffset getSpecialOffset() {
38+
if (isSpecialOffset()) {
39+
return specialOffset;
40+
} else {
41+
throw new IllegalStateException("subscriptionOffset is not specialOffset");
4642
}
47-
SubscriptionOffset that = (SubscriptionOffset) o;
48-
return specialOffset == that.specialOffset;
4943
}
5044

51-
@Override
52-
public int hashCode() {
53-
return Objects.hash(specialOffset);
45+
public RecordId getNormalOffset() {
46+
if (isSpecialOffset()) {
47+
return recordId;
48+
} else {
49+
throw new IllegalStateException("subscriptionOffset is not normal offset");
50+
}
5451
}
5552
}

client/src/main/java/io/hstream/impl/QueryerImpl.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,14 @@ public void onNext(CreateQueryStreamResponse value) {
6060
value.getStreamQuery().getId(),
6161
value.getQueryStream().getStreamName());
6262

63-
client.createSubscription(
64-
new Subscription(
65-
STREAM_QUERY_SUBSCRIPTION_PREFIX + resultStreamNameSuffix,
66-
STREAM_QUERY_STREAM_PREFIX + resultStreamNameSuffix,
67-
new SubscriptionOffset(SubscriptionOffset.SpecialOffset.EARLIEST),
68-
10));
63+
Subscription subscription =
64+
Subscription.newBuilder()
65+
.subscription(STREAM_QUERY_SUBSCRIPTION_PREFIX + resultStreamNameSuffix)
66+
.stream(STREAM_QUERY_STREAM_PREFIX + resultStreamNameSuffix)
67+
.offset(new SubscriptionOffset(SubscriptionOffset.SpecialOffset.EARLIEST))
68+
.ackTimeoutSeconds(10)
69+
.build();
70+
client.createSubscription(subscription);
6971

7072
queryInnerConsumer =
7173
client

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ public static io.hstream.internal.SubscriptionOffset.SpecialOffset specialOffset
2626
return io.hstream.internal.SubscriptionOffset.SpecialOffset.EARLIST;
2727
case LATEST:
2828
return io.hstream.internal.SubscriptionOffset.SpecialOffset.LATEST;
29-
case UNRECOGNIZED:
30-
return io.hstream.internal.SubscriptionOffset.SpecialOffset.UNRECOGNIZED;
3129
default:
3230
throw new IllegalArgumentException();
3331
}
@@ -40,23 +38,32 @@ public static SubscriptionOffset.SpecialOffset specialOffsetFromGrpc(
4038
return SubscriptionOffset.SpecialOffset.EARLIEST;
4139
case LATEST:
4240
return SubscriptionOffset.SpecialOffset.LATEST;
43-
case UNRECOGNIZED:
44-
return SubscriptionOffset.SpecialOffset.UNRECOGNIZED;
4541
default:
4642
throw new IllegalArgumentException();
4743
}
4844
}
4945

5046
public static io.hstream.internal.SubscriptionOffset subscriptionOffsetToGrpc(
5147
SubscriptionOffset offset) {
52-
return io.hstream.internal.SubscriptionOffset.newBuilder()
53-
.setSpecialOffset(specialOffsetToGrpc(offset.getSpecialOffset()))
54-
.build();
48+
if (offset.isSpecialOffset()) {
49+
return io.hstream.internal.SubscriptionOffset.newBuilder()
50+
.setSpecialOffset(specialOffsetToGrpc(offset.getSpecialOffset()))
51+
.build();
52+
53+
} else {
54+
return io.hstream.internal.SubscriptionOffset.newBuilder()
55+
.setRecordOffset(recordIdToGrpc(offset.getNormalOffset()))
56+
.build();
57+
}
5558
}
5659

5760
public static SubscriptionOffset subscriptionOffsetFromGrpc(
5861
io.hstream.internal.SubscriptionOffset offset) {
59-
return new SubscriptionOffset(specialOffsetFromGrpc(offset.getSpecialOffset()));
62+
if (offset.hasRecordOffset()) {
63+
return new SubscriptionOffset(recordIdFromGrpc(offset.getRecordOffset()));
64+
} else {
65+
return new SubscriptionOffset(specialOffsetFromGrpc(offset.getSpecialOffset()));
66+
}
6067
}
6168

6269
public static io.hstream.internal.Subscription subscriptionToGrpc(Subscription subscription) {
@@ -69,11 +76,11 @@ public static io.hstream.internal.Subscription subscriptionToGrpc(Subscription s
6976
}
7077

7178
public static Subscription subscriptionFromGrpc(io.hstream.internal.Subscription subscription) {
72-
return new Subscription(
73-
subscription.getSubscriptionId(),
74-
subscription.getStreamName(),
75-
subscriptionOffsetFromGrpc(subscription.getOffset()),
76-
subscription.getAckTimeoutSeconds());
79+
return Subscription.newBuilder().subscription(subscription.getSubscriptionId()).stream(
80+
subscription.getStreamName())
81+
.offset(subscriptionOffsetFromGrpc(subscription.getOffset()))
82+
.ackTimeoutSeconds(subscription.getAckTimeoutSeconds())
83+
.build();
7784
}
7885

7986
public static io.hstream.internal.Stream streamToGrpc(Stream stream) {

client/src/test/java/io/hstream/HStreamClientTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@ public void setUp() {
2828
testSubscriptionId = TEST_SUBSCRIPTION_PREFIX + suffix;
2929
client.createStream(testStreamName);
3030
Subscription subscription =
31-
new Subscription(
32-
testSubscriptionId,
33-
testStreamName,
34-
new SubscriptionOffset(SubscriptionOffset.SpecialOffset.LATEST),
35-
10);
31+
Subscription.newBuilder().subscription(testSubscriptionId).stream(testStreamName)
32+
.offset(new SubscriptionOffset(SubscriptionOffset.SpecialOffset.LATEST))
33+
.ackTimeoutSeconds(10)
34+
.build();
3635
client.createSubscription(subscription);
3736
}
3837

0 commit comments

Comments
 (0)