diff --git a/README.md b/README.md index 8ca5d42..79e8c36 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ For the demos discussed below, it will be helpful to create a new basin and stre Start by setting some environment variables in your shell. ```bash -export S2_AUTH_TOKEN="MY-SECRET" +export S2_ACCESS_TOKEN="MY-SECRET" export S2_BASIN="my-demo-java" export S2_STREAM="test/1" ``` diff --git a/app/src/main/java/org/example/app/AccountDemo.java b/app/src/main/java/org/example/app/AccountDemo.java index 65b30c4..d8399ce 100644 --- a/app/src/main/java/org/example/app/AccountDemo.java +++ b/app/src/main/java/org/example/app/AccountDemo.java @@ -8,10 +8,13 @@ import s2.config.Config; import s2.config.Endpoints; import s2.types.Age; +import s2.types.BasinConfig; import s2.types.CreateBasinRequest; import s2.types.ListBasinsRequest; import s2.types.StorageClass; import s2.types.StreamConfig; +import s2.types.Timestamping; +import s2.types.TimestampingMode; public class AccountDemo { @@ -19,7 +22,7 @@ public class AccountDemo { public static void main(String[] args) throws Exception { var config = - Config.newBuilder(System.getenv("S2_AUTH_TOKEN")) + Config.newBuilder(System.getenv("S2_ACCESS_TOKEN")) .withEndpoints(Endpoints.fromEnvironment()) .build(); @@ -33,11 +36,16 @@ public static void main(String[] args) throws Exception { .createBasin( CreateBasinRequest.newBuilder() .withBasin(UUID.randomUUID().toString()) - .withDefaultStreamConfig( - StreamConfig.newBuilder() - .withRetentionPolicy(new Age(Duration.ofDays(7))) - .withStorageClass(StorageClass.STANDARD) - .build()) + .withBasinConfig( + new BasinConfig( + StreamConfig.newBuilder() + .withRetentionPolicy(new Age(Duration.ofDays(7))) + .withTimestamping( + new Timestamping(TimestampingMode.CLIENT_REQUIRE, true)) + .withStorageClass(StorageClass.STANDARD) + .build(), + false, + false)) .build()) .get(); logger.info("newBasin={}", newBasin); diff --git a/app/src/main/java/org/example/app/BasinDemo.java b/app/src/main/java/org/example/app/BasinDemo.java index 56e7be1..1939e3b 100644 --- a/app/src/main/java/org/example/app/BasinDemo.java +++ b/app/src/main/java/org/example/app/BasinDemo.java @@ -15,7 +15,7 @@ public class BasinDemo { public static void main(String[] args) throws Exception { final var config = - Config.newBuilder(System.getenv("S2_AUTH_TOKEN")) + Config.newBuilder(System.getenv("S2_ACCESS_TOKEN")) .withEndpoints(Endpoints.fromEnvironment()) .build(); diff --git a/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java b/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java index a19d8d2..676c6cd 100644 --- a/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java @@ -25,12 +25,15 @@ public class ManagedAppendSessionDemo { private static final Logger logger = LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName()); + // 512KiB + private static final Integer TARGET_BATCH_SIZE = 512 * 1024; + public static void main(String[] args) throws Exception { - final var authToken = System.getenv("S2_AUTH_TOKEN"); + final var authToken = System.getenv("S2_ACCESS_TOKEN"); final var basinName = System.getenv("S2_BASIN"); final var streamName = System.getenv("S2_STREAM"); if (authToken == null) { - throw new IllegalStateException("S2_AUTH_TOKEN not set"); + throw new IllegalStateException("S2_ACCESS_TOKEN not set"); } if (basinName == null) { throw new IllegalStateException("S2_BASIN not set"); @@ -82,7 +85,16 @@ public static void main(String[] args) throws Exception { try { // Generate a record with approximately 10KiB of random text. var payload = - RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10); + RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", TARGET_BATCH_SIZE); + + while (futureAppendSession.remainingBufferCapacityBytes() + < (TARGET_BATCH_SIZE + payload.length())) { + // Crude backpressure mechanism; slow down the rate of payload creation by sleeping + // momentarily + // if we have hit the internal append buffer max size. + Thread.sleep(10); + } + var append = futureAppendSession.submit( AppendInput.newBuilder() diff --git a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java index 471f19b..6859b62 100644 --- a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java @@ -11,6 +11,7 @@ import s2.config.Endpoints; import s2.types.Batch; import s2.types.ReadSessionRequest; +import s2.types.Start; public class ManagedReadSessionDemo { @@ -19,11 +20,11 @@ public class ManagedReadSessionDemo { public static void main(String[] args) throws Exception { - final var authToken = System.getenv("S2_AUTH_TOKEN"); + final var authToken = System.getenv("S2_ACCESS_TOKEN"); final var basinName = System.getenv("S2_BASIN"); final var streamName = System.getenv("S2_STREAM"); if (authToken == null) { - throw new IllegalStateException("S2_AUTH_TOKEN not set"); + throw new IllegalStateException("S2_ACCESS_TOKEN not set"); } if (basinName == null) { throw new IllegalStateException("S2_BASIN not set"); @@ -32,11 +33,7 @@ public static void main(String[] args) throws Exception { throw new IllegalStateException("S2_STREAM not set"); } - var config = - Config.newBuilder(authToken) - .withEndpoints(Endpoints.fromEnvironment()) - .withCompression(true) - .build(); + var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build(); try (final var executor = new ScheduledThreadPoolExecutor(12); final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) { @@ -49,7 +46,10 @@ public static void main(String[] args) throws Exception { try (final var managedSession = streamClient.managedReadSession( - ReadSessionRequest.newBuilder().withHeartbeats(true).build(), + ReadSessionRequest.newBuilder() + .withHeartbeats(true) + .withStart(Start.seqNum(0)) + .build(), 1024 * 1024 * 1024 * 5)) { AtomicLong receivedBytes = new AtomicLong(); @@ -61,10 +61,12 @@ public static void main(String[] args) throws Exception { if (elem instanceof Batch batch) { var size = batch.meteredBytes(); logger.info( - "batch of {} bytes, seqnums {}..={}", + "batch of {} bytes, seqnums {}..={} / instants {}..={}", size, batch.firstSeqNum(), - batch.lastSeqNum()); + batch.lastSeqNum(), + batch.firstTimestamp(), + batch.lastTimestamp()); receivedBytes.addAndGet(size); } else { logger.info("non batch received: {}", elem); diff --git a/gradle.properties b/gradle.properties index 268a339..847cca8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.0.15-SNAPSHOT \ No newline at end of file +version=0.0.15 \ No newline at end of file diff --git a/s2-internal/src/main/proto b/s2-internal/src/main/proto index aea316a..4183c36 160000 --- a/s2-internal/src/main/proto +++ b/s2-internal/src/main/proto @@ -1 +1 @@ -Subproject commit aea316a72958fe4ffb00cdb30681d0f5836012c6 +Subproject commit 4183c36ef4b34d8626e01873eafbaf9583aae067 diff --git a/s2-sdk/src/main/java/s2/client/BaseClient.java b/s2-sdk/src/main/java/s2/client/BaseClient.java index 29079a0..6957cdd 100644 --- a/s2-sdk/src/main/java/s2/client/BaseClient.java +++ b/s2-sdk/src/main/java/s2/client/BaseClient.java @@ -55,6 +55,7 @@ static boolean retryableStatus(Status status) { switch (status.getCode()) { case UNKNOWN: case DEADLINE_EXCEEDED: + case RESOURCE_EXHAUSTED: case UNAVAILABLE: return true; default: diff --git a/s2-sdk/src/main/java/s2/client/Client.java b/s2-sdk/src/main/java/s2/client/Client.java index b844841..ecac440 100644 --- a/s2-sdk/src/main/java/s2/client/Client.java +++ b/s2-sdk/src/main/java/s2/client/Client.java @@ -20,7 +20,6 @@ import s2.types.CreateBasinRequest; import s2.types.Paginated; import s2.types.ReconfigureBasinRequest; -import s2.types.StreamConfig; import s2.v1alpha.AccountServiceGrpc; import s2.v1alpha.DeleteBasinRequest; import s2.v1alpha.GetBasinConfigRequest; @@ -125,9 +124,7 @@ public ListenableFuture reconfigureBasin(ReconfigureBasinRequest re withStaticRetries( config.maxRetries, () -> this.futureStub.reconfigureBasin(reconfigure.toProto())), - resp -> - new BasinConfig( - StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())), + resp -> BasinConfig.fromProto(resp.getConfig()), executor)); } @@ -146,9 +143,7 @@ public ListenableFuture getBasinConfig(String basin) { () -> this.futureStub.getBasinConfig( GetBasinConfigRequest.newBuilder().setBasin(basin).build())), - resp -> - new BasinConfig( - StreamConfig.fromProto(resp.getConfig().getDefaultStreamConfig())), + resp -> BasinConfig.fromProto(resp.getConfig()), executor)); } diff --git a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java index f84a2d2..6c661cb 100644 --- a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java +++ b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java @@ -57,6 +57,10 @@ public class ManagedAppendSession implements AutoCloseable { this.remainingAttempts = new AtomicInteger(this.client.config.maxRetries); } + public Integer remainingBufferCapacityBytes() { + return this.inflightBytes.availablePermits(); + } + private ListenableFuture retryingDaemon() { return Futures.catchingAsync( executor.submit(this::daemon), @@ -370,6 +374,8 @@ public ListenableFuture closeGracefully() { return daemon; } + interface Notification {} + static class InflightRecord { final AppendInput input; final long entryNanos; @@ -392,8 +398,6 @@ static InflightRecord construct(AppendInput input, Long meteredBytes) { } } - interface Notification {} - class Ack implements Notification { final AppendOutput output; diff --git a/s2-sdk/src/main/java/s2/client/ManagedReadSession.java b/s2-sdk/src/main/java/s2/client/ManagedReadSession.java index 2eec65c..02d11a6 100644 --- a/s2-sdk/src/main/java/s2/client/ManagedReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ManagedReadSession.java @@ -101,7 +101,6 @@ public void close() throws Exception { } interface ReadItem {} - ; static class DataItem implements ReadItem { final ReadOutput readOutput; diff --git a/s2-sdk/src/main/java/s2/client/ReadSession.java b/s2-sdk/src/main/java/s2/client/ReadSession.java index 53ecd52..c725f44 100644 --- a/s2-sdk/src/main/java/s2/client/ReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ReadSession.java @@ -10,12 +10,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import s2.types.Batch; import s2.types.ReadOutput; import s2.types.ReadSessionRequest; +import s2.types.Start; import s2.v1alpha.ReadSessionResponse; public class ReadSession implements AutoCloseable { @@ -27,7 +29,7 @@ public class ReadSession implements AutoCloseable { final ScheduledExecutorService executor; final StreamClient client; - final AtomicLong nextStartSeqNum; + final AtomicReference nextStart; final AtomicLong consumedRecords = new AtomicLong(); final AtomicLong consumedBytes = new AtomicLong(0); final AtomicInteger remainingAttempts; @@ -52,7 +54,7 @@ public class ReadSession implements AutoCloseable { this.onResponse = onResponse; this.onError = onError; this.request = request; - this.nextStartSeqNum = new AtomicLong(request.startSeqNum); + this.nextStart = new AtomicReference<>(request.start); this.remainingAttempts = new AtomicInteger(client.config.maxRetries); this.lastEvent = new AtomicLong(System.nanoTime()); @@ -134,12 +136,12 @@ private ListenableFuture retrying() { return Futures.catchingAsync( readSessionInner( - request.update(nextStartSeqNum.get(), consumedRecords.get(), consumedBytes.get()), + request.update(nextStart.get(), consumedRecords.get(), consumedBytes.get()), resp -> { if (resp instanceof Batch) { final Batch batch = (Batch) resp; var lastRecordIdx = batch.lastSeqNum(); - lastRecordIdx.ifPresent(v -> nextStartSeqNum.set(v + 1)); + lastRecordIdx.ifPresent(v -> nextStart.set(Start.seqNum(v + 1))); consumedRecords.addAndGet(batch.sequencedRecordBatch.records.size()); consumedBytes.addAndGet(batch.meteredBytes()); } diff --git a/s2-sdk/src/main/java/s2/types/AppendInput.java b/s2-sdk/src/main/java/s2/types/AppendInput.java index ff152f6..1a1fb9c 100644 --- a/s2-sdk/src/main/java/s2/types/AppendInput.java +++ b/s2-sdk/src/main/java/s2/types/AppendInput.java @@ -1,6 +1,5 @@ package s2.types; -import com.google.protobuf.ByteString; import java.io.Serializable; import java.util.List; import java.util.Optional; @@ -8,10 +7,10 @@ public class AppendInput implements MeteredBytes, Serializable { public final List records; public final Optional matchSeqNum; - public final Optional fencingToken; + public final Optional fencingToken; private AppendInput( - List records, Optional matchSeqNum, Optional fencingToken) { + List records, Optional matchSeqNum, Optional fencingToken) { this.records = records; this.matchSeqNum = matchSeqNum; this.fencingToken = fencingToken; @@ -38,7 +37,7 @@ public s2.v1alpha.AppendInput toProto(String streamName) { public static class AppendInputBuilder { private List records = List.of(); private Optional matchSeqNum = Optional.empty(); - private Optional fencingToken = Optional.empty(); + private Optional fencingToken = Optional.empty(); public AppendInputBuilder withRecords(List records) { this.records = records; @@ -50,7 +49,7 @@ public AppendInputBuilder withMatchSeqNum(Long matchSeqNum) { return this; } - public AppendInputBuilder withFencingToken(ByteString fencingToken) { + public AppendInputBuilder withFencingToken(String fencingToken) { this.fencingToken = Optional.of(fencingToken); return this; } @@ -64,8 +63,8 @@ public AppendInput build() { }); fencingToken.ifPresent( token -> { - if (token.size() > 16) { - throw new IllegalArgumentException("fencingToken must be less than 16 bytes"); + if (token.length() > 36) { + throw new IllegalArgumentException("fencingToken must be 36 or fewer UTF-8 bytes"); } }); var provisional = new AppendInput(records, matchSeqNum, fencingToken); diff --git a/s2-sdk/src/main/java/s2/types/BasinConfig.java b/s2-sdk/src/main/java/s2/types/BasinConfig.java index 3c45f6f..1273f7b 100644 --- a/s2-sdk/src/main/java/s2/types/BasinConfig.java +++ b/s2-sdk/src/main/java/s2/types/BasinConfig.java @@ -2,14 +2,28 @@ public class BasinConfig { public final StreamConfig defaultStreamConfig; + public final boolean createStreamOnAppend; + public final boolean createStreamOnRead; - public BasinConfig(StreamConfig defaultStreamConfig) { + public BasinConfig( + StreamConfig defaultStreamConfig, boolean createStreamOnAppend, boolean createStreamOnRead) { this.defaultStreamConfig = defaultStreamConfig; + this.createStreamOnAppend = createStreamOnAppend; + this.createStreamOnRead = createStreamOnRead; + } + + public static BasinConfig fromProto(s2.v1alpha.BasinConfig basinConfig) { + return new BasinConfig( + StreamConfig.fromProto(basinConfig.getDefaultStreamConfig()), + basinConfig.getCreateStreamOnAppend(), + basinConfig.getCreateStreamOnRead()); } public s2.v1alpha.BasinConfig toProto() { return s2.v1alpha.BasinConfig.newBuilder() .setDefaultStreamConfig(defaultStreamConfig.toProto()) + .setCreateStreamOnAppend(createStreamOnAppend) + .setCreateStreamOnRead(createStreamOnRead) .build(); } } diff --git a/s2-sdk/src/main/java/s2/types/Batch.java b/s2-sdk/src/main/java/s2/types/Batch.java index b30801b..2ddc525 100644 --- a/s2-sdk/src/main/java/s2/types/Batch.java +++ b/s2-sdk/src/main/java/s2/types/Batch.java @@ -1,5 +1,6 @@ package s2.types; +import java.time.Instant; import java.util.Optional; public final class Batch implements ReadOutput, MeteredBytes { @@ -23,6 +24,19 @@ public Optional lastSeqNum() { } } + public Optional firstTimestamp() { + return this.sequencedRecordBatch.records.stream().findFirst().map(sr -> sr.timestamp); + } + + public Optional lastTimestamp() { + if (!this.sequencedRecordBatch.records.isEmpty()) { + return Optional.of( + this.sequencedRecordBatch.records.get(sequencedRecordBatch.records.size() - 1).timestamp); + } else { + return Optional.empty(); + } + } + @Override public long meteredBytes() { return this.sequencedRecordBatch.meteredBytes(); diff --git a/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java b/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java index b5d56f9..dec2e50 100644 --- a/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java +++ b/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java @@ -56,8 +56,8 @@ public CreateBasinRequestBuilder withBasin(String basin) { return this; } - public CreateBasinRequestBuilder withDefaultStreamConfig(StreamConfig config) { - this.config = Optional.of(new BasinConfig(config)); + public CreateBasinRequestBuilder withBasinConfig(BasinConfig config) { + this.config = Optional.of(config); return this; } @@ -75,7 +75,7 @@ public CreateBasinRequest build() { this.basin.ifPresent(BasinUtils::validateBasinName); return new CreateBasinRequest( this.basin, - this.config.orElse(new BasinConfig(StreamConfig.newBuilder().build())), + this.config.orElse(new BasinConfig(StreamConfig.newBuilder().build(), false, false)), this.assignment); } } diff --git a/s2-sdk/src/main/java/s2/types/Header.java b/s2-sdk/src/main/java/s2/types/Header.java index ac7e0c5..281c86d 100644 --- a/s2-sdk/src/main/java/s2/types/Header.java +++ b/s2-sdk/src/main/java/s2/types/Header.java @@ -11,11 +11,11 @@ public Header(ByteString name, ByteString value) { this.value = value; } - public s2.v1alpha.Header toProto() { - return s2.v1alpha.Header.newBuilder().setName(name).setValue(value).build(); - } - public static Header fromProto(s2.v1alpha.Header protoHeader) { return new Header(protoHeader.getName(), protoHeader.getValue()); } + + public s2.v1alpha.Header toProto() { + return s2.v1alpha.Header.newBuilder().setName(name).setValue(value).build(); + } } diff --git a/s2-sdk/src/main/java/s2/types/ReadOutput.java b/s2-sdk/src/main/java/s2/types/ReadOutput.java index 3772d7c..a239e89 100644 --- a/s2-sdk/src/main/java/s2/types/ReadOutput.java +++ b/s2-sdk/src/main/java/s2/types/ReadOutput.java @@ -5,8 +5,6 @@ static ReadOutput fromProto(s2.v1alpha.ReadOutput readOutput) { switch (readOutput.getOutputCase()) { case BATCH: return new Batch(SequencedRecordBatch.fromProto(readOutput.getBatch())); - case FIRST_SEQ_NUM: - return new FirstSeqNum(readOutput.getFirstSeqNum()); case NEXT_SEQ_NUM: return new NextSeqNum(readOutput.getNextSeqNum()); } diff --git a/s2-sdk/src/main/java/s2/types/ReadRequest.java b/s2-sdk/src/main/java/s2/types/ReadRequest.java index 7671d9d..2822668 100644 --- a/s2-sdk/src/main/java/s2/types/ReadRequest.java +++ b/s2-sdk/src/main/java/s2/types/ReadRequest.java @@ -4,11 +4,11 @@ public class ReadRequest { - public final long startSeqNum; + public final Start start; public final ReadLimit readLimit; - protected ReadRequest(long startSeqNum, ReadLimit readLimit) { - this.startSeqNum = startSeqNum; + protected ReadRequest(Start start, ReadLimit readLimit) { + this.start = start; this.readLimit = readLimit; } @@ -17,19 +17,28 @@ public static ReadRequestBuilder newBuilder() { } public s2.v1alpha.ReadRequest toProto(String streamName) { - return s2.v1alpha.ReadRequest.newBuilder() - .setStream(streamName) - .setStartSeqNum(startSeqNum) - .setLimit(readLimit.toProto()) - .build(); + s2.v1alpha.ReadRequest.Builder builder = + s2.v1alpha.ReadRequest.newBuilder().setStream(streamName).setLimit(readLimit.toProto()); + + if (start instanceof Start.SeqNum) { + builder.setSeqNum(((Start.SeqNum) start).value); + } else if (start instanceof Start.Timestamp) { + builder.setTimestamp(((Start.Timestamp) start).value); + } else if (start instanceof Start.TailOffset) { + builder.setTailOffset(((Start.TailOffset) start).value); + } else { + throw new IllegalStateException("Unknown start type: " + start.getClass().getName()); + } + + return builder.build(); } public static class ReadRequestBuilder { - private Optional startSeqNum = Optional.empty(); + private Optional start = Optional.empty(); private Optional readLimit = Optional.empty(); - public ReadRequestBuilder withStartSeqNum(long startSeqNum) { - this.startSeqNum = Optional.of(startSeqNum); + public ReadRequestBuilder withStart(Start start) { + this.start = Optional.of(start); return this; } @@ -40,7 +49,8 @@ public ReadRequestBuilder withReadLimit(ReadLimit readLimit) { public ReadRequest build() { this.readLimit.ifPresent(ReadLimit::validateUnary); - return new ReadRequest(this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE)); + return new ReadRequest( + this.start.orElseGet(() -> Start.seqNum(0L)), this.readLimit.orElse(ReadLimit.NONE)); } } } diff --git a/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java b/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java index 3ad8481..b610cf3 100644 --- a/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java +++ b/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java @@ -4,12 +4,12 @@ public class ReadSessionRequest { - public final long startSeqNum; + public final Start start; public final ReadLimit readLimit; public final boolean heartbeats; - protected ReadSessionRequest(long startSeqNum, ReadLimit readLimit, boolean heartbeats) { - this.startSeqNum = startSeqNum; + protected ReadSessionRequest(Start start, ReadLimit readLimit, boolean heartbeats) { + this.start = start; this.readLimit = readLimit; this.heartbeats = heartbeats; } @@ -18,27 +18,38 @@ public static ReadSessionRequestBuilder newBuilder() { return new ReadSessionRequestBuilder(); } - public ReadSessionRequest update(long newStartSeqNum, long consumedRecords, long consumedBytes) { + public ReadSessionRequest update(Start start, long consumedRecords, long consumedBytes) { return new ReadSessionRequest( - newStartSeqNum, readLimit.remaining(consumedRecords, consumedBytes), heartbeats); + start, readLimit.remaining(consumedRecords, consumedBytes), heartbeats); } public s2.v1alpha.ReadSessionRequest toProto(String streamName) { - return s2.v1alpha.ReadSessionRequest.newBuilder() - .setStream(streamName) - .setStartSeqNum(startSeqNum) - .setLimit(readLimit.toProto()) - .setHeartbeats(heartbeats) - .build(); + s2.v1alpha.ReadSessionRequest.Builder builder = + s2.v1alpha.ReadSessionRequest.newBuilder() + .setStream(streamName) + .setLimit(readLimit.toProto()) + .setHeartbeats(heartbeats); + + if (start instanceof Start.SeqNum) { + builder.setSeqNum(((Start.SeqNum) start).value); + } else if (start instanceof Start.Timestamp) { + builder.setTimestamp(((Start.Timestamp) start).value); + } else if (start instanceof Start.TailOffset) { + builder.setTailOffset(((Start.TailOffset) start).value); + } else { + throw new IllegalStateException("Unknown Start type: " + start.getClass().getName()); + } + + return builder.build(); } public static class ReadSessionRequestBuilder { - private Optional startSeqNum = Optional.empty(); + private Optional start = Optional.empty(); private Optional readLimit = Optional.empty(); private boolean heartbeats = false; - public ReadSessionRequestBuilder withStartSeqNum(long startSeqNum) { - this.startSeqNum = Optional.of(startSeqNum); + public ReadSessionRequestBuilder withStart(Start start) { + this.start = Optional.of(start); return this; } @@ -54,7 +65,9 @@ public ReadSessionRequestBuilder withHeartbeats(boolean heartbeats) { public ReadSessionRequest build() { return new ReadSessionRequest( - this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE), this.heartbeats); + this.start.orElseGet(() -> Start.seqNum(0L)), + this.readLimit.orElse(ReadLimit.NONE), + this.heartbeats); } } } diff --git a/s2-sdk/src/main/java/s2/types/SequencedRecord.java b/s2-sdk/src/main/java/s2/types/SequencedRecord.java index d111f64..5cc8273 100644 --- a/s2-sdk/src/main/java/s2/types/SequencedRecord.java +++ b/s2-sdk/src/main/java/s2/types/SequencedRecord.java @@ -1,6 +1,7 @@ package s2.types; import com.google.protobuf.ByteString; +import java.time.Instant; import java.util.List; import java.util.stream.Collectors; @@ -8,19 +9,13 @@ public class SequencedRecord implements MeteredBytes { public final long seqNum; public final List
headers; public final ByteString body; + public final Instant timestamp; - SequencedRecord(long seqNum, List
headers, ByteString body) { + SequencedRecord(long seqNum, List
headers, ByteString body, Instant timestamp) { this.seqNum = seqNum; this.headers = headers; this.body = body; - } - - @Override - public long meteredBytes() { - return 8 - + (2L * this.headers.size()) - + this.headers.stream().map(h -> h.name.size() + h.value.size()).reduce(0, Integer::sum) - + this.body.size(); + this.timestamp = timestamp; } public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedRecord) { @@ -29,6 +24,15 @@ public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedReco sequencedRecord.getHeadersList().stream() .map(Header::fromProto) .collect(Collectors.toList()), - sequencedRecord.getBody()); + sequencedRecord.getBody(), + Instant.ofEpochMilli(sequencedRecord.getTimestamp())); + } + + @Override + public long meteredBytes() { + return 8 + + (2L * this.headers.size()) + + this.headers.stream().map(h -> h.name.size() + h.value.size()).reduce(0, Integer::sum) + + this.body.size(); } } diff --git a/s2-sdk/src/main/java/s2/types/Start.java b/s2-sdk/src/main/java/s2/types/Start.java new file mode 100644 index 0000000..31f77c2 --- /dev/null +++ b/s2-sdk/src/main/java/s2/types/Start.java @@ -0,0 +1,45 @@ +package s2.types; + +import java.time.Instant; + +public abstract class Start { + public static SeqNum seqNum(long seqNum) { + return new SeqNum(seqNum); + } + + public static Timestamp timestamp(Instant instant) { + return new Timestamp(instant.toEpochMilli()); + } + + public static TailOffset tailOffset(long tailOffset) { + return new TailOffset(tailOffset); + } + + public static final class SeqNum extends Start { + public final long value; + + private SeqNum(long value) { + this.value = value; + } + } + + public static final class Timestamp extends Start { + public final long value; + + private Timestamp(long value) { + this.value = value; + } + + public Instant toInstant() { + return Instant.ofEpochMilli(value); + } + } + + public static final class TailOffset extends Start { + public final long value; + + private TailOffset(long value) { + this.value = value; + } + } +} diff --git a/s2-sdk/src/main/java/s2/types/StreamConfig.java b/s2-sdk/src/main/java/s2/types/StreamConfig.java index e558e87..3d89c4b 100644 --- a/s2-sdk/src/main/java/s2/types/StreamConfig.java +++ b/s2-sdk/src/main/java/s2/types/StreamConfig.java @@ -8,10 +8,15 @@ public class StreamConfig { public final StorageClass storageClass; public final Optional retentionPolicy; + public final Optional timestamping; - StreamConfig(StorageClass storageClass, Optional retentionPolicy) { + StreamConfig( + StorageClass storageClass, + Optional retentionPolicy, + Optional timestamping) { this.storageClass = storageClass; this.retentionPolicy = retentionPolicy; + this.timestamping = timestamping; } public static StreamConfig fromProto(s2.v1alpha.StreamConfig proto) { @@ -36,7 +41,13 @@ public static StreamConfig fromProto(s2.v1alpha.StreamConfig proto) { if (proto.getRetentionPolicyCase() == RetentionPolicyCase.AGE) { retentionPolicy = Optional.of(new Age(Duration.ofSeconds(proto.getAge()))); } - return new StreamConfig(storageClass, retentionPolicy); + + Optional timestamping = Optional.empty(); + if (proto.hasTimestamping()) { + timestamping = Optional.of(Timestamping.fromProto(proto.getTimestamping())); + } + + return new StreamConfig(storageClass, retentionPolicy, timestamping); } public static StreamConfigBuilder newBuilder() { @@ -44,12 +55,43 @@ public static StreamConfigBuilder newBuilder() { } public s2.v1alpha.StreamConfig toProto() { - return s2.v1alpha.StreamConfig.newBuilder().build(); + var builder = s2.v1alpha.StreamConfig.newBuilder(); + final s2.v1alpha.StorageClass storageClass; + switch (this.storageClass) { + case UNKNOWN: + throw new IllegalArgumentException("Unknown storage class: " + this.storageClass); + case STANDARD: + storageClass = s2.v1alpha.StorageClass.STORAGE_CLASS_STANDARD; + break; + case EXPRESS: + storageClass = s2.v1alpha.StorageClass.STORAGE_CLASS_EXPRESS; + break; + case UNSPECIFIED: + default: + storageClass = s2.v1alpha.StorageClass.STORAGE_CLASS_UNSPECIFIED; + break; + } + + builder.setStorageClass(storageClass); + + if (retentionPolicy.isPresent()) { + RetentionPolicy retentionPolicy = this.retentionPolicy.get(); + if (retentionPolicy instanceof Age) { + builder.setAge(((Age) retentionPolicy).age.getSeconds()); + } else { + throw new IllegalArgumentException("Invalid retention policy: " + retentionPolicy); + } + } + + this.timestamping.ifPresent(ts -> builder.setTimestamping(ts.toProto())); + + return builder.build(); } public static class StreamConfigBuilder { private Optional storageClass = Optional.empty(); private Optional retentionPolicy = Optional.empty(); + private Optional timestamping = Optional.empty(); public StreamConfigBuilder withStorageClass(StorageClass storageClass) { this.storageClass = Optional.of(storageClass); @@ -61,8 +103,14 @@ public StreamConfigBuilder withRetentionPolicy(RetentionPolicy retentionPolicy) return this; } + public StreamConfigBuilder withTimestamping(Timestamping timestamping) { + this.timestamping = Optional.of(timestamping); + return this; + } + public StreamConfig build() { - return new StreamConfig(this.storageClass.orElse(StorageClass.EXPRESS), this.retentionPolicy); + return new StreamConfig( + this.storageClass.orElse(StorageClass.EXPRESS), this.retentionPolicy, this.timestamping); } } } diff --git a/s2-sdk/src/main/java/s2/types/Timestamping.java b/s2-sdk/src/main/java/s2/types/Timestamping.java new file mode 100644 index 0000000..f4ef3ac --- /dev/null +++ b/s2-sdk/src/main/java/s2/types/Timestamping.java @@ -0,0 +1,68 @@ +package s2.types; + +import s2.v1alpha.StreamConfig; + +public class Timestamping { + public final TimestampingMode mode; + public final boolean uncapped; + + /** + * Instantiates a new Timestamping. + * + * @param mode selected timestamping behavior + * @param uncapped if client-specified timestamps should be allowed to exceed the arrival time + */ + public Timestamping(TimestampingMode mode, boolean uncapped) { + this.mode = mode; + this.uncapped = uncapped; + } + + public static Timestamping fromProto(StreamConfig.Timestamping proto) { + final TimestampingMode mode; + switch (proto.getMode()) { + case TIMESTAMPING_MODE_UNSPECIFIED: + mode = TimestampingMode.UNSPECIFIED; + break; + case TIMESTAMPING_MODE_CLIENT_PREFER: + mode = TimestampingMode.CLIENT_PREFER; + break; + case TIMESTAMPING_MODE_CLIENT_REQUIRE: + mode = TimestampingMode.CLIENT_REQUIRE; + break; + case TIMESTAMPING_MODE_ARRIVAL: + mode = TimestampingMode.ARRIVAL; + break; + case UNRECOGNIZED: + default: + mode = TimestampingMode.UNKNOWN; + break; + } + + return new Timestamping(mode, proto.getUncapped()); + } + + public StreamConfig.Timestamping toProto() { + final s2.v1alpha.TimestampingMode timestampingMode; + switch (mode) { + case UNSPECIFIED: + timestampingMode = s2.v1alpha.TimestampingMode.TIMESTAMPING_MODE_UNSPECIFIED; + break; + case CLIENT_PREFER: + timestampingMode = s2.v1alpha.TimestampingMode.TIMESTAMPING_MODE_CLIENT_PREFER; + break; + case CLIENT_REQUIRE: + timestampingMode = s2.v1alpha.TimestampingMode.TIMESTAMPING_MODE_CLIENT_REQUIRE; + break; + case ARRIVAL: + timestampingMode = s2.v1alpha.TimestampingMode.TIMESTAMPING_MODE_ARRIVAL; + break; + default: + throw new IllegalArgumentException("Unexpected value for timestamping mode: " + mode); + } + + return StreamConfig.Timestamping.newBuilder() + .setMode(timestampingMode) + .setUncapped(uncapped) + .build(); + } +} diff --git a/s2-sdk/src/main/java/s2/types/TimestampingMode.java b/s2-sdk/src/main/java/s2/types/TimestampingMode.java new file mode 100644 index 0000000..20592d5 --- /dev/null +++ b/s2-sdk/src/main/java/s2/types/TimestampingMode.java @@ -0,0 +1,12 @@ +package s2.types; + +public enum TimestampingMode { + UNKNOWN, + UNSPECIFIED, + /// Prefer client-specified timestamp if present otherwise use arrival time. + CLIENT_PREFER, + /// Require a client-specified timestamp and reject the append if it is missing. + CLIENT_REQUIRE, + /// Use the arrival time and ignore any client-specified timestamp. + ARRIVAL +} diff --git a/s2-sdk/src/test/java/s2/client/ReadSessionTest.java b/s2-sdk/src/test/java/s2/client/ReadSessionTest.java index c5be008..8e989c6 100644 --- a/s2-sdk/src/test/java/s2/client/ReadSessionTest.java +++ b/s2-sdk/src/test/java/s2/client/ReadSessionTest.java @@ -20,6 +20,7 @@ import s2.types.ReadLimit; import s2.types.ReadOutput; import s2.types.ReadSessionRequest; +import s2.types.Start; import s2.v1alpha.StreamService.MockReadSessionStreamService; public class ReadSessionTest { @@ -61,7 +62,7 @@ public void tearDown() throws Exception { public void testReadSession() throws Exception { ReadSessionRequest request = ReadSessionRequest.newBuilder() - .withStartSeqNum(0) + .withStart(Start.seqNum(0)) .withReadLimit(ReadLimit.count(25)) .build(); diff --git a/s2-sdk/src/test/java/s2/v1alpha/StreamService/MockReadSessionStreamService.java b/s2-sdk/src/test/java/s2/v1alpha/StreamService/MockReadSessionStreamService.java index dc20958..447622f 100644 --- a/s2-sdk/src/test/java/s2/v1alpha/StreamService/MockReadSessionStreamService.java +++ b/s2-sdk/src/test/java/s2/v1alpha/StreamService/MockReadSessionStreamService.java @@ -18,14 +18,25 @@ public void readSession( ReadSessionRequest request, StreamObserver responseObserver) { System.out.println("MockStreamService.readSession req " + request); - var startSeqNum = request.getStartSeqNum(); + long startSeqNum = 0; + switch (request.getStartCase()) { + case SEQ_NUM: + startSeqNum = request.getSeqNum(); + break; + case TIMESTAMP: + case TAIL_OFFSET: + case START_NOT_SET: + startSeqNum = 0; + break; + } + var limit = request.getLimit().getCount(); if (!(limit > 0)) { throw new RuntimeException("count must be set"); } for (var seqNum = startSeqNum; seqNum < startSeqNum + limit; seqNum++) { if (calls.getAndIncrement() % 10 == 0) { - responseObserver.onError(new RuntimeException("I messed up!")); + responseObserver.onError(new RuntimeException("Response observer failed")); return; } else { var batch = @@ -35,6 +46,7 @@ public void readSession( .addRecords( SequencedRecord.newBuilder() .setSeqNum(seqNum) + .setTimestamp(System.currentTimeMillis()) .setBody(ByteString.copyFromUtf8(String.format("fake %s", seqNum))) .build()) .build())