Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions src/workerd/server/container-client.c++
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,22 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
kj::String dockerPath,
kj::String containerName,
kj::String imageName,
kj::TaskSet& waitUntilTasks)
kj::TaskSet& waitUntilTasks,
kj::Function<void()> cleanupCallback)
: byteStreamFactory(byteStreamFactory),
timer(timer),
network(network),
dockerPath(kj::mv(dockerPath)),
containerName(kj::encodeUriComponent(kj::mv(containerName))),
imageName(kj::mv(imageName)),
waitUntilTasks(waitUntilTasks) {}
waitUntilTasks(waitUntilTasks),
cleanupCallback(kj::mv(cleanupCallback)) {}

ContainerClient::~ContainerClient() noexcept(false) {
// Call the cleanup callback to remove this client from the ActorNamespace map
cleanupCallback();

// Destroy the Docker container
waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE,
kj::str("/containers/", containerName, "?force=true"))
.ignoreResult());
Expand Down Expand Up @@ -250,7 +256,11 @@ kj::Promise<ContainerClient::InspectResponse> ContainerClient::inspectContainer(
auto state = jsonRoot.getState();
JSG_REQUIRE(state.hasStatus(), Error, "Malformed ContainerInspect response");
auto status = state.getStatus();
bool running = status == "running";
// Treat both "running" and "restarting" as running. The "restarting" state occurs when
// Docker is automatically restarting a container (due to restart policy). From the user's
// perspective, a restarting container is still "alive" and should be treated as running
// so that start() correctly refuses to start a duplicate and destroy() can clean it up.
bool running = status == "running" || status == "restarting";
co_return InspectResponse{.isRunning = running, .ports = kj::mv(portMappings)};
}

Expand Down Expand Up @@ -427,7 +437,19 @@ kj::Promise<void> ContainerClient::signal(SignalContext context) {
}

kj::Promise<void> ContainerClient::setInactivityTimeout(SetInactivityTimeoutContext context) {
// empty implementation on purpose
auto params = context.getParams();
auto durationMs = params.getDurationMs();

JSG_REQUIRE(
durationMs > 0, Error, "setInactivityTimeout() requires durationMs > 0, got ", durationMs);

auto timeout = durationMs * kj::MILLISECONDS;

// Add a timer task that holds a reference to this ContainerClient.
waitUntilTasks.add(timer.afterDelay(timeout).then([self = kj::addRef(*this)]() {
// This callback does nothing but drop the reference
}));

co_return;
}

Expand All @@ -444,4 +466,8 @@ kj::Promise<void> ContainerClient::listenTcp(ListenTcpContext context) {
KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead");
}

kj::Own<ContainerClient> ContainerClient::addRef() {
return kj::addRef(*this);
}

} // namespace workerd::server
15 changes: 13 additions & 2 deletions src/workerd/server/container-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <kj/async.h>
#include <kj/compat/http.h>
#include <kj/map.h>
#include <kj/refcount.h>
#include <kj/string.h>

namespace workerd::server {
Expand All @@ -19,15 +20,20 @@ namespace workerd::server {
// so it can be used as a rpc::Container::Client via kj::heap<ContainerClient>().
// This allows the Container JSG class to use Docker directly without knowing
// it's talking to Docker instead of a real RPC service.
class ContainerClient final: public rpc::Container::Server {
//
// ContainerClient is reference-counted to support actor reconnection with inactivity timeouts.
// When setInactivityTimeout() is called, a timer holds a reference to prevent premature
// destruction. The ContainerClient can be shared across multiple actor lifetimes.
class ContainerClient final: public rpc::Container::Server, public kj::Refcounted {
public:
ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
kj::Timer& timer,
kj::Network& network,
kj::String dockerPath,
kj::String containerName,
kj::String imageName,
kj::TaskSet& waitUntilTasks);
kj::TaskSet& waitUntilTasks,
kj::Function<void()> cleanupCallback);

~ContainerClient() noexcept(false);

Expand All @@ -41,6 +47,8 @@ class ContainerClient final: public rpc::Container::Server {
kj::Promise<void> listenTcp(ListenTcpContext context) override;
kj::Promise<void> setInactivityTimeout(SetInactivityTimeoutContext context) override;

kj::Own<ContainerClient> addRef();

private:
capnp::ByteStreamFactory& byteStreamFactory;
kj::Timer& timer;
Expand Down Expand Up @@ -82,6 +90,9 @@ class ContainerClient final: public rpc::Container::Server {
kj::Promise<void> stopContainer();
kj::Promise<void> killContainer(uint32_t signal);
kj::Promise<void> destroyContainer();

// Cleanup callback to remove from ActorNamespace map when destroyed
kj::Function<void()> cleanupCallback;
};

} // namespace workerd::server
58 changes: 50 additions & 8 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2313,6 +2313,11 @@ class Server::WorkerService final: public Service,
auto reason = 0;
a->shutdown(reason);
}

// Drop the container client reference
// If setInactivityTimeout() was called, there's still a timer holding a reference
// If not, this may be the last reference and the ContainerClient destructor will run
containerClient = kj::none;
}

