Skip to content

Commit bc05d00

Browse files
committed
Implement setInactivityTimeout in ContainerClient
setInactivityTimeout allows a container to outlive its associated actor. The current implementation tightly couples the lifetime of the container client to the lifetime of the actor, so that the ContainerClient is always destroyed whenever the actor is destroyed. To fix this we make ContainerClient reference counted and provide a reference to the actor. Whenever setInactivityTimeout is called we add a reference to the client. If the actor shuts down, the remaining reference keeps the container alive until the timeout expires. If setInactivityTimeout was never called then there is no remaining reference, and the container shuts down when the actor does. We also implement the ability for an actor to reconnect to an already running container when it restarts. We do this by maintaining a map of containerId->ContainerClient in the ActorNamespace. This map does not hold a reference to the ContainerClient: only actors and the inactivity timeout promise hold references to ensure we do not leak resources by holding a reference indefinitely. Whenever an actor starts it consults the map to see if a ContainerClient already exists. If it doesn't we create one and add it to the map. In order to clear entries from the map when a ContainerClient is destroyed we provide a callback function (`cleanupCallback`) to the ContainerClient which is run when ContainerClient is destroyed.
1 parent 276ef19 commit bc05d00

File tree

3 files changed

+111
-17
lines changed

3 files changed

+111
-17
lines changed

src/workerd/server/container-client.c++

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,16 +107,22 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
107107
kj::String dockerPath,
108108
kj::String containerName,
109109
kj::String imageName,
110-
kj::TaskSet& waitUntilTasks)
110+
kj::TaskSet& waitUntilTasks,
111+
kj::Function<void()> cleanupCallback)
111112
: byteStreamFactory(byteStreamFactory),
112113
timer(timer),
113114
network(network),
114115
dockerPath(kj::mv(dockerPath)),
115-
containerName(kj::encodeUriComponent(kj::mv(containerName))),
116+
containerName(kj::encodeUriComponent(containerName)),
116117
imageName(kj::mv(imageName)),
117-
waitUntilTasks(waitUntilTasks) {}
118+
waitUntilTasks(waitUntilTasks),
119+
cleanupCallback(kj::mv(cleanupCallback)) {}
118120

119121
ContainerClient::~ContainerClient() noexcept(false) {
122+
// Call the cleanup callback to remove this client from the ActorNamespace map
123+
cleanupCallback();
124+
125+
// Destroy the Docker container
120126
waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE,
121127
kj::str("/containers/", containerName, "?force=true"))
122128
.ignoreResult());
@@ -427,7 +433,19 @@ kj::Promise<void> ContainerClient::signal(SignalContext context) {
427433
}
428434

429435
kj::Promise<void> ContainerClient::setInactivityTimeout(SetInactivityTimeoutContext context) {
430-
// empty implementation on purpose
436+
auto params = context.getParams();
437+
auto durationMs = params.getDurationMs();
438+
439+
JSG_REQUIRE(
440+
durationMs > 0, Error, "setInactivityTimeout() requires durationMs > 0, got ", durationMs);
441+
442+
auto timeout = durationMs * kj::MILLISECONDS;
443+
444+
// Add a timer task that holds a reference to this ContainerClient.
445+
waitUntilTasks.add(timer.afterDelay(timeout).then([self = kj::addRef(*this)]() {
446+
// This callback does nothing but drop the reference
447+
}));
448+
431449
co_return;
432450
}
433451

@@ -444,4 +462,8 @@ kj::Promise<void> ContainerClient::listenTcp(ListenTcpContext context) {
444462
KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead");
445463
}
446464

465+
kj::Own<ContainerClient> ContainerClient::addRef() {
466+
return kj::addRef(*this);
467+
}
468+
447469
} // namespace workerd::server

src/workerd/server/container-client.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <kj/async.h>
1212
#include <kj/compat/http.h>
1313
#include <kj/map.h>
14+
#include <kj/refcount.h>
1415
#include <kj/string.h>
1516

