Skip to content

Commit 976e26c

Browse files
authored
CC-6219: Implement setInactivityTimeout in ContainerClient (#5491)
* Implement setInactivityTimeout in ContainerClient setInactivityTimeout allows a container to outlive its associated actor. The current implementation tightly couples the lifetime of the container client to the lifetime of the actor, so that the ContainerClient is always destroyed whenever the actor is destroyed. To fix this we make ContainerClient reference counted and provide a reference to the actor. Whenever setInactivityTimeout is called we add a reference to the client. If the actor shuts down, the remaining reference keeps the container alive until the timeout expires. If setInactivityTimeout was never called then there is no remaining reference, and the container shuts down when the actor does. We also implement the ability for an actor to reconnect to an already running container when it restarts. We do this by maintaining a map of containerId->ContainerClient in the ActorNamespace. This map does not hold a reference to the ContainerClient: only actors and the inactivity timeout promise hold references to ensure we do not leak resources by holding a reference indefinitely. Whenever an actor starts it consults the map to see if a ContainerClient already exists. If it doesn't we create one and add it to the map. In order to clear entries from the map when a ContainerClient is destroyed we provide a callback function (`cleanupCallback`) to the ContainerClient which is run when ContainerClient is destroyed. * Add test for setInactivityTimeout
1 parent 2b71448 commit 976e26c

File tree

4 files changed

+217
-62
lines changed

4 files changed

+217
-62
lines changed

src/workerd/server/container-client.c++

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,16 +107,22 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
107107
kj::String dockerPath,
108108
kj::String containerName,
109109
kj::String imageName,
110-
kj::TaskSet& waitUntilTasks)
110+
kj::TaskSet& waitUntilTasks,
111+
kj::Function<void()> cleanupCallback)
111112
: byteStreamFactory(byteStreamFactory),
112113
timer(timer),
113114
network(network),
114115
dockerPath(kj::mv(dockerPath)),
115116
containerName(kj::encodeUriComponent(kj::mv(containerName))),
116117
imageName(kj::mv(imageName)),
117-
waitUntilTasks(waitUntilTasks) {}
118+
waitUntilTasks(waitUntilTasks),
119+
cleanupCallback(kj::mv(cleanupCallback)) {}
118120

119121
ContainerClient::~ContainerClient() noexcept(false) {
122+
// Call the cleanup callback to remove this client from the ActorNamespace map
123+
cleanupCallback();
124+
125+
// Destroy the Docker container
120126
waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE,
121127
kj::str("/containers/", containerName, "?force=true"))
122128
.ignoreResult());
@@ -250,7 +256,11 @@ kj::Promise<ContainerClient::InspectResponse> ContainerClient::inspectContainer(
250256
auto state = jsonRoot.getState();
251257
JSG_REQUIRE(state.hasStatus(), Error, "Malformed ContainerInspect response");
252258
auto status = state.getStatus();
253-
bool running = status == "running";
259+
// Treat both "running" and "restarting" as running. The "restarting" state occurs when
260+
// Docker is automatically restarting a container (due to restart policy). From the user's
261+
// perspective, a restarting container is still "alive" and should be treated as running
262+
// so that start() correctly refuses to start a duplicate and destroy() can clean it up.
263+
bool running = status == "running" || status == "restarting";
254264
co_return InspectResponse{.isRunning = running, .ports = kj::mv(portMappings)};
255265
}
256266

@@ -427,7 +437,19 @@ kj::Promise<void> ContainerClient::signal(SignalContext context) {
427437
}
428438

429439
kj::Promise<void> ContainerClient::setInactivityTimeout(SetInactivityTimeoutContext context) {
430-
// empty implementation on purpose
440+
auto params = context.getParams();
441+
auto durationMs = params.getDurationMs();
442+
443+
JSG_REQUIRE(
444+
durationMs > 0, Error, "setInactivityTimeout() requires durationMs > 0, got ", durationMs);
445+
446+
auto timeout = durationMs * kj::MILLISECONDS;
447+
448+
// Add a timer task that holds a reference to this ContainerClient.
449+
waitUntilTasks.add(timer.afterDelay(timeout).then([self = kj::addRef(*this)]() {
450+
// This callback does nothing but drop the reference
451+
}));
452+
431453
co_return;
432454
}
433455