void active() override {
Expand Down Expand Up @@ -2441,6 +2446,7 @@ class Server::WorkerService final: public Service,
manager = kj::none;
tracker->shutdown();
actor = kj::none;
containerClient = kj::none;

KJ_IF_SOME(r, reason) {
brokenReason = kj::cp(r);
Expand Down Expand Up @@ -2512,6 +2518,9 @@ class Server::WorkerService final: public Service,
kj::Maybe<kj::Promise<void>> onBrokenTask;
kj::Maybe<kj::Exception> brokenReason;

// Reference to the ContainerClient (if container is enabled for this actor)
kj::Maybe<kj::Own<ContainerClient>> containerClient;

// If this is a `ForkedPromise<void>`, await the promise. When it has resolved, then
// `classAndId` will have been replaced with the resolved `ClassAndId` value.
kj::OneOf<ClassAndId, kj::ForkedPromise<void>> classAndId;
Expand Down Expand Up @@ -2683,6 +2692,11 @@ class Server::WorkerService final: public Service,
}
// Destroy the last strong Worker::Actor reference.
actor = kj::none;

// Drop our reference to the ContainerClient
// If setInactivityTimeout() was called, the timer still holds a reference
// so the container stays alive until the timeout expires
containerClient = kj::none;
}

void start(kj::Own<ActorClass>& actorClass, Worker::Actor::Id& id) {
Expand Down Expand Up @@ -2767,10 +2781,8 @@ class Server::WorkerService final: public Service,

auto loopback = kj::refcounted<Loopback>(*this);

kj::Maybe<rpc::Container::Client> containerClient = kj::none;
kj::Maybe<rpc::Container::Client> container = kj::none;
KJ_IF_SOME(config, containerOptions) {
auto& dockerPathRef = KJ_ASSERT_NONNULL(ns.dockerPath,
"dockerPath needs to be defined in order enable containers on this durable object.");
KJ_ASSERT(config.hasImageName(), "Image name is required");
auto imageName = config.getImageName();
kj::String containerId;
Expand All @@ -2782,15 +2794,14 @@ class Server::WorkerService final: public Service,
containerId = kj::str(existingId);
}
}
containerClient = kj::heap<ContainerClient>(ns.byteStreamFactory, timer, ns.dockerNetwork,
kj::str(dockerPathRef),
kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerId),
kj::str(imageName), ns.waitUntilTasks);

container = ns.getContainerClient(
kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerId), imageName);
}

auto actor = actorClass->newActor(getTracker(), Worker::Actor::cloneId(id),
kj::mv(makeActorCache), kj::mv(makeStorage), kj::mv(loopback), tryGetManagerRef(),
kj::mv(containerClient), *this);
kj::mv(container), *this);
onBrokenTask = monitorOnBroken(*actor);
this->actor = kj::mv(actor);
}
Expand Down Expand Up @@ -2828,6 +2839,31 @@ class Server::WorkerService final: public Service,
})->addRef();
}

kj::Own<ContainerClient> getContainerClient(
kj::StringPtr containerId, kj::StringPtr imageName) {
KJ_IF_SOME(existingClient, containerClients.find(containerId)) {
return existingClient->addRef();
}

// No existing container in the map, create a new one
auto& dockerPathRef = KJ_ASSERT_NONNULL(
dockerPath, "dockerPath must be defined to enable containers on this Durable Object.");

// Remove from the map when the container is destroyed
kj::Function<void()> cleanupCallback = [this, containerId = kj::str(containerId)]() {
containerClients.erase(containerId);
};

auto client = kj::refcounted<ContainerClient>(byteStreamFactory, timer, dockerNetwork,
kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), waitUntilTasks,
kj::mv(cleanupCallback));

// Store raw pointer in map (does not own)
containerClients.insert(kj::str(containerId), client.get());

return kj::mv(client);
}

void abortAll(kj::Maybe<const kj::Exception&> reason) {
for (auto& actor: actors) {
actor.value->abort(reason);
Expand Down Expand Up @@ -2856,6 +2892,12 @@ class Server::WorkerService final: public Service,
// inactivity, we keep the ActorContainer in the map but drop the Own<Worker::Actor>. When a new
// request comes in, we recreate the Own<Worker::Actor>.
ActorMap actors;

// Map of container IDs to ContainerClients (for reconnection support with inactivity timeouts).
// The map holds raw pointers (not ownership) - ContainerClients are owned by actors and timers.
// When the last reference is dropped, the destructor removes the entry from this map.
kj::HashMap<kj::String, ContainerClient*> containerClients;

kj::Maybe<kj::Promise<void>> cleanupTask;
kj::Timer& timer;
capnp::ByteStreamFactory& byteStreamFactory;
Expand Down
Loading