Skip to content

Commit 7ec32e8

Browse files
authored
feat: Support shards stage1 (#72)
1 parent 869239d commit 7ec32e8

File tree

17 files changed

+995
-224
lines changed

17 files changed

+995
-224
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
run: ./gradlew ktlintCheck
3535

3636
- name: Prepare test env
37-
run: script/prepare-test-env.sh
37+
run: script/prepare-test-env-latest.sh
3838

3939
- name: Build and test with Gradle
4040
run: ./gradlew build

client/src/main/java/io/hstream/Producer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,12 @@ public interface Producer {
2020
* @return the {@link RecordId} wrapped in a {@link CompletableFuture}
2121
*/
2222
CompletableFuture<RecordId> write(HRecord hRecord);
23+
24+
/**
25+
* Write a {@link Record}.
26+
*
27+
* @param record {@link Record}
28+
* @return the {@link RecordId} wrapped in a {@link CompletableFuture}
29+
*/
30+
CompletableFuture<RecordId> write(Record record);
2331
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package io.hstream;
2+
3+
import static com.google.common.base.Preconditions.checkArgument;
4+
5+
public class Record {
6+
7+
private String key;
8+
private byte[] rawRecord;
9+
private HRecord hRecord;
10+
private boolean isRawRecord;
11+
12+
public static Builder newBuilder() {
13+
return new Builder();
14+
}
15+
16+
private Record(String key, byte[] rawRecord) {
17+
isRawRecord = true;
18+
this.key = key;
19+
this.rawRecord = rawRecord;
20+
}
21+
22+
private Record(String key, HRecord hRecord) {
23+
isRawRecord = false;
24+
this.key = key;
25+
this.hRecord = hRecord;
26+
}
27+
28+
public String getKey() {
29+
return key;
30+
}
31+
32+
public byte[] getRawRecord() {
33+
checkArgument(isRawRecord);
34+
return rawRecord;
35+
}
36+
37+
public HRecord getHRecord() {
38+
checkArgument(!isRawRecord);
39+
return hRecord;
40+
}
41+
42+
public boolean isRawRecord() {
43+
return isRawRecord;
44+
}
45+
46+
public static class Builder {
47+
private String key;
48+
private byte[] rawRecord;
49+
private HRecord hRecord;
50+
51+
public Builder key(String key) {
52+
this.key = key;
53+
return this;
54+
}
55+
56+
public Builder rawRecord(byte[] rawRecord) {
57+
this.rawRecord = rawRecord;
58+
return this;
59+
}
60+
61+
public Builder hRecord(HRecord hRecord) {
62+
this.hRecord = hRecord;
63+
return this;
64+
}
65+
66+
public Record build() {
67+
checkArgument(
68+
(rawRecord != null && hRecord == null) || (rawRecord == null && hRecord != null));
69+
if (rawRecord != null) {
70+
return new Record(key, rawRecord);
71+
} else {
72+
return new Record(key, hRecord);
73+
}
74+
}
75+
}
76+
}

client/src/main/java/io/hstream/Subscription.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,11 @@ public class Subscription {
88

99
private String subscriptionId;
1010
private String streamName;
11-
private SubscriptionOffset subscriptionOffset;
1211
private int ackTimeoutSeconds;
1312

14-
private Subscription(
15-
String subscriptionId,
16-
String streamName,
17-
SubscriptionOffset subscriptionOffset,
18-
int ackTimeoutSeconds) {
13+
private Subscription(String subscriptionId, String streamName, int ackTimeoutSeconds) {
1914
this.subscriptionId = subscriptionId;
2015
this.streamName = streamName;
21-
this.subscriptionOffset = subscriptionOffset;
2216
this.ackTimeoutSeconds = ackTimeoutSeconds;
2317
}
2418

@@ -35,10 +29,6 @@ public String getStreamName() {
3529
return streamName;
3630
}
3731

38-
public SubscriptionOffset getSubscriptionOffset() {
39-
return subscriptionOffset;
40-
}
41-
4232
public int getAckTimeoutSeconds() {
4333
return ackTimeoutSeconds;
4434
}
@@ -50,20 +40,18 @@ public boolean equals(Object o) {
5040
Subscription that = (Subscription) o;
5141
return ackTimeoutSeconds == that.ackTimeoutSeconds
5242
&& subscriptionId.equals(that.subscriptionId)
53-
&& streamName.equals(that.streamName)
54-
&& subscriptionOffset.equals(that.subscriptionOffset);
43+
&& streamName.equals(that.streamName);
5544
}
5645

5746
@Override
5847
public int hashCode() {
59-
return Objects.hash(subscriptionId, streamName, subscriptionOffset, ackTimeoutSeconds);
48+
return Objects.hash(subscriptionId, streamName, ackTimeoutSeconds);
6049
}
6150

6251
public static class Builder {
6352

6453
private String subscriptionId;
6554
private String streamName;
66-
private SubscriptionOffset subscriptionOffset;
6755
private int ackTimeoutSeconds = 600;
6856

6957
public Builder subscription(String subscriptionId) {
@@ -76,11 +64,6 @@ public Builder stream(String streamName) {
7664
return this;
7765
}
7866

79-
public Builder offset(SubscriptionOffset subscriptionOffset) {
80-
this.subscriptionOffset = subscriptionOffset;
81-
return this;
82-
}
83-
8467
public Builder ackTimeoutSeconds(int ackTimeoutSeconds) {
8568
this.ackTimeoutSeconds = ackTimeoutSeconds;
8669
return this;
@@ -89,9 +72,8 @@ public Builder ackTimeoutSeconds(int ackTimeoutSeconds) {
8972
public Subscription build() {
9073
checkNotNull(subscriptionId);
9174
checkNotNull(streamName);
92-
checkNotNull(subscriptionOffset);
9375
checkState(ackTimeoutSeconds > 0 && ackTimeoutSeconds < 36000);
94-
return new Subscription(subscriptionId, streamName, subscriptionOffset, ackTimeoutSeconds);
76+
return new Subscription(subscriptionId, streamName, ackTimeoutSeconds);
9577
}
9678
}
9779
}

client/src/main/java/io/hstream/SubscriptionOffset.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

client/src/main/java/io/hstream/impl/DefaultSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@ public class DefaultSettings {
55
public static final long REQUEST_RETRY_INTERVAL_SECONDS = 5;
66

77
public static final int APPEND_RETRY_MAX_TIMES = 5;
8+
9+
public static final String DEFAULT_ORDERING_KEY = "__default__";
810
}

client/src/main/java/io/hstream/impl/QueryerImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.grpc.stub.StreamObserver;
55
import io.hstream.*;
66
import io.hstream.Subscription;
7-
import io.hstream.SubscriptionOffset;
87
import io.hstream.internal.*;
98
import io.hstream.internal.Stream;
109
import java.util.List;
@@ -76,7 +75,6 @@ public void onNext(CreateQueryStreamResponse value) {
7675
Subscription.newBuilder()
7776
.subscription(STREAM_QUERY_SUBSCRIPTION_PREFIX + resultStreamNameSuffix)
7877
.stream(STREAM_QUERY_STREAM_PREFIX + resultStreamNameSuffix)
79-
.offset(new SubscriptionOffset(SubscriptionOffset.SpecialOffset.EARLIEST))
8078
.ackTimeoutSeconds(10)
8179
.build();
8280
client.createSubscription(subscription);

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,66 +19,17 @@ public static RecordId recordIdFromGrpc(io.hstream.internal.RecordId recordId) {
1919
return new RecordId(recordId.getBatchId(), recordId.getBatchIndex());
2020
}
2121

22-
public static io.hstream.internal.SubscriptionOffset.SpecialOffset specialOffsetToGrpc(
23-
SubscriptionOffset.SpecialOffset offset) {
24-
switch (offset) {
25-
case EARLIEST:
26-
return io.hstream.internal.SubscriptionOffset.SpecialOffset.EARLIST;
27-
case LATEST:
28-
return io.hstream.internal.SubscriptionOffset.SpecialOffset.LATEST;
29-
default:
30-
throw new IllegalArgumentException();
31-
}
32-
}
33-
34-
public static SubscriptionOffset.SpecialOffset specialOffsetFromGrpc(
35-
io.hstream.internal.SubscriptionOffset.SpecialOffset offset) {
36-
switch (offset) {
37-
case EARLIST:
38-
return SubscriptionOffset.SpecialOffset.EARLIEST;
39-
case LATEST:
40-
return SubscriptionOffset.SpecialOffset.LATEST;
41-
default:
42-
throw new IllegalArgumentException();
43-
}
44-
}
45-
46-
public static io.hstream.internal.SubscriptionOffset subscriptionOffsetToGrpc(
47-
SubscriptionOffset offset) {
48-
if (offset.isSpecialOffset()) {
49-
return io.hstream.internal.SubscriptionOffset.newBuilder()
50-
.setSpecialOffset(specialOffsetToGrpc(offset.getSpecialOffset()))
51-
.build();
52-
53-
} else {
54-
return io.hstream.internal.SubscriptionOffset.newBuilder()
55-
.setRecordOffset(recordIdToGrpc(offset.getNormalOffset()))
56-
.build();
57-
}
58-
}
59-
60-
public static SubscriptionOffset subscriptionOffsetFromGrpc(
61-
io.hstream.internal.SubscriptionOffset offset) {
62-
if (offset.hasRecordOffset()) {
63-
return new SubscriptionOffset(recordIdFromGrpc(offset.getRecordOffset()));
64-
} else {
65-
return new SubscriptionOffset(specialOffsetFromGrpc(offset.getSpecialOffset()));
66-
}
67-
}
68-
6922
public static io.hstream.internal.Subscription subscriptionToGrpc(Subscription subscription) {
7023
return io.hstream.internal.Subscription.newBuilder()
7124
.setSubscriptionId(subscription.getSubscriptionId())
7225
.setStreamName(subscription.getStreamName())
73-
.setOffset(subscriptionOffsetToGrpc(subscription.getSubscriptionOffset()))
7426
.setAckTimeoutSeconds(subscription.getAckTimeoutSeconds())
7527
.build();
7628
}
7729

7830
public static Subscription subscriptionFromGrpc(io.hstream.internal.Subscription subscription) {
7931
return Subscription.newBuilder().subscription(subscription.getSubscriptionId()).stream(
8032
subscription.getStreamName())
81-
.offset(subscriptionOffsetFromGrpc(subscription.getOffset()))
8233
.ackTimeoutSeconds(subscription.getAckTimeoutSeconds())
8334
.build();
8435
}

client/src/main/java/io/hstream/util/RecordUtils.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import com.google.protobuf.Struct;
66
import com.google.protobuf.util.JsonFormat;
77
import io.hstream.*;
8+
import io.hstream.Record;
9+
import io.hstream.impl.DefaultSettings;
810
import io.hstream.internal.HStreamRecord;
911
import io.hstream.internal.HStreamRecordHeader;
1012
import io.hstream.internal.ReceivedRecord;
@@ -18,7 +20,10 @@ public class RecordUtils {
1820

1921
public static HStreamRecord buildHStreamRecordFromRawRecord(byte[] rawRecord) {
2022
HStreamRecordHeader header =
21-
HStreamRecordHeader.newBuilder().setFlag(HStreamRecordHeader.Flag.RAW).build();
23+
HStreamRecordHeader.newBuilder()
24+
.setFlag(HStreamRecordHeader.Flag.RAW)
25+
.setKey(DefaultSettings.DEFAULT_ORDERING_KEY)
26+
.build();
2227
return HStreamRecord.newBuilder()
2328
.setHeader(header)
2429
.setPayload(ByteString.copyFrom(rawRecord))
@@ -28,7 +33,10 @@ public static HStreamRecord buildHStreamRecordFromRawRecord(byte[] rawRecord) {
2833
public static HStreamRecord buildHStreamRecordFromHRecord(HRecord hRecord) {
2934
try {
3035
HStreamRecordHeader header =
31-
HStreamRecordHeader.newBuilder().setFlag(HStreamRecordHeader.Flag.JSON).build();
36+
HStreamRecordHeader.newBuilder()
37+
.setFlag(HStreamRecordHeader.Flag.JSON)
38+
.setKey(DefaultSettings.DEFAULT_ORDERING_KEY)
39+
.build();
3240
String json = JsonFormat.printer().print(hRecord.getDelegate());
3341
logger.debug("hrecord to json: {}", json);
3442
return HStreamRecord.newBuilder()
@@ -40,6 +48,19 @@ public static HStreamRecord buildHStreamRecordFromHRecord(HRecord hRecord) {
4048
}
4149
}
4250

51+
public static HStreamRecord buildHStreamRecordFromRecord(Record record) {
52+
HStreamRecord hStreamRecord =
53+
record.isRawRecord()
54+
? buildHStreamRecordFromRawRecord(record.getRawRecord())
55+
: buildHStreamRecordFromHRecord(record.getHRecord());
56+
if (record.getKey() == null) {
57+
return hStreamRecord;
58+
}
59+
HStreamRecordHeader newHeader =
60+
HStreamRecordHeader.newBuilder(hStreamRecord.getHeader()).setKey(record.getKey()).build();
61+
return HStreamRecord.newBuilder(hStreamRecord).setHeader(newHeader).build();
62+
}
63+
4364
public static byte[] parseRawRecordFromHStreamRecord(HStreamRecord hStreamRecord) {
4465
HStreamRecordHeader.Flag flag = hStreamRecord.getHeader().getFlag();
4566
if (!flag.equals(HStreamRecordHeader.Flag.RAW)) {

0 commit comments

Comments
 (0)