@@ -444,4 +466,8 @@ kj::Promise<void> ContainerClient::listenTcp(ListenTcpContext context) {
444466
KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead");
445467
}
446468

469+
kj::Own<ContainerClient> ContainerClient::addRef() {
470+
return kj::addRef(*this);
471+
}
472+
447473
} // namespace workerd::server

src/workerd/server/container-client.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <kj/async.h>
1212
#include <kj/compat/http.h>
1313
#include <kj/map.h>
14+
#include <kj/refcount.h>
1415
#include <kj/string.h>
1516

1617
namespace workerd::server {
@@ -19,15 +20,20 @@ namespace workerd::server {
1920
// so it can be used as a rpc::Container::Client via kj::heap<ContainerClient>().
2021
// This allows the Container JSG class to use Docker directly without knowing
2122
// it's talking to Docker instead of a real RPC service.
22-
class ContainerClient final: public rpc::Container::Server {
23+
//
24+
// ContainerClient is reference-counted to support actor reconnection with inactivity timeouts.
25+
// When setInactivityTimeout() is called, a timer holds a reference to prevent premature
26+
// destruction. The ContainerClient can be shared across multiple actor lifetimes.
27+
class ContainerClient final: public rpc::Container::Server, public kj::Refcounted {
2328
public:
2429
ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
2530
kj::Timer& timer,
2631
kj::Network& network,
2732
kj::String dockerPath,
2833
kj::String containerName,
2934
kj::String imageName,
30-
kj::TaskSet& waitUntilTasks);
35+
kj::TaskSet& waitUntilTasks,
36+
kj::Function<void()> cleanupCallback);
3137

3238
~ContainerClient() noexcept(false);
3339

@@ -41,6 +47,8 @@ class ContainerClient final: public rpc::Container::Server {
4147
kj::Promise<void> listenTcp(ListenTcpContext context) override;
4248
kj::Promise<void> setInactivityTimeout(SetInactivityTimeoutContext context) override;
4349

50+
kj::Own<ContainerClient> addRef();
51+
4452
private:
4553
capnp::ByteStreamFactory& byteStreamFactory;
4654
kj::Timer& timer;
@@ -82,6 +90,9 @@ class ContainerClient final: public rpc::Container::Server {
8290
kj::Promise<void> stopContainer();
8391
kj::Promise<void> killContainer(uint32_t signal);
8492
kj::Promise<void> destroyContainer();
93+
94+
// Cleanup callback to remove from ActorNamespace map when destroyed
95+
kj::Function<void()> cleanupCallback;
8596
};
8697

8798
} // namespace workerd::server

src/workerd/server/server.c++

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2313,6 +2313,11 @@ class Server::WorkerService final: public Service,
23132313
auto reason = 0;
23142314
a->shutdown(reason);
23152315
}
2316+
2317+
// Drop the container client reference
2318+
// If setInactivityTimeout() was called, there's still a timer holding a reference
2319+
// If not, this may be the last reference and the ContainerClient destructor will run
2320+
containerClient = kj::none;
23162321
}
23172322

23182323
void active() override {
@@ -2441,6 +2446,7 @@ class Server::WorkerService final: public Service,
24412446
manager = kj::none;
24422447
tracker->shutdown();
24432448
actor = kj::none;
2449+
containerClient = kj::none;
24442450

24452451
KJ_IF_SOME(r, reason) {
24462452
brokenReason = kj::cp(r);
@@ -2512,6 +2518,9 @@ class Server::WorkerService final: public Service,
25122518
kj::Maybe<kj::Promise<void>> onBrokenTask;
25132519
kj::Maybe<kj::Exception> brokenReason;
25142520

2521+
// Reference to the ContainerClient (if container is enabled for this actor)
2522+
kj::Maybe<kj::Own<ContainerClient>> containerClient;
2523+
25152524
// If this is a `ForkedPromise<void>`, await the promise. When it has resolved, then
25162525
// `classAndId` will have been replaced with the resolved `ClassAndId` value.
25172526
kj::OneOf<ClassAndId, kj::ForkedPromise<void>> classAndId;
@@ -2683,6 +2692,11 @@ class Server::WorkerService final: public Service,
26832692
}
26842693
// Destroy the last strong Worker::Actor reference.
26852694
actor = kj::none;
2695+
2696+
// Drop our reference to the ContainerClient
2697+
// If setInactivityTimeout() was called, the timer still holds a reference
2698+
// so the container stays alive until the timeout expires
2699+
containerClient = kj::none;
26862700
}
26872701

