Skip to content

Commit e5d445d

Browse files
committed
updates
1 parent 934228f commit e5d445d

27 files changed

+359
-98
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ For the demos discussed below, it will be helpful to create a new basin and stre
9999
Start by setting some environment variables in your shell.
100100

101101
```bash
102-
export S2_AUTH_TOKEN="MY-SECRET"
102+
export S2_ACCESS_TOKEN="MY-SECRET"
103103
export S2_BASIN="my-demo-java"
104104
export S2_STREAM="test/1"
105105
```

app/src/main/java/org/example/app/AccountDemo.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,21 @@
88
import s2.config.Config;
99
import s2.config.Endpoints;
1010
import s2.types.Age;
11+
import s2.types.BasinConfig;
1112
import s2.types.CreateBasinRequest;
1213
import s2.types.ListBasinsRequest;
1314
import s2.types.StorageClass;
1415
import s2.types.StreamConfig;
16+
import s2.types.Timestamping;
17+
import s2.types.TimestampingMode;
1518

1619
public class AccountDemo {
1720

1821
private static final Logger logger = LoggerFactory.getLogger(AccountDemo.class.getName());
1922

2023
public static void main(String[] args) throws Exception {
2124
var config =
22-
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
25+
Config.newBuilder(System.getenv("S2_ACCESS_TOKEN"))
2326
.withEndpoints(Endpoints.fromEnvironment())
2427
.build();
2528

@@ -33,11 +36,16 @@ public static void main(String[] args) throws Exception {
3336
.createBasin(
3437
CreateBasinRequest.newBuilder()
3538
.withBasin(UUID.randomUUID().toString())
36-
.withDefaultStreamConfig(
37-
StreamConfig.newBuilder()
38-
.withRetentionPolicy(new Age(Duration.ofDays(7)))
39-
.withStorageClass(StorageClass.STANDARD)
40-
.build())
39+
.withBasinConfig(
40+
new BasinConfig(
41+
StreamConfig.newBuilder()
42+
.withRetentionPolicy(new Age(Duration.ofDays(7)))
43+
.withTimestamping(
44+
new Timestamping(TimestampingMode.CLIENT_REQUIRE, true))
45+
.withStorageClass(StorageClass.STANDARD)
46+
.build(),
47+
false,
48+
false))
4149
.build())
4250
.get();
4351
logger.info("newBasin={}", newBasin);

app/src/main/java/org/example/app/BasinDemo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class BasinDemo {
1515

1616
public static void main(String[] args) throws Exception {
1717
final var config =
18-
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
18+
Config.newBuilder(System.getenv("S2_ACCESS_TOKEN"))
1919
.withEndpoints(Endpoints.fromEnvironment())
2020
.build();
2121

app/src/main/java/org/example/app/ManagedAppendSessionDemo.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@ public class ManagedAppendSessionDemo {
2525
private static final Logger logger =
2626
LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName());
2727

28+
// 512KiB
29+
private static final Integer TARGET_BATCH_SIZE = 512 * 1024;
30+
2831
public static void main(String[] args) throws Exception {
29-
final var authToken = System.getenv("S2_AUTH_TOKEN");
32+
final var authToken = System.getenv("S2_ACCESS_TOKEN");
3033
final var basinName = System.getenv("S2_BASIN");
3134
final var streamName = System.getenv("S2_STREAM");
3235
if (authToken == null) {
33-
throw new IllegalStateException("S2_AUTH_TOKEN not set");
36+
throw new IllegalStateException("S2_ACCESS_TOKEN not set");
3437
}
3538
if (basinName == null) {
3639
throw new IllegalStateException("S2_BASIN not set");
@@ -82,7 +85,16 @@ public static void main(String[] args) throws Exception {
8285
try {
8386
// Generate a record with approximately 10KiB of random text.
8487
var payload =
85-
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10);
88+
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", TARGET_BATCH_SIZE);
89+
90+
while (futureAppendSession.remainingBufferCapacityBytes()
91+
< (TARGET_BATCH_SIZE + payload.length())) {
92+
// Crude backpressure mechanism; slow down the rate of payload creation by sleeping
93+
// momentarily
94+
// if we have hit the internal append buffer max size.
95+
Thread.sleep(10);
96+
}
97+
8698
var append =
8799
futureAppendSession.submit(
88100
AppendInput.newBuilder()

app/src/main/java/org/example/app/ManagedReadSessionDemo.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import s2.config.Endpoints;
1212
import s2.types.Batch;
1313
import s2.types.ReadSessionRequest;
14+
import s2.types.Start;
1415

1516
public class ManagedReadSessionDemo {
1617

@@ -19,11 +20,11 @@ public class ManagedReadSessionDemo {
1920

2021
public static void main(String[] args) throws Exception {
2122

22-
final var authToken = System.getenv("S2_AUTH_TOKEN");
23+
final var authToken = System.getenv("S2_ACCESS_TOKEN");
2324
final var basinName = System.getenv("S2_BASIN");
2425
final var streamName = System.getenv("S2_STREAM");
2526
if (authToken == null) {
26-
throw new IllegalStateException("S2_AUTH_TOKEN not set");
27+
throw new IllegalStateException("S2_ACCESS_TOKEN not set");
2728
}
2829
if (basinName == null) {
2930
throw new IllegalStateException("S2_BASIN not set");
@@ -32,11 +33,7 @@ public static void main(String[] args) throws Exception {
3233
throw new IllegalStateException("S2_STREAM not set");
3334
}
3435

35-
var config =
36-
Config.newBuilder(authToken)
37-
.withEndpoints(Endpoints.fromEnvironment())
38-
.withCompression(true)
39-
.build();
36+
var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build();
4037

4138
try (final var executor = new ScheduledThreadPoolExecutor(12);
4239
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {
@@ -49,7 +46,10 @@ public static void main(String[] args) throws Exception {
4946

5047
try (final var managedSession =
5148
streamClient.managedReadSession(
52-
ReadSessionRequest.newBuilder().withHeartbeats(true).build(),
49+
ReadSessionRequest.newBuilder()
50+
.withHeartbeats(true)
51+
.withStart(Start.seqNum(0))
52+
.build(),
5353
1024 * 1024 * 1024 * 5)) {
5454

5555
AtomicLong receivedBytes = new AtomicLong();
@@ -61,10 +61,12 @@ public static void main(String[] args) throws Exception {
6161
if (elem instanceof Batch batch) {
6262
var size = batch.meteredBytes();
6363
logger.info(
64-
"batch of {} bytes, seqnums {}..={}",
64+
"batch of {} bytes, seqnums {}..={} / instants {}..={}",
6565
size,
6666
batch.firstSeqNum(),
67-
batch.lastSeqNum());
67+
batch.lastSeqNum(),
68+
batch.firstTimestamp(),
69+
batch.lastTimestamp());
6870
receivedBytes.addAndGet(size);
6971
} else {
7072
logger.info("non batch received: {}", elem);

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.0.15-SNAPSHOT
1+
version=0.0.15

s2-internal/src/main/proto

Submodule proto updated from aea316a to 4183c36

s2-sdk/src/main/java/s2/client/BaseClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ static boolean retryableStatus(Status status) {
5555
switch (status.getCode()) {
5656
case UNKNOWN:
5757
case DEADLINE_EXCEEDED:
58+
case RESOURCE_EXHAUSTED:
5859
case UNAVAILABLE:
5960
return true;
6061
default:

s2-sdk/src/main/java/s2/client/Client.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import s2.types.CreateBasinRequest;
2121
import s2.types.Paginated;
2222
import s2.types.ReconfigureBasinRequest;
23-
import s2.types.StreamConfig;
2423
import s2.v1alpha.AccountServiceGrpc;
2524
import s2.v1alpha.DeleteBasinRequest;
2625
import s2.v1alpha.GetBasinConfigRequest;
@@ -125,9 +124,7 @@ public ListenableFuture<BasinConfig> reconfigureBasin(ReconfigureBasinRequest re
125124
withStaticRetries(
126125
config.maxRetries,
127126
() -> this.futureStub.reconfigureBasin(reconfigure.toProto())),
128-
resp ->
129-
new BasinConfig(
130-
StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())),
127+
resp -> BasinConfig.fromProto(resp.getConfig()),
131128
executor));
132129
}
133130

@@ -146,9 +143,7 @@ public ListenableFuture<BasinConfig> getBasinConfig(String basin) {
146143
() ->
147144
this.futureStub.getBasinConfig(
148145
GetBasinConfigRequest.newBuilder().setBasin(basin).build())),
149-
resp ->
150-
new BasinConfig(
151-
StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())),
146+
resp -> BasinConfig.fromProto(resp.getConfig()),
152147
executor));
153148
}
154149

s2-sdk/src/main/java/s2/client/ManagedAppendSession.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public class ManagedAppendSession implements AutoCloseable {
5757
this.remainingAttempts = new AtomicInteger(this.client.config.maxRetries);
5858
}
5959

60+
public Integer remainingBufferCapacityBytes() {
61+
return this.inflightBytes.availablePermits();
62+
}
63+
6064
private ListenableFuture<Void> retryingDaemon() {
6165
return Futures.catchingAsync(
6266
executor.submit(this::daemon),
@@ -370,6 +374,8 @@ public ListenableFuture<Void> closeGracefully() {
370374
return daemon;
371375
}
372376

377+
interface Notification {}
378+
373379
static class InflightRecord {
374380
final AppendInput input;
375381
final long entryNanos;
@@ -392,8 +398,6 @@ static InflightRecord construct(AppendInput input, Long meteredBytes) {
392398
}
393399
}
394400

395-
interface Notification {}
396-
397401
class Ack implements Notification {
398402
final AppendOutput output;
399403

0 commit comments

Comments
 (0)