diff --git a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java index 1d4daef..471f19b 100644 --- a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java @@ -10,7 +10,6 @@ import s2.config.Config; import s2.config.Endpoints; import s2.types.Batch; -import s2.types.ReadLimit; import s2.types.ReadSessionRequest; public class ManagedReadSessionDemo { @@ -50,7 +49,7 @@ public static void main(String[] args) throws Exception { try (final var managedSession = streamClient.managedReadSession( - ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(), + ReadSessionRequest.newBuilder().withHeartbeats(true).build(), 1024 * 1024 * 1024 * 5)) { AtomicLong receivedBytes = new AtomicLong(); diff --git a/app/src/main/resources/logback.xml b/app/src/main/resources/logback.xml index 261510c..0a2a18a 100644 --- a/app/src/main/resources/logback.xml +++ b/app/src/main/resources/logback.xml @@ -7,7 +7,7 @@ - + diff --git a/gradle.properties b/gradle.properties index b4a8bb7..a42b5b0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.0.14-SNAPSHOT \ No newline at end of file +version=0.0.14 \ No newline at end of file diff --git a/s2-internal/src/main/proto b/s2-internal/src/main/proto index c6a481b..aea316a 160000 --- a/s2-internal/src/main/proto +++ b/s2-internal/src/main/proto @@ -1 +1 @@ -Subproject commit c6a481b0912ec80b327086b37f22affd077480b0 +Subproject commit aea316a72958fe4ffb00cdb30681d0f5836012c6 diff --git a/s2-sdk/src/main/java/s2/client/ReadSession.java b/s2-sdk/src/main/java/s2/client/ReadSession.java index 4d0f0fa..53ecd52 100644 --- a/s2-sdk/src/main/java/s2/client/ReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ReadSession.java @@ -6,6 +6,8 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -20,6 +22,8 @@ public class ReadSession implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(ReadSession.class.getName()); + private static final Long HEARTBEAT_THRESHOLD_NANOS = TimeUnit.SECONDS.toNanos(20); + final ScheduledExecutorService executor; final StreamClient client; @@ -28,6 +32,10 @@ public class ReadSession implements AutoCloseable { final AtomicLong consumedBytes = new AtomicLong(0); final AtomicInteger remainingAttempts; + // Liveness timer. + final AtomicLong lastEvent; + final ListenableFuture livenessDaemon; + final Consumer onResponse; final Consumer onError; @@ -46,6 +54,9 @@ public class ReadSession implements AutoCloseable { this.request = request; this.nextStartSeqNum = new AtomicLong(request.startSeqNum); this.remainingAttempts = new AtomicInteger(client.config.maxRetries); + this.lastEvent = new AtomicLong(System.nanoTime()); + + this.livenessDaemon = request.heartbeats ? livenessDaemon() : Futures.immediateFuture(null); this.daemon = this.retrying(); } @@ -60,7 +71,12 @@ private ListenableFuture readSessionInner( @Override public void onNext(ReadSessionResponse value) { - innerOnResponse.accept(ReadOutput.fromProto(value.getOutput())); + lastEvent.set(System.nanoTime()); + if (value.hasOutput()) { + innerOnResponse.accept(ReadOutput.fromProto(value.getOutput())); + } else { + logger.trace("heartbeat"); + } } @Override @@ -72,12 +88,48 @@ public void onError(Throwable t) { @Override public void onCompleted() { logger.debug("Read session inner onCompleted"); + livenessDaemon.cancel(true); fut.set(null); } }); return fut; } + private ListenableFuture livenessDaemon() { + SettableFuture livenessFuture = SettableFuture.create(); + scheduleLivenessCheck(livenessFuture); + return livenessFuture; + } + + private void scheduleLivenessCheck(SettableFuture livenessFuture) { + final long delay = (lastEvent.get() + HEARTBEAT_THRESHOLD_NANOS) - System.nanoTime(); + + logger.trace( + "Checking liveness. Next deadline: {} seconds.", + TimeUnit.SECONDS.convert(delay, TimeUnit.NANOSECONDS)); + if (delay <= 0) { + this.onError.accept( + Status.DEADLINE_EXCEEDED + .withDescription("ReadSession hit local heartbeat deadline") + .asRuntimeException()); + this.daemon.cancel(true); + livenessFuture.set(null); + } else { + ScheduledFuture scheduledCheck = + executor.schedule( + () -> { + if (livenessFuture.isDone()) { + return; + } + scheduleLivenessCheck(livenessFuture); + }, + delay, + TimeUnit.NANOSECONDS); + + livenessFuture.addListener(() -> scheduledCheck.cancel(true), executor); + } + } + private ListenableFuture retrying() { return Futures.catchingAsync( @@ -107,6 +159,7 @@ private ListenableFuture retrying() { } else { logger.warn("readSession failed, status={}", status.getCode()); onError.accept(t); + this.livenessDaemon.cancel(true); return Futures.immediateFuture(null); } }, @@ -119,6 +172,7 @@ public ListenableFuture awaitCompletion() { @Override public void close() { + this.livenessDaemon.cancel(true); this.daemon.cancel(true); } } diff --git a/s2-sdk/src/main/java/s2/types/BasinInfo.java b/s2-sdk/src/main/java/s2/types/BasinInfo.java index d72f22b..5747116 100644 --- a/s2-sdk/src/main/java/s2/types/BasinInfo.java +++ b/s2-sdk/src/main/java/s2/types/BasinInfo.java @@ -2,18 +2,29 @@ public class BasinInfo { public final String name; - public final String scope; - public final String cell; + public final BasinScope basinScope; public final BasinState basinState; - BasinInfo(String name, String scope, String cell, BasinState basinState) { + BasinInfo(String name, BasinScope basinScope, BasinState basinState) { this.name = name; - this.scope = scope; - this.cell = cell; + this.basinScope = basinScope; this.basinState = basinState; } public static BasinInfo fromProto(s2.v1alpha.BasinInfo basinInfo) { + BasinScope scope; + switch (basinInfo.getScope()) { + case BASIN_SCOPE_UNSPECIFIED: + scope = BasinScope.UNSPECIFIED; + break; + case BASIN_SCOPE_AWS_US_EAST_1: + scope = BasinScope.AWS_US_EAST_1; + break; + default: + scope = BasinScope.UNKNOWN; + break; + } + BasinState state; switch (basinInfo.getState()) { case BASIN_STATE_UNSPECIFIED: @@ -33,6 +44,6 @@ public static BasinInfo fromProto(s2.v1alpha.BasinInfo basinInfo) { break; } - return new BasinInfo(basinInfo.getName(), basinInfo.getScope(), basinInfo.getCell(), state); + return new BasinInfo(basinInfo.getName(), scope, state); } } diff --git a/s2-sdk/src/main/java/s2/types/BasinScope.java b/s2-sdk/src/main/java/s2/types/BasinScope.java new file mode 100644 index 0000000..51eb30f --- /dev/null +++ b/s2-sdk/src/main/java/s2/types/BasinScope.java @@ -0,0 +1,7 @@ +package s2.types; + +public enum BasinScope { + UNSPECIFIED, + AWS_US_EAST_1, + UNKNOWN +} diff --git a/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java b/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java index 47af245..3ad8481 100644 --- a/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java +++ b/s2-sdk/src/main/java/s2/types/ReadSessionRequest.java @@ -6,10 +6,12 @@ public class ReadSessionRequest { public final long startSeqNum; public final ReadLimit readLimit; + public final boolean heartbeats; - protected ReadSessionRequest(long startSeqNum, ReadLimit readLimit) { + protected ReadSessionRequest(long startSeqNum, ReadLimit readLimit, boolean heartbeats) { this.startSeqNum = startSeqNum; this.readLimit = readLimit; + this.heartbeats = heartbeats; } public static ReadSessionRequestBuilder newBuilder() { @@ -18,14 +20,7 @@ public static ReadSessionRequestBuilder newBuilder() { public ReadSessionRequest update(long newStartSeqNum, long consumedRecords, long consumedBytes) { return new ReadSessionRequest( - newStartSeqNum, readLimit.remaining(consumedRecords, consumedBytes)); - } - - public s2.v1alpha.ReadSessionRequest toProto() { - return s2.v1alpha.ReadSessionRequest.newBuilder() - .setStartSeqNum(startSeqNum) - .setLimit(readLimit.toProto()) - .build(); + newStartSeqNum, readLimit.remaining(consumedRecords, consumedBytes), heartbeats); } public s2.v1alpha.ReadSessionRequest toProto(String streamName) { @@ -33,12 +28,14 @@ public s2.v1alpha.ReadSessionRequest toProto(String streamName) { .setStream(streamName) .setStartSeqNum(startSeqNum) .setLimit(readLimit.toProto()) + .setHeartbeats(heartbeats) .build(); } public static class ReadSessionRequestBuilder { private Optional startSeqNum = Optional.empty(); private Optional readLimit = Optional.empty(); + private boolean heartbeats = false; public ReadSessionRequestBuilder withStartSeqNum(long startSeqNum) { this.startSeqNum = Optional.of(startSeqNum); @@ -50,9 +47,14 @@ public ReadSessionRequestBuilder withReadLimit(ReadLimit readLimit) { return this; } + public ReadSessionRequestBuilder withHeartbeats(boolean heartbeats) { + this.heartbeats = heartbeats; + return this; + } + public ReadSessionRequest build() { return new ReadSessionRequest( - this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE)); + this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE), this.heartbeats); } } }