Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ tasks.test {
java {
withSourcesJar()
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
languageVersion.set(JavaLanguageVersion.of(19))
Copy link
Member Author

@sgbalogh sgbalogh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduled executor only impl's AutoClosable starting in 19, so the bump is required (just for the demo, not SDK)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL

}
}
2 changes: 1 addition & 1 deletion app/src/main/java/org/example/app/AccountDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static void main(String[] args) throws Exception {
.withEndpoints(Endpoints.fromEnvironment())
.build();

try (var client = new Client(config)) {
try (var client = Client.newBuilder(config).build()) {

var basins = client.listBasins(ListBasinsRequest.newBuilder().build()).get();
basins.elems().forEach(basin -> logger.info("basin={}", basin));
Expand Down
11 changes: 5 additions & 6 deletions app/src/main/java/org/example/app/BasinDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import s2.client.Client;
import s2.client.BasinClient;
import s2.config.Config;
import s2.config.Endpoints;
import s2.types.CreateStreamRequest;
Expand All @@ -14,15 +14,14 @@ public class BasinDemo {
private static final Logger logger = LoggerFactory.getLogger(BasinDemo.class.getName());

public static void main(String[] args) throws Exception {
var config =
final var config =
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
.withEndpoints(Endpoints.fromEnvironment())
.build();

try (var client = new Client(config)) {

var basinClient = client.basinClient("my-first-basin");
var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get();
try (final var basinClient =
BasinClient.newBuilder(config, System.getenv("S2_BASIN")).build()) {
final var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get();
streams
.elems()
.forEach(
Expand Down
153 changes: 76 additions & 77 deletions app/src/main/java/org/example/app/ManagedAppendSessionDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import s2.client.Client;
import s2.channel.ManagedChannelFactory;
import s2.client.StreamClient;
import s2.config.AppendRetryPolicy;
import s2.config.Config;
import s2.config.Endpoints;
Expand All @@ -21,27 +22,6 @@

public class ManagedAppendSessionDemo {

static class RandomASCIIStringGenerator {
private static final String ASCII_PRINTABLE_CHARACTERS =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789";

private static final Random RANDOM = new Random();

public static String generateRandomASCIIString(String prefix, int length) {
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative.");
}

StringBuilder sb = new StringBuilder(length);
sb.append(prefix);
for (int i = 0; i < length - prefix.length(); i++) {
int index = RANDOM.nextInt(ASCII_PRINTABLE_CHARACTERS.length());
sb.append(ASCII_PRINTABLE_CHARACTERS.charAt(index));
}
return sb.toString();
}
}

private static final Logger logger =
LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName());

Expand Down Expand Up @@ -69,69 +49,88 @@ public static void main(String[] args) throws Exception {
final LinkedBlockingQueue<ListenableFuture<AppendOutput>> pendingAppends =
new LinkedBlockingQueue<>();

var executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
var consumer =
executor.submit(
() -> {
try {
while (true) {
var output = pendingAppends.take().get();
if (output == null) {
logger.info("consumer closing");
break;
try (final var executor =
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(4));
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {

final var consumer =
executor.submit(
() -> {
try {
while (true) {
var output = pendingAppends.take().get();
if (output == null) {
logger.info("consumer closing");
break;
}
logger.info("consumer got: {}", output);
}
logger.info("consumer got: {}", output);
} catch (Exception e) {
logger.error("consumer failed", e);
}
} catch (Exception e) {
logger.error("consumer failed", e);
}
});

try (var client = new Client(config)) {

final var streamClient = client.basinClient(basinName).streamClient(streamName);
final var futureAppendSession = streamClient.managedAppendSession();

for (var i = 0; i < 50_000; i++) {
try {
// Generate a record with approximately 10KiB of random text.
var payload = RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10);
var append =
futureAppendSession.submit(
AppendInput.newBuilder()
.withRecords(
List.of(
AppendRecord.newBuilder()
.withBody(payload.getBytes(StandardCharsets.UTF_8))
.build()))
.build(),
// Duration is how long we are willing to wait to receive a future.
Duration.ofSeconds(1));

pendingAppends.add(append);
} catch (RuntimeException e) {
logger.error("producer failed", e);
pendingAppends.add(Futures.immediateFailedFuture(e));
break;
});

final var streamClient =
StreamClient.newBuilder(config, basinName, streamName)
.withExecutor(executor)
.withChannel(channel)
.build();

try (final var futureAppendSession = streamClient.managedAppendSession()) {

for (var i = 0; i < 50_000; i++) {
try {
// Generate a record with approximately 10KiB of random text.
var payload =
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10);
var append =
futureAppendSession.submit(
AppendInput.newBuilder()
.withRecords(
List.of(
AppendRecord.newBuilder()
.withBody(payload.getBytes(StandardCharsets.UTF_8))
.build()))
.build(),
// Duration is how long we are willing to wait to receive a future.
Duration.ofSeconds(10));

pendingAppends.add(append);
} catch (RuntimeException e) {
logger.error("producer failed", e);
pendingAppends.add(Futures.immediateFailedFuture(e));
break;
}
}

logger.info("finished submitting all appends");

// Signal to the consumer that no further appends are happening.
pendingAppends.add(Futures.immediateFuture(null));
}

logger.info("finished submitting all appends");
consumer.get();
}
}

// Signal to the consumer that no further appends are happening.
pendingAppends.add(Futures.immediateFuture(null));
static class RandomASCIIStringGenerator {
private static final String ASCII_PRINTABLE_CHARACTERS =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789";

private static final Random RANDOM = new Random();

logger.info("starting graceful close");
try {
futureAppendSession.closeGracefully().get();
} catch (Exception e) {
logger.error("caught exception during close", e);
public static String generateRandomASCIIString(String prefix, int length) {
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative.");
}
logger.info("finished closing");
}

// Await the consumer future.
consumer.get();
executor.shutdown();
StringBuilder sb = new StringBuilder(length);
sb.append(prefix);
for (int i = 0; i < length - prefix.length(); i++) {
int index = RANDOM.nextInt(ASCII_PRINTABLE_CHARACTERS.length());
sb.append(ASCII_PRINTABLE_CHARACTERS.charAt(index));
}
return sb.toString();
}
}
}
62 changes: 35 additions & 27 deletions app/src/main/java/org/example/app/ManagedReadSessionDemo.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.example.app;

import java.time.Duration;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import s2.client.Client;
import s2.channel.ManagedChannelFactory;
import s2.client.StreamClient;
import s2.config.Config;
import s2.config.Endpoints;
import s2.types.Batch;
Expand Down Expand Up @@ -33,38 +35,44 @@ public static void main(String[] args) throws Exception {

var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build();

try (var client = new Client(config)) {
try (final var executor = new ScheduledThreadPoolExecutor(1);
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {

var streamClient = client.basinClient(basinName).streamClient(streamName);
final var streamClient =
StreamClient.newBuilder(config, basinName, streamName)
.withExecutor(executor)
.withChannel(channel)
.build();

var managedSession =
try (final var managedSession =
streamClient.managedReadSession(
ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(),
1024 * 1024 * 1024);
1024 * 1024 * 1024)) {

AtomicLong receivedBytes = new AtomicLong();
while (!managedSession.isClosed()) {
// Poll for up to 1 minute.
var resp = managedSession.get(Duration.ofSeconds(60));
resp.ifPresentOrElse(
elem -> {
if (elem instanceof Batch batch) {
var size = batch.meteredBytes();
logger.info(
"batch of {} bytes, seqnums {}..={}",
size,
batch.firstSeqNum(),
batch.lastSeqNum());
receivedBytes.addAndGet(size);
} else {
logger.info("non batch received: {}", elem);
}
},
() -> {
logger.info("no batch");
});
AtomicLong receivedBytes = new AtomicLong();
while (!managedSession.isClosed()) {
// Poll for up to 1 minute.
var resp = managedSession.get(Duration.ofSeconds(60));
resp.ifPresentOrElse(
elem -> {
if (elem instanceof Batch batch) {
var size = batch.meteredBytes();
logger.info(
"batch of {} bytes, seqnums {}..={}",
size,
batch.firstSeqNum(),
batch.lastSeqNum());
receivedBytes.addAndGet(size);
} else {
logger.info("non batch received: {}", elem);
}
},
() -> {
logger.info("no batch");
});
}
logger.info("finished, received {} bytes in total", receivedBytes.get());
}
logger.info("finished, received {} bytes in total", receivedBytes.get());
}
}
}
2 changes: 1 addition & 1 deletion app/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</encoder>
</appender>

<logger level="info" name="s2" additivity="false">
<logger additivity="false" level="info" name="s2">
<appender-ref ref="console"/>
</logger>

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.12
version=0.0.13-SNAPSHOT
16 changes: 16 additions & 0 deletions s2-sdk/src/main/java/s2/channel/AccountChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package s2.channel;

import io.grpc.ManagedChannel;

public final class AccountChannel extends AutoClosableManagedChannel
implements AccountCompatibleChannel {

AccountChannel(ManagedChannel managedChannel) {
super(managedChannel);
}

@Override
public AutoClosableManagedChannel getChannel() {
return this;
}
}
5 changes: 5 additions & 0 deletions s2-sdk/src/main/java/s2/channel/AccountCompatibleChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package s2.channel;

public interface AccountCompatibleChannel {
AutoClosableManagedChannel getChannel();
}
32 changes: 32 additions & 0 deletions s2-sdk/src/main/java/s2/channel/AutoClosableManagedChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package s2.channel;

import io.grpc.ManagedChannel;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AutoClosableManagedChannel implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(AutoClosableManagedChannel.class);
public final ManagedChannel managedChannel;

public AutoClosableManagedChannel(ManagedChannel managedChannel) {
this.managedChannel = managedChannel;
}

@Override
public void close() {
managedChannel.shutdown();
try {
if (!managedChannel.awaitTermination(5, TimeUnit.SECONDS)) {
managedChannel.shutdownNow();
if (!managedChannel.awaitTermination(5, TimeUnit.SECONDS)) {
logger.warn("Channel did not terminate within 10s total");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
managedChannel.shutdownNow();
}
}
}
16 changes: 16 additions & 0 deletions s2-sdk/src/main/java/s2/channel/BasinChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package s2.channel;

import io.grpc.ManagedChannel;

public final class BasinChannel extends AutoClosableManagedChannel
implements BasinCompatibleChannel {

BasinChannel(ManagedChannel channel) {
super(channel);
}

@Override
public AutoClosableManagedChannel getChannel() {
return this;
}
}
Loading
Loading