Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ For the demos discussed below, it will be helpful to create a new basin and stre
Start by setting some environment variables in your shell.

```bash
export S2_AUTH_TOKEN="MY-SECRET"
export S2_ACCESS_TOKEN="MY-SECRET"
export S2_BASIN="my-demo-java"
export S2_STREAM="test/1"
```
Expand Down
20 changes: 14 additions & 6 deletions app/src/main/java/org/example/app/AccountDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
import s2.config.Config;
import s2.config.Endpoints;
import s2.types.Age;
import s2.types.BasinConfig;
import s2.types.CreateBasinRequest;
import s2.types.ListBasinsRequest;
import s2.types.StorageClass;
import s2.types.StreamConfig;
import s2.types.Timestamping;
import s2.types.TimestampingMode;

public class AccountDemo {

private static final Logger logger = LoggerFactory.getLogger(AccountDemo.class.getName());

public static void main(String[] args) throws Exception {
var config =
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
Config.newBuilder(System.getenv("S2_ACCESS_TOKEN"))
.withEndpoints(Endpoints.fromEnvironment())
.build();

Expand All @@ -33,11 +36,16 @@ public static void main(String[] args) throws Exception {
.createBasin(
CreateBasinRequest.newBuilder()
.withBasin(UUID.randomUUID().toString())
.withDefaultStreamConfig(
StreamConfig.newBuilder()
.withRetentionPolicy(new Age(Duration.ofDays(7)))
.withStorageClass(StorageClass.STANDARD)
.build())
.withBasinConfig(
new BasinConfig(
StreamConfig.newBuilder()
.withRetentionPolicy(new Age(Duration.ofDays(7)))
.withTimestamping(
new Timestamping(TimestampingMode.CLIENT_REQUIRE, true))
.withStorageClass(StorageClass.STANDARD)
.build(),
false,
false))
.build())
.get();
logger.info("newBasin={}", newBasin);
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/example/app/BasinDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class BasinDemo {

public static void main(String[] args) throws Exception {
final var config =
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
Config.newBuilder(System.getenv("S2_ACCESS_TOKEN"))
.withEndpoints(Endpoints.fromEnvironment())
.build();

Expand Down
18 changes: 15 additions & 3 deletions app/src/main/java/org/example/app/ManagedAppendSessionDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ public class ManagedAppendSessionDemo {
private static final Logger logger =
LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName());

// 512KiB
private static final Integer TARGET_BATCH_SIZE = 512 * 1024;

public static void main(String[] args) throws Exception {
final var authToken = System.getenv("S2_AUTH_TOKEN");
final var authToken = System.getenv("S2_ACCESS_TOKEN");
final var basinName = System.getenv("S2_BASIN");
final var streamName = System.getenv("S2_STREAM");
if (authToken == null) {
throw new IllegalStateException("S2_AUTH_TOKEN not set");
throw new IllegalStateException("S2_ACCESS_TOKEN not set");
}
if (basinName == null) {
throw new IllegalStateException("S2_BASIN not set");
Expand Down Expand Up @@ -82,7 +85,16 @@ public static void main(String[] args) throws Exception {
try {
// Generate a record with approximately 10KiB of random text.
var payload =
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10);
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", TARGET_BATCH_SIZE);

while (futureAppendSession.remainingBufferCapacityBytes()
< (TARGET_BATCH_SIZE + payload.length())) {
// Crude backpressure mechanism; slow down the rate of payload creation by sleeping
// momentarily
// if we have hit the internal append buffer max size.
Thread.sleep(10);
}

var append =
futureAppendSession.submit(
AppendInput.newBuilder()
Expand Down
22 changes: 12 additions & 10 deletions app/src/main/java/org/example/app/ManagedReadSessionDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import s2.config.Endpoints;
import s2.types.Batch;
import s2.types.ReadSessionRequest;
import s2.types.Start;

public class ManagedReadSessionDemo {

Expand All @@ -19,11 +20,11 @@ public class ManagedReadSessionDemo {

public static void main(String[] args) throws Exception {

final var authToken = System.getenv("S2_AUTH_TOKEN");
final var authToken = System.getenv("S2_ACCESS_TOKEN");
final var basinName = System.getenv("S2_BASIN");
final var streamName = System.getenv("S2_STREAM");
if (authToken == null) {
throw new IllegalStateException("S2_AUTH_TOKEN not set");
throw new IllegalStateException("S2_ACCESS_TOKEN not set");
}
if (basinName == null) {
throw new IllegalStateException("S2_BASIN not set");
Expand All @@ -32,11 +33,7 @@ public static void main(String[] args) throws Exception {
throw new IllegalStateException("S2_STREAM not set");
}

var config =
Config.newBuilder(authToken)
.withEndpoints(Endpoints.fromEnvironment())
.withCompression(true)
.build();
var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build();

try (final var executor = new ScheduledThreadPoolExecutor(12);
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {
Expand All @@ -49,7 +46,10 @@ public static void main(String[] args) throws Exception {

try (final var managedSession =
streamClient.managedReadSession(
ReadSessionRequest.newBuilder().withHeartbeats(true).build(),
ReadSessionRequest.newBuilder()
.withHeartbeats(true)
.withStart(Start.seqNum(0))
.build(),
1024 * 1024 * 1024 * 5)) {

AtomicLong receivedBytes = new AtomicLong();
Expand All @@ -61,10 +61,12 @@ public static void main(String[] args) throws Exception {
if (elem instanceof Batch batch) {
var size = batch.meteredBytes();
logger.info(
"batch of {} bytes, seqnums {}..={}",
"batch of {} bytes, seqnums {}..={} / instants {}..={}",
size,
batch.firstSeqNum(),
batch.lastSeqNum());
batch.lastSeqNum(),
batch.firstTimestamp(),
batch.lastTimestamp());
receivedBytes.addAndGet(size);
} else {
logger.info("non batch received: {}", elem);
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.15-SNAPSHOT
version=0.0.15
2 changes: 1 addition & 1 deletion s2-internal/src/main/proto
1 change: 1 addition & 0 deletions s2-sdk/src/main/java/s2/client/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static boolean retryableStatus(Status status) {
switch (status.getCode()) {
case UNKNOWN:
case DEADLINE_EXCEEDED:
case RESOURCE_EXHAUSTED:
case UNAVAILABLE:
return true;
default:
Expand Down
9 changes: 2 additions & 7 deletions s2-sdk/src/main/java/s2/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import s2.types.CreateBasinRequest;
import s2.types.Paginated;
import s2.types.ReconfigureBasinRequest;
import s2.types.StreamConfig;
import s2.v1alpha.AccountServiceGrpc;
import s2.v1alpha.DeleteBasinRequest;
import s2.v1alpha.GetBasinConfigRequest;
Expand Down Expand Up @@ -125,9 +124,7 @@ public ListenableFuture<BasinConfig> reconfigureBasin(ReconfigureBasinRequest re
withStaticRetries(
config.maxRetries,
() -> this.futureStub.reconfigureBasin(reconfigure.toProto())),
resp ->
new BasinConfig(
StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())),
resp -> BasinConfig.fromProto(resp.getConfig()),
executor));
}

Expand All @@ -146,9 +143,7 @@ public ListenableFuture<BasinConfig> getBasinConfig(String basin) {
() ->
this.futureStub.getBasinConfig(
GetBasinConfigRequest.newBuilder().setBasin(basin).build())),
resp ->
new BasinConfig(
StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())),
resp -> BasinConfig.fromProto(resp.getConfig()),
executor));
}

Expand Down
8 changes: 6 additions & 2 deletions s2-sdk/src/main/java/s2/client/ManagedAppendSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class ManagedAppendSession implements AutoCloseable {
this.remainingAttempts = new AtomicInteger(this.client.config.maxRetries);
}

public Integer remainingBufferCapacityBytes() {
return this.inflightBytes.availablePermits();
}

private ListenableFuture<Void> retryingDaemon() {
return Futures.catchingAsync(
executor.submit(this::daemon),
Expand Down Expand Up @@ -370,6 +374,8 @@ public ListenableFuture<Void> closeGracefully() {
return daemon;
}

interface Notification {}

static class InflightRecord {
final AppendInput input;
final long entryNanos;
Expand All @@ -392,8 +398,6 @@ static InflightRecord construct(AppendInput input, Long meteredBytes) {
}
}

interface Notification {}

class Ack implements Notification {
final AppendOutput output;

Expand Down
1 change: 0 additions & 1 deletion s2-sdk/src/main/java/s2/client/ManagedReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void close() throws Exception {
}

interface ReadItem {}
;

static class DataItem implements ReadItem {
final ReadOutput readOutput;
Expand Down
10 changes: 6 additions & 4 deletions s2-sdk/src/main/java/s2/client/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import s2.types.Batch;
import s2.types.ReadOutput;
import s2.types.ReadSessionRequest;
import s2.types.Start;
import s2.v1alpha.ReadSessionResponse;

public class ReadSession implements AutoCloseable {
Expand All @@ -27,7 +29,7 @@ public class ReadSession implements AutoCloseable {
final ScheduledExecutorService executor;
final StreamClient client;

final AtomicLong nextStartSeqNum;
final AtomicReference<Start> nextStart;
final AtomicLong consumedRecords = new AtomicLong();
final AtomicLong consumedBytes = new AtomicLong(0);
final AtomicInteger remainingAttempts;
Expand All @@ -52,7 +54,7 @@ public class ReadSession implements AutoCloseable {
this.onResponse = onResponse;
this.onError = onError;
this.request = request;
this.nextStartSeqNum = new AtomicLong(request.startSeqNum);
this.nextStart = new AtomicReference<>(request.start);
this.remainingAttempts = new AtomicInteger(client.config.maxRetries);
this.lastEvent = new AtomicLong(System.nanoTime());

Expand Down Expand Up @@ -134,12 +136,12 @@ private ListenableFuture<Void> retrying() {

return Futures.catchingAsync(
readSessionInner(
request.update(nextStartSeqNum.get(), consumedRecords.get(), consumedBytes.get()),
request.update(nextStart.get(), consumedRecords.get(), consumedBytes.get()),
resp -> {
if (resp instanceof Batch) {
final Batch batch = (Batch) resp;
var lastRecordIdx = batch.lastSeqNum();
lastRecordIdx.ifPresent(v -> nextStartSeqNum.set(v + 1));
lastRecordIdx.ifPresent(v -> nextStart.set(Start.seqNum(v + 1)));
consumedRecords.addAndGet(batch.sequencedRecordBatch.records.size());
consumedBytes.addAndGet(batch.meteredBytes());
}
Expand Down
13 changes: 6 additions & 7 deletions s2-sdk/src/main/java/s2/types/AppendInput.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package s2.types;

import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;

public class AppendInput implements MeteredBytes, Serializable {
public final List<AppendRecord> records;
public final Optional<Long> matchSeqNum;
public final Optional<ByteString> fencingToken;
public final Optional<String> fencingToken;

private AppendInput(
List<AppendRecord> records, Optional<Long> matchSeqNum, Optional<ByteString> fencingToken) {
List<AppendRecord> records, Optional<Long> matchSeqNum, Optional<String> fencingToken) {
this.records = records;
this.matchSeqNum = matchSeqNum;
this.fencingToken = fencingToken;
Expand All @@ -38,7 +37,7 @@ public s2.v1alpha.AppendInput toProto(String streamName) {
public static class AppendInputBuilder {
private List<AppendRecord> records = List.of();
private Optional<Long> matchSeqNum = Optional.empty();
private Optional<ByteString> fencingToken = Optional.empty();
private Optional<String> fencingToken = Optional.empty();

public AppendInputBuilder withRecords(List<AppendRecord> records) {
this.records = records;
Expand All @@ -50,7 +49,7 @@ public AppendInputBuilder withMatchSeqNum(Long matchSeqNum) {
return this;
}

public AppendInputBuilder withFencingToken(ByteString fencingToken) {
public AppendInputBuilder withFencingToken(String fencingToken) {
this.fencingToken = Optional.of(fencingToken);
return this;
}
Expand All @@ -64,8 +63,8 @@ public AppendInput build() {
});
fencingToken.ifPresent(
token -> {
if (token.size() > 16) {
throw new IllegalArgumentException("fencingToken must be less than 16 bytes");
if (token.length() > 36) {
throw new IllegalArgumentException("fencingToken must be 36 or fewer UTF-8 bytes");
}
});
var provisional = new AppendInput(records, matchSeqNum, fencingToken);
Expand Down
16 changes: 15 additions & 1 deletion s2-sdk/src/main/java/s2/types/BasinConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@

public class BasinConfig {
public final StreamConfig defaultStreamConfig;
public final boolean createStreamOnAppend;
public final boolean createStreamOnRead;

public BasinConfig(StreamConfig defaultStreamConfig) {
public BasinConfig(
StreamConfig defaultStreamConfig, boolean createStreamOnAppend, boolean createStreamOnRead) {
this.defaultStreamConfig = defaultStreamConfig;
this.createStreamOnAppend = createStreamOnAppend;
this.createStreamOnRead = createStreamOnRead;
}

public static BasinConfig fromProto(s2.v1alpha.BasinConfig basinConfig) {
return new BasinConfig(
StreamConfig.fromProto(basinConfig.getDefaultStreamConfig()),
basinConfig.getCreateStreamOnAppend(),
basinConfig.getCreateStreamOnRead());
}

public s2.v1alpha.BasinConfig toProto() {
return s2.v1alpha.BasinConfig.newBuilder()
.setDefaultStreamConfig(defaultStreamConfig.toProto())
.setCreateStreamOnAppend(createStreamOnAppend)
.setCreateStreamOnRead(createStreamOnRead)
.build();
}
}
14 changes: 14 additions & 0 deletions s2-sdk/src/main/java/s2/types/Batch.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package s2.types;

import java.time.Instant;
import java.util.Optional;

public final class Batch implements ReadOutput, MeteredBytes {
Expand All @@ -23,6 +24,19 @@ public Optional<Long> lastSeqNum() {
}
}

public Optional<Instant> firstTimestamp() {
return this.sequencedRecordBatch.records.stream().findFirst().map(sr -> sr.timestamp);
}

public Optional<Instant> lastTimestamp() {
if (!this.sequencedRecordBatch.records.isEmpty()) {
return Optional.of(
this.sequencedRecordBatch.records.get(sequencedRecordBatch.records.size() - 1).timestamp);
} else {
return Optional.empty();
}
}

@Override
public long meteredBytes() {
return this.sequencedRecordBatch.meteredBytes();
Expand Down
Loading
Loading