diff --git a/app/src/main/java/org/example/app/AccountDemo.java b/app/src/main/java/org/example/app/AccountDemo.java index d87c78f..65b30c4 100644 --- a/app/src/main/java/org/example/app/AccountDemo.java +++ b/app/src/main/java/org/example/app/AccountDemo.java @@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception { try (var client = Client.newBuilder(config).build()) { var basins = client.listBasins(ListBasinsRequest.newBuilder().build()).get(); - basins.elems().forEach(basin -> logger.info("basin={}", basin)); + basins.elems.forEach(basin -> logger.info("basin={}", basin)); var newBasin = client diff --git a/app/src/main/java/org/example/app/BasinDemo.java b/app/src/main/java/org/example/app/BasinDemo.java index 3219bb2..56e7be1 100644 --- a/app/src/main/java/org/example/app/BasinDemo.java +++ b/app/src/main/java/org/example/app/BasinDemo.java @@ -22,12 +22,10 @@ public static void main(String[] args) throws Exception { try (final var basinClient = BasinClient.newBuilder(config, System.getenv("S2_BASIN")).build()) { final var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get(); - streams - .elems() - .forEach( - stream -> { - logger.info("stream={}", stream); - }); + streams.elems.forEach( + stream -> { + logger.info("stream={}", stream); + }); var newStream = basinClient diff --git a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java index 84323d9..1d4daef 100644 --- a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java @@ -33,9 +33,13 @@ public static void main(String[] args) throws Exception { throw new IllegalStateException("S2_STREAM not set"); } - var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build(); + var config = + Config.newBuilder(authToken) + .withEndpoints(Endpoints.fromEnvironment()) + .withCompression(true) + .build(); - try (final var executor = new ScheduledThreadPoolExecutor(1); + try (final var executor = new ScheduledThreadPoolExecutor(12); final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) { final var streamClient = @@ -47,7 +51,7 @@ public static void main(String[] args) throws Exception { try (final var managedSession = streamClient.managedReadSession( ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(), - 1024 * 1024 * 1024)) { + 1024 * 1024 * 1024 * 5)) { AtomicLong receivedBytes = new AtomicLong(); while (!managedSession.isClosed()) { diff --git a/gradle.properties b/gradle.properties index e702813..07f1a23 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.0.13-SNAPSHOT \ No newline at end of file +version=0.0.13 \ No newline at end of file diff --git a/s2-internal/build.gradle.kts b/s2-internal/build.gradle.kts index b73c456..a20c9e6 100644 --- a/s2-internal/build.gradle.kts +++ b/s2-internal/build.gradle.kts @@ -54,7 +54,7 @@ protobuf { java { toolchain { - languageVersion.set(JavaLanguageVersion.of(17)) + languageVersion.set(JavaLanguageVersion.of(11)) } withJavadocJar() withSourcesJar() diff --git a/s2-sdk/build.gradle.kts b/s2-sdk/build.gradle.kts index 194c7ed..a7d5bb7 100644 --- a/s2-sdk/build.gradle.kts +++ b/s2-sdk/build.gradle.kts @@ -45,7 +45,7 @@ tasks.test { java { toolchain { - languageVersion.set(JavaLanguageVersion.of(17)) + languageVersion.set(JavaLanguageVersion.of(11)) } withJavadocJar() withSourcesJar() diff --git a/s2-sdk/src/main/java/s2/client/BaseClient.java b/s2-sdk/src/main/java/s2/client/BaseClient.java index ee6d264..29079a0 100644 --- a/s2-sdk/src/main/java/s2/client/BaseClient.java +++ b/s2-sdk/src/main/java/s2/client/BaseClient.java @@ -52,10 +52,14 @@ public Thread newThread(Runnable r) { } static boolean retryableStatus(Status status) { - return switch (status.getCode()) { - case UNKNOWN, DEADLINE_EXCEEDED, UNAVAILABLE -> true; - default -> false; - }; + switch (status.getCode()) { + case UNKNOWN: + case DEADLINE_EXCEEDED: + case UNAVAILABLE: + return true; + default: + return false; + } } public void close() { diff --git a/s2-sdk/src/main/java/s2/client/BasinClient.java b/s2-sdk/src/main/java/s2/client/BasinClient.java index 6d7551d..2f51b22 100644 --- a/s2-sdk/src/main/java/s2/client/BasinClient.java +++ b/s2-sdk/src/main/java/s2/client/BasinClient.java @@ -8,6 +8,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; import s2.auth.BearerTokenCallCredentials; import s2.channel.BasinCompatibleChannel; import s2.channel.ManagedChannelFactory; @@ -66,7 +67,9 @@ public ListenableFuture> listStreams( resp -> new Paginated<>( resp.getHasMore(), - resp.getStreamsList().stream().map(StreamInfo::fromProto).toList()), + resp.getStreamsList().stream() + .map(StreamInfo::fromProto) + .collect(Collectors.toList())), executor)); } diff --git a/s2-sdk/src/main/java/s2/client/Client.java b/s2-sdk/src/main/java/s2/client/Client.java index 1e39934..b844841 100644 --- a/s2-sdk/src/main/java/s2/client/Client.java +++ b/s2-sdk/src/main/java/s2/client/Client.java @@ -8,6 +8,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import s2.auth.BearerTokenCallCredentials; @@ -61,7 +62,9 @@ public ListenableFuture> listBasins(s2.types.ListBasinsRequ resp -> new Paginated<>( resp.getHasMore(), - resp.getBasinsList().stream().map(BasinInfo::fromProto).toList()), + resp.getBasinsList().stream() + .map(BasinInfo::fromProto) + .collect(Collectors.toList())), executor)); } diff --git a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java index f506cc9..f84a2d2 100644 --- a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java +++ b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java @@ -179,7 +179,8 @@ record -> { throw Status.CANCELLED .withDescription("hit deadline while retransmitting") .asRuntimeException(); - } else if (notification instanceof Ack ack) { + } else if (notification instanceof Ack) { + final Ack ack = (Ack) notification; this.remainingAttempts.set(this.client.config.maxRetries); var correspondingInflight = inflightQueue.poll(); if (correspondingInflight == null) { @@ -187,9 +188,10 @@ record -> { } else { validate(correspondingInflight, ack.output); correspondingInflight.callback.set(ack.output); - this.inflightBytes.release(correspondingInflight.meteredBytes.intValue()); + this.inflightBytes.release((int) correspondingInflight.meteredBytes); } - } else if (notification instanceof Error error) { + } else if (notification instanceof Error) { + final Error error = (Error) notification; throw new RuntimeException(error.throwable); } else { throw Status.INTERNAL @@ -218,7 +220,8 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException { // through them and cancel any pending batches via their callback. while (!notificationQueue.isEmpty()) { var entry = notificationQueue.poll(); - if (entry instanceof Batch batch) { + if (entry instanceof Batch) { + final Batch batch = (Batch) entry; batch.input.callback.setException(fatal.asRuntimeException()); } } @@ -228,7 +231,7 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException { } private void validate(InflightRecord record, AppendOutput output) { - var numRecordsForAcknowledgement = output.endSeqNum() - output.startSeqNum(); + var numRecordsForAcknowledgement = output.endSeqNum - output.startSeqNum; if (numRecordsForAcknowledgement != record.input.records.size()) { throw Status.INTERNAL .withDescription( @@ -287,7 +290,8 @@ public void onCompleted() { clientObserver.onError(Status.CANCELLED.asRuntimeException()); throw Status.CANCELLED.asRuntimeException(); } - } else if (notification instanceof Batch batch) { + } else if (notification instanceof Batch) { + final Batch batch = (Batch) notification; logger.debug("notification=BATCH"); if (!inflightQueue.offer(batch.input)) { throw Status.INTERNAL.asRuntimeException(); @@ -304,7 +308,8 @@ public void onCompleted() { + TimeUnit.NANOSECONDS.convert(this.client.config.requestTimeout))); } - } else if (notification instanceof Ack ack) { + } else if (notification instanceof Ack) { + final Ack ack = (Ack) notification; logger.debug("notification=ACK"); this.remainingAttempts.set(this.client.config.maxRetries); var correspondingInflight = inflightQueue.poll(); @@ -313,7 +318,7 @@ public void onCompleted() { } else { validate(correspondingInflight, ack.output); correspondingInflight.callback.set(ack.output); - this.inflightBytes.release(correspondingInflight.meteredBytes.intValue()); + this.inflightBytes.release((int) correspondingInflight.meteredBytes); // Reset the next deadline. this.nextDeadlineSystemNanos.set( @@ -323,18 +328,20 @@ public void onCompleted() { entry.entryNanos + TimeUnit.NANOSECONDS.convert(this.client.config.requestTimeout))); } - } else if (notification instanceof Error error) { + } else if (notification instanceof Error) { + final Error error = (Error) notification; logger.debug("notification=ERROR"); clientObserver.onError(Status.CANCELLED.asRuntimeException()); throw new RuntimeException(error.throwable); - } else if (notification instanceof ClientClose close) { + } else if (notification instanceof ClientClose) { + final ClientClose close = (ClientClose) notification; logger.debug("notification=CLIENT_CLOSE,gracefully={}", close.gracefully); clientObserver.onCompleted(); if (!close.gracefully) { return null; } - } else if (notification instanceof ServerClose close) { + } else if (notification instanceof ServerClose) { logger.debug("notification=SERVER_CLOSE"); if (acceptingAppends.get() || !inflightQueue.isEmpty() || !notificationQueue.isEmpty()) { throw Status.INTERNAL @@ -363,25 +370,63 @@ public ListenableFuture closeGracefully() { return daemon; } - sealed interface Notification permits Ack, Batch, ClientClose, Error, ServerClose {} + static class InflightRecord { + final AppendInput input; + final long entryNanos; + final SettableFuture callback; + final long meteredBytes; + + InflightRecord( + AppendInput input, + long entryNanos, + SettableFuture callback, + long meteredBytes) { + this.input = input; + this.entryNanos = entryNanos; + this.callback = callback; + this.meteredBytes = meteredBytes; + } - record InflightRecord( - AppendInput input, - Long entryNanos, - SettableFuture callback, - Long meteredBytes) { static InflightRecord construct(AppendInput input, Long meteredBytes) { return new InflightRecord(input, System.nanoTime(), SettableFuture.create(), meteredBytes); } } - record Batch(InflightRecord input) implements Notification {} + interface Notification {} + + class Ack implements Notification { + final AppendOutput output; + + Ack(AppendOutput output) { + this.output = output; + } + } - record Ack(AppendOutput output) implements Notification {} + class Batch implements Notification { + final InflightRecord input; - record Error(Throwable throwable) implements Notification {} + Batch(InflightRecord input) { + this.input = input; + } + } - record ClientClose(boolean gracefully) implements Notification {} + class ClientClose implements Notification { + final boolean gracefully; - record ServerClose() implements Notification {} + ClientClose(boolean gracefully) { + this.gracefully = gracefully; + } + } + + class Error implements Notification { + final Throwable throwable; + + Error(Throwable throwable) { + this.throwable = throwable; + } + } + + class ServerClose implements Notification { + ServerClose() {} + } } diff --git a/s2-sdk/src/main/java/s2/client/ManagedReadSession.java b/s2-sdk/src/main/java/s2/client/ManagedReadSession.java index ef2e69e..2eec65c 100644 --- a/s2-sdk/src/main/java/s2/client/ManagedReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ManagedReadSession.java @@ -26,7 +26,8 @@ public class ManagedReadSession implements AutoCloseable { readSessionRequest, resp -> { try { - if (resp instanceof Batch batch) { + if (resp instanceof Batch) { + final Batch batch = (Batch) resp; bufferAvailable.acquire((int) batch.meteredBytes()); } queue.put(new DataItem(resp)); @@ -74,7 +75,8 @@ private Optional getInner(Optional readItem) { var nextRead = readItem.flatMap( elem -> { - if (elem instanceof ErrorItem item) { + if (elem instanceof ErrorItem) { + final ErrorItem item = (ErrorItem) elem; throw new RuntimeException(item.error); } else if (elem instanceof EndItem) { return Optional.empty(); @@ -83,7 +85,7 @@ private Optional getInner(Optional readItem) { } }); nextRead - .map(nr -> (nr instanceof Batch batch) ? (int) batch.meteredBytes() : 0) + .map(nr -> (nr instanceof Batch) ? (int) ((Batch) nr).meteredBytes() : 0) .ifPresent(bufferAvailable::release); return nextRead; } @@ -98,11 +100,26 @@ public void close() throws Exception { this.readSession.close(); } - private sealed interface ReadItem permits DataItem, ErrorItem, EndItem {} + interface ReadItem {} + ; - record DataItem(ReadOutput readOutput) implements ReadItem {} + static class DataItem implements ReadItem { + final ReadOutput readOutput; - record ErrorItem(Throwable error) implements ReadItem {} + DataItem(ReadOutput readOutput) { + this.readOutput = readOutput; + } + } - record EndItem() implements ReadItem {} + static class ErrorItem implements ReadItem { + final Throwable error; + + ErrorItem(Throwable error) { + this.error = error; + } + } + + static class EndItem implements ReadItem { + EndItem() {} + } } diff --git a/s2-sdk/src/main/java/s2/client/ReadSession.java b/s2-sdk/src/main/java/s2/client/ReadSession.java index 61d6243..4d0f0fa 100644 --- a/s2-sdk/src/main/java/s2/client/ReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ReadSession.java @@ -84,10 +84,11 @@ private ListenableFuture retrying() { readSessionInner( request.update(nextStartSeqNum.get(), consumedRecords.get(), consumedBytes.get()), resp -> { - if (resp instanceof Batch batch) { + if (resp instanceof Batch) { + final Batch batch = (Batch) resp; var lastRecordIdx = batch.lastSeqNum(); lastRecordIdx.ifPresent(v -> nextStartSeqNum.set(v + 1)); - consumedRecords.addAndGet(batch.sequencedRecordBatch().records().size()); + consumedRecords.addAndGet(batch.sequencedRecordBatch.records.size()); consumedBytes.addAndGet(batch.meteredBytes()); } this.remainingAttempts.set(client.config.maxRetries); diff --git a/s2-sdk/src/main/java/s2/client/StreamClient.java b/s2-sdk/src/main/java/s2/client/StreamClient.java index 1487605..606f358 100644 --- a/s2-sdk/src/main/java/s2/client/StreamClient.java +++ b/s2-sdk/src/main/java/s2/client/StreamClient.java @@ -14,6 +14,7 @@ import s2.auth.BearerTokenCallCredentials; import s2.channel.BasinCompatibleChannel; import s2.channel.ManagedChannelFactory; +import s2.config.AppendRetryPolicy; import s2.config.Config; import s2.types.AppendInput; import s2.types.AppendOutput; @@ -21,6 +22,7 @@ import s2.types.ReadRequest; import s2.types.ReadSessionRequest; import s2.v1alpha.AppendRequest; +import s2.v1alpha.AppendResponse; import s2.v1alpha.AppendSessionRequest; import s2.v1alpha.AppendSessionResponse; import s2.v1alpha.CheckTailRequest; @@ -97,7 +99,7 @@ public ListenableFuture checkTail() { /** * Retrieve a batch of records from a stream, using the unary read RPC. * - * @see s2.client.StreamClient#readSession + * @see StreamClient#readSession * @param request the request * @return future of the read result */ @@ -115,14 +117,13 @@ public ListenableFuture read(ReadRequest request) { * Retrieve batches of records from a stream continuously. * *

This entryway into a read session does internally perform retries (if configured via {@link - * s2.config.Config#maxRetries}). It does not handle any form of backpressure or flow control - * directly. + * Config#maxRetries}). It does not handle any form of backpressure or flow control directly. * *

The stream is interacted with via callbacks, which delegate to an underlying GRPC StreamObserver * class. * - * @see s2.client.StreamClient#managedReadSession + * @see StreamClient#managedReadSession * @param request the request * @param onResponse function to run, sequentially, on each successful message * @param onError function to run on an error @@ -136,15 +137,15 @@ public ReadSession readSession( /** * Retrieve batches of records from a stream continuously, using a buffered queue-backed iterator. * - *

This entryway into a read session, similar to {@link s2.client.StreamClient#readSession}, - * will retry internally if configured. + *

This entryway into a read session, similar to {@link StreamClient#readSession}, will retry + * internally if configured. * *

The GRPC streaming response will be buffered, based on the `maxBufferedBytes` param, * preventing situations where the result of a read accumulate faster than a user can handle. * *

Results are interacted with via an interator-like API, rather than via callbacks. * - * @see s2.client.StreamClient#readSession + * @see StreamClient#readSession * @param request the request * @param maxBufferedBytes the max allowed amount of read response metered bytes to keep in the * buffer @@ -158,35 +159,40 @@ public ManagedReadSession managedReadSession( /** * Append a batch of records to a stream, using the unary append RPC. * - *

Note that the choice of {@link s2.config.Config#appendRetryPolicy} is important. Since - * appends are not idempotent by default, retries could cause duplicates in a stream. If - * you use-case cannot tolerate the potential of duplicate records, make sure to select {@link - * s2.config.AppendRetryPolicy#NO_SIDE_EFFECTS}. + *

Note that the choice of {@link Config#appendRetryPolicy} is important. Since appends are not + * idempotent by default, retries could cause duplicates in a stream. If your use-case + * cannot tolerate the potential of duplicate records, make sure to select {@link + * AppendRetryPolicy#NO_SIDE_EFFECTS}. * - * @see s2.config.Config#appendRetryPolicy - * @see s2.config.AppendRetryPolicy + * @see Config#appendRetryPolicy + * @see AppendRetryPolicy * @param request the request * @return future of the append response */ public ListenableFuture append(AppendInput request) { + ListenableFuture future; + switch (config.appendRetryPolicy) { + case ALL: + future = + withStaticRetries( + config.maxRetries, + () -> + this.futureStub.append( + AppendRequest.newBuilder().setInput(request.toProto(streamName)).build())); + break; + case NO_SIDE_EFFECTS: + future = + this.futureStub.append( + AppendRequest.newBuilder().setInput(request.toProto(streamName)).build()); + break; + default: + throw new UnsupportedOperationException( + "Unsupported append retry policy: " + config.appendRetryPolicy); + } return withTimeout( () -> Futures.transform( - switch (config.appendRetryPolicy) { - case ALL -> - withStaticRetries( - config.maxRetries, - () -> - this.futureStub.append( - AppendRequest.newBuilder() - .setInput(request.toProto(streamName)) - .build())); - case NO_SIDE_EFFECTS -> - this.futureStub.append( - AppendRequest.newBuilder().setInput(request.toProto(streamName)).build()); - }, - response -> AppendOutput.fromProto(response.getOutput()), - executor)); + future, response -> AppendOutput.fromProto(response.getOutput()), executor)); } /** @@ -243,13 +249,13 @@ public void onCompleted() { *

Unlike with {@link StreamClient#appendSession}, this session will attempt to retry * intermittent failures if so elected. * - *

Note that the choice of {@link s2.config.Config#appendRetryPolicy} is important. Since - * appends are not idempotent by default, retries could cause duplicates in a stream. If - * you use-case cannot tolerate the potential of duplicate records, make sure to select {@link - * s2.config.AppendRetryPolicy#NO_SIDE_EFFECTS}. + *

Note that the choice of {@link Config#appendRetryPolicy} is important. Since appends are not + * idempotent by default, retries could cause duplicates in a stream. If you use-case + * cannot tolerate the potential of duplicate records, make sure to select {@link + * AppendRetryPolicy#NO_SIDE_EFFECTS}. * - * @see s2.config.Config#appendRetryPolicy - * @see s2.config.AppendRetryPolicy + * @see Config#appendRetryPolicy + * @see AppendRetryPolicy * @return the managed append session */ public ManagedAppendSession managedAppendSession() { @@ -293,6 +299,16 @@ public StreamClient build() { } } - public record AppendSessionRequestStream( - Consumer onNext, Consumer onError, Runnable onComplete) {} + public static class AppendSessionRequestStream { + final Consumer onNext; + final Consumer onError; + final Runnable onComplete; + + AppendSessionRequestStream( + Consumer onNext, Consumer onError, Runnable onComplete) { + this.onNext = onNext; + this.onError = onError; + this.onComplete = onComplete; + } + } } diff --git a/s2-sdk/src/main/java/s2/config/BasinEndpoint.java b/s2-sdk/src/main/java/s2/config/BasinEndpoint.java index 6cce94b..89809d4 100644 --- a/s2-sdk/src/main/java/s2/config/BasinEndpoint.java +++ b/s2-sdk/src/main/java/s2/config/BasinEndpoint.java @@ -2,7 +2,7 @@ import java.util.Objects; -public abstract sealed class BasinEndpoint permits ParentZone, Direct { +public abstract class BasinEndpoint { public final Address address; BasinEndpoint(Address address) { diff --git a/s2-sdk/src/main/java/s2/config/Endpoints.java b/s2-sdk/src/main/java/s2/config/Endpoints.java index bbf270b..b1bf461 100644 --- a/s2-sdk/src/main/java/s2/config/Endpoints.java +++ b/s2-sdk/src/main/java/s2/config/Endpoints.java @@ -48,7 +48,8 @@ public static Endpoints forCloud(Cloud cloud) { } public boolean singleEndpoint() { - if (this.basin instanceof Direct direct) { + if (this.basin instanceof Direct) { + final Direct direct = (Direct) this.basin; return this.account.equals(direct.address); } else if (this.basin instanceof ParentZone) { return false; diff --git a/s2-sdk/src/main/java/s2/types/AppendOutput.java b/s2-sdk/src/main/java/s2/types/AppendOutput.java index 1d64140..b1e71a6 100644 --- a/s2-sdk/src/main/java/s2/types/AppendOutput.java +++ b/s2-sdk/src/main/java/s2/types/AppendOutput.java @@ -1,8 +1,25 @@ package s2.types; -public record AppendOutput(long startSeqNum, long endSeqNum, long nextSeqNum) { +public class AppendOutput { + public final long startSeqNum; + public final long endSeqNum; + public final long nextSeqNum; + + AppendOutput(long startSeqNum, long endSeqNum, long nextSeqNum) { + this.startSeqNum = startSeqNum; + this.endSeqNum = endSeqNum; + this.nextSeqNum = nextSeqNum; + } + public static AppendOutput fromProto(s2.v1alpha.AppendOutput appendOutput) { return new AppendOutput( appendOutput.getStartSeqNum(), appendOutput.getEndSeqNum(), appendOutput.getNextSeqNum()); } + + @Override + public String toString() { + return String.format( + "AppendOutput[startSeqNum=%s, endSeqNum=%s, nextSeqNum=%s]", + startSeqNum, endSeqNum, nextSeqNum); + } } diff --git a/s2-sdk/src/main/java/s2/types/AppendRecord.java b/s2-sdk/src/main/java/s2/types/AppendRecord.java index 546e798..34a35aa 100644 --- a/s2-sdk/src/main/java/s2/types/AppendRecord.java +++ b/s2-sdk/src/main/java/s2/types/AppendRecord.java @@ -24,7 +24,7 @@ public static AppendRecordBuilder newBuilder() { public long meteredBytes() { return 8 + (2L * this.headers.size()) - + this.headers.stream().map(h -> h.name().size() + h.value().size()).reduce(0, Integer::sum) + + this.headers.stream().map(h -> h.name.size() + h.value.size()).reduce(0, Integer::sum) + this.body.size(); } diff --git a/s2-sdk/src/main/java/s2/types/BasinConfig.java b/s2-sdk/src/main/java/s2/types/BasinConfig.java index 24523a4..3c45f6f 100644 --- a/s2-sdk/src/main/java/s2/types/BasinConfig.java +++ b/s2-sdk/src/main/java/s2/types/BasinConfig.java @@ -1,6 +1,12 @@ package s2.types; -public record BasinConfig(StreamConfig defaultStreamConfig) { +public class BasinConfig { + public final StreamConfig defaultStreamConfig; + + public BasinConfig(StreamConfig defaultStreamConfig) { + this.defaultStreamConfig = defaultStreamConfig; + } + public s2.v1alpha.BasinConfig toProto() { return s2.v1alpha.BasinConfig.newBuilder() .setDefaultStreamConfig(defaultStreamConfig.toProto()) diff --git a/s2-sdk/src/main/java/s2/types/BasinInfo.java b/s2-sdk/src/main/java/s2/types/BasinInfo.java index 3cd31e6..d72f22b 100644 --- a/s2-sdk/src/main/java/s2/types/BasinInfo.java +++ b/s2-sdk/src/main/java/s2/types/BasinInfo.java @@ -1,15 +1,38 @@ package s2.types; -public record BasinInfo(String name, String scope, String cell, BasinState state) { +public class BasinInfo { + public final String name; + public final String scope; + public final String cell; + public final BasinState basinState; + + BasinInfo(String name, String scope, String cell, BasinState basinState) { + this.name = name; + this.scope = scope; + this.cell = cell; + this.basinState = basinState; + } + public static BasinInfo fromProto(s2.v1alpha.BasinInfo basinInfo) { - var state = - switch (basinInfo.getState()) { - case BASIN_STATE_UNSPECIFIED -> BasinState.UNSPECIFIED; - case BASIN_STATE_ACTIVE -> BasinState.ACTIVE; - case BASIN_STATE_CREATING -> BasinState.CREATING; - case BASIN_STATE_DELETING -> BasinState.DELETING; - default -> BasinState.UNKNOWN; - }; + BasinState state; + switch (basinInfo.getState()) { + case BASIN_STATE_UNSPECIFIED: + state = BasinState.UNSPECIFIED; + break; + case BASIN_STATE_ACTIVE: + state = BasinState.ACTIVE; + break; + case BASIN_STATE_CREATING: + state = BasinState.CREATING; + break; + case BASIN_STATE_DELETING: + state = BasinState.DELETING; + break; + default: + state = BasinState.UNKNOWN; + break; + } + return new BasinInfo(basinInfo.getName(), basinInfo.getScope(), basinInfo.getCell(), state); } } diff --git a/s2-sdk/src/main/java/s2/types/Batch.java b/s2-sdk/src/main/java/s2/types/Batch.java index a043a11..b30801b 100644 --- a/s2-sdk/src/main/java/s2/types/Batch.java +++ b/s2-sdk/src/main/java/s2/types/Batch.java @@ -2,19 +2,22 @@ import java.util.Optional; -public record Batch(SequencedRecordBatch sequencedRecordBatch) implements ReadOutput, MeteredBytes { +public final class Batch implements ReadOutput, MeteredBytes { + + public final SequencedRecordBatch sequencedRecordBatch; + + Batch(SequencedRecordBatch sequencedRecordBatch) { + this.sequencedRecordBatch = sequencedRecordBatch; + } public Optional firstSeqNum() { - return this.sequencedRecordBatch.records().stream().findFirst().map(SequencedRecord::seqNum); + return this.sequencedRecordBatch.records.stream().findFirst().map(sr -> sr.seqNum); } public Optional lastSeqNum() { - if (!this.sequencedRecordBatch.records().isEmpty()) { + if (!this.sequencedRecordBatch.records.isEmpty()) { return Optional.of( - this.sequencedRecordBatch - .records() - .get(sequencedRecordBatch.records().size() - 1) - .seqNum()); + this.sequencedRecordBatch.records.get(sequencedRecordBatch.records.size() - 1).seqNum); } else { return Optional.empty(); } diff --git a/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java b/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java index aa26119..b5d56f9 100644 --- a/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java +++ b/s2-sdk/src/main/java/s2/types/CreateBasinRequest.java @@ -26,7 +26,7 @@ public s2.v1alpha.CreateBasinRequest toProto() { return builder.build(); } - abstract static sealed class BasinAssignment permits Scope, Cell { + abstract static class BasinAssignment { public final String value; BasinAssignment(String value) { diff --git a/s2-sdk/src/main/java/s2/types/FirstSeqNum.java b/s2-sdk/src/main/java/s2/types/FirstSeqNum.java index 7eaa28c..16d6548 100644 --- a/s2-sdk/src/main/java/s2/types/FirstSeqNum.java +++ b/s2-sdk/src/main/java/s2/types/FirstSeqNum.java @@ -1,3 +1,10 @@ package s2.types; -public record FirstSeqNum(long value) implements ReadOutput {} +public final class FirstSeqNum implements ReadOutput { + + public final long value; + + public FirstSeqNum(long value) { + this.value = value; + } +} diff --git a/s2-sdk/src/main/java/s2/types/Header.java b/s2-sdk/src/main/java/s2/types/Header.java index 7f526df..ac7e0c5 100644 --- a/s2-sdk/src/main/java/s2/types/Header.java +++ b/s2-sdk/src/main/java/s2/types/Header.java @@ -2,13 +2,20 @@ import com.google.protobuf.ByteString; -public record Header(ByteString name, ByteString value) { +public class Header { + public final ByteString name; + public final ByteString value; - public static Header fromProto(s2.v1alpha.Header header) { - return new Header(header.getName(), header.getValue()); + public Header(ByteString name, ByteString value) { + this.name = name; + this.value = value; } public s2.v1alpha.Header toProto() { return s2.v1alpha.Header.newBuilder().setName(name).setValue(value).build(); } + + public static Header fromProto(s2.v1alpha.Header protoHeader) { + return new Header(protoHeader.getName(), protoHeader.getValue()); + } } diff --git a/s2-sdk/src/main/java/s2/types/NextSeqNum.java b/s2-sdk/src/main/java/s2/types/NextSeqNum.java index 062b3b2..8e44361 100644 --- a/s2-sdk/src/main/java/s2/types/NextSeqNum.java +++ b/s2-sdk/src/main/java/s2/types/NextSeqNum.java @@ -1,3 +1,9 @@ package s2.types; -public record NextSeqNum(long value) implements ReadOutput {} +public final class NextSeqNum implements ReadOutput { + public final long value; + + public NextSeqNum(long value) { + this.value = value; + } +} diff --git a/s2-sdk/src/main/java/s2/types/Paginated.java b/s2-sdk/src/main/java/s2/types/Paginated.java index 7025d2d..639e5c0 100644 --- a/s2-sdk/src/main/java/s2/types/Paginated.java +++ b/s2-sdk/src/main/java/s2/types/Paginated.java @@ -2,4 +2,12 @@ import java.util.List; -public record Paginated(boolean hasMore, List elems) {} +public class Paginated { + public final boolean hasMore; + public final List elems; + + public Paginated(boolean hasMore, List elems) { + this.hasMore = hasMore; + this.elems = elems; + } +} diff --git a/s2-sdk/src/main/java/s2/types/ReadOutput.java b/s2-sdk/src/main/java/s2/types/ReadOutput.java index 61fb602..3772d7c 100644 --- a/s2-sdk/src/main/java/s2/types/ReadOutput.java +++ b/s2-sdk/src/main/java/s2/types/ReadOutput.java @@ -1,18 +1,15 @@ package s2.types; -public sealed interface ReadOutput permits Batch, FirstSeqNum, NextSeqNum { +public interface ReadOutput { static ReadOutput fromProto(s2.v1alpha.ReadOutput readOutput) { switch (readOutput.getOutputCase()) { - case BATCH -> { + case BATCH: return new Batch(SequencedRecordBatch.fromProto(readOutput.getBatch())); - } - case FIRST_SEQ_NUM -> { + case FIRST_SEQ_NUM: return new FirstSeqNum(readOutput.getFirstSeqNum()); - } - case NEXT_SEQ_NUM -> { + case NEXT_SEQ_NUM: return new NextSeqNum(readOutput.getNextSeqNum()); - } } - throw new IllegalStateException("Unrecognized readOutput case: " + readOutput.getOutputCase()); + throw new IllegalStateException("Unrecognized readOutput case: " + readOutput); } } diff --git a/s2-sdk/src/main/java/s2/types/RetentionPolicy.java b/s2-sdk/src/main/java/s2/types/RetentionPolicy.java index 1a3359d..c4baf62 100644 --- a/s2-sdk/src/main/java/s2/types/RetentionPolicy.java +++ b/s2-sdk/src/main/java/s2/types/RetentionPolicy.java @@ -1,3 +1,3 @@ package s2.types; -public abstract sealed class RetentionPolicy permits Age {} +public abstract class RetentionPolicy {} diff --git a/s2-sdk/src/main/java/s2/types/SequencedRecord.java b/s2-sdk/src/main/java/s2/types/SequencedRecord.java index f581026..d111f64 100644 --- a/s2-sdk/src/main/java/s2/types/SequencedRecord.java +++ b/s2-sdk/src/main/java/s2/types/SequencedRecord.java @@ -2,21 +2,33 @@ import com.google.protobuf.ByteString; import java.util.List; +import java.util.stream.Collectors; -public record SequencedRecord(Long seqNum, List

headers, ByteString body) - implements MeteredBytes { - public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedRecord) { - return new SequencedRecord( - sequencedRecord.getSeqNum(), - sequencedRecord.getHeadersList().stream().map(Header::fromProto).toList(), - sequencedRecord.getBody()); +public class SequencedRecord implements MeteredBytes { + public final long seqNum; + public final List
headers; + public final ByteString body; + + SequencedRecord(long seqNum, List
headers, ByteString body) { + this.seqNum = seqNum; + this.headers = headers; + this.body = body; } @Override public long meteredBytes() { return 8 + (2L * this.headers.size()) - + this.headers.stream().map(h -> h.name().size() + h.value().size()).reduce(0, Integer::sum) + + this.headers.stream().map(h -> h.name.size() + h.value.size()).reduce(0, Integer::sum) + this.body.size(); } + + public static SequencedRecord fromProto(s2.v1alpha.SequencedRecord sequencedRecord) { + return new SequencedRecord( + sequencedRecord.getSeqNum(), + sequencedRecord.getHeadersList().stream() + .map(Header::fromProto) + .collect(Collectors.toList()), + sequencedRecord.getBody()); + } } diff --git a/s2-sdk/src/main/java/s2/types/SequencedRecordBatch.java b/s2-sdk/src/main/java/s2/types/SequencedRecordBatch.java index a7923e2..8879624 100644 --- a/s2-sdk/src/main/java/s2/types/SequencedRecordBatch.java +++ b/s2-sdk/src/main/java/s2/types/SequencedRecordBatch.java @@ -1,12 +1,20 @@ package s2.types; import java.util.List; +import java.util.stream.Collectors; -public record SequencedRecordBatch(List records) implements MeteredBytes { +public class SequencedRecordBatch implements MeteredBytes { + public final List records; + + SequencedRecordBatch(List records) { + this.records = records; + } public static SequencedRecordBatch fromProto(s2.v1alpha.SequencedRecordBatch batch) { return new SequencedRecordBatch( - batch.getRecordsList().stream().map(SequencedRecord::fromProto).toList()); + batch.getRecordsList().stream() + .map(SequencedRecord::fromProto) + .collect(Collectors.toList())); } @Override diff --git a/s2-sdk/src/main/java/s2/types/StreamConfig.java b/s2-sdk/src/main/java/s2/types/StreamConfig.java index 5eb73b1..e558e87 100644 --- a/s2-sdk/src/main/java/s2/types/StreamConfig.java +++ b/s2-sdk/src/main/java/s2/types/StreamConfig.java @@ -2,6 +2,7 @@ import java.time.Duration; import java.util.Optional; +import s2.v1alpha.StreamConfig.RetentionPolicyCase; public class StreamConfig { @@ -14,18 +15,27 @@ public class StreamConfig { } public static StreamConfig fromProto(s2.v1alpha.StreamConfig proto) { - var storageClass = - switch (proto.getStorageClass()) { - case STORAGE_CLASS_UNSPECIFIED -> StorageClass.UNSPECIFIED; - case STORAGE_CLASS_STANDARD -> StorageClass.STANDARD; - case STORAGE_CLASS_EXPRESS -> StorageClass.EXPRESS; - default -> StorageClass.UNKNOWN; - }; - Optional retentionPolicy = - switch (proto.getRetentionPolicyCase()) { - case AGE -> Optional.of(new Age(Duration.ofSeconds(proto.getAge()))); - case RETENTIONPOLICY_NOT_SET -> Optional.empty(); - }; + StorageClass storageClass; + switch (proto.getStorageClass()) { + case STORAGE_CLASS_UNSPECIFIED: + storageClass = StorageClass.UNSPECIFIED; + break; + case STORAGE_CLASS_STANDARD: + storageClass = StorageClass.STANDARD; + break; + case STORAGE_CLASS_EXPRESS: + storageClass = StorageClass.EXPRESS; + break; + default: + storageClass = StorageClass.UNKNOWN; + break; + } + + Optional retentionPolicy = Optional.empty(); + + if (proto.getRetentionPolicyCase() == RetentionPolicyCase.AGE) { + retentionPolicy = Optional.of(new Age(Duration.ofSeconds(proto.getAge()))); + } return new StreamConfig(storageClass, retentionPolicy); } diff --git a/s2-sdk/src/main/java/s2/types/StreamInfo.java b/s2-sdk/src/main/java/s2/types/StreamInfo.java index 8bd03e6..0672baa 100644 --- a/s2-sdk/src/main/java/s2/types/StreamInfo.java +++ b/s2-sdk/src/main/java/s2/types/StreamInfo.java @@ -3,7 +3,17 @@ import java.time.Instant; import java.util.Optional; -public record StreamInfo(String name, Instant createdAt, Optional deletedAt) { +public class StreamInfo { + public final String name; + public final Instant createdAt; + public final Optional deletedAt; + + private StreamInfo(String name, Instant createdAt, Optional deletedAt) { + this.name = name; + this.createdAt = createdAt; + this.deletedAt = deletedAt; + } + public static StreamInfo fromProto(s2.v1alpha.StreamInfo streamInfo) { var createdAt = Instant.ofEpochSecond(streamInfo.getCreatedAt()); Optional deletedAt = diff --git a/s2-sdk/src/test/java/s2/client/ReadSessionTest.java b/s2-sdk/src/test/java/s2/client/ReadSessionTest.java index 49031be..c5be008 100644 --- a/s2-sdk/src/test/java/s2/client/ReadSessionTest.java +++ b/s2-sdk/src/test/java/s2/client/ReadSessionTest.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -79,10 +80,10 @@ public void testReadSession() throws Exception { System.out.println(received); var flattenedRecords = received.stream() - .flatMap(o -> ((Batch) o).sequencedRecordBatch().records().stream()) - .toList(); + .flatMap(o -> ((Batch) o).sequencedRecordBatch.records.stream()) + .collect(Collectors.toList()); assertThat(flattenedRecords.size()).isEqualTo(25); IntStream.range(0, flattenedRecords.size()) - .forEach(i -> assertThat(flattenedRecords.get(i).seqNum()).isEqualTo(i)); + .forEach(i -> assertThat(flattenedRecords.get(i).seqNum).isEqualTo(i)); } }