Skip to content

Commit c6c65e9

Browse files
committed
updates
1 parent 934228f commit c6c65e9

File tree

19 files changed

+179
-63
lines changed

19 files changed

+179
-63
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ For the demos discussed below, it will be helpful to create a new basin and stre
9999
Start by setting some environment variables in your shell.
100100

101101
```bash
102-
export S2_AUTH_TOKEN="MY-SECRET"
102+
export S2_ACCESS_TOKEN="MY-SECRET"
103103
export S2_BASIN="my-demo-java"
104104
export S2_STREAM="test/1"
105105
```

app/src/main/java/org/example/app/AccountDemo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class AccountDemo {
1919

2020
public static void main(String[] args) throws Exception {
2121
var config =
22-
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
22+
Config.newBuilder(System.getenv("S2_ACCESS_TOKEN"))
2323
.withEndpoints(Endpoints.fromEnvironment())
2424
.build();
2525

app/src/main/java/org/example/app/BasinDemo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class BasinDemo {
1515

1616
public static void main(String[] args) throws Exception {
1717
final var config =
18-
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
18+
Config.newBuilder(System.getenv("S2_ACCESS_TOKEN"))
1919
.withEndpoints(Endpoints.fromEnvironment())
2020
.build();
2121

app/src/main/java/org/example/app/ManagedAppendSessionDemo.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@ public class ManagedAppendSessionDemo {
2525
private static final Logger logger =
2626
LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName());
2727

28+
// 512KiB
29+
private static final Integer TARGET_BATCH_SIZE = 512 * 1024;
30+
2831
public static void main(String[] args) throws Exception {
29-
final var authToken = System.getenv("S2_AUTH_TOKEN");
32+
final var authToken = System.getenv("S2_ACCESS_TOKEN");
3033
final var basinName = System.getenv("S2_BASIN");
3134
final var streamName = System.getenv("S2_STREAM");
3235
if (authToken == null) {
33-
throw new IllegalStateException("S2_AUTH_TOKEN not set");
36+
throw new IllegalStateException("S2_ACCESS_TOKEN not set");
3437
}
3538
if (basinName == null) {
3639
throw new IllegalStateException("S2_BASIN not set");
@@ -82,7 +85,16 @@ public static void main(String[] args) throws Exception {
8285
try {
8386
// Generate a record with approximately 10KiB of random text.
8487
var payload =
85-
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10);
88+
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", TARGET_BATCH_SIZE);
89+
90+
while (futureAppendSession.remainingBufferCapacityBytes()
91+
< (TARGET_BATCH_SIZE + payload.length())) {
92+
// Crude backpressure mechanism; slow down the rate of payload creation by sleeping
93+
// momentarily
94+
// if we have hit the internal append buffer max size.
95+
Thread.sleep(10);
96+
}
97+
8698
var append =
8799
futureAppendSession.submit(
88100
AppendInput.newBuilder()

app/src/main/java/org/example/app/ManagedReadSessionDemo.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import s2.config.Endpoints;
1212
import s2.types.Batch;
1313
import s2.types.ReadSessionRequest;
14+
import s2.types.Start;
1415

1516
public class ManagedReadSessionDemo {
1617

@@ -19,11 +20,11 @@ public class ManagedReadSessionDemo {
1920

2021
public static void main(String[] args) throws Exception {
2122

22-
final var authToken = System.getenv("S2_AUTH_TOKEN");
23+
final var authToken = System.getenv("S2_ACCESS_TOKEN");
2324
final var basinName = System.getenv("S2_BASIN");
2425
final var streamName = System.getenv("S2_STREAM");
2526
if (authToken == null) {
26-
throw new IllegalStateException("S2_AUTH_TOKEN not set");
27+
throw new IllegalStateException("S2_ACCESS_TOKEN not set");
2728
}
2829
if (basinName == null) {
2930
throw new IllegalStateException("S2_BASIN not set");
@@ -32,11 +33,7 @@ public static void main(String[] args) throws Exception {
3233
throw new IllegalStateException("S2_STREAM not set");
3334
}
3435

35-
var config =
36-
Config.newBuilder(authToken)
37-
.withEndpoints(Endpoints.fromEnvironment())
38-
.withCompression(true)
39-
.build();
36+
var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build();
4037

4138
try (final var executor = new ScheduledThreadPoolExecutor(12);
4239
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {
@@ -49,7 +46,10 @@ public static void main(String[] args) throws Exception {
4946

5047
try (final var managedSession =
5148
streamClient.managedReadSession(
52-
ReadSessionRequest.newBuilder().withHeartbeats(true).build(),
49+
ReadSessionRequest.newBuilder()
50+
.withHeartbeats(true)
51+
.withStart(Start.seqNum(0))
52+
.build(),
5353
1024 * 1024 * 1024 * 5)) {
5454

5555
AtomicLong receivedBytes = new AtomicLong();
@@ -61,10 +61,12 @@ public static void main(String[] args) throws Exception {
6161
if (elem instanceof Batch batch) {
6262
var size = batch.meteredBytes();
6363
logger.info(
64-
"batch of {} bytes, seqnums {}..={}",
64+
"batch of {} bytes, seqnums {}..={} / instants {}..={}",
6565
size,
6666
batch.firstSeqNum(),
67-
batch.lastSeqNum());
67+
batch.lastSeqNum(),
68+
batch.firstTimestamp(),
69+
batch.lastTimestamp());
6870
receivedBytes.addAndGet(size);
6971
} else {
7072
logger.info("non batch received: {}", elem);

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.0.15-SNAPSHOT
1+
version=0.0.15

s2-internal/src/main/proto

Submodule proto updated from aea316a to 4183c36

s2-sdk/src/main/java/s2/client/BaseClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ static boolean retryableStatus(Status status) {
5555
switch (status.getCode()) {
5656
case UNKNOWN:
5757
case DEADLINE_EXCEEDED:
58+
case RESOURCE_EXHAUSTED:
5859
case UNAVAILABLE:
5960
return true;
6061
default:

s2-sdk/src/main/java/s2/client/ManagedAppendSession.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public class ManagedAppendSession implements AutoCloseable {
5757
this.remainingAttempts = new AtomicInteger(this.client.config.maxRetries);
5858
}
5959

60+
public Integer remainingBufferCapacityBytes() {
61+
return this.inflightBytes.availablePermits();
62+
}
63+
6064
private ListenableFuture<Void> retryingDaemon() {
6165
return Futures.catchingAsync(
6266
executor.submit(this::daemon),

s2-sdk/src/main/java/s2/client/ReadSession.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
import java.util.concurrent.TimeUnit;
1111
import java.util.concurrent.atomic.AtomicInteger;
1212
import java.util.concurrent.atomic.AtomicLong;
13+
import java.util.concurrent.atomic.AtomicReference;
1314
import java.util.function.Consumer;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
1617
import s2.types.Batch;
1718
import s2.types.ReadOutput;
1819
import s2.types.ReadSessionRequest;
20+
import s2.types.Start;
1921
import s2.v1alpha.ReadSessionResponse;
2022

2123
public class ReadSession implements AutoCloseable {
@@ -27,7 +29,7 @@ public class ReadSession implements AutoCloseable {
2729
final ScheduledExecutorService executor;
2830
final StreamClient client;
2931

30-
final AtomicLong nextStartSeqNum;
32+
final AtomicReference<Start> nextStart;
3133
final AtomicLong consumedRecords = new AtomicLong();
3234
final AtomicLong consumedBytes = new AtomicLong(0);
3335
final AtomicInteger remainingAttempts;
@@ -52,7 +54,7 @@ public class ReadSession implements AutoCloseable {
5254
this.onResponse = onResponse;
5355
this.onError = onError;
5456
this.request = request;
55-
this.nextStartSeqNum = new AtomicLong(request.startSeqNum);
57+
this.nextStart = new AtomicReference<>(request.start);
5658
this.remainingAttempts = new AtomicInteger(client.config.maxRetries);
5759
this.lastEvent = new AtomicLong(System.nanoTime());
5860

@@ -134,12 +136,12 @@ private ListenableFuture<Void> retrying() {
134136

135137
return Futures.catchingAsync(
136138
readSessionInner(
137-
request.update(nextStartSeqNum.get(), consumedRecords.get(), consumedBytes.get()),
139+
request.update(nextStart.get(), consumedRecords.get(), consumedBytes.get()),
138140
resp -> {
139141
if (resp instanceof Batch) {
140142
final Batch batch = (Batch) resp;
141143
var lastRecordIdx = batch.lastSeqNum();
142-
lastRecordIdx.ifPresent(v -> nextStartSeqNum.set(v + 1));
144+
lastRecordIdx.ifPresent(v -> nextStart.set(Start.seqNum(v + 1)));
143145
consumedRecords.addAndGet(batch.sequencedRecordBatch.records.size());
144146
consumedBytes.addAndGet(batch.meteredBytes());
145147
}

0 commit comments

Comments
 (0)