26882702
void start(kj::Own<ActorClass>& actorClass, Worker::Actor::Id& id) {
@@ -2767,10 +2781,8 @@ class Server::WorkerService final: public Service,
27672781

27682782
auto loopback = kj::refcounted<Loopback>(*this);
27692783

2770-
kj::Maybe<rpc::Container::Client> containerClient = kj::none;
2784+
kj::Maybe<rpc::Container::Client> container = kj::none;
27712785
KJ_IF_SOME(config, containerOptions) {
2772-
auto& dockerPathRef = KJ_ASSERT_NONNULL(ns.dockerPath,
2773-
"dockerPath needs to be defined in order enable containers on this durable object.");
27742786
KJ_ASSERT(config.hasImageName(), "Image name is required");
27752787
auto imageName = config.getImageName();
27762788
kj::String containerId;
@@ -2782,15 +2794,14 @@ class Server::WorkerService final: public Service,
27822794
containerId = kj::str(existingId);
27832795
}
27842796
}
2785-
containerClient = kj::heap<ContainerClient>(ns.byteStreamFactory, timer, ns.dockerNetwork,
2786-
kj::str(dockerPathRef),
2787-
kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerId),
2788-
kj::str(imageName), ns.waitUntilTasks);
2797+
2798+
container = ns.getContainerClient(
2799+
kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerId), imageName);
27892800
}
27902801

27912802
auto actor = actorClass->newActor(getTracker(), Worker::Actor::cloneId(id),
27922803
kj::mv(makeActorCache), kj::mv(makeStorage), kj::mv(loopback), tryGetManagerRef(),
2793-
kj::mv(containerClient), *this);
2804+
kj::mv(container), *this);
27942805
onBrokenTask = monitorOnBroken(*actor);
27952806
this->actor = kj::mv(actor);
27962807
}
@@ -2828,6 +2839,31 @@ class Server::WorkerService final: public Service,
28282839
})->addRef();
28292840
}
28302841

2842+
kj::Own<ContainerClient> getContainerClient(
2843+
kj::StringPtr containerId, kj::StringPtr imageName) {
2844+
KJ_IF_SOME(existingClient, containerClients.find(containerId)) {
2845+
return existingClient->addRef();
2846+
}
2847+
2848+
// No existing container in the map, create a new one
2849+
auto& dockerPathRef = KJ_ASSERT_NONNULL(
2850+
dockerPath, "dockerPath must be defined to enable containers on this Durable Object.");
2851+
2852+
// Remove from the map when the container is destroyed
2853+
kj::Function<void()> cleanupCallback = [this, containerId = kj::str(containerId)]() {
2854+
containerClients.erase(containerId);
2855+
};
2856+
2857+
auto client = kj::refcounted<ContainerClient>(byteStreamFactory, timer, dockerNetwork,
2858+
kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), waitUntilTasks,
2859+
kj::mv(cleanupCallback));
2860+
2861+
// Store raw pointer in map (does not own)
2862+
containerClients.insert(kj::str(containerId), client.get());
2863+
2864+
return kj::mv(client);
2865+
}
2866+
28312867
void abortAll(kj::Maybe<const kj::Exception&> reason) {
28322868
for (auto& actor: actors) {
28332869
actor.value->abort(reason);
@@ -2856,6 +2892,12 @@ class Server::WorkerService final: public Service,
28562892
// inactivity, we keep the ActorContainer in the map but drop the Own<Worker::Actor>. When a new
28572893
// request comes in, we recreate the Own<Worker::Actor>.
28582894
ActorMap actors;
2895+
2896+
// Map of container IDs to ContainerClients (for reconnection support with inactivity timeouts).
2897+
// The map holds raw pointers (not ownership) - ContainerClients are owned by actors and timers.
2898+
// When the last reference is dropped, the destructor removes the entry from this map.
2899+
kj::HashMap<kj::String, ContainerClient*> containerClients;
2900+
28592901
kj::Maybe<kj::Promise<void>> cleanupTask;
28602902
kj::Timer& timer;
28612903
capnp::ByteStreamFactory& byteStreamFactory;

0 commit comments

Comments
 (0)