Skip to content

Commit a352918

Browse files
authored
feat: better approach to client resource management (#11)
1 parent 26d1825 commit a352918

23 files changed

+478
-276
lines changed

app/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,6 @@ tasks.test {
5252
java {
5353
withSourcesJar()
5454
toolchain {
55-
languageVersion.set(JavaLanguageVersion.of(17))
55+
languageVersion.set(JavaLanguageVersion.of(19))
5656
}
5757
}

app/src/main/java/org/example/app/AccountDemo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public static void main(String[] args) throws Exception {
2323
.withEndpoints(Endpoints.fromEnvironment())
2424
.build();
2525

26-
try (var client = new Client(config)) {
26+
try (var client = Client.newBuilder(config).build()) {
2727

2828
var basins = client.listBasins(ListBasinsRequest.newBuilder().build()).get();
2929
basins.elems().forEach(basin -> logger.info("basin={}", basin));

app/src/main/java/org/example/app/BasinDemo.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
5-
import s2.client.Client;
5+
import s2.client.BasinClient;
66
import s2.config.Config;
77
import s2.config.Endpoints;
88
import s2.types.CreateStreamRequest;
@@ -14,15 +14,14 @@ public class BasinDemo {
1414
private static final Logger logger = LoggerFactory.getLogger(BasinDemo.class.getName());
1515

1616
public static void main(String[] args) throws Exception {
17-
var config =
17+
final var config =
1818
Config.newBuilder(System.getenv("S2_AUTH_TOKEN"))
1919
.withEndpoints(Endpoints.fromEnvironment())
2020
.build();
2121

22-
try (var client = new Client(config)) {
23-
24-
var basinClient = client.basinClient("my-first-basin");
25-
var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get();
22+
try (final var basinClient =
23+
BasinClient.newBuilder(config, System.getenv("S2_BASIN")).build()) {
24+
final var streams = basinClient.listStreams(ListStreamsRequest.newBuilder().build()).get();
2625
streams
2726
.elems()
2827
.forEach(

app/src/main/java/org/example/app/ManagedAppendSessionDemo.java

Lines changed: 76 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import java.util.concurrent.LinkedBlockingQueue;
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
14-
import s2.client.Client;
14+
import s2.channel.ManagedChannelFactory;
15+
import s2.client.StreamClient;
1516
import s2.config.AppendRetryPolicy;
1617
import s2.config.Config;
1718
import s2.config.Endpoints;
@@ -21,27 +22,6 @@
2122

2223
public class ManagedAppendSessionDemo {
2324

24-
static class RandomASCIIStringGenerator {
25-
private static final String ASCII_PRINTABLE_CHARACTERS =
26-
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789";
27-
28-
private static final Random RANDOM = new Random();
29-
30-
public static String generateRandomASCIIString(String prefix, int length) {
31-
if (length < 0) {
32-
throw new IllegalArgumentException("Length cannot be negative.");
33-
}
34-
35-
StringBuilder sb = new StringBuilder(length);
36-
sb.append(prefix);
37-
for (int i = 0; i < length - prefix.length(); i++) {
38-
int index = RANDOM.nextInt(ASCII_PRINTABLE_CHARACTERS.length());
39-
sb.append(ASCII_PRINTABLE_CHARACTERS.charAt(index));
40-
}
41-
return sb.toString();
42-
}
43-
}
44-
4525
private static final Logger logger =
4626
LoggerFactory.getLogger(ManagedAppendSessionDemo.class.getName());
4727

@@ -69,69 +49,88 @@ public static void main(String[] args) throws Exception {
6949
final LinkedBlockingQueue<ListenableFuture<AppendOutput>> pendingAppends =
7050
new LinkedBlockingQueue<>();
7151

72-
var executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
73-
var consumer =
74-
executor.submit(
75-
() -> {
76-
try {
77-
while (true) {
78-
var output = pendingAppends.take().get();
79-
if (output == null) {
80-
logger.info("consumer closing");
81-
break;
52+
try (final var executor =
53+
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(4));
54+
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {
55+
56+
final var consumer =
57+
executor.submit(
58+
() -> {
59+
try {
60+
while (true) {
61+
var output = pendingAppends.take().get();
62+
if (output == null) {
63+
logger.info("consumer closing");
64+
break;
65+
}
66+
logger.info("consumer got: {}", output);
8267
}
83-
logger.info("consumer got: {}", output);
68+
} catch (Exception e) {
69+
logger.error("consumer failed", e);
8470
}
85-
} catch (Exception e) {
86-
logger.error("consumer failed", e);
87-
}
88-
});
89-
90-
try (var client = new Client(config)) {
91-
92-
final var streamClient = client.basinClient(basinName).streamClient(streamName);
93-
final var futureAppendSession = streamClient.managedAppendSession();
94-
95-
for (var i = 0; i < 50_000; i++) {
96-
try {
97-
// Generate a record with approximately 10KiB of random text.
98-
var payload = RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10);
99-
var append =
100-
futureAppendSession.submit(
101-
AppendInput.newBuilder()
102-
.withRecords(
103-
List.of(
104-
AppendRecord.newBuilder()
105-
.withBody(payload.getBytes(StandardCharsets.UTF_8))
106-
.build()))
107-
.build(),
108-
// Duration is how long we are willing to wait to receive a future.
109-
Duration.ofSeconds(1));
110-
111-
pendingAppends.add(append);
112-
} catch (RuntimeException e) {
113-
logger.error("producer failed", e);
114-
pendingAppends.add(Futures.immediateFailedFuture(e));
115-
break;
71+
});
72+
73+
final var streamClient =
74+
StreamClient.newBuilder(config, basinName, streamName)
75+
.withExecutor(executor)
76+
.withChannel(channel)
77+
.build();
78+
79+
try (final var futureAppendSession = streamClient.managedAppendSession()) {
80+
81+
for (var i = 0; i < 50_000; i++) {
82+
try {
83+
// Generate a record with approximately 10KiB of random text.
84+
var payload =
85+
RandomASCIIStringGenerator.generateRandomASCIIString(i + " - ", 1024 * 10);
86+
var append =
87+
futureAppendSession.submit(
88+
AppendInput.newBuilder()
89+
.withRecords(
90+
List.of(
91+
AppendRecord.newBuilder()
92+
.withBody(payload.getBytes(StandardCharsets.UTF_8))
93+
.build()))
94+
.build(),
95+
// Duration is how long we are willing to wait to receive a future.
96+
Duration.ofSeconds(10));
97+
98+
pendingAppends.add(append);
99+
} catch (RuntimeException e) {
100+
logger.error("producer failed", e);
101+
pendingAppends.add(Futures.immediateFailedFuture(e));
102+
break;
103+
}
116104
}
105+
106+
logger.info("finished submitting all appends");
107+
108+
// Signal to the consumer that no further appends are happening.
109+
pendingAppends.add(Futures.immediateFuture(null));
117110
}
118111

119-
logger.info("finished submitting all appends");
112+
consumer.get();
113+
}
114+
}
120115

121-
// Signal to the consumer that no further appends are happening.
122-
pendingAppends.add(Futures.immediateFuture(null));
116+
static class RandomASCIIStringGenerator {
117+
private static final String ASCII_PRINTABLE_CHARACTERS =
118+
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789";
119+
120+
private static final Random RANDOM = new Random();
123121

124-
logger.info("starting graceful close");
125-
try {
126-
futureAppendSession.closeGracefully().get();
127-
} catch (Exception e) {
128-
logger.error("caught exception during close", e);
122+
public static String generateRandomASCIIString(String prefix, int length) {
123+
if (length < 0) {
124+
throw new IllegalArgumentException("Length cannot be negative.");
129125
}
130-
logger.info("finished closing");
131-
}
132126

133-
// Await the consumer future.
134-
consumer.get();
135-
executor.shutdown();
127+
StringBuilder sb = new StringBuilder(length);
128+
sb.append(prefix);
129+
for (int i = 0; i < length - prefix.length(); i++) {
130+
int index = RANDOM.nextInt(ASCII_PRINTABLE_CHARACTERS.length());
131+
sb.append(ASCII_PRINTABLE_CHARACTERS.charAt(index));
132+
}
133+
return sb.toString();
134+
}
136135
}
137136
}
Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package org.example.app;
22

33
import java.time.Duration;
4+
import java.util.concurrent.ScheduledThreadPoolExecutor;
45
import java.util.concurrent.atomic.AtomicLong;
56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
7-
import s2.client.Client;
8+
import s2.channel.ManagedChannelFactory;
9+
import s2.client.StreamClient;
810
import s2.config.Config;
911
import s2.config.Endpoints;
1012
import s2.types.Batch;
@@ -33,38 +35,44 @@ public static void main(String[] args) throws Exception {
3335

3436
var config = Config.newBuilder(authToken).withEndpoints(Endpoints.fromEnvironment()).build();
3537

36-
try (var client = new Client(config)) {
38+
try (final var executor = new ScheduledThreadPoolExecutor(1);
39+
final var channel = ManagedChannelFactory.forBasinOrStreamService(config, basinName)) {
3740

38-
var streamClient = client.basinClient(basinName).streamClient(streamName);
41+
final var streamClient =
42+
StreamClient.newBuilder(config, basinName, streamName)
43+
.withExecutor(executor)
44+
.withChannel(channel)
45+
.build();
3946

40-
var managedSession =
47+
try (final var managedSession =
4148
streamClient.managedReadSession(
4249
ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(),
43-
1024 * 1024 * 1024);
50+
1024 * 1024 * 1024)) {
4451

45-
AtomicLong receivedBytes = new AtomicLong();
46-
while (!managedSession.isClosed()) {
47-
// Poll for up to 1 minute.
48-
var resp = managedSession.get(Duration.ofSeconds(60));
49-
resp.ifPresentOrElse(
50-
elem -> {
51-
if (elem instanceof Batch batch) {
52-
var size = batch.meteredBytes();
53-
logger.info(
54-
"batch of {} bytes, seqnums {}..={}",
55-
size,
56-
batch.firstSeqNum(),
57-
batch.lastSeqNum());
58-
receivedBytes.addAndGet(size);
59-
} else {
60-
logger.info("non batch received: {}", elem);
61-
}
62-
},
63-
() -> {
64-
logger.info("no batch");
65-
});
52+
AtomicLong receivedBytes = new AtomicLong();
53+
while (!managedSession.isClosed()) {
54+
// Poll for up to 1 minute.
55+
var resp = managedSession.get(Duration.ofSeconds(60));
56+
resp.ifPresentOrElse(
57+
elem -> {
58+
if (elem instanceof Batch batch) {
59+
var size = batch.meteredBytes();
60+
logger.info(
61+
"batch of {} bytes, seqnums {}..={}",
62+
size,
63+
batch.firstSeqNum(),
64+
batch.lastSeqNum());
65+
receivedBytes.addAndGet(size);
66+
} else {
67+
logger.info("non batch received: {}", elem);
68+
}
69+
},
70+
() -> {
71+
logger.info("no batch");
72+
});
73+
}
74+
logger.info("finished, received {} bytes in total", receivedBytes.get());
6675
}
67-
logger.info("finished, received {} bytes in total", receivedBytes.get());
6876
}
6977
}
7078
}

app/src/main/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</encoder>
88
</appender>
99

10-
<logger level="info" name="s2" additivity="false">
10+
<logger additivity="false" level="info" name="s2">
1111
<appender-ref ref="console"/>
1212
</logger>
1313

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.0.12
1+
version=0.0.13-SNAPSHOT
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package s2.channel;
2+
3+
import io.grpc.ManagedChannel;
4+
5+
public final class AccountChannel extends AutoClosableManagedChannel
6+
implements AccountCompatibleChannel {
7+
8+
AccountChannel(ManagedChannel managedChannel) {
9+
super(managedChannel);
10+
}
11+
12+
@Override
13+
public AutoClosableManagedChannel getChannel() {
14+
return this;
15+
}
16+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package s2.channel;
2+
3+
public interface AccountCompatibleChannel {
4+
AutoClosableManagedChannel getChannel();
5+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package s2.channel;
2+
3+
import io.grpc.ManagedChannel;
4+
import java.util.concurrent.TimeUnit;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
public class AutoClosableManagedChannel implements AutoCloseable {
9+
10+
private static final Logger logger = LoggerFactory.getLogger(AutoClosableManagedChannel.class);
11+
public final ManagedChannel managedChannel;
12+
13+
public AutoClosableManagedChannel(ManagedChannel managedChannel) {
14+
this.managedChannel = managedChannel;
15+
}
16+
17+
@Override
18+
public void close() {
19+
managedChannel.shutdown();
20+
try {
21+
if (!managedChannel.awaitTermination(5, TimeUnit.SECONDS)) {
22+
managedChannel.shutdownNow();
23+
if (!managedChannel.awaitTermination(5, TimeUnit.SECONDS)) {
24+
logger.warn("Channel did not terminate within 10s total");
25+
}
26+
}
27+
} catch (InterruptedException e) {
28+
Thread.currentThread().interrupt();
29+
managedChannel.shutdownNow();
30+
}
31+
}
32+
}

0 commit comments

Comments
 (0)