Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class ManagedAppendSessionDemo {
private static final Logger logger =
LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName());

// 512KiB
private static final Integer TARGET_BATCH_SIZE = 512 * 1024;
// 128KiB
private static final Integer TARGET_BATCH_SIZE = 128 * 1024;

public static void main(String[] args) throws Exception {
final var authToken = System.getenv("S2_ACCESS_TOKEN");
Expand Down
8 changes: 3 additions & 5 deletions app/src/main/java/org/example/app/ManagedReadSessionDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,10 @@ public static void main(String[] args) throws Exception {
if (elem instanceof Batch batch) {
var size = batch.meteredBytes();
logger.info(
"batch of {} bytes, seqnums {}..={} / instants {}..={}",
"batch of {} bytes, first={} ..= last={}",
size,
batch.firstSeqNum(),
batch.lastSeqNum(),
batch.firstTimestamp(),
batch.lastTimestamp());
batch.firstPosition(),
batch.lastPosition());
receivedBytes.addAndGet(size);
} else {
logger.info("non batch received: {}", elem);
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.16-SNAPSHOT
version=0.0.16
2 changes: 1 addition & 1 deletion s2-internal/src/main/proto
2 changes: 1 addition & 1 deletion s2-sdk/src/main/java/s2/client/ManagedAppendSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException {
}

private void validate(InflightRecord record, AppendOutput output) {
var numRecordsForAcknowledgement = output.endSeqNum - output.startSeqNum;
var numRecordsForAcknowledgement = output.end.seqNum - output.start.seqNum;
if (numRecordsForAcknowledgement != record.input.records.size()) {
throw Status.INTERNAL
.withDescription(
Expand Down
4 changes: 2 additions & 2 deletions s2-sdk/src/main/java/s2/client/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ private ListenableFuture<Void> retrying() {
resp -> {
if (resp instanceof Batch) {
final Batch batch = (Batch) resp;
var lastRecordIdx = batch.lastSeqNum();
lastRecordIdx.ifPresent(v -> nextStart.set(Start.seqNum(v + 1)));
var lastPosition = batch.lastPosition();
lastPosition.ifPresent(v -> nextStart.set(Start.seqNum(v.seqNum + 1)));
consumedRecords.addAndGet(batch.sequencedRecordBatch.records.size());
consumedBytes.addAndGet(batch.meteredBytes());
}
Expand Down
8 changes: 4 additions & 4 deletions s2-sdk/src/main/java/s2/client/StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import s2.types.ReadOutput;
import s2.types.ReadRequest;
import s2.types.ReadSessionRequest;
import s2.types.StreamPosition;
import s2.v1alpha.AppendRequest;
import s2.v1alpha.AppendResponse;
import s2.v1alpha.AppendSessionRequest;
import s2.v1alpha.AppendSessionResponse;
import s2.v1alpha.CheckTailRequest;
import s2.v1alpha.CheckTailResponse;
import s2.v1alpha.StreamServiceGrpc;
import s2.v1alpha.StreamServiceGrpc.StreamServiceFutureStub;
import s2.v1alpha.StreamServiceGrpc.StreamServiceStub;
Expand Down Expand Up @@ -81,9 +81,9 @@ public static StreamClientBuilder newBuilder(Config config, String basinName, St
/**
* Check the sequence number that will be assigned to the next record on a stream.
*
* @return future of the next sequence number
* @return future of the tail's position
*/
public ListenableFuture<Long> checkTail() {
public ListenableFuture<StreamPosition> checkTail() {
return withTimeout(
() ->
Futures.transform(
Expand All @@ -92,7 +92,7 @@ public ListenableFuture<Long> checkTail() {
() ->
this.futureStub.checkTail(
CheckTailRequest.newBuilder().setStream(streamName).build())),
CheckTailResponse::getNextSeqNum,
(resp) -> new StreamPosition(resp.getNextSeqNum(), resp.getLastTimestamp()),
executor));
}

Expand Down
22 changes: 11 additions & 11 deletions s2-sdk/src/main/java/s2/types/AppendOutput.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package s2.types;

public class AppendOutput {
public final long startSeqNum;
public final long endSeqNum;
public final long nextSeqNum;
public final StreamPosition start;
public final StreamPosition end;
public final StreamPosition tail;

AppendOutput(long startSeqNum, long endSeqNum, long nextSeqNum) {
this.startSeqNum = startSeqNum;
this.endSeqNum = endSeqNum;
this.nextSeqNum = nextSeqNum;
AppendOutput(StreamPosition start, StreamPosition end, StreamPosition tail) {
this.start = start;
this.end = end;
this.tail = tail;
}

public static AppendOutput fromProto(s2.v1alpha.AppendOutput appendOutput) {
return new AppendOutput(
appendOutput.getStartSeqNum(), appendOutput.getEndSeqNum(), appendOutput.getNextSeqNum());
new StreamPosition(appendOutput.getStartSeqNum(), appendOutput.getStartTimestamp()),
new StreamPosition(appendOutput.getEndSeqNum(), appendOutput.getEndTimestamp()),
new StreamPosition(appendOutput.getNextSeqNum(), appendOutput.getLastTimestamp()));
}

@Override
public String toString() {
return String.format(
"AppendOutput[startSeqNum=%s, endSeqNum=%s, nextSeqNum=%s]",
startSeqNum, endSeqNum, nextSeqNum);
return String.format("AppendOutput[start=%s, end=%s, tail=%s]", start, end, tail);
}
}
27 changes: 8 additions & 19 deletions s2-sdk/src/main/java/s2/types/Batch.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package s2.types;

import java.time.Instant;
import java.util.Optional;

public final class Batch implements ReadOutput, MeteredBytes {
Expand All @@ -11,27 +10,17 @@ public final class Batch implements ReadOutput, MeteredBytes {
this.sequencedRecordBatch = sequencedRecordBatch;
}

public Optional<Long> firstSeqNum() {
return this.sequencedRecordBatch.records.stream().findFirst().map(sr -> sr.seqNum);
public Optional<StreamPosition> firstPosition() {
return this.sequencedRecordBatch.records.stream()
.findFirst()
.map(sr -> new StreamPosition(sr.seqNum, sr.timestamp));
}

public Optional<Long> lastSeqNum() {
public Optional<StreamPosition> lastPosition() {
if (!this.sequencedRecordBatch.records.isEmpty()) {
return Optional.of(
this.sequencedRecordBatch.records.get(sequencedRecordBatch.records.size() - 1).seqNum);
} else {
return Optional.empty();
}
}

public Optional<Instant> firstTimestamp() {
return this.sequencedRecordBatch.records.stream().findFirst().map(sr -> sr.timestamp);
}

public Optional<Instant> lastTimestamp() {
if (!this.sequencedRecordBatch.records.isEmpty()) {
return Optional.of(
this.sequencedRecordBatch.records.get(sequencedRecordBatch.records.size() - 1).timestamp);
var lastRecord =
this.sequencedRecordBatch.records.get(this.sequencedRecordBatch.records.size() - 1);
return Optional.of(new StreamPosition(lastRecord.seqNum, lastRecord.timestamp));
} else {
return Optional.empty();
}
Expand Down
7 changes: 3 additions & 4 deletions s2-sdk/src/main/java/s2/types/SequencedRecord.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package s2.types;

import com.google.protobuf.ByteString;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;

public class SequencedRecord implements MeteredBytes {
public final long seqNum;
public final List<Header> headers;
public final ByteString body;
public final Instant timestamp;
public final long timestamp;

SequencedRecord(long seqNum, List<Header> headers, ByteString body, Instant timestamp) {
SequencedRecord(long seqNum, List<Header> headers, ByteString body, long timestamp) {
this.seqNum = seqNum;
this.headers = headers;
this.body = body;
Expand All @@ -25,7 +24,7 @@ public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedReco
.map(Header::fromProto)
.collect(Collectors.toList()),
sequencedRecord.getBody(),
Instant.ofEpochMilli(sequencedRecord.getTimestamp()));
sequencedRecord.getTimestamp());
}

@Override
Expand Down
10 changes: 2 additions & 8 deletions s2-sdk/src/main/java/s2/types/Start.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package s2.types;

import java.time.Instant;

public abstract class Start {
public static SeqNum seqNum(long seqNum) {
return new SeqNum(seqNum);
}

public static Timestamp timestamp(Instant instant) {
return new Timestamp(instant.toEpochMilli());
public static Timestamp timestamp(long timestamp) {
return new Timestamp(timestamp);
}

public static TailOffset tailOffset(long tailOffset) {
Expand All @@ -29,10 +27,6 @@ public static final class Timestamp extends Start {
private Timestamp(long value) {
this.value = value;
}

public Instant toInstant() {
return Instant.ofEpochMilli(value);
}
}

public static final class TailOffset extends Start {
Expand Down
22 changes: 22 additions & 0 deletions s2-sdk/src/main/java/s2/types/StreamPosition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package s2.types;

public class StreamPosition {
public final long seqNum;
public final long timestamp;

public StreamPosition(long seqNum, long timestamp) {
if (seqNum < 0) {
throw new IllegalArgumentException("seqNum must be non-negative, got: " + seqNum);
}
if (timestamp < 0) {
throw new IllegalArgumentException("timestamp must be non-negative, got: " + timestamp);
}
this.seqNum = seqNum;
this.timestamp = timestamp;
}

@Override
public String toString() {
return String.format("StreamPosition[seqNum=%s, timestamp=%s]", seqNum, timestamp);
}
}
Loading