Skip to content

Commit 31078c1

Browse files
authored
feat: add createdTime in Stream and Subscription (#157)
1 parent 9d1c5d5 commit 31078c1

File tree

6 files changed

+57
-20
lines changed

6 files changed

+57
-20
lines changed

client/src/main/java/io/hstream/Stream.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,27 @@
33
import static com.google.common.base.Preconditions.checkArgument;
44
import static com.google.common.base.Preconditions.checkNotNull;
55

6+
import java.time.Instant;
67
import java.util.Objects;
78

89
public class Stream {
910
private String streamName;
1011
private int replicationFactor;
1112
private int backlogDuration;
1213
private int shardCount;
13-
14-
public Stream(String streamName, int replicationFactor, int backlogDuration, int shardCount) {
14+
private final Instant createdTime;
15+
16+
Stream(
17+
String streamName,
18+
int replicationFactor,
19+
int backlogDuration,
20+
int shardCount,
21+
Instant createdTime) {
1522
this.streamName = streamName;
1623
this.replicationFactor = replicationFactor;
1724
this.backlogDuration = backlogDuration;
1825
this.shardCount = shardCount;
26+
this.createdTime = createdTime;
1927
}
2028

2129
public String getStreamName() {
@@ -50,6 +58,10 @@ public void setShardCount(int shardCount) {
5058
this.shardCount = shardCount;
5159
}
5260

61+
public Instant getCreatedTime() {
62+
return createdTime;
63+
}
64+
5365
@Override
5466
public boolean equals(Object o) {
5567
if (this == o) return true;
@@ -71,6 +83,7 @@ public static final class Builder {
7183
private int replicationFactor = 1;
7284
private int backlogDuration = 3600 * 24;
7385
private int shardCount = 1;
86+
private Instant createdTime;
7487

7588
/**
7689
* @param streamName required, the name of the stream
@@ -109,11 +122,16 @@ public Builder shardCount(int shardCount) {
109122
return this;
110123
}
111124

125+
public Builder createdTime(Instant createdTime) {
126+
this.createdTime = createdTime;
127+
return this;
128+
}
129+
112130
public Stream build() {
113131
checkNotNull(streamName);
114132
checkArgument(replicationFactor >= 1 && replicationFactor <= 15);
115133
checkArgument(shardCount >= 1);
116-
return new Stream(streamName, replicationFactor, backlogDuration, shardCount);
134+
return new Stream(streamName, replicationFactor, backlogDuration, shardCount, createdTime);
117135
}
118136
}
119137

client/src/main/java/io/hstream/Subscription.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.google.common.base.Preconditions.*;
44

5+
import java.time.Instant;
56
import java.util.Objects;
67

78
public class Subscription {
@@ -14,6 +15,12 @@ public class Subscription {
1415

1516
private SubscriptionOffset offset;
1617

18+
public Instant getCreatedTime() {
19+
return createdTime;
20+
}
21+
22+
private final Instant createdTime;
23+
1724
public enum SubscriptionOffset {
1825
EARLIEST,
1926
LATEST,
@@ -24,12 +31,14 @@ private Subscription(
2431
String streamName,
2532
int ackTimeoutSeconds,
2633
int maxUnackedRecords,
27-
SubscriptionOffset offset) {
34+
SubscriptionOffset offset,
35+
Instant createdTime) {
2836
this.subscriptionId = subscriptionId;
2937
this.streamName = streamName;
3038
this.ackTimeoutSeconds = ackTimeoutSeconds;
3139
this.maxUnackedRecords = maxUnackedRecords;
3240
this.offset = offset;
41+
this.createdTime = createdTime;
3342
}
3443

3544
/** @return {@link Subscription.Builder} */
@@ -81,6 +90,8 @@ public static class Builder {
8190
private int maxUnackedRecords = 10000;
8291
private SubscriptionOffset offset = SubscriptionOffset.LATEST;
8392

93+
private Instant createdTime;
94+
8495
public Builder subscription(String subscriptionId) {
8596
this.subscriptionId = subscriptionId;
8697
return this;
@@ -106,13 +117,18 @@ public Builder offset(SubscriptionOffset offset) {
106117
return this;
107118
}
108119

120+
public Builder createdTime(Instant createdTime) {
121+
this.createdTime = createdTime;
122+
return this;
123+
}
124+
109125
public Subscription build() {
110126
checkNotNull(subscriptionId);
111127
checkNotNull(streamName);
112128
checkState(ackTimeoutSeconds > 0 && ackTimeoutSeconds < 36000);
113129
checkState(maxUnackedRecords > 0);
114130
return new Subscription(
115-
subscriptionId, streamName, ackTimeoutSeconds, maxUnackedRecords, offset);
131+
subscriptionId, streamName, ackTimeoutSeconds, maxUnackedRecords, offset, createdTime);
116132
}
117133
}
118134
}

client/src/main/java/io/hstream/TaskStatus.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,5 @@ public enum TaskStatus {
55
TASK_CREATED,
66
TASK_RUNNING,
77
TASK_CREATION_ABORT,
8-
TASK_CONNECTION_ABORT,
98
TASK_TERMINATED,
109
}

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.hstream.internal.RecordId;
77
import io.hstream.internal.SpecialOffset;
88
import io.hstream.internal.TaskStatusPB;
9+
import java.time.Instant;
910

1011
/**
1112
* A class of utility functions to convert between the GRPC generated classes and the custom classes
@@ -61,10 +62,12 @@ public static io.hstream.internal.Subscription subscriptionToGrpc(Subscription s
6162
}
6263

6364
public static Subscription subscriptionFromGrpc(io.hstream.internal.Subscription subscription) {
65+
var createdTime = subscription.getCreationTime();
6466
return Subscription.newBuilder().subscription(subscription.getSubscriptionId()).stream(
6567
subscription.getStreamName())
6668
.ackTimeoutSeconds(subscription.getAckTimeoutSeconds())
6769
.offset(subscriptionOffsetFromGrpc(subscription.getOffset()))
70+
.createdTime(Instant.ofEpochSecond(createdTime.getSeconds(), createdTime.getNanos()))
6871
.build();
6972
}
7073

@@ -78,11 +81,14 @@ public static io.hstream.internal.Stream streamToGrpc(Stream stream) {
7881
}
7982

8083
public static Stream streamFromGrpc(io.hstream.internal.Stream stream) {
81-
return new Stream(
82-
stream.getStreamName(),
83-
stream.getReplicationFactor(),
84-
stream.getBacklogDuration(),
85-
stream.getShardCount());
84+
var createdTime = stream.getCreationTime();
85+
return Stream.newBuilder()
86+
.streamName(stream.getStreamName())
87+
.replicationFactor(stream.getReplicationFactor())
88+
.backlogDuration(stream.getBacklogDuration())
89+
.shardCount(stream.getShardCount())
90+
.createdTime(Instant.ofEpochSecond(createdTime.getSeconds(), createdTime.getNanos()))
91+
.build();
8692
}
8793

8894
public static StreamShardOffset streamShardOffsetFromGrpc(
@@ -183,8 +189,6 @@ public static TaskStatus taskStatusFromInternal(TaskStatusPB statusPB) {
183189
return TaskStatus.TASK_CREATED;
184190
case TASK_RUNNING:
185191
return TaskStatus.TASK_RUNNING;
186-
case TASK_CONNECTION_ABORT:
187-
return TaskStatus.TASK_CONNECTION_ABORT;
188192
case TASK_CREATION_ABORT:
189193
return TaskStatus.TASK_CREATION_ABORT;
190194
case TASK_TERMINATED:

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ class HStreamClientKtImpl(bootstrapServerUrls: List<String>, credentials: Channe
121121
unaryCallBlocked {
122122
it.createStream(
123123
GrpcUtils.streamToGrpc(
124-
Stream(
125-
stream,
126-
replicationFactor.toInt(),
127-
backlogDuration,
128-
shardCnt
129-
)
124+
Stream.newBuilder()
125+
.streamName(stream)
126+
.replicationFactor(replicationFactor.toInt())
127+
.shardCount(shardCnt)
128+
.backlogDuration(backlogDuration)
129+
.build()
130130
)
131131
)
132132
}

client/src/main/proto

0 commit comments

Comments
 (0)