Skip to content

Commit acbf5e9

Browse files
authored
feat: add timestamp offset (#198)
1 parent 0cea064 commit acbf5e9

File tree

2 files changed

+35
-19
lines changed

2 files changed

+35
-19
lines changed

client/src/main/java/io/hstream/StreamShardOffset.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ public enum SpecialOffset {
77
LATEST;
88
}
99

10-
private enum OffsetType {
10+
public enum OffsetType {
1111
SPECIAL,
12+
TIMESTAMP,
1213
NORMAL;
1314
}
1415

1516
private SpecialOffset specialOffset;
1617
private String recordId;
18+
private long timestamp;
19+
1720
private OffsetType offsetType;
1821

1922
public StreamShardOffset(SpecialOffset specialOffset) {
@@ -26,6 +29,11 @@ public StreamShardOffset(String recordId) {
2629
this.offsetType = OffsetType.NORMAL;
2730
}
2831

32+
public StreamShardOffset(long timestamp) {
33+
this.timestamp = timestamp;
34+
this.offsetType = OffsetType.TIMESTAMP;
35+
}
36+
2937
public boolean isSpecialOffset() {
3038
return offsetType.equals(OffsetType.SPECIAL);
3139
}
@@ -34,6 +42,14 @@ public boolean isNormalOffset() {
3442
return offsetType.equals(OffsetType.NORMAL);
3543
}
3644

45+
public boolean isTimestampOffset() {
46+
return offsetType.equals(OffsetType.TIMESTAMP);
47+
}
48+
49+
public OffsetType getOffsetType() {
50+
return offsetType;
51+
}
52+
3753
public SpecialOffset getSpecialOffset() {
3854
if (isSpecialOffset()) {
3955
return specialOffset;
@@ -49,4 +65,12 @@ public String getNormalOffset() {
4965
throw new IllegalStateException("subscriptionOffset is not normal offset");
5066
}
5167
}
68+
69+
public long getTimestampOffset() {
70+
if (isTimestampOffset()) {
71+
return timestamp;
72+
} else {
73+
throw new IllegalStateException("subscriptionOffset is not timestamp offset");
74+
}
75+
}
5276
}

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import com.google.protobuf.util.JsonFormat;
88
import io.hstream.*;
99
import io.hstream.internal.RecordId;
10+
import io.hstream.internal.ShardOffset;
1011
import io.hstream.internal.SpecialOffset;
1112
import io.hstream.internal.TaskStatusPB;
13+
import io.hstream.internal.TimestampOffset;
1214
import java.time.Instant;
1315
import java.util.stream.Collectors;
1416

@@ -96,24 +98,6 @@ public static Stream streamFromGrpc(io.hstream.internal.Stream stream) {
9698
.build();
9799
}
98100

99-
public static StreamShardOffset streamShardOffsetFromGrpc(
100-
io.hstream.internal.ShardOffset shardOffset) {
101-
if (shardOffset.hasSpecialOffset()) {
102-
switch (shardOffset.getSpecialOffset()) {
103-
case EARLIEST:
104-
return new StreamShardOffset(StreamShardOffset.SpecialOffset.EARLIEST);
105-
case LATEST:
106-
return new StreamShardOffset(StreamShardOffset.SpecialOffset.LATEST);
107-
default:
108-
throw new IllegalArgumentException("Unknown ShardOffset : " + shardOffset);
109-
}
110-
} else if (shardOffset.hasRecordOffset()) {
111-
return new StreamShardOffset(recordIdFromGrpc(shardOffset.getRecordOffset()));
112-
} else {
113-
throw new IllegalArgumentException("Unknown ShardOffset : " + shardOffset);
114-
}
115-
}
116-
117101
public static io.hstream.internal.ShardOffset streamShardOffsetToGrpc(
118102
StreamShardOffset shardOffset) {
119103
if (shardOffset.isSpecialOffset()) {
@@ -133,6 +117,14 @@ public static io.hstream.internal.ShardOffset streamShardOffsetToGrpc(
133117
return io.hstream.internal.ShardOffset.newBuilder()
134118
.setRecordOffset(recordIdToGrpc(shardOffset.getNormalOffset()))
135119
.build();
120+
} else if (shardOffset.isTimestampOffset()) {
121+
return ShardOffset.newBuilder()
122+
.setTimestampOffset(
123+
TimestampOffset.newBuilder()
124+
.setTimestampInMs(shardOffset.getTimestampOffset())
125+
.setStrictAccuracy(true)
126+
.build())
127+
.build();
136128
} else {
137129
throw new IllegalArgumentException("Unknown streamShardOffset : " + shardOffset);
138130
}

0 commit comments

Comments
 (0)