Skip to content

Commit 2f1ce3f

Browse files
committed
a
1 parent c6c65e9 commit 2f1ce3f

File tree

10 files changed

+54
-41
lines changed

10 files changed

+54
-41
lines changed

app/src/main/java/org/example/app/AccountDemo.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import s2.config.Config;
99
import s2.config.Endpoints;
1010
import s2.types.Age;
11+
import s2.types.BasinConfig;
1112
import s2.types.CreateBasinRequest;
1213
import s2.types.ListBasinsRequest;
1314
import s2.types.StorageClass;
@@ -33,11 +34,14 @@ public static void main(String[] args) throws Exception {
3334
.createBasin(
3435
CreateBasinRequest.newBuilder()
3536
.withBasin(UUID.randomUUID().toString())
36-
.withDefaultStreamConfig(
37-
StreamConfig.newBuilder()
38-
.withRetentionPolicy(new Age(Duration.ofDays(7)))
39-
.withStorageClass(StorageClass.STANDARD)
40-
.build())
37+
.withBasinConfig(
38+
new BasinConfig(
39+
StreamConfig.newBuilder()
40+
.withRetentionPolicy(new Age(Duration.ofDays(7)))
41+
.withStorageClass(StorageClass.STANDARD)
42+
.build(),
43+
false,
44+
false))
4145
.build())
4246
.get();
4347
logger.info("newBasin={}", newBasin);

s2-sdk/src/main/java/s2/client/Client.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import s2.types.CreateBasinRequest;
2121
import s2.types.Paginated;
2222
import s2.types.ReconfigureBasinRequest;
23-
import s2.types.StreamConfig;
2423
import s2.v1alpha.AccountServiceGrpc;
2524
import s2.v1alpha.DeleteBasinRequest;
2625
import s2.v1alpha.GetBasinConfigRequest;
@@ -125,9 +124,7 @@ public ListenableFuture<BasinConfig> reconfigureBasin(ReconfigureBasinRequest re
125124
withStaticRetries(
126125
config.maxRetries,
127126
() -> this.futureStub.reconfigureBasin(reconfigure.toProto())),
128-
resp ->
129-
new BasinConfig(
130-
StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())),
127+
resp -> BasinConfig.fromProto(resp.getConfig()),
131128
executor));
132129
}
133130

@@ -146,9 +143,7 @@ public ListenableFuture<BasinConfig> getBasinConfig(String basin) {
146143
() ->
147144
this.futureStub.getBasinConfig(
148145
GetBasinConfigRequest.newBuilder().setBasin(basin).build())),
149-
resp ->
150-
new BasinConfig(
151-
StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())),
146+
resp -> BasinConfig.fromProto(resp.getConfig()),
152147
executor));
153148
}
154149

s2-sdk/src/main/java/s2/client/ManagedAppendSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,8 @@ public ListenableFuture<Void> closeGracefully() {
374374
return daemon;
375375
}
376376

377+
interface Notification {}
378+
377379
static class InflightRecord {
378380
final AppendInput input;
379381
final long entryNanos;
@@ -396,8 +398,6 @@ static InflightRecord construct(AppendInput input, Long meteredBytes) {
396398
}
397399
}
398400

