diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 5b3191e..a50e2bb 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -52,6 +52,6 @@ tasks.test { java { withSourcesJar() toolchain { - languageVersion.set(JavaLanguageVersion.of(17)) + languageVersion.set(JavaLanguageVersion.of(19)) } } diff --git a/app/src/main/java/org/example/app/AccountDemo.java b/app/src/main/java/org/example/app/AccountDemo.java index 4855761..d87c78f 100644 --- a/app/src/main/java/org/example/app/AccountDemo.java +++ b/app/src/main/java/org/example/app/AccountDemo.java @@ -23,7 +23,7 @@ public static void main(String[] args) throws Exception { .withEndpoints(Endpoints.fromEnvironment()) .build(); - try (var client = new Client(config)) { + try (var client = Client.newBuilder(config).build()) { var basins = client.listBasins(ListBasinsRequest.newBuilder().build()).get(); basins.elems().forEach(basin -> logger.info("basin={}", basin)); diff --git a/app/src/main/java/org/example/app/BasinDemo.java b/app/src/main/java/org/example/app/BasinDemo.java index 99e3276..3219bb2 100644 --- a/app/src/main/java/org/example/app/BasinDemo.java +++ b/app/src/main/java/org/example/app/BasinDemo.java @@ -2,7 +2,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import s2.client.Client; +import s2.client.BasinClient; import s2.config.Config; import s2.config.Endpoints; import s2.types.CreateStreamRequest; @@ -14,15 +14,14 @@ public class BasinDemo { private static final Logger logger = LoggerFactory.getLogger(BasinDemo.class.getName()); public static void main(String[] args) throws Exception { - var config = + final var config = Config.newBuilder(System.getenv("S2_AUTH_TOKEN")) .withEndpoints(Endpoints.fromEnvironment()) .build(); - try (var client = new Client(config)) { - - var basinClient = client.basinClient("my-first-basin"); - var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get(); + try (final var basinClient = + BasinClient.newBuilder(config, System.getenv("S2_BASIN")).build()) { + final var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get(); streams .elems() .forEach( diff --git a/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java b/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java index 90833b5..a19d8d2 100644 --- a/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedAppendSessionDemo.java @@ -11,7 +11,8 @@ import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import s2.client.Client; +import s2.channel.ManagedChannelFactory; +import s2.client.StreamClient; import s2.config.AppendRetryPolicy; import s2.config.Config; import s2.config.Endpoints; @@ -21,27 +22,6 @@ public class ManagedAppendSessionDemo { - static class RandomASCIIStringGenerator { - private static final String ASCII_PRINTABLE_CHARACTERS = - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789"; - - private static final Random RANDOM = new Random(); - - public static String generateRandomASCIIString(String prefix, int length) { - if (length < 0) { - throw new IllegalArgumentException("Length cannot be negative."); - } - - StringBuilder sb = new StringBuilder(length); - sb.append(prefix); - for (int i = 0; i < length - prefix.length(); i++) { - int index = RANDOM.nextInt(ASCII_PRINTABLE_CHARACTERS.length()); - sb.append(ASCII_PRINTABLE_CHARACTERS.charAt(index)); - } - return sb.toString(); - } - } - private static final Logger logger = LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName()); @@ -69,69 +49,88 @@ public static void main(String[] args) throws Exception { final LinkedBlockingQueue> pendingAppends = new LinkedBlockingQueue<>(); - var executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); - var consumer = - executor.submit( - () -> { - try { - while (true) { - var output = pendingAppends.take().get(); - if (output == null) { - logger.info("consumer closing"); - break; + try (final var executor = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(4)); + final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) { + + final var consumer = + executor.submit( + () -> { + try { + while (true) { + var output = pendingAppends.take().get(); + if (output == null) { + logger.info("consumer closing"); + break; + } + logger.info("consumer got: {}", output); } - logger.info("consumer got: {}", output); + } catch (Exception e) { + logger.error("consumer failed", e); } - } catch (Exception e) { - logger.error("consumer failed", e); - } - }); - - try (var client = new Client(config)) { - - final var streamClient = client.basinClient(basinName).streamClient(streamName); - final var futureAppendSession = streamClient.managedAppendSession(); - - for (var i = 0; i < 50_000; i++) { - try { - // Generate a record with approximately 10KiB of random text. - var payload = RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10); - var append = - futureAppendSession.submit( - AppendInput.newBuilder() - .withRecords( - List.of( - AppendRecord.newBuilder() - .withBody(payload.getBytes(StandardCharsets.UTF_8)) - .build())) - .build(), - // Duration is how long we are willing to wait to receive a future. - Duration.ofSeconds(1)); - - pendingAppends.add(append); - } catch (RuntimeException e) { - logger.error("producer failed", e); - pendingAppends.add(Futures.immediateFailedFuture(e)); - break; + }); + + final var streamClient = + StreamClient.newBuilder(config, basinName, streamName) + .withExecutor(executor) + .withChannel(channel) + .build(); + + try (final var futureAppendSession = streamClient.managedAppendSession()) { + + for (var i = 0; i < 50_000; i++) { + try { + // Generate a record with approximately 10KiB of random text. + var payload = + RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10); + var append = + futureAppendSession.submit( + AppendInput.newBuilder() + .withRecords( + List.of( + AppendRecord.newBuilder() + .withBody(payload.getBytes(StandardCharsets.UTF_8)) + .build())) + .build(), + // Duration is how long we are willing to wait to receive a future. + Duration.ofSeconds(10)); + + pendingAppends.add(append); + } catch (RuntimeException e) { + logger.error("producer failed", e); + pendingAppends.add(Futures.immediateFailedFuture(e)); + break; + } } + + logger.info("finished submitting all appends"); + + // Signal to the consumer that no further appends are happening. + pendingAppends.add(Futures.immediateFuture(null)); } - logger.info("finished submitting all appends"); + consumer.get(); + } + } - // Signal to the consumer that no further appends are happening. - pendingAppends.add(Futures.immediateFuture(null)); + static class RandomASCIIStringGenerator { + private static final String ASCII_PRINTABLE_CHARACTERS = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789"; + + private static final Random RANDOM = new Random(); - logger.info("starting graceful close"); - try { - futureAppendSession.closeGracefully().get(); - } catch (Exception e) { - logger.error("caught exception during close", e); + public static String generateRandomASCIIString(String prefix, int length) { + if (length < 0) { + throw new IllegalArgumentException("Length cannot be negative."); } - logger.info("finished closing"); - } - // Await the consumer future. - consumer.get(); - executor.shutdown(); + StringBuilder sb = new StringBuilder(length); + sb.append(prefix); + for (int i = 0; i < length - prefix.length(); i++) { + int index = RANDOM.nextInt(ASCII_PRINTABLE_CHARACTERS.length()); + sb.append(ASCII_PRINTABLE_CHARACTERS.charAt(index)); + } + return sb.toString(); + } } } diff --git a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java index 41fcfc3..84323d9 100644 --- a/app/src/main/java/org/example/app/ManagedReadSessionDemo.java +++ b/app/src/main/java/org/example/app/ManagedReadSessionDemo.java @@ -1,10 +1,12 @@ package org.example.app; import java.time.Duration; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import s2.client.Client; +import s2.channel.ManagedChannelFactory; +import s2.client.StreamClient; import s2.config.Config; import s2.config.Endpoints; import s2.types.Batch; @@ -33,38 +35,44 @@ public static void main(String[] args) throws Exception { var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build(); - try (var client = new Client(config)) { + try (final var executor = new ScheduledThreadPoolExecutor(1); + final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) { - var streamClient = client.basinClient(basinName).streamClient(streamName); + final var streamClient = + StreamClient.newBuilder(config, basinName, streamName) + .withExecutor(executor) + .withChannel(channel) + .build(); - var managedSession = + try (final var managedSession = streamClient.managedReadSession( ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(), - 1024 * 1024 * 1024); + 1024 * 1024 * 1024)) { - AtomicLong receivedBytes = new AtomicLong(); - while (!managedSession.isClosed()) { - // Poll for up to 1 minute. - var resp = managedSession.get(Duration.ofSeconds(60)); - resp.ifPresentOrElse( - elem -> { - if (elem instanceof Batch batch) { - var size = batch.meteredBytes(); - logger.info( - "batch of {} bytes, seqnums {}..={}", - size, - batch.firstSeqNum(), - batch.lastSeqNum()); - receivedBytes.addAndGet(size); - } else { - logger.info("non batch received: {}", elem); - } - }, - () -> { - logger.info("no batch"); - }); + AtomicLong receivedBytes = new AtomicLong(); + while (!managedSession.isClosed()) { + // Poll for up to 1 minute. + var resp = managedSession.get(Duration.ofSeconds(60)); + resp.ifPresentOrElse( + elem -> { + if (elem instanceof Batch batch) { + var size = batch.meteredBytes(); + logger.info( + "batch of {} bytes, seqnums {}..={}", + size, + batch.firstSeqNum(), + batch.lastSeqNum()); + receivedBytes.addAndGet(size); + } else { + logger.info("non batch received: {}", elem); + } + }, + () -> { + logger.info("no batch"); + }); + } + logger.info("finished, received {} bytes in total", receivedBytes.get()); } - logger.info("finished, received {} bytes in total", receivedBytes.get()); } } } diff --git a/app/src/main/resources/logback.xml b/app/src/main/resources/logback.xml index a74093b..261510c 100644 --- a/app/src/main/resources/logback.xml +++ b/app/src/main/resources/logback.xml @@ -7,7 +7,7 @@ - + diff --git a/gradle.properties b/gradle.properties index 5e640e1..e702813 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.0.12 \ No newline at end of file +version=0.0.13-SNAPSHOT \ No newline at end of file diff --git a/s2-sdk/src/main/java/s2/channel/AccountChannel.java b/s2-sdk/src/main/java/s2/channel/AccountChannel.java new file mode 100644 index 0000000..8991140 --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/AccountChannel.java @@ -0,0 +1,16 @@ +package s2.channel; + +import io.grpc.ManagedChannel; + +public final class AccountChannel extends AutoClosableManagedChannel + implements AccountCompatibleChannel { + + AccountChannel(ManagedChannel managedChannel) { + super(managedChannel); + } + + @Override + public AutoClosableManagedChannel getChannel() { + return this; + } +} diff --git a/s2-sdk/src/main/java/s2/channel/AccountCompatibleChannel.java b/s2-sdk/src/main/java/s2/channel/AccountCompatibleChannel.java new file mode 100644 index 0000000..aad6747 --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/AccountCompatibleChannel.java @@ -0,0 +1,5 @@ +package s2.channel; + +public interface AccountCompatibleChannel { + AutoClosableManagedChannel getChannel(); +} diff --git a/s2-sdk/src/main/java/s2/channel/AutoClosableManagedChannel.java b/s2-sdk/src/main/java/s2/channel/AutoClosableManagedChannel.java new file mode 100644 index 0000000..be9c3c3 --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/AutoClosableManagedChannel.java @@ -0,0 +1,32 @@ +package s2.channel; + +import io.grpc.ManagedChannel; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AutoClosableManagedChannel implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(AutoClosableManagedChannel.class); + public final ManagedChannel managedChannel; + + public AutoClosableManagedChannel(ManagedChannel managedChannel) { + this.managedChannel = managedChannel; + } + + @Override + public void close() { + managedChannel.shutdown(); + try { + if (!managedChannel.awaitTermination(5, TimeUnit.SECONDS)) { + managedChannel.shutdownNow(); + if (!managedChannel.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warn("Channel did not terminate within 10s total"); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + managedChannel.shutdownNow(); + } + } +} diff --git a/s2-sdk/src/main/java/s2/channel/BasinChannel.java b/s2-sdk/src/main/java/s2/channel/BasinChannel.java new file mode 100644 index 0000000..419a497 --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/BasinChannel.java @@ -0,0 +1,16 @@ +package s2.channel; + +import io.grpc.ManagedChannel; + +public final class BasinChannel extends AutoClosableManagedChannel + implements BasinCompatibleChannel { + + BasinChannel(ManagedChannel channel) { + super(channel); + } + + @Override + public AutoClosableManagedChannel getChannel() { + return this; + } +} diff --git a/s2-sdk/src/main/java/s2/channel/BasinCompatibleChannel.java b/s2-sdk/src/main/java/s2/channel/BasinCompatibleChannel.java new file mode 100644 index 0000000..8682b0c --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/BasinCompatibleChannel.java @@ -0,0 +1,5 @@ +package s2.channel; + +public interface BasinCompatibleChannel { + AutoClosableManagedChannel getChannel(); +} diff --git a/s2-sdk/src/main/java/s2/channel/CombinedChannel.java b/s2-sdk/src/main/java/s2/channel/CombinedChannel.java new file mode 100644 index 0000000..2eef03b --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/CombinedChannel.java @@ -0,0 +1,16 @@ +package s2.channel; + +import io.grpc.ManagedChannel; + +public final class CombinedChannel extends AutoClosableManagedChannel + implements AccountCompatibleChannel, BasinCompatibleChannel { + + CombinedChannel(ManagedChannel managedChannel) { + super(managedChannel); + } + + @Override + public AutoClosableManagedChannel getChannel() { + return this; + } +} diff --git a/s2-sdk/src/main/java/s2/channel/ManagedChannelFactory.java b/s2-sdk/src/main/java/s2/channel/ManagedChannelFactory.java new file mode 100644 index 0000000..9c20400 --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/ManagedChannelFactory.java @@ -0,0 +1,29 @@ +package s2.channel; + +import io.grpc.ManagedChannelBuilder; +import s2.config.Config; + +public class ManagedChannelFactory { + public static AccountChannel forAccountService(Config config) { + return new AccountChannel( + ManagedChannelBuilder.forAddress( + config.endpoints.account.host, config.endpoints.account.port) + .build()); + } + + public static BasinChannel forBasinOrStreamService(Config config, String basinName) { + return new BasinChannel( + ManagedChannelBuilder.forTarget(config.endpoints.basin.toTarget(basinName)).build()); + } + + public static CombinedChannel forCombinedChannel(Config config) { + if (!config.endpoints.singleEndpoint()) { + throw new IllegalArgumentException( + "Combined channel cannot be used when account and basin endpoints differ."); + } + return new CombinedChannel( + ManagedChannelBuilder.forAddress( + config.endpoints.account.host, config.endpoints.account.port) + .build()); + } +} diff --git a/s2-sdk/src/main/java/s2/channel/package-info.java b/s2-sdk/src/main/java/s2/channel/package-info.java new file mode 100644 index 0000000..79bfd3e --- /dev/null +++ b/s2-sdk/src/main/java/s2/channel/package-info.java @@ -0,0 +1,2 @@ +/** Utilities and classes for constructing gRPC channels for communicating with S2 services. */ +package s2.channel; diff --git a/s2-sdk/src/main/java/s2/client/BaseClient.java b/s2-sdk/src/main/java/s2/client/BaseClient.java index 12371c1..ee6d264 100644 --- a/s2-sdk/src/main/java/s2/client/BaseClient.java +++ b/s2-sdk/src/main/java/s2/client/BaseClient.java @@ -2,26 +2,53 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.ManagedChannel; import io.grpc.Status; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import s2.channel.AutoClosableManagedChannel; import s2.config.Config; public abstract class BaseClient implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(BaseClient.class.getName()); final Config config; - final ManagedChannel channel; + final AutoClosableManagedChannel channel; final ScheduledExecutorService executor; + final boolean ownedChannel; + final boolean ownedExecutor; - BaseClient(Config config, ManagedChannel channel, ScheduledExecutorService executor) { + BaseClient( + Config config, + AutoClosableManagedChannel channel, + ScheduledExecutorService executor, + boolean ownedChannel, + boolean ownedExecutor) { this.config = config; this.channel = channel; this.executor = executor; + this.ownedChannel = ownedChannel; + this.ownedExecutor = ownedExecutor; + } + + static ScheduledExecutorService defaultExecutor(String name) { + return Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors(), + new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(Runnable r) { + Thread thread = defaultFactory.newThread(r); + thread.setDaemon(true); + thread.setName(String.format("S2-%s-%s", name, thread.getId())); + return thread; + } + }); } static boolean retryableStatus(Status status) { @@ -31,9 +58,13 @@ static boolean retryableStatus(Status status) { }; } - public void close() throws Exception { - channel.shutdown(); - executor.shutdown(); + public void close() { + if (this.ownedChannel) { + this.channel.close(); + } + if (this.ownedExecutor) { + this.executor.shutdown(); + } } ListenableFuture withTimeout(Supplier> op) { diff --git a/s2-sdk/src/main/java/s2/client/BasinClient.java b/s2-sdk/src/main/java/s2/client/BasinClient.java index 8f918ac..6d7551d 100644 --- a/s2-sdk/src/main/java/s2/client/BasinClient.java +++ b/s2-sdk/src/main/java/s2/client/BasinClient.java @@ -2,14 +2,15 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.stub.MetadataUtils; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import s2.auth.BearerTokenCallCredentials; +import s2.channel.BasinCompatibleChannel; +import s2.channel.ManagedChannelFactory; import s2.config.Config; import s2.types.CreateStreamRequest; import s2.types.Paginated; @@ -27,49 +28,27 @@ public class BasinClient extends BaseClient { private final BasinServiceGrpc.BasinServiceFutureStub futureStub; - /** - * Instantiates a new Basin client. - * - *

Most users will prefer to use the {@link s2.client.Client#basinClient(String)} method for - * construction. - * - * @see s2.client.Client#basinClient - * @param basin the basin - * @param config the config - * @param executor the executor - */ - public BasinClient(String basin, Config config, ScheduledExecutorService executor) { - this( - basin, - config, - ManagedChannelBuilder.forTarget(config.endpoints.basin.toTarget(basin)).build(), - executor); - } - - /** - * Instantiates a new Basin client. - * - *

Most users will prefer to use the {@link s2.client.Client#basinClient(String)} method for - * construction. - * - * @see s2.client.Client#basinClient - * @param basin the basin - * @param config the config - * @param channel the channel - * @param executor the executor - */ - public BasinClient( - String basin, Config config, ManagedChannel channel, ScheduledExecutorService executor) { - super(config, channel, executor); + BasinClient( + Config config, + String basin, + BasinCompatibleChannel channel, + ScheduledExecutorService executor, + boolean ownedChannel, + boolean ownedExecutor) { + super(config, channel.getChannel(), executor, ownedChannel, ownedExecutor); var meta = new Metadata(); meta.put(Key.of("s2-basin", Metadata.ASCII_STRING_MARSHALLER), basin); this.basin = basin; this.futureStub = - BasinServiceGrpc.newFutureStub(channel) + BasinServiceGrpc.newFutureStub(channel.getChannel().managedChannel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); } + public static BasinClientBuilder newBuilder(Config config, String basin) { + return new BasinClientBuilder(config, basin); + } + /** * List streams within the basin. * @@ -172,13 +151,36 @@ public ListenableFuture reconfigureStream( executor)); } - /** - * Create a StreamClient for interacting with stream-level RPCs. - * - * @param streamName the stream name - * @return the stream client - */ - public StreamClient streamClient(String streamName) { - return new StreamClient(streamName, this.basin, this.config, this.channel, this.executor); + public static class BasinClientBuilder { + private final Config config; + private final String basin; + private Optional channel = Optional.empty(); + private Optional executor = Optional.empty(); + + BasinClientBuilder(Config config, String basin) { + this.config = config; + this.basin = basin; + } + + public BasinClientBuilder withChannel(BasinCompatibleChannel channel) { + this.channel = Optional.of(channel); + return this; + } + + public BasinClientBuilder withExecutor(ScheduledExecutorService executor) { + this.executor = Optional.of(executor); + return this; + } + + public BasinClient build() { + return new BasinClient( + this.config, + this.basin, + this.channel.orElseGet( + () -> ManagedChannelFactory.forBasinOrStreamService(this.config, this.basin)), + this.executor.orElseGet(() -> BaseClient.defaultExecutor("basinClient")), + this.channel.isEmpty(), + this.executor.isEmpty()); + } } } diff --git a/s2-sdk/src/main/java/s2/client/Client.java b/s2-sdk/src/main/java/s2/client/Client.java index 2fb5850..1e39934 100644 --- a/s2-sdk/src/main/java/s2/client/Client.java +++ b/s2-sdk/src/main/java/s2/client/Client.java @@ -2,18 +2,17 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.stub.MetadataUtils; +import java.util.Optional; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import s2.auth.BearerTokenCallCredentials; +import s2.channel.AccountCompatibleChannel; +import s2.channel.ManagedChannelFactory; import s2.config.Config; import s2.types.BasinConfig; import s2.types.BasinInfo; @@ -31,54 +30,22 @@ public class Client extends BaseClient { private static final Logger logger = LoggerFactory.getLogger(Client.class.getName()); private final AccountServiceGrpc.AccountServiceFutureStub futureStub; - /** - * Instantiates a new Client, using default settings for creating a channel, as well as an - * executor service. - * - * @param config the config - */ - public Client(Config config) { - this( - config, - ManagedChannelBuilder.forAddress( - config.endpoints.account.host, config.endpoints.account.port) - .build(), - Executors.newScheduledThreadPool( - Runtime.getRuntime().availableProcessors(), - new ThreadFactory() { - private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(Runnable r) { - Thread thread = defaultFactory.newThread(r); - thread.setDaemon(true); - thread.setName("S2Client-" + thread.getId()); - return thread; - } - })); - } - - /** - * Instantiates a new Client. - * - *

Note that the executor is not the same as that used internally by netty for grpc - * (which, for the moment, is not controllable by clients). This executor is used for other async - * calls initiated by the SDK, such as application-level retries, timeouts, and transformations. - * - *

The executor used by this Client class will be shared with any BasinClient constructed from - * it (and, similarly, will be used for any StreamClient constructed from the BasinClient). - * - * @param config the config - * @param channel the channel - * @param executor the executor - */ - public Client(Config config, ManagedChannel channel, ScheduledExecutorService executor) { - super(config, channel, executor); + private Client( + Config config, + AccountCompatibleChannel channel, + ScheduledExecutorService executor, + boolean ownedChannel, + boolean ownedClient) { + super(config, channel.getChannel(), executor, ownedChannel, ownedClient); this.futureStub = - AccountServiceGrpc.newFutureStub(channel) + AccountServiceGrpc.newFutureStub(channel.getChannel().managedChannel) .withCallCredentials(new BearerTokenCallCredentials(config.token)); } + public static ClientBuilder newBuilder(Config config) { + return new ClientBuilder(config); + } + /** * List basins. * @@ -182,24 +149,33 @@ public ListenableFuture getBasinConfig(String basin) { executor)); } - /** - * Create a BasinClient for interacting with basin-level RPCs. - * - *

The generated client will use the same channel if possible. - * - * @param basin the basin - * @return the basin client - */ - public BasinClient basinClient(String basin) { - // If the basin endpoint identical to account, reuse the connection. - if (this.config.endpoints.singleEndpoint()) { - return new BasinClient(basin, this.config, this.channel, this.executor); - } else { - return new BasinClient( - basin, + public static class ClientBuilder { + + private final Config config; + private Optional channel = Optional.empty(); + private Optional executor = Optional.empty(); + + public ClientBuilder(Config config) { + this.config = config; + } + + public ClientBuilder withChannel(AccountCompatibleChannel channel) { + this.channel = Optional.of(channel); + return this; + } + + public ClientBuilder withExecutor(ScheduledExecutorService executor) { + this.executor = Optional.of(executor); + return this; + } + + public Client build() { + return new Client( this.config, - ManagedChannelBuilder.forTarget(config.endpoints.basin.toTarget(basin)).build(), - this.executor); + this.channel.orElseGet(() -> ManagedChannelFactory.forAccountService(this.config)), + this.executor.orElseGet(() -> BaseClient.defaultExecutor("client")), + this.channel.isEmpty(), + this.executor.isEmpty()); } } } diff --git a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java index 5c9c6fe..f506cc9 100644 --- a/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java +++ b/s2-sdk/src/main/java/s2/client/ManagedAppendSession.java @@ -10,6 +10,7 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -24,25 +25,29 @@ import s2.v1alpha.AppendSessionRequest; import s2.v1alpha.AppendSessionResponse; -public class ManagedAppendSession { +public class ManagedAppendSession implements AutoCloseable { static final int ACQUIRE_QUANTUM_MS = 50; + private static final Logger logger = LoggerFactory.getLogger(ManagedAppendSession.class.getName()); + final ListeningScheduledExecutorService executor; final StreamClient client; + final Integer bufferCapacityBytes; final Semaphore inflightBytes; - final ListenableFuture daemon; + final AtomicInteger remainingAttempts; final AtomicReference> nextDeadlineSystemNanos = new AtomicReference<>(Optional.empty()); final AtomicBoolean acceptingAppends = new AtomicBoolean(true); - // TODO can use theoretical max for smallest possible batch sizes given inflightBytes budget to - // bound queue sizes + final LinkedBlockingQueue inflightQueue = new LinkedBlockingQueue<>(); final LinkedBlockingQueue notificationQueue = new LinkedBlockingQueue<>(); + final ListenableFuture daemon; + ManagedAppendSession(StreamClient client) { this.executor = MoreExecutors.listeningDecorator(client.executor); this.client = client; @@ -115,12 +120,6 @@ private boolean acquirePermits(int permits, Duration maxWait) throws Interrupted return true; } - public ListenableFuture closeGracefully() throws InterruptedException { - this.acceptingAppends.set(false); - this.notificationQueue.put(new ClientClose(true)); - return daemon; - } - /// Note that this will NOT resolve any outstanding futures issued by this session. public ListenableFuture closeImmediately() throws InterruptedException { this.acceptingAppends.set(false); @@ -347,6 +346,23 @@ public void onCompleted() { return null; } + @Override + public void close() { + try { + this.closeGracefully().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public ListenableFuture closeGracefully() { + this.acceptingAppends.set(false); + this.notificationQueue.add(new ClientClose(true)); + return daemon; + } + sealed interface Notification permits Ack, Batch, ClientClose, Error, ServerClose {} record InflightRecord( diff --git a/s2-sdk/src/main/java/s2/client/ManagedReadSession.java b/s2-sdk/src/main/java/s2/client/ManagedReadSession.java index 42e99ca..ef2e69e 100644 --- a/s2-sdk/src/main/java/s2/client/ManagedReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ManagedReadSession.java @@ -10,7 +10,7 @@ import s2.types.ReadOutput; import s2.types.ReadSessionRequest; -public class ManagedReadSession { +public class ManagedReadSession implements AutoCloseable { private final Semaphore bufferAvailable; private final LinkedBlockingQueue queue; @@ -92,6 +92,12 @@ public Optional get(Duration maxWait) throws InterruptedException { return getInner(Optional.ofNullable(queue.poll(maxWait.toMillis(), TimeUnit.MILLISECONDS))); } + @Override + public void close() throws Exception { + this.closed.set(true); + this.readSession.close(); + } + private sealed interface ReadItem permits DataItem, ErrorItem, EndItem {} record DataItem(ReadOutput readOutput) implements ReadItem {} diff --git a/s2-sdk/src/main/java/s2/client/ReadSession.java b/s2-sdk/src/main/java/s2/client/ReadSession.java index f27cba1..61d6243 100644 --- a/s2-sdk/src/main/java/s2/client/ReadSession.java +++ b/s2-sdk/src/main/java/s2/client/ReadSession.java @@ -16,18 +16,22 @@ import s2.types.ReadSessionRequest; import s2.v1alpha.ReadSessionResponse; -public class ReadSession { +public class ReadSession implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(ReadSession.class.getName()); + final ScheduledExecutorService executor; final StreamClient client; + final AtomicLong nextStartSeqNum; final AtomicLong consumedRecords = new AtomicLong(); final AtomicLong consumedBytes = new AtomicLong(0); + final AtomicInteger remainingAttempts; + final Consumer onResponse; final Consumer onError; + final ReadSessionRequest request; - final AtomicInteger remainingAttempts; final ListenableFuture daemon; ReadSession( @@ -111,4 +115,9 @@ private ListenableFuture retrying() { public ListenableFuture awaitCompletion() { return this.daemon; } + + @Override + public void close() { + this.daemon.cancel(true); + } } diff --git a/s2-sdk/src/main/java/s2/client/StreamClient.java b/s2-sdk/src/main/java/s2/client/StreamClient.java index 3245839..1487605 100644 --- a/s2-sdk/src/main/java/s2/client/StreamClient.java +++ b/s2-sdk/src/main/java/s2/client/StreamClient.java @@ -2,16 +2,18 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import s2.auth.BearerTokenCallCredentials; +import s2.channel.BasinCompatibleChannel; +import s2.channel.ManagedChannelFactory; import s2.config.Config; import s2.types.AppendInput; import s2.types.AppendOutput; @@ -31,45 +33,33 @@ public class StreamClient extends BasinClient { private static final Logger logger = LoggerFactory.getLogger(StreamClient.class.getName()); + private static final String compressionCodec = "gzip"; /** Name of stream associated with this client. */ final String streamName; - private static final String compressionCodec = "gzip"; - - private final StreamServiceFutureStub futureStub; final StreamServiceStub asyncStub; + private final StreamServiceFutureStub futureStub; - /** - * Instantiates a new Stream client. - * - *

Most users will prefer to use the {@link s2.client.BasinClient#streamClient(String)} method - * for construction. - * - * @see s2.client.BasinClient#streamClient - * @param streamName the stream name - * @param basin the basin - * @param config the config - * @param channel the channel - * @param executor the executor - */ - public StreamClient( - String streamName, - String basin, + private StreamClient( Config config, - ManagedChannel channel, - ScheduledExecutorService executor) { - super(basin, config, channel, executor); + String basin, + String streamName, + BasinCompatibleChannel channel, + ScheduledExecutorService executor, + boolean ownedChannel, + boolean ownedExecutor) { + super(config, basin, channel, executor, ownedChannel, ownedExecutor); var meta = new Metadata(); meta.put(Key.of("s2-basin", Metadata.ASCII_STRING_MARSHALLER), basin); this.streamName = streamName; StreamServiceFutureStub futureStub = - StreamServiceGrpc.newFutureStub(channel) + StreamServiceGrpc.newFutureStub(channel.getChannel().managedChannel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); StreamServiceStub asyncStub = - StreamServiceGrpc.newStub(channel) + StreamServiceGrpc.newStub(channel.getChannel().managedChannel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); @@ -82,6 +72,10 @@ public StreamClient( this.asyncStub = asyncStub; } + public static StreamClientBuilder newBuilder(Config config, String basinName, String streamName) { + return new StreamClientBuilder(config, basinName, streamName); + } + /** * Check the sequence number that will be assigned to the next record on a stream. * @@ -262,6 +256,43 @@ public ManagedAppendSession managedAppendSession() { return new ManagedAppendSession(this); } + public static class StreamClientBuilder { + + private final Config config; + private final String basinName; + private final String streamName; + private Optional channel = Optional.empty(); + private Optional executor = Optional.empty(); + + public StreamClientBuilder(Config config, String basinName, String streamName) { + this.config = config; + this.basinName = basinName; + this.streamName = streamName; + } + + public StreamClientBuilder withChannel(BasinCompatibleChannel channel) { + this.channel = Optional.of(channel); + return this; + } + + public StreamClientBuilder withExecutor(ScheduledExecutorService executor) { + this.executor = Optional.of(executor); + return this; + } + + public StreamClient build() { + return new StreamClient( + this.config, + this.basinName, + this.streamName, + this.channel.orElseGet( + () -> ManagedChannelFactory.forBasinOrStreamService(this.config, this.basinName)), + this.executor.orElseGet(() -> BaseClient.defaultExecutor("streamClient")), + this.channel.isEmpty(), + this.executor.isEmpty()); + } + } + public record AppendSessionRequestStream( Consumer onNext, Consumer onError, Runnable onComplete) {} } diff --git a/s2-sdk/src/test/java/s2/client/ReadSessionTest.java b/s2-sdk/src/test/java/s2/client/ReadSessionTest.java index db66bdf..49031be 100644 --- a/s2-sdk/src/test/java/s2/client/ReadSessionTest.java +++ b/s2-sdk/src/test/java/s2/client/ReadSessionTest.java @@ -8,10 +8,12 @@ import io.grpc.inprocess.InProcessServerBuilder; import java.util.ArrayList; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import s2.channel.AutoClosableManagedChannel; import s2.config.Config; import s2.types.Batch; import s2.types.ReadLimit; @@ -23,6 +25,7 @@ public class ReadSessionTest { private Server server; private ManagedChannel channel; private StreamClient client; + private ScheduledExecutorService executor; @BeforeEach public void setUp() throws Exception { @@ -35,21 +38,22 @@ public void setUp() throws Exception { .start(); channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - - Config config = Config.newBuilder("fake-token").withMaxRetries(3).build(); + executor = Executors.newSingleThreadScheduledExecutor(); client = - new StreamClient( - "test-stream", - "test-basin", - config, - channel, - Executors.newSingleThreadScheduledExecutor()); + StreamClient.newBuilder( + Config.newBuilder("fake-token").withMaxRetries(3).build(), + "test-basin", + "test-stream") + .withChannel(() -> new AutoClosableManagedChannel(channel) {}) + .withExecutor(executor) + .build(); } @AfterEach public void tearDown() throws Exception { channel.shutdownNow(); server.shutdownNow(); + executor.shutdownNow(); } @Test