diff --git a/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java b/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java index 676c6cd..0bef595 100644 --- a/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java @@ -25,8 +25,8 @@ public class ManagedAppendSessionDemo { private static final Logger logger = LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName()); - // 512KiB - private static final Integer TARGET_BATCH_SIZE = 512 * 1024; + // 128KiB + private static final Integer TARGET_BATCH_SIZE = 128 * 1024; public static void main(String[] args) throws Exception { final var authToken = System.getenv("S2_ACCESS_TOKEN"); diff --git a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java index 6859b62..d5d24cd 100644 --- a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java @@ -61,12 +61,10 @@ public static void main(String[] args) throws Exception { if (elem instanceof Batch batch) { var size = batch.meteredBytes(); logger.info( - "batch of {} bytes, seqnums {}..={} / instants {}..={}", + "batch of {} bytes, first={} ..= last={}", size, - batch.firstSeqNum(), - batch.lastSeqNum(), - batch.firstTimestamp(), - batch.lastTimestamp()); + batch.firstPosition(), + batch.lastPosition()); receivedBytes.addAndGet(size); } else { logger.info("non batch received: {}", elem); diff --git a/gradle.properties b/gradle.properties index 55ab57e..c85d851 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.0.16-SNAPSHOT \ No newline at end of file +version=0.0.16 \ No newline at end of file diff --git a/s2-internal/src/main/proto b/s2-internal/src/main/proto index 4183c36..b6add44 160000 --- a/s2-internal/src/main/proto +++ b/s2-internal/src/main/proto @@ -1 +1 @@ -Subproject commit 4183c36ef4b34d8626e01873eafbaf9583aae067 +Subproject commit b6add440765c24318a61c3d3c47efaf909844b9d diff --git a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java index 6c661cb..4a32ee6 100644 --- a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java +++ b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java @@ -235,7 +235,7 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException { } private void validate(InflightRecord record, AppendOutput output) { - var numRecordsForAcknowledgement = output.endSeqNum - output.startSeqNum; + var numRecordsForAcknowledgement = output.end.seqNum - output.start.seqNum; if (numRecordsForAcknowledgement != record.input.records.size()) { throw Status.INTERNAL .withDescription( diff --git a/s2-sdk/src/main/java/s2/client/ReadSession.java b/s2-sdk/src/main/java/s2/client/ReadSession.java index c725f44..84730a0 100644 --- a/s2-sdk/src/main/java/s2/client/ReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ReadSession.java @@ -140,8 +140,8 @@ private ListenableFuture retrying() { resp -> { if (resp instanceof Batch) { final Batch batch = (Batch) resp; - var lastRecordIdx = batch.lastSeqNum(); - lastRecordIdx.ifPresent(v -> nextStart.set(Start.seqNum(v + 1))); + var lastPosition = batch.lastPosition(); + lastPosition.ifPresent(v -> nextStart.set(Start.seqNum(v.seqNum + 1))); consumedRecords.addAndGet(batch.sequencedRecordBatch.records.size()); consumedBytes.addAndGet(batch.meteredBytes()); } diff --git a/s2-sdk/src/main/java/s2/client/StreamClient.java b/s2-sdk/src/main/java/s2/client/StreamClient.java index 606f358..32c4ec8 100644 --- a/s2-sdk/src/main/java/s2/client/StreamClient.java +++ b/s2-sdk/src/main/java/s2/client/StreamClient.java @@ -21,12 +21,12 @@ import s2.types.ReadOutput; import s2.types.ReadRequest; import s2.types.ReadSessionRequest; +import s2.types.StreamPosition; import s2.v1alpha.AppendRequest; import s2.v1alpha.AppendResponse; import s2.v1alpha.AppendSessionRequest; import s2.v1alpha.AppendSessionResponse; import s2.v1alpha.CheckTailRequest; -import s2.v1alpha.CheckTailResponse; import s2.v1alpha.StreamServiceGrpc; import s2.v1alpha.StreamServiceGrpc.StreamServiceFutureStub; import s2.v1alpha.StreamServiceGrpc.StreamServiceStub; @@ -81,9 +81,9 @@ public static StreamClientBuilder newBuilder(Config config, String basinName, St /** * Check the sequence number that will be assigned to the next record on a stream. * - * @return future of the next sequence number + * @return future of the tail's position */ - public ListenableFuture checkTail() { + public ListenableFuture checkTail() { return withTimeout( () -> Futures.transform( @@ -92,7 +92,7 @@ public ListenableFuture checkTail() { () -> this.futureStub.checkTail( CheckTailRequest.newBuilder().setStream(streamName).build())), - CheckTailResponse::getNextSeqNum, + (resp) -> new StreamPosition(resp.getNextSeqNum(), resp.getLastTimestamp()), executor)); } diff --git a/s2-sdk/src/main/java/s2/types/AppendOutput.java b/s2-sdk/src/main/java/s2/types/AppendOutput.java index b1e71a6..071b204 100644 --- a/s2-sdk/src/main/java/s2/types/AppendOutput.java +++ b/s2-sdk/src/main/java/s2/types/AppendOutput.java @@ -1,25 +1,25 @@ package s2.types; public class AppendOutput { - public final long startSeqNum; - public final long endSeqNum; - public final long nextSeqNum; + public final StreamPosition start; + public final StreamPosition end; + public final StreamPosition tail; - AppendOutput(long startSeqNum, long endSeqNum, long nextSeqNum) { - this.startSeqNum = startSeqNum; - this.endSeqNum = endSeqNum; - this.nextSeqNum = nextSeqNum; + AppendOutput(StreamPosition start, StreamPosition end, StreamPosition tail) { + this.start = start; + this.end = end; + this.tail = tail; } public static AppendOutput fromProto(s2.v1alpha.AppendOutput appendOutput) { return new AppendOutput( - appendOutput.getStartSeqNum(), appendOutput.getEndSeqNum(), appendOutput.getNextSeqNum()); + new StreamPosition(appendOutput.getStartSeqNum(), appendOutput.getStartTimestamp()), + new StreamPosition(appendOutput.getEndSeqNum(), appendOutput.getEndTimestamp()), + new StreamPosition(appendOutput.getNextSeqNum(), appendOutput.getLastTimestamp())); } @Override public String toString() { - return String.format( - "AppendOutput[startSeqNum=%s, endSeqNum=%s, nextSeqNum=%s]", - startSeqNum, endSeqNum, nextSeqNum); + return String.format("AppendOutput[start=%s, end=%s, tail=%s]", start, end, tail); } } diff --git a/s2-sdk/src/main/java/s2/types/Batch.java b/s2-sdk/src/main/java/s2/types/Batch.java index 2ddc525..01d59b9 100644 --- a/s2-sdk/src/main/java/s2/types/Batch.java +++ b/s2-sdk/src/main/java/s2/types/Batch.java @@ -1,6 +1,5 @@ package s2.types; -import java.time.Instant; import java.util.Optional; public final class Batch implements ReadOutput, MeteredBytes { @@ -11,27 +10,17 @@ public final class Batch implements ReadOutput, MeteredBytes { this.sequencedRecordBatch = sequencedRecordBatch; } - public Optional firstSeqNum() { - return this.sequencedRecordBatch.records.stream().findFirst().map(sr -> sr.seqNum); + public Optional firstPosition() { + return this.sequencedRecordBatch.records.stream() + .findFirst() + .map(sr -> new StreamPosition(sr.seqNum, sr.timestamp)); } - public Optional lastSeqNum() { + public Optional lastPosition() { if (!this.sequencedRecordBatch.records.isEmpty()) { - return Optional.of( - this.sequencedRecordBatch.records.get(sequencedRecordBatch.records.size() - 1).seqNum); - } else { - return Optional.empty(); - } - } - - public Optional firstTimestamp() { - return this.sequencedRecordBatch.records.stream().findFirst().map(sr -> sr.timestamp); - } - - public Optional lastTimestamp() { - if (!this.sequencedRecordBatch.records.isEmpty()) { - return Optional.of( - this.sequencedRecordBatch.records.get(sequencedRecordBatch.records.size() - 1).timestamp); + var lastRecord = + this.sequencedRecordBatch.records.get(this.sequencedRecordBatch.records.size() - 1); + return Optional.of(new StreamPosition(lastRecord.seqNum, lastRecord.timestamp)); } else { return Optional.empty(); } diff --git a/s2-sdk/src/main/java/s2/types/SequencedRecord.java b/s2-sdk/src/main/java/s2/types/SequencedRecord.java index 5cc8273..9a8a492 100644 --- a/s2-sdk/src/main/java/s2/types/SequencedRecord.java +++ b/s2-sdk/src/main/java/s2/types/SequencedRecord.java @@ -1,7 +1,6 @@ package s2.types; import com.google.protobuf.ByteString; -import java.time.Instant; import java.util.List; import java.util.stream.Collectors; @@ -9,9 +8,9 @@ public class SequencedRecord implements MeteredBytes { public final long seqNum; public final List
headers; public final ByteString body; - public final Instant timestamp; + public final long timestamp; - SequencedRecord(long seqNum, List
headers, ByteString body, Instant timestamp) { + SequencedRecord(long seqNum, List
headers, ByteString body, long timestamp) { this.seqNum = seqNum; this.headers = headers; this.body = body; @@ -25,7 +24,7 @@ public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedReco .map(Header::fromProto) .collect(Collectors.toList()), sequencedRecord.getBody(), - Instant.ofEpochMilli(sequencedRecord.getTimestamp())); + sequencedRecord.getTimestamp()); } @Override diff --git a/s2-sdk/src/main/java/s2/types/Start.java b/s2-sdk/src/main/java/s2/types/Start.java index 31f77c2..aa8ee14 100644 --- a/s2-sdk/src/main/java/s2/types/Start.java +++ b/s2-sdk/src/main/java/s2/types/Start.java @@ -1,14 +1,12 @@ package s2.types; -import java.time.Instant; - public abstract class Start { public static SeqNum seqNum(long seqNum) { return new SeqNum(seqNum); } - public static Timestamp timestamp(Instant instant) { - return new Timestamp(instant.toEpochMilli()); + public static Timestamp timestamp(long timestamp) { + return new Timestamp(timestamp); } public static TailOffset tailOffset(long tailOffset) { @@ -29,10 +27,6 @@ public static final class Timestamp extends Start { private Timestamp(long value) { this.value = value; } - - public Instant toInstant() { - return Instant.ofEpochMilli(value); - } } public static final class TailOffset extends Start { diff --git a/s2-sdk/src/main/java/s2/types/StreamPosition.java b/s2-sdk/src/main/java/s2/types/StreamPosition.java new file mode 100644 index 0000000..546a88a --- /dev/null +++ b/s2-sdk/src/main/java/s2/types/StreamPosition.java @@ -0,0 +1,22 @@ +package s2.types; + +public class StreamPosition { + public final long seqNum; + public final long timestamp; + + public StreamPosition(long seqNum, long timestamp) { + if (seqNum < 0) { + throw new IllegalArgumentException("seqNum must be non-negative, got: " + seqNum); + } + if (timestamp < 0) { + throw new IllegalArgumentException("timestamp must be non-negative, got: " + timestamp); + } + this.seqNum = seqNum; + this.timestamp = timestamp; + } + + @Override + public String toString() { + return String.format("StreamPosition[seqNum=%s, timestamp=%s]", seqNum, timestamp); + } +}