399-
interface Notification {}
400-
401401
class Ack implements Notification {
402402
final AppendOutput output;
403403

s2-sdk/src/main/java/s2/client/ManagedReadSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ public void close() throws Exception {
101101
}
102102

103103
interface ReadItem {}
104-
;
105104

106105
static class DataItem implements ReadItem {
107106
final ReadOutput readOutput;

s2-sdk/src/main/java/s2/types/BasinConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,28 @@
22

33
public class BasinConfig {
44
public final StreamConfig defaultStreamConfig;
5+
public final boolean createStreamOnAppend;
6+
public final boolean createStreamOnRead;
57

6-
public BasinConfig(StreamConfig defaultStreamConfig) {
8+
public BasinConfig(
9+
StreamConfig defaultStreamConfig, boolean createStreamOnAppend, boolean createStreamOnRead) {
710
this.defaultStreamConfig = defaultStreamConfig;
11+
this.createStreamOnAppend = createStreamOnAppend;
12+
this.createStreamOnRead = createStreamOnRead;
13+
}
14+
15+
public static BasinConfig fromProto(s2.v1alpha.BasinConfig basinConfig) {
16+
return new BasinConfig(
17+
StreamConfig.fromProto(basinConfig.getDefaultStreamConfig()),
18+
basinConfig.getCreateStreamOnAppend(),
19+
basinConfig.getCreateStreamOnRead());
820
}
921

1022
public s2.v1alpha.BasinConfig toProto() {
1123
return s2.v1alpha.BasinConfig.newBuilder()
1224
.setDefaultStreamConfig(defaultStreamConfig.toProto())
25+
.setCreateStreamOnAppend(createStreamOnAppend)
26+
.setCreateStreamOnRead(createStreamOnRead)
1327
.build();
1428
}
1529
}

s2-sdk/src/main/java/s2/types/CreateBasinRequest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public CreateBasinRequestBuilder withBasin(String basin) {
5656
return this;
5757
}
5858

59-
public CreateBasinRequestBuilder withDefaultStreamConfig(StreamConfig config) {
60-
this.config = Optional.of(new BasinConfig(config));
59+
public CreateBasinRequestBuilder withBasinConfig(BasinConfig config) {
60+
this.config = Optional.of(config);
6161
return this;
6262
}
6363

@@ -75,7 +75,7 @@ public CreateBasinRequest build() {
7575
this.basin.ifPresent(BasinUtils::validateBasinName);
7676
return new CreateBasinRequest(
7777
this.basin,
78-
this.config.orElse(new BasinConfig(StreamConfig.newBuilder().build())),
78+
this.config.orElse(new BasinConfig(StreamConfig.newBuilder().build(), false, false)),
7979
this.assignment);
8080
}
8181
}

s2-sdk/src/main/java/s2/types/Header.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ public Header(ByteString name, ByteString value) {
1111
this.value = value;
1212
}
1313

14-
public s2.v1alpha.Header toProto() {
15-
return s2.v1alpha.Header.newBuilder().setName(name).setValue(value).build();
16-
}
17-
1814
public static Header fromProto(s2.v1alpha.Header protoHeader) {
1915
return new Header(protoHeader.getName(), protoHeader.getValue());
2016
}
17+
18+
public s2.v1alpha.Header toProto() {
19+
return s2.v1alpha.Header.newBuilder().setName(name).setValue(value).build();
20+
}
2121
}

s2-sdk/src/main/java/s2/types/ReadOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ static ReadOutput fromProto(s2.v1alpha.ReadOutput readOutput) {
1010
}
1111
throw new IllegalStateException("Unrecognized readOutput case: " + readOutput);
1212
}
13-
}
13+
}

s2-sdk/src/main/java/s2/types/ReadSessionRequest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ public ReadSessionRequest update(Start start, long consumedRecords, long consume
2424
}
2525

2626
public s2.v1alpha.ReadSessionRequest toProto(String streamName) {
27-
s2.v1alpha.ReadSessionRequest.Builder builder = s2.v1alpha.ReadSessionRequest.newBuilder()
28-
.setStream(streamName)
29-
.setLimit(readLimit.toProto())
30-
.setHeartbeats(heartbeats);
31-
27+
s2.v1alpha.ReadSessionRequest.Builder builder =
28+
s2.v1alpha.ReadSessionRequest.newBuilder()
29+
.setStream(streamName)
30+
.setLimit(readLimit.toProto())
31+
.setHeartbeats(heartbeats);
32+
3233
if (start instanceof Start.SeqNum) {
3334
builder.setSeqNum(((Start.SeqNum) start).value);
3435
} else if (start instanceof Start.Timestamp) {
@@ -38,7 +39,7 @@ public s2.v1alpha.ReadSessionRequest toProto(String streamName) {
3839
} else {
3940
throw new IllegalStateException("Unknown Start type: " + start.getClass().getName());
4041
}
41-
42+
4243
return builder.build();
4344
}
4445

@@ -64,9 +65,9 @@ public ReadSessionRequestBuilder withHeartbeats(boolean heartbeats) {
6465

6566
public ReadSessionRequest build() {
6667
return new ReadSessionRequest(
67-
this.start.orElseGet(() -> Start.seqNum(0L)),
68-
this.readLimit.orElse(ReadLimit.NONE),
68+
this.start.orElseGet(() -> Start.seqNum(0L)),
69+
this.readLimit.orElse(ReadLimit.NONE),
6970
this.heartbeats);
7071
}
7172
}
72-
}
73+
}

s2-sdk/src/main/java/s2/types/SequencedRecord.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,6 @@ public class SequencedRecord implements MeteredBytes {
1818
this.timestamp = timestamp;
1919
}
2020

21-
@Override
22-
public long meteredBytes() {
23-
return 8
24-
+ (2L * this.headers.size())
25-
+ this.headers.stream().map(h -> h.name.size() + h.value.size()).reduce(0, Integer::sum)
26-
+ this.body.size();
27-
}
28-
2921
public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedRecord) {
3022
return new SequencedRecord(
3123
sequencedRecord.getSeqNum(),
@@ -35,4 +27,12 @@ public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedReco
3527
sequencedRecord.getBody(),
3628
Instant.ofEpochMilli(sequencedRecord.getTimestamp()));
3729
}
30+
31+
@Override
32+
public long meteredBytes() {
33+
return 8
34+
+ (2L * this.headers.size())
35+
+ this.headers.stream().map(h -> h.name.size() + h.value.size()).reduce(0, Integer::sum)
36+
+ this.body.size();
37+
}
3838
}

0 commit comments

Comments
 (0)