diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 19073c44795..52c5e24fa3d 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -107,16 +107,22 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, kj::String dockerPath, kj::String containerName, kj::String imageName, - kj::TaskSet& waitUntilTasks) + kj::TaskSet& waitUntilTasks, + kj::Function cleanupCallback) : byteStreamFactory(byteStreamFactory), timer(timer), network(network), dockerPath(kj::mv(dockerPath)), containerName(kj::encodeUriComponent(kj::mv(containerName))), imageName(kj::mv(imageName)), - waitUntilTasks(waitUntilTasks) {} + waitUntilTasks(waitUntilTasks), + cleanupCallback(kj::mv(cleanupCallback)) {} ContainerClient::~ContainerClient() noexcept(false) { + // Call the cleanup callback to remove this client from the ActorNamespace map + cleanupCallback(); + + // Destroy the Docker container waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE, kj::str("/containers/", containerName, "?force=true")) .ignoreResult()); @@ -250,7 +256,11 @@ kj::Promise ContainerClient::inspectContainer( auto state = jsonRoot.getState(); JSG_REQUIRE(state.hasStatus(), Error, "Malformed ContainerInspect response"); auto status = state.getStatus(); - bool running = status == "running"; + // Treat both "running" and "restarting" as running. The "restarting" state occurs when + // Docker is automatically restarting a container (due to restart policy). From the user's + // perspective, a restarting container is still "alive" and should be treated as running + // so that start() correctly refuses to start a duplicate and destroy() can clean it up. + bool running = status == "running" || status == "restarting"; co_return InspectResponse{.isRunning = running, .ports = kj::mv(portMappings)}; } @@ -427,7 +437,19 @@ kj::Promise ContainerClient::signal(SignalContext context) { } kj::Promise ContainerClient::setInactivityTimeout(SetInactivityTimeoutContext context) { - // empty implementation on purpose + auto params = context.getParams(); + auto durationMs = params.getDurationMs(); + + JSG_REQUIRE( + durationMs > 0, Error, "setInactivityTimeout() requires durationMs > 0, got ", durationMs); + + auto timeout = durationMs * kj::MILLISECONDS; + + // Add a timer task that holds a reference to this ContainerClient. + waitUntilTasks.add(timer.afterDelay(timeout).then([self = kj::addRef(*this)]() { + // This callback does nothing but drop the reference + })); + co_return; } @@ -444,4 +466,8 @@ kj::Promise ContainerClient::listenTcp(ListenTcpContext context) { KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead"); } +kj::Own ContainerClient::addRef() { + return kj::addRef(*this); +} + } // namespace workerd::server diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index f4e1839fba2..456c8911eb5 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -11,6 +11,7 @@ #include #include #include +#include #include namespace workerd::server { @@ -19,7 +20,11 @@ namespace workerd::server { // so it can be used as a rpc::Container::Client via kj::heap(). // This allows the Container JSG class to use Docker directly without knowing // it's talking to Docker instead of a real RPC service. -class ContainerClient final: public rpc::Container::Server { +// +// ContainerClient is reference-counted to support actor reconnection with inactivity timeouts. +// When setInactivityTimeout() is called, a timer holds a reference to prevent premature +// destruction. The ContainerClient can be shared across multiple actor lifetimes. +class ContainerClient final: public rpc::Container::Server, public kj::Refcounted { public: ContainerClient(capnp::ByteStreamFactory& byteStreamFactory, kj::Timer& timer, @@ -27,7 +32,8 @@ class ContainerClient final: public rpc::Container::Server { kj::String dockerPath, kj::String containerName, kj::String imageName, - kj::TaskSet& waitUntilTasks); + kj::TaskSet& waitUntilTasks, + kj::Function cleanupCallback); ~ContainerClient() noexcept(false); @@ -41,6 +47,8 @@ class ContainerClient final: public rpc::Container::Server { kj::Promise listenTcp(ListenTcpContext context) override; kj::Promise setInactivityTimeout(SetInactivityTimeoutContext context) override; + kj::Own addRef(); + private: capnp::ByteStreamFactory& byteStreamFactory; kj::Timer& timer; @@ -82,6 +90,9 @@ class ContainerClient final: public rpc::Container::Server { kj::Promise stopContainer(); kj::Promise killContainer(uint32_t signal); kj::Promise destroyContainer(); + + // Cleanup callback to remove from ActorNamespace map when destroyed + kj::Function cleanupCallback; }; } // namespace workerd::server diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 14108fa7faa..2ff0cb33639 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2313,6 +2313,11 @@ class Server::WorkerService final: public Service, auto reason = 0; a->shutdown(reason); } + + // Drop the container client reference + // If setInactivityTimeout() was called, there's still a timer holding a reference + // If not, this may be the last reference and the ContainerClient destructor will run + containerClient = kj::none; } void active() override { @@ -2441,6 +2446,7 @@ class Server::WorkerService final: public Service, manager = kj::none; tracker->shutdown(); actor = kj::none; + containerClient = kj::none; KJ_IF_SOME(r, reason) { brokenReason = kj::cp(r); @@ -2512,6 +2518,9 @@ class Server::WorkerService final: public Service, kj::Maybe> onBrokenTask; kj::Maybe brokenReason; + // Reference to the ContainerClient (if container is enabled for this actor) + kj::Maybe> containerClient; + // If this is a `ForkedPromise`, await the promise. When it has resolved, then // `classAndId` will have been replaced with the resolved `ClassAndId` value. kj::OneOf> classAndId; @@ -2683,6 +2692,11 @@ class Server::WorkerService final: public Service, } // Destroy the last strong Worker::Actor reference. actor = kj::none; + + // Drop our reference to the ContainerClient + // If setInactivityTimeout() was called, the timer still holds a reference + // so the container stays alive until the timeout expires + containerClient = kj::none; } void start(kj::Own& actorClass, Worker::Actor::Id& id) { @@ -2767,10 +2781,8 @@ class Server::WorkerService final: public Service, auto loopback = kj::refcounted(*this); - kj::Maybe containerClient = kj::none; + kj::Maybe container = kj::none; KJ_IF_SOME(config, containerOptions) { - auto& dockerPathRef = KJ_ASSERT_NONNULL(ns.dockerPath, - "dockerPath needs to be defined in order enable containers on this durable object."); KJ_ASSERT(config.hasImageName(), "Image name is required"); auto imageName = config.getImageName(); kj::String containerId; @@ -2782,15 +2794,14 @@ class Server::WorkerService final: public Service, containerId = kj::str(existingId); } } - containerClient = kj::heap(ns.byteStreamFactory, timer, ns.dockerNetwork, - kj::str(dockerPathRef), - kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerId), - kj::str(imageName), ns.waitUntilTasks); + + container = ns.getContainerClient( + kj::str("workerd-", KJ_ASSERT_NONNULL(uniqueKey), "-", containerId), imageName); } auto actor = actorClass->newActor(getTracker(), Worker::Actor::cloneId(id), kj::mv(makeActorCache), kj::mv(makeStorage), kj::mv(loopback), tryGetManagerRef(), - kj::mv(containerClient), *this); + kj::mv(container), *this); onBrokenTask = monitorOnBroken(*actor); this->actor = kj::mv(actor); } @@ -2828,6 +2839,31 @@ class Server::WorkerService final: public Service, })->addRef(); } + kj::Own getContainerClient( + kj::StringPtr containerId, kj::StringPtr imageName) { + KJ_IF_SOME(existingClient, containerClients.find(containerId)) { + return existingClient->addRef(); + } + + // No existing container in the map, create a new one + auto& dockerPathRef = KJ_ASSERT_NONNULL( + dockerPath, "dockerPath must be defined to enable containers on this Durable Object."); + + // Remove from the map when the container is destroyed + kj::Function cleanupCallback = [this, containerId = kj::str(containerId)]() { + containerClients.erase(containerId); + }; + + auto client = kj::refcounted(byteStreamFactory, timer, dockerNetwork, + kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), waitUntilTasks, + kj::mv(cleanupCallback)); + + // Store raw pointer in map (does not own) + containerClients.insert(kj::str(containerId), client.get()); + + return kj::mv(client); + } + void abortAll(kj::Maybe reason) { for (auto& actor: actors) { actor.value->abort(reason); @@ -2856,6 +2892,12 @@ class Server::WorkerService final: public Service, // inactivity, we keep the ActorContainer in the map but drop the Own. When a new // request comes in, we recreate the Own. ActorMap actors; + + // Map of container IDs to ContainerClients (for reconnection support with inactivity timeouts). + // The map holds raw pointers (not ownership) - ContainerClients are owned by actors and timers. + // When the last reference is dropped, the destructor removes the entry from this map. + kj::HashMap containerClients; + kj::Maybe> cleanupTask; kj::Timer& timer; capnp::ByteStreamFactory& byteStreamFactory; diff --git a/src/workerd/server/tests/container-client/test.js b/src/workerd/server/tests/container-client/test.js index 2a72953dc33..d1659d0f3a1 100644 --- a/src/workerd/server/tests/container-client/test.js +++ b/src/workerd/server/tests/container-client/test.js @@ -12,18 +12,37 @@ export class DurableObjectExample extends DurableObject { } assert.strictEqual(container.running, false); - // Start container with valid configuration - await container.start({ - entrypoint: ['node', 'nonexistant.js'], - }); + // Start container with invalid entrypoint + { + container.start({ + entrypoint: ['node', 'nonexistant.js'], + }); - let exitCode = undefined; - await container.monitor().catch((err) => { - exitCode = err.exitCode; - }); + let exitCode = undefined; + await container.monitor().catch((err) => { + exitCode = err.exitCode; + }); + + assert.strictEqual(typeof exitCode, 'number'); + assert.notEqual(0, exitCode); + } + + // Start container with valid entrypoint and stop it + { + container.start(); + + await scheduler.wait(500); - assert.strictEqual(typeof exitCode, 'number'); - assert.notEqual(0, exitCode); + let exitCode = undefined; + const monitor = container.monitor().catch((err) => { + exitCode = err.exitCode; + }); + await container.destroy(); + await monitor; + + assert.strictEqual(typeof exitCode, 'number'); + assert.equal(137, exitCode); + } } async testBasics() { @@ -36,7 +55,7 @@ export class DurableObjectExample extends DurableObject { assert.strictEqual(container.running, false); // Start container with valid configuration - await container.start({ + container.start({ env: { A: 'B', C: 'D', L: 'F' }, enableInternet: true, }); @@ -68,7 +87,7 @@ export class DurableObjectExample extends DurableObject { ); throw e; } - await scheduler.wait(1000); + await scheduler.wait(500); } } @@ -81,28 +100,50 @@ export class DurableObjectExample extends DurableObject { assert.strictEqual(container.running, false); } - async leaveRunning() { - // Start container and leave it running + async testSetInactivityTimeout(timeout) { const container = this.ctx.container; - if (!container.running) { - await container.start({ - entrypoint: ['leave-running'], - }); + if (container.running) { + let monitor = container.monitor().catch((_err) => {}); + await container.destroy(); + await monitor; } + assert.strictEqual(container.running, false); + + container.start(); assert.strictEqual(container.running, true); - } - async checkRunning() { - // Check container was started using leaveRunning() - const container = this.ctx.container; + // Wait for container to be running + await scheduler.wait(500); - // Let's guard in case the test assumptions are wrong. - if (!container.running) { - return; + try { + await container.setInactivityTimeout(0); + } catch (err) { + assert.strictEqual(err.name, 'TypeError'); + assert.match( + err.message, + /setInactivityTimeout\(\) cannot be called with a durationMs <= 0/ + ); } - await container.destroy(); + if (timeout > 0) { + await container.setInactivityTimeout(timeout); + } + } + + async start() { + assert.strictEqual(this.ctx.container.running, false); + this.ctx.container.start(); + assert.strictEqual(this.ctx.container.running, true); + + // Wait for container to be running + await scheduler.wait(500); + } + + // Assert that the container is running + async expectRunning(running) { + assert.strictEqual(this.ctx.container.running, running); + await this.ctx.container.destroy(); } async abort() { @@ -130,7 +171,7 @@ export class DurableObjectExample extends DurableObject { async startAlarm(start, ms) { if (start && !this.ctx.container.running) { - await this.ctx.container.start(); + this.ctx.container.start(); } await this.ctx.storage.setAlarm(Date.now() + ms); } @@ -148,7 +189,7 @@ export class DurableObjectExample extends DurableObject { const { container } = this.ctx; if (!container.running) { - await container.start({ + container.start({ env: { WS_ENABLED: 'true' }, enableInternet: true, }); @@ -256,25 +297,6 @@ export const testExitCode = { }, }; -// Test container persistence across durable object instances -export const testAlreadyRunning = { - async test(_ctrl, env) { - const id = env.MY_CONTAINER.idFromName('testAlreadyRunning'); - let stub = env.MY_CONTAINER.get(id); - - await stub.leaveRunning(); - - await assert.rejects(() => stub.abort(), { - name: 'Error', - message: 'Application called abort() to reset Durable Object.', - }); - - // Recreate stub to get a new instance - stub = env.MY_CONTAINER.get(id); - await stub.checkRunning(); - }, -}; - // Test WebSocket functionality export const testWebSockets = { async test(_ctrl, env) { @@ -284,7 +306,7 @@ export const testWebSockets = { }, }; -// // Test alarm functionality with containers +// Test alarm functionality with containers export const testAlarm = { async test(_ctrl, env) { // Test that we can recover the use_containers flag correctly in setAlarm @@ -302,6 +324,9 @@ export const testAlarm = { retries++; } + // Wait for container to start + await scheduler.wait(500); + // Set alarm for future and abort await stub.startAlarm(false, 1000); @@ -318,3 +343,54 @@ export const testAlarm = { await stub.checkAlarmAbortConfirmation(); }, }; + +export const testContainerShutdown = { + async test(_, env) { + { + const stub = env.MY_CONTAINER.getByName('testContainerShutdown'); + await stub.start(); + await assert.rejects(() => stub.abort(), { + name: 'Error', + message: 'Application called abort() to reset Durable Object.', + }); + } + + // Wait for the container to be shutdown after the DO aborts + await scheduler.wait(500); + + { + const stub = env.MY_CONTAINER.getByName('testContainerShutdown'); + + // Container should not be running after DO exited + await stub.expectRunning(false); + } + }, +}; + +export const testSetInactivityTimeout = { + async test(_ctrl, env) { + { + const stub = env.MY_CONTAINER.getByName('testSetInactivityTimeout'); + + await stub.testSetInactivityTimeout(3000); + + await assert.rejects(() => stub.abort(), { + name: 'Error', + message: 'Application called abort() to reset Durable Object.', + }); + } + + // Here we wait to ensure that if setInactivityTimeout *doesn't* work, the + // container has enough time to shutdown after the DO is aborted. If we + // don't wait then ctx.container.running will always be true, even without + // setInactivityTimeout, because the container won't have stoped yet. + await scheduler.wait(500); + + { + const stub = env.MY_CONTAINER.getByName('testSetInactivityTimeout'); + + // Container should still be running after DO exited + await stub.expectRunning(true); + } + }, +};