Skip to content

Commit 0b12ed2

Browse files
authored
Extract RyukResourceReaper (#4959)
1 parent 53538f4 commit 0b12ed2

File tree

2 files changed

+192
-168
lines changed

2 files changed

+192
-168
lines changed

core/src/main/java/org/testcontainers/utility/ResourceReaper.java

Lines changed: 4 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,35 @@
11
package org.testcontainers.utility;
22

33
import com.github.dockerjava.api.DockerClient;
4-
import com.github.dockerjava.api.async.ResultCallback;
54
import com.github.dockerjava.api.command.InspectContainerResponse;
65
import com.github.dockerjava.api.exception.NotFoundException;
7-
import com.github.dockerjava.api.model.Bind;
86
import com.github.dockerjava.api.model.Container;
9-
import com.github.dockerjava.api.model.ExposedPort;
10-
import com.github.dockerjava.api.model.Frame;
11-
import com.github.dockerjava.api.model.HostConfig;
127
import com.github.dockerjava.api.model.Network;
13-
import com.github.dockerjava.api.model.PortBinding;
14-
import com.github.dockerjava.api.model.Ports;
158
import com.github.dockerjava.api.model.PruneType;
16-
import com.github.dockerjava.api.model.Volume;
179
import com.google.common.annotations.VisibleForTesting;
1810
import com.google.common.base.Throwables;
1911
import com.google.common.collect.Sets;
20-
import lombok.SneakyThrows;
2112
import lombok.extern.slf4j.Slf4j;
22-
import org.rnorth.ducttape.ratelimits.RateLimiter;
23-
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
2413
import org.slf4j.Logger;
2514
import org.slf4j.LoggerFactory;
2615
import org.testcontainers.DockerClientFactory;
27-
import org.testcontainers.containers.ContainerState;
2816

2917
import java.io.BufferedReader;
3018
import java.io.IOException;
3119
import java.io.InputStream;
3220
import java.io.InputStreamReader;
3321
import java.io.OutputStream;
3422
import java.io.UnsupportedEncodingException;
35-
import java.net.InetSocketAddress;
36-
import java.net.Socket;
3723
import java.net.URLEncoder;
38-
import java.nio.charset.StandardCharsets;
3924
import java.util.AbstractMap.SimpleEntry;
4025
import java.util.ArrayList;
4126
import java.util.Arrays;
42-
import java.util.Collections;
4327
import java.util.List;
4428
import java.util.Map;
45-
import java.util.Objects;
4629
import java.util.Set;
4730
import java.util.concurrent.ConcurrentHashMap;
48-
import java.util.concurrent.CountDownLatch;
49-
import java.util.concurrent.TimeUnit;
5031
import java.util.concurrent.atomic.AtomicBoolean;
5132
import java.util.stream.Collectors;
52-
import java.util.stream.Stream;
53-
54-
import static org.awaitility.Awaitility.await;
5533

5634
/**
5735
* Component that responsible for container removal and automatic cleanup of dead containers at JVM shutdown.
@@ -61,20 +39,14 @@ public final class ResourceReaper {
6139

6240
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceReaper.class);
6341

64-
private static final List<List<Map.Entry<String, String>>> DEATH_NOTE = new ArrayList<>(
42+
static final List<List<Map.Entry<String, String>>> DEATH_NOTE = new ArrayList<>(
6543
Arrays.asList(
6644
DockerClientFactory.DEFAULT_LABELS.entrySet().stream()
6745
.<Map.Entry<String, String>>map(it -> new SimpleEntry<>("label", it.getKey() + "=" + it.getValue()))
6846
.collect(Collectors.toList())
6947
)
7048
);
7149

72-
private static final RateLimiter RYUK_ACK_RATE_LIMITER = RateLimiterBuilder
73-
.newBuilder()
74-
.withRate(4, TimeUnit.SECONDS)
75-
.withConstantThroughput()
76-
.build();
77-
7850
private static ResourceReaper instance;
7951
private static AtomicBoolean ryukStarted = new AtomicBoolean(false);
8052
private final DockerClient dockerClient = DockerClientFactory.lazyClient();
@@ -98,147 +70,11 @@ public static String start(String hostIpAddress, DockerClient client) {
9870
* @deprecated internal API
9971
*/
10072
@Deprecated
101-
@SneakyThrows(InterruptedException.class)
10273
public static String start(DockerClient client) {
103-
String ryukImage = ImageNameSubstitutor.instance()
104-
.apply(DockerImageName.parse("testcontainers/ryuk:0.3.3"))
105-
.asCanonicalNameString();
106-
DockerClientFactory.instance().checkAndPullImage(client, ryukImage);
107-
108-
List<Bind> binds = new ArrayList<>();
109-
binds.add(new Bind(DockerClientFactory.instance().getRemoteDockerUnixSocketPath(), new Volume("/var/run/docker.sock")));
110-
111-
ExposedPort ryukExposedPort = ExposedPort.tcp(8080);
112-
String ryukContainerId = client.createContainerCmd(ryukImage)
113-
.withHostConfig(
114-
new HostConfig()
115-
.withAutoRemove(true)
116-
.withPortBindings(new PortBinding(Ports.Binding.empty(), ryukExposedPort))
117-
)
118-
.withExposedPorts(ryukExposedPort)
119-
.withName("testcontainers-ryuk-" + DockerClientFactory.SESSION_ID)
120-
.withLabels(Collections.singletonMap(DockerClientFactory.TESTCONTAINERS_LABEL, "true"))
121-
.withBinds(binds)
122-
.withPrivileged(TestcontainersConfiguration.getInstance().isRyukPrivileged())
123-
.exec()
124-
.getId();
125-
126-
client.startContainerCmd(ryukContainerId).exec();
127-
128-
StringBuilder ryukLog = new StringBuilder();
129-
130-
ResultCallback.Adapter<Frame> logCallback = client.logContainerCmd(ryukContainerId)
131-
.withSince(0)
132-
.withFollowStream(true)
133-
.withStdOut(true)
134-
.withStdErr(true)
135-
.exec(new ResultCallback.Adapter<Frame>() {
136-
@Override
137-
public void onNext(Frame frame) {
138-
ryukLog.append(new String(frame.getPayload(), StandardCharsets.UTF_8));
139-
}
140-
});
141-
142-
InspectContainerResponse inspectedContainer;
143-
try {
144-
// inspect container response might initially not contain the mapped port
145-
inspectedContainer = await()
146-
.atMost(5, TimeUnit.SECONDS)
147-
.pollInterval(DynamicPollInterval.ofMillis(50))
148-
.pollInSameThread()
149-
.until(
150-
() -> client.inspectContainerCmd(ryukContainerId).exec(),
151-
inspectContainerResponse -> {
152-
return inspectContainerResponse
153-
.getNetworkSettings()
154-
.getPorts()
155-
.getBindings()
156-
.values()
157-
.stream()
158-
.anyMatch(Objects::nonNull);
159-
}
160-
);
161-
} catch (Exception e) {
162-
log.warn("Ryuk container cannot be inspected and probably had a problem starting. Ryuk's logs:\n{}", ryukLog);
163-
throw new IllegalStateException("Ryuk failed to start", e);
164-
}
165-
166-
ContainerState containerState = new ContainerState() {
167-
168-
@Override
169-
public List<Integer> getExposedPorts() {
170-
return Stream.of(getContainerInfo().getConfig().getExposedPorts())
171-
.map(ExposedPort::getPort)
172-
.collect(Collectors.toList());
173-
}
174-
175-
@Override
176-
public InspectContainerResponse getContainerInfo() {
177-
return inspectedContainer;
178-
}
179-
};
180-
181-
CountDownLatch ryukScheduledLatch = new CountDownLatch(1);
182-
183-
String host = containerState.getHost();
184-
Integer ryukPort = containerState.getFirstMappedPort();
185-
Thread kiraThread = new Thread(
186-
DockerClientFactory.TESTCONTAINERS_THREAD_GROUP,
187-
() -> {
188-
while (true) {
189-
RYUK_ACK_RATE_LIMITER.doWhenReady(() -> {
190-
int index = 0;
191-
// not set the read timeout, as Ryuk would not send anything unless a new filter is submitted, meaning that we would get a timeout exception pretty quick
192-
try (Socket clientSocket = new Socket()) {
193-
clientSocket.connect(new InetSocketAddress(host, ryukPort), 5 * 1000);
194-
FilterRegistry registry = new FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream());
195-
196-
synchronized (DEATH_NOTE) {
197-
while (true) {
198-
if (DEATH_NOTE.size() <= index) {
199-
try {
200-
DEATH_NOTE.wait(1_000);
201-
continue;
202-
} catch (InterruptedException e) {
203-
throw new RuntimeException(e);
204-
}
205-
}
206-
List<Map.Entry<String, String>> filters = DEATH_NOTE.get(index);
207-
boolean isAcknowledged = registry.register(filters);
208-
if (isAcknowledged) {
209-
log.debug("Received 'ACK' from Ryuk");
210-
ryukScheduledLatch.countDown();
211-
index++;
212-
} else {
213-
log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters.");
214-
}
215-
}
216-
}
217-
} catch (IOException e) {
218-
log.warn("Can not connect to Ryuk at {}:{}", host, ryukPort, e);
219-
}
220-
});
221-
}
222-
},
223-
"testcontainers-ryuk"
224-
);
225-
kiraThread.setDaemon(true);
226-
kiraThread.start();
227-
try {
228-
// We need to wait before we can start any containers to make sure that we delete them
229-
if (!ryukScheduledLatch.await(TestcontainersConfiguration.getInstance().getRyukTimeout(), TimeUnit.SECONDS)) {
230-
log.error("Timed out waiting for Ryuk container to start. Ryuk's logs:\n{}", ryukLog);
231-
throw new IllegalStateException(String.format("Could not connect to Ryuk at %s:%s", host, ryukPort));
232-
}
233-
} finally {
234-
try {
235-
logCallback.close();
236-
} catch (IOException ignored) {
237-
}
238-
}
239-
74+
RyukResourceReaper ryuk = new RyukResourceReaper(client);
75+
String containerId = ryuk.getContainerId();
24076
ryukStarted.set(true);
241-
return ryukContainerId;
77+
return containerId;
24278
}
24379

24480
public synchronized static ResourceReaper instance() {

0 commit comments

Comments
 (0)