1617
namespace workerd::server {
@@ -19,15 +20,20 @@ namespace workerd::server {
1920
// so it can be used as a rpc::Container::Client via kj::heap<ContainerClient>().
2021
// This allows the Container JSG class to use Docker directly without knowing
2122
// it's talking to Docker instead of a real RPC service.
22-
class ContainerClient final: public rpc::Container::Server {
23+
//
24+
// ContainerClient is reference-counted to support actor reconnection with inactivity timeouts.
25+
// When setInactivityTimeout() is called, a timer holds a reference to prevent premature
26+
// destruction. The ContainerClient can be shared across multiple actor lifetimes.
27+
class ContainerClient final: public rpc::Container::Server, public kj::Refcounted {
2328
public:
2429
ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
2530
kj::Timer& timer,
2631
kj::Network& network,
2732
kj::String dockerPath,
2833
kj::String containerName,
2934
kj::String imageName,
30-
kj::TaskSet& waitUntilTasks);
35+
kj::TaskSet& waitUntilTasks,
36+
kj::Function<void()> cleanupCallback);
3137

3238
~ContainerClient() noexcept(false);
3339

@@ -41,6 +47,9 @@ class ContainerClient final: public rpc::Container::Server {
4147
kj::Promise<void> listenTcp(ListenTcpContext context) override;
4248
kj::Promise<void> setInactivityTimeout(SetInactivityTimeoutContext context) override;
4349

50+
// Reference counting support
51+
kj::Own<ContainerClient> addRef();
52+
4453
private:
4554
capnp::ByteStreamFactory& byteStreamFactory;
4655
kj::Timer& timer;
@@ -82,6 +91,9 @@ class ContainerClient final: public rpc::Container::Server {
8291
kj::Promise<void> stopContainer();
8392
kj::Promise<void> killContainer(uint32_t signal);
8493
kj::Promise<void> destroyContainer();
94+
95+
// Cleanup callback to remove from ActorNamespace map when destroyed
96+
kj::Function<void()> cleanupCallback;
8597
};
8698

8799
} // namespace workerd::server

src/workerd/server/server.c++

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2347,6 +2347,11 @@ class Server::WorkerService final: public Service,
23472347
auto reason = 0;
23482348
a->shutdown(reason);
23492349
}
2350+
2351+
// Drop the container client reference
2352+
// If setInactivityTimeout() was called, there's still a timer holding a reference
2353+
// If not, this may be the last reference and the ContainerClient destructor will run
2354+
containerClient = kj::none;
23502355
}
23512356

23522357
void active() override {
@@ -2475,6 +2480,7 @@ class Server::WorkerService final: public Service,
24752480
manager = kj::none;
24762481
tracker->shutdown();
24772482
actor = kj::none;
2483+
containerClient = kj::none;
24782484

24792485
KJ_IF_SOME(r, reason) {
24802486
brokenReason = kj::cp(r);
@@ -2546,6 +2552,9 @@ class Server::WorkerService final: public Service,
25462552
kj::Maybe<kj::Promise<void>> onBrokenTask;
25472553
kj::Maybe<kj::Exception> brokenReason;
25482554

2555+
// Reference to the ContainerClient (if container is enabled for this actor)
2556+
kj::Maybe<kj::Own<ContainerClient>> containerClient;
2557+
25492558
// If this is a `ForkedPromise<void>`, await the promise. When it has resolved, then
25502559
// `classAndId` will have been replaced with the resolved `ClassAndId` value.
25512560
kj::OneOf<ClassAndId, kj::ForkedPromise<void>> classAndId;
@@ -2717,6 +2726,11 @@ class Server::WorkerService final: public Service,
27172726
}
27182727
// Destroy the last strong Worker::Actor reference.
27192728
actor = kj::none;
2729+
2730+
// Drop our reference to the ContainerClient
2731+
// If setInactivityTimeout() was called, the timer still holds a reference
2732+
// so the container stays alive until the timeout expires
2733+
containerClient = kj::none;
27202734
}
27212735

