Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import s2.config.Config;
import s2.config.Endpoints;
import s2.types.Batch;
import s2.types.ReadLimit;
import s2.types.ReadSessionRequest;

public class ManagedReadSessionDemo {
Expand Down Expand Up @@ -50,7 +49,7 @@ public static void main(String[] args) throws Exception {

try (final var managedSession =
streamClient.managedReadSession(
ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(),
ReadSessionRequest.newBuilder().withHeartbeats(true).build(),
1024 * 1024 * 1024 * 5)) {

AtomicLong receivedBytes = new AtomicLong();
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</encoder>
</appender>

<logger additivity="false" level="info" name="s2">
<logger additivity="false" level="trace" name="s2">
<appender-ref ref="console"/>
</logger>

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.14-SNAPSHOT
version=0.0.14
2 changes: 1 addition & 1 deletion s2-internal/src/main/proto
56 changes: 55 additions & 1 deletion s2-sdk/src/main/java/s2/client/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand All @@ -20,6 +22,8 @@ public class ReadSession implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(ReadSession.class.getName());

private static final Long HEARTBEAT_THRESHOLD_NANOS = TimeUnit.SECONDS.toNanos(20);

final ScheduledExecutorService executor;
final StreamClient client;

Expand All @@ -28,6 +32,10 @@ public class ReadSession implements AutoCloseable {
final AtomicLong consumedBytes = new AtomicLong(0);
final AtomicInteger remainingAttempts;

// Liveness timer.
final AtomicLong lastEvent;
final ListenableFuture<Void> livenessDaemon;

final Consumer<ReadOutput> onResponse;
final Consumer<Throwable> onError;

Expand All @@ -46,6 +54,9 @@ public class ReadSession implements AutoCloseable {
this.request = request;
this.nextStartSeqNum = new AtomicLong(request.startSeqNum);
this.remainingAttempts = new AtomicInteger(client.config.maxRetries);
this.lastEvent = new AtomicLong(System.nanoTime());

this.livenessDaemon = request.heartbeats ? livenessDaemon() : Futures.immediateFuture(null);
this.daemon = this.retrying();
}

Expand All @@ -60,7 +71,12 @@ private ListenableFuture<Void> readSessionInner(

@Override
public void onNext(ReadSessionResponse value) {
innerOnResponse.accept(ReadOutput.fromProto(value.getOutput()));
lastEvent.set(System.nanoTime());
if (value.hasOutput()) {
innerOnResponse.accept(ReadOutput.fromProto(value.getOutput()));
} else {
logger.trace("heartbeat");
}
}

@Override
Expand All @@ -72,12 +88,48 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
logger.debug("Read session inner onCompleted");
livenessDaemon.cancel(true);
fut.set(null);
}
});
return fut;
}

private ListenableFuture<Void> livenessDaemon() {
SettableFuture<Void> livenessFuture = SettableFuture.create();
scheduleLivenessCheck(livenessFuture);
return livenessFuture;
}

private void scheduleLivenessCheck(SettableFuture<Void> livenessFuture) {
final long delay = (lastEvent.get() + HEARTBEAT_THRESHOLD_NANOS) - System.nanoTime();

logger.trace(
"Checking liveness. Next deadline: {} seconds.",
TimeUnit.SECONDS.convert(delay, TimeUnit.NANOSECONDS));
if (delay <= 0) {
this.onError.accept(
Status.DEADLINE_EXCEEDED
.withDescription("ReadSession hit local heartbeat deadline")
.asRuntimeException());
this.daemon.cancel(true);
livenessFuture.set(null);
} else {
ScheduledFuture<?> scheduledCheck =
executor.schedule(
() -> {
if (livenessFuture.isDone()) {
return;
}
scheduleLivenessCheck(livenessFuture);
},
delay,
TimeUnit.NANOSECONDS);

livenessFuture.addListener(() -> scheduledCheck.cancel(true), executor);
}
}

private ListenableFuture<Void> retrying() {

return Futures.catchingAsync(
Expand Down Expand Up @@ -107,6 +159,7 @@ private ListenableFuture<Void> retrying() {
} else {
logger.warn("readSession failed, status={}", status.getCode());
onError.accept(t);
this.livenessDaemon.cancel(true);
return Futures.immediateFuture(null);
}
},
Expand All @@ -119,6 +172,7 @@ public ListenableFuture<Void> awaitCompletion() {

@Override
public void close() {
this.livenessDaemon.cancel(true);
this.daemon.cancel(true);
}
}
23 changes: 17 additions & 6 deletions s2-sdk/src/main/java/s2/types/BasinInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@

public class BasinInfo {
public final String name;
public final String scope;
public final String cell;
public final BasinScope basinScope;
public final BasinState basinState;

BasinInfo(String name, String scope, String cell, BasinState basinState) {
BasinInfo(String name, BasinScope basinScope, BasinState basinState) {
this.name = name;
this.scope = scope;
this.cell = cell;
this.basinScope = basinScope;
this.basinState = basinState;
}

public static BasinInfo fromProto(s2.v1alpha.BasinInfo basinInfo) {
BasinScope scope;
switch (basinInfo.getScope()) {
case BASIN_SCOPE_UNSPECIFIED:
scope = BasinScope.UNSPECIFIED;
break;
case BASIN_SCOPE_AWS_US_EAST_1:
scope = BasinScope.AWS_US_EAST_1;
break;
default:
scope = BasinScope.UNKNOWN;
break;
}

BasinState state;
switch (basinInfo.getState()) {
case BASIN_STATE_UNSPECIFIED:
Expand All @@ -33,6 +44,6 @@ public static BasinInfo fromProto(s2.v1alpha.BasinInfo basinInfo) {
break;
}

return new BasinInfo(basinInfo.getName(), basinInfo.getScope(), basinInfo.getCell(), state);
return new BasinInfo(basinInfo.getName(), scope, state);
}
}
7 changes: 7 additions & 0 deletions s2-sdk/src/main/java/s2/types/BasinScope.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package s2.types;

public enum BasinScope {
UNSPECIFIED,
AWS_US_EAST_1,
UNKNOWN
}
22 changes: 12 additions & 10 deletions s2-sdk/src/main/java/s2/types/ReadSessionRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ public class ReadSessionRequest {

public final long startSeqNum;
public final ReadLimit readLimit;
public final boolean heartbeats;

protected ReadSessionRequest(long startSeqNum, ReadLimit readLimit) {
protected ReadSessionRequest(long startSeqNum, ReadLimit readLimit, boolean heartbeats) {
this.startSeqNum = startSeqNum;
this.readLimit = readLimit;
this.heartbeats = heartbeats;
}

public static ReadSessionRequestBuilder newBuilder() {
Expand All @@ -18,27 +20,22 @@ public static ReadSessionRequestBuilder newBuilder() {

public ReadSessionRequest update(long newStartSeqNum, long consumedRecords, long consumedBytes) {
return new ReadSessionRequest(
newStartSeqNum, readLimit.remaining(consumedRecords, consumedBytes));
}

public s2.v1alpha.ReadSessionRequest toProto() {
return s2.v1alpha.ReadSessionRequest.newBuilder()
.setStartSeqNum(startSeqNum)
.setLimit(readLimit.toProto())
.build();
newStartSeqNum, readLimit.remaining(consumedRecords, consumedBytes), heartbeats);
}

public s2.v1alpha.ReadSessionRequest toProto(String streamName) {
return s2.v1alpha.ReadSessionRequest.newBuilder()
.setStream(streamName)
.setStartSeqNum(startSeqNum)
.setLimit(readLimit.toProto())
.setHeartbeats(heartbeats)
.build();
}

public static class ReadSessionRequestBuilder {
private Optional<Long> startSeqNum = Optional.empty();
private Optional<ReadLimit> readLimit = Optional.empty();
private boolean heartbeats = false;

public ReadSessionRequestBuilder withStartSeqNum(long startSeqNum) {
this.startSeqNum = Optional.of(startSeqNum);
Expand All @@ -50,9 +47,14 @@ public ReadSessionRequestBuilder withReadLimit(ReadLimit readLimit) {
return this;
}

public ReadSessionRequestBuilder withHeartbeats(boolean heartbeats) {
this.heartbeats = heartbeats;
return this;
}

public ReadSessionRequest build() {
return new ReadSessionRequest(
this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE));
this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE), this.heartbeats);
}
}
}
Loading