Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/src/main/java/org/example/app/AccountDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception {
try (var client = Client.newBuilder(config).build()) {

var basins = client.listBasins(ListBasinsRequest.newBuilder().build()).get();
basins.elems().forEach(basin -> logger.info("basin={}", basin));
basins.elems.forEach(basin -> logger.info("basin={}", basin));

var newBasin =
client
Expand Down
10 changes: 4 additions & 6 deletions app/src/main/java/org/example/app/BasinDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ public static void main(String[] args) throws Exception {
try (final var basinClient =
BasinClient.newBuilder(config, System.getenv("S2_BASIN")).build()) {
final var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get();
streams
.elems()
.forEach(
stream -> {
logger.info("stream={}", stream);
});
streams.elems.forEach(
stream -> {
logger.info("stream={}", stream);
});

var newStream =
basinClient
Expand Down
10 changes: 7 additions & 3 deletions app/src/main/java/org/example/app/ManagedReadSessionDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ public static void main(String[] args) throws Exception {
throw new IllegalStateException("S2_STREAM not set");
}

var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build();
var config =
Config.newBuilder(authToken)
.withEndpoints(Endpoints.fromEnvironment())
.withCompression(true)
.build();

try (final var executor = new ScheduledThreadPoolExecutor(1);
try (final var executor = new ScheduledThreadPoolExecutor(12);
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {

final var streamClient =
Expand All @@ -47,7 +51,7 @@ public static void main(String[] args) throws Exception {
try (final var managedSession =
streamClient.managedReadSession(
ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(),
1024 * 1024 * 1024)) {
1024 * 1024 * 1024 * 5)) {

AtomicLong receivedBytes = new AtomicLong();
while (!managedSession.isClosed()) {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.13-SNAPSHOT
version=0.0.13
2 changes: 1 addition & 1 deletion s2-internal/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protobuf {

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
languageVersion.set(JavaLanguageVersion.of(11))
}
withJavadocJar()
withSourcesJar()
Expand Down
2 changes: 1 addition & 1 deletion s2-sdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ tasks.test {

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
languageVersion.set(JavaLanguageVersion.of(11))
}
withJavadocJar()
withSourcesJar()
Expand Down
12 changes: 8 additions & 4 deletions s2-sdk/src/main/java/s2/client/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ public Thread newThread(Runnable r) {
}

static boolean retryableStatus(Status status) {
return switch (status.getCode()) {
case UNKNOWN, DEADLINE_EXCEEDED, UNAVAILABLE -> true;
default -> false;
};
switch (status.getCode()) {
case UNKNOWN:
case DEADLINE_EXCEEDED:
case UNAVAILABLE:
return true;
default:
return false;
}
}

public void close() {
Expand Down
5 changes: 4 additions & 1 deletion s2-sdk/src/main/java/s2/client/BasinClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import s2.auth.BearerTokenCallCredentials;
import s2.channel.BasinCompatibleChannel;
import s2.channel.ManagedChannelFactory;
Expand Down Expand Up @@ -66,7 +67,9 @@ public ListenableFuture<Paginated<StreamInfo>> listStreams(
resp ->
new Paginated<>(
resp.getHasMore(),
resp.getStreamsList().stream().map(StreamInfo::fromProto).toList()),
resp.getStreamsList().stream()
.map(StreamInfo::fromProto)
.collect(Collectors.toList())),
executor));
}

Expand Down
5 changes: 4 additions & 1 deletion s2-sdk/src/main/java/s2/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import s2.auth.BearerTokenCallCredentials;
Expand Down Expand Up @@ -61,7 +62,9 @@ public ListenableFuture<Paginated<BasinInfo>> listBasins(s2.types.ListBasinsRequ
resp ->
new Paginated<>(
resp.getHasMore(),
resp.getBasinsList().stream().map(BasinInfo::fromProto).toList()),
resp.getBasinsList().stream()
.map(BasinInfo::fromProto)
.collect(Collectors.toList())),
executor));
}

Expand Down
89 changes: 67 additions & 22 deletions s2-sdk/src/main/java/s2/client/ManagedAppendSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,19 @@ record -> {
throw Status.CANCELLED
.withDescription("hit deadline while retransmitting")
.asRuntimeException();
} else if (notification instanceof Ack ack) {
} else if (notification instanceof Ack) {
final Ack ack = (Ack) notification;
this.remainingAttempts.set(this.client.config.maxRetries);
var correspondingInflight = inflightQueue.poll();
if (correspondingInflight == null) {
throw Status.INTERNAL.withDescription("inflight queue is empty").asRuntimeException();
} else {
validate(correspondingInflight, ack.output);
correspondingInflight.callback.set(ack.output);
this.inflightBytes.release(correspondingInflight.meteredBytes.intValue());
this.inflightBytes.release((int) correspondingInflight.meteredBytes);
}
} else if (notification instanceof Error error) {
} else if (notification instanceof Error) {
final Error error = (Error) notification;
throw new RuntimeException(error.throwable);
} else {
throw Status.INTERNAL
Expand Down Expand Up @@ -218,7 +220,8 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException {
// through them and cancel any pending batches via their callback.
while (!notificationQueue.isEmpty()) {
var entry = notificationQueue.poll();
if (entry instanceof Batch batch) {
if (entry instanceof Batch) {
final Batch batch = (Batch) entry;
batch.input.callback.setException(fatal.asRuntimeException());
}
}
Expand All @@ -228,7 +231,7 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException {
}

private void validate(InflightRecord record, AppendOutput output) {
var numRecordsForAcknowledgement = output.endSeqNum() - output.startSeqNum();
var numRecordsForAcknowledgement = output.endSeqNum - output.startSeqNum;
if (numRecordsForAcknowledgement != record.input.records.size()) {
throw Status.INTERNAL
.withDescription(
Expand Down Expand Up @@ -287,7 +290,8 @@ public void onCompleted() {
clientObserver.onError(Status.CANCELLED.asRuntimeException());
throw Status.CANCELLED.asRuntimeException();
}
} else if (notification instanceof Batch batch) {
} else if (notification instanceof Batch) {
final Batch batch = (Batch) notification;
logger.debug("notification=BATCH");
if (!inflightQueue.offer(batch.input)) {
throw Status.INTERNAL.asRuntimeException();
Expand All @@ -304,7 +308,8 @@ public void onCompleted() {
+ TimeUnit.NANOSECONDS.convert(this.client.config.requestTimeout)));
}

} else if (notification instanceof Ack ack) {
} else if (notification instanceof Ack) {
final Ack ack = (Ack) notification;
logger.debug("notification=ACK");
this.remainingAttempts.set(this.client.config.maxRetries);
var correspondingInflight = inflightQueue.poll();
Expand All @@ -313,7 +318,7 @@ public void onCompleted() {
} else {
validate(correspondingInflight, ack.output);
correspondingInflight.callback.set(ack.output);
this.inflightBytes.release(correspondingInflight.meteredBytes.intValue());
this.inflightBytes.release((int) correspondingInflight.meteredBytes);

// Reset the next deadline.
this.nextDeadlineSystemNanos.set(
Expand All @@ -323,18 +328,20 @@ public void onCompleted() {
entry.entryNanos
+ TimeUnit.NANOSECONDS.convert(this.client.config.requestTimeout)));
}
} else if (notification instanceof Error error) {
} else if (notification instanceof Error) {
final Error error = (Error) notification;
logger.debug("notification=ERROR");
clientObserver.onError(Status.CANCELLED.asRuntimeException());
throw new RuntimeException(error.throwable);

} else if (notification instanceof ClientClose close) {
} else if (notification instanceof ClientClose) {
final ClientClose close = (ClientClose) notification;
logger.debug("notification=CLIENT_CLOSE,gracefully={}", close.gracefully);
clientObserver.onCompleted();
if (!close.gracefully) {
return null;
}
} else if (notification instanceof ServerClose close) {
} else if (notification instanceof ServerClose) {
logger.debug("notification=SERVER_CLOSE");
if (acceptingAppends.get() || !inflightQueue.isEmpty() || !notificationQueue.isEmpty()) {
throw Status.INTERNAL
Expand Down Expand Up @@ -363,25 +370,63 @@ public ListenableFuture<Void> closeGracefully() {
return daemon;
}

sealed interface Notification permits Ack, Batch, ClientClose, Error, ServerClose {}
static class InflightRecord {
final AppendInput input;
final long entryNanos;
final SettableFuture<AppendOutput> callback;
final long meteredBytes;

InflightRecord(
AppendInput input,
long entryNanos,
SettableFuture<AppendOutput> callback,
long meteredBytes) {
this.input = input;
this.entryNanos = entryNanos;
this.callback = callback;
this.meteredBytes = meteredBytes;
}

record InflightRecord(
AppendInput input,
Long entryNanos,
SettableFuture<AppendOutput> callback,
Long meteredBytes) {
static InflightRecord construct(AppendInput input, Long meteredBytes) {
return new InflightRecord(input, System.nanoTime(), SettableFuture.create(), meteredBytes);
}
}

record Batch(InflightRecord input) implements Notification {}
interface Notification {}

class Ack implements Notification {
final AppendOutput output;

Ack(AppendOutput output) {
this.output = output;
}
}

record Ack(AppendOutput output) implements Notification {}
class Batch implements Notification {
final InflightRecord input;

record Error(Throwable throwable) implements Notification {}
Batch(InflightRecord input) {
this.input = input;
}
}

record ClientClose(boolean gracefully) implements Notification {}
class ClientClose implements Notification {
final boolean gracefully;

record ServerClose() implements Notification {}
ClientClose(boolean gracefully) {
this.gracefully = gracefully;
}
}

class Error implements Notification {
final Throwable throwable;

Error(Throwable throwable) {
this.throwable = throwable;
}
}

class ServerClose implements Notification {
ServerClose() {}
}
}
31 changes: 24 additions & 7 deletions s2-sdk/src/main/java/s2/client/ManagedReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public class ManagedReadSession implements AutoCloseable {
readSessionRequest,
resp -> {
try {
if (resp instanceof Batch batch) {
if (resp instanceof Batch) {
final Batch batch = (Batch) resp;
bufferAvailable.acquire((int) batch.meteredBytes());
}
queue.put(new DataItem(resp));
Expand Down Expand Up @@ -74,7 +75,8 @@ private Optional<ReadOutput> getInner(Optional<ReadItem> readItem) {
var nextRead =
readItem.flatMap(
elem -> {
if (elem instanceof ErrorItem item) {
if (elem instanceof ErrorItem) {
final ErrorItem item = (ErrorItem) elem;
throw new RuntimeException(item.error);
} else if (elem instanceof EndItem) {
return Optional.empty();
Expand All @@ -83,7 +85,7 @@ private Optional<ReadOutput> getInner(Optional<ReadItem> readItem) {
}
});
nextRead
.map(nr -> (nr instanceof Batch batch) ? (int) batch.meteredBytes() : 0)
.map(nr -> (nr instanceof Batch) ? (int) ((Batch) nr).meteredBytes() : 0)
.ifPresent(bufferAvailable::release);
return nextRead;
}
Expand All @@ -98,11 +100,26 @@ public void close() throws Exception {
this.readSession.close();
}

private sealed interface ReadItem permits DataItem, ErrorItem, EndItem {}
interface ReadItem {}
;

record DataItem(ReadOutput readOutput) implements ReadItem {}
static class DataItem implements ReadItem {
final ReadOutput readOutput;

record ErrorItem(Throwable error) implements ReadItem {}
DataItem(ReadOutput readOutput) {
this.readOutput = readOutput;
}
}

record EndItem() implements ReadItem {}
static class ErrorItem implements ReadItem {
final Throwable error;

ErrorItem(Throwable error) {
this.error = error;
}
}

static class EndItem implements ReadItem {
EndItem() {}
}
}
5 changes: 3 additions & 2 deletions s2-sdk/src/main/java/s2/client/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ private ListenableFuture<Void> retrying() {
readSessionInner(
request.update(nextStartSeqNum.get(), consumedRecords.get(), consumedBytes.get()),
resp -> {
if (resp instanceof Batch batch) {
if (resp instanceof Batch) {
final Batch batch = (Batch) resp;
var lastRecordIdx = batch.lastSeqNum();
lastRecordIdx.ifPresent(v -> nextStartSeqNum.set(v + 1));
consumedRecords.addAndGet(batch.sequencedRecordBatch().records().size());
consumedRecords.addAndGet(batch.sequencedRecordBatch.records.size());
consumedBytes.addAndGet(batch.meteredBytes());
}
this.remainingAttempts.set(client.config.maxRetries);
Expand Down
Loading
Loading