27222736
void start(kj::Own<ActorClass>& actorClass, Worker::Actor::Id& id) {
@@ -2801,30 +2815,35 @@ class Server::WorkerService final: public Service,
28012815

28022816
auto loopback = kj::refcounted<Loopback>(*this);
28032817

2804-
kj::Maybe<rpc::Container::Client> containerClient = kj::none;
2818+
kj::Maybe<rpc::Container::Client> containerClientCapability = kj::none;
28052819
KJ_IF_SOME(config, containerOptions) {
2806-
auto& dockerPathRef = KJ_ASSERT_NONNULL(ns.dockerPath,
2807-
"dockerPath needs to be defined in order enable containers on this durable object.");
28082820
KJ_ASSERT(config.hasImageName(), "Image name is required");
28092821
auto imageName = config.getImageName();
2810-
kj::String containerId;
2822+
kj::String containerIdStr;
28112823
KJ_SWITCH_ONEOF(id) {
28122824
KJ_CASE_ONEOF(globalId, kj::Own<ActorIdFactory::ActorId>) {
2813-
containerId = globalId->toString();
2825+
containerIdStr = globalId->toString();
28142826
}
28152827
KJ_CASE_ONEOF(existingId, kj::String) {
2816-
containerId = kj::str(existingId);
2828+
containerIdStr = kj::str(existingId);
28172829
}
28182830
}
2819-
containerClient = kj::heap<ContainerClient>(ns.byteStreamFactory, timer, ns.dockerNetwork,
2820-
kj::str(dockerPathRef),
2821-
kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerId),
2822-
kj::str(imageName), ns.waitUntilTasks);
2831+
2832+
// Create the full container name
2833+
auto fullContainerId =
2834+
kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerIdStr);
2835+
2836+
// Get or create the ContainerClient (may reuse existing one from previous actor instance)
2837+
containerClient = ns.getOrCreateContainerClient(fullContainerId, imageName);
2838+
2839+
// Get a capability to pass to the Actor
2840+
// This creates an additional reference that the Actor's IoContext holds
2841+
containerClientCapability = KJ_ASSERT_NONNULL(containerClient)->addRef();
28232842
}
28242843

28252844
auto actor = actorClass->newActor(getTracker(), Worker::Actor::cloneId(id),
28262845
kj::mv(makeActorCache), kj::mv(makeStorage), kj::mv(loopback), tryGetManagerRef(),
2827-
kj::mv(containerClient), *this);
2846+
kj::mv(containerClientCapability), *this);
28282847
onBrokenTask = monitorOnBroken(*actor);
28292848
this->actor = kj::mv(actor);
28302849
}
@@ -2862,11 +2881,46 @@ class Server::WorkerService final: public Service,
28622881
})->addRef();
28632882
}
28642883

2884+
kj::Own<ContainerClient> getOrCreateContainerClient(
2885+
kj::StringPtr containerId, kj::StringPtr imageName) {
2886+
// Check if ContainerClient already exists in map
2887+
KJ_IF_SOME(existingClient, containerClients.find(containerId)) {
2888+
// Found existing client, return a new reference to it
2889+
return existingClient->addRef();
2890+
}
2891+
2892+
// Not found, create new ContainerClient
2893+
auto& dockerPathRef = KJ_ASSERT_NONNULL(
2894+
dockerPath, "dockerPath must be defined to enable containers on this Durable Object.");
2895+
2896+
// Create cleanup callback to remove from map when destroyed
2897+
kj::Function<void()> cleanupCallback = [this, containerId]() {
2898+
// Remove raw pointer from map when destructor runs
2899+
containerClients.erase(containerId);
2900+
};
2901+
2902+
// Create new ContainerClient
2903+
auto client = kj::refcounted<ContainerClient>(byteStreamFactory, timer, dockerNetwork,
2904+
kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), waitUntilTasks,
2905+
kj::mv(cleanupCallback));
2906+
2907+
// Store raw pointer in map (does not own)
2908+
containerClients.insert(kj::str(containerId), client.get());
2909+
2910+
// Return owned reference
2911+
return kj::mv(client);
2912+
}
2913+
28652914
void abortAll(kj::Maybe<const kj::Exception&> reason) {
28662915
for (auto& actor: actors) {
28672916
actor.value->abort(reason);
28682917
}
28692918
actors.clear();
2919+
2920+
// Clean up all container clients
2921+
// This drops the ActorNamespace's reference to each ContainerClient
2922+
// If any have active timers, they'll still be kept alive until timer expires
2923+
containerClients.clear();
28702924
}
28712925

28722926
private:
@@ -2890,6 +2944,12 @@ class Server::WorkerService final: public Service,
28902944
// inactivity, we keep the ActorContainer in the map but drop the Own<Worker::Actor>. When a new
28912945
// request comes in, we recreate the Own<Worker::Actor>.
28922946
ActorMap actors;
2947+
2948+
// Map of container IDs to ContainerClients (for reconnection support with inactivity timeouts).
2949+
// The map holds raw pointers (not ownership) - ContainerClients are owned by actors and timers.
2950+
// When the last reference is dropped, the destructor removes the entry from this map.
2951+
kj::HashMap<kj::StringPtr, ContainerClient*> containerClients;
2952+
28932953
kj::Maybe<kj::Promise<void>> cleanupTask;
28942954
kj::Timer& timer;
28952955
capnp::ByteStreamFactory& byteStreamFactory;

0 commit comments

Comments
 (0)