Skip to content

Commit d05081b

Browse files
committed
Add rpc_params_dup_stubs compat flag.
This flag changes the Worker RPC behavior to match Cap'n Web: When a stub is passed in the params of an RPC method, we should NOT transfer ownership of the stub. Instead, the stub is dup()ed. The comments explain in more detail why these semantics are superior (and thus why Cap'n Web took them). Additionally, in the case that the params contain an `RpcTarget`, if that target has a `dup()` method, we call it. This specifically fixes an interoperability bug with Cap'n Web: cloudflare/capnweb#110
1 parent d8f9149 commit d05081b

File tree

6 files changed

+370
-11
lines changed

6 files changed

+370
-11
lines changed

src/workerd/api/tests/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,12 @@ wd_test(
275275
data = ["js-rpc-test.js"],
276276
)
277277

278+
wd_test(
279+
src = "js-rpc-params-ownership-test.wd-test",
280+
args = ["--experimental"],
281+
data = ["js-rpc-params-ownership-test.js"],
282+
)
283+
278284
wd_test(
279285
src = "memory-cache-test.wd-test",
280286
args = ["--experimental"],
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import assert from 'node:assert';
2+
import { WorkerEntrypoint, RpcTarget, RpcStub } from 'cloudflare:workers';
3+
4+
class Counter extends RpcTarget {
5+
count = { value: 0 };
6+
dupCounts = { created: 0, disposed: 0 };
7+
disposeCount = 0;
8+
9+
increment(amount = 1) {
10+
this.count.value += amount;
11+
return this.count.value;
12+
}
13+
14+
[Symbol.dispose]() {
15+
++this.disposeCount;
16+
}
17+
}
18+
19+
class DupableCounter extends Counter {
20+
disposed = false;
21+
22+
dup() {
23+
let result = new DupableCounter();
24+
result.count = this.count;
25+
result.dupCounts = this.dupCounts;
26+
++this.dupCounts.created;
27+
return result;
28+
}
29+
30+
[Symbol.dispose]() {
31+
if (this.disposed) {
32+
throw new Error('duplicate disposal');
33+
}
34+
this.disposed = true;
35+
++this.dupCounts.disposed;
36+
++this.disposeCount;
37+
}
38+
}
39+
40+
export class TestService extends WorkerEntrypoint {
41+
async increment(stub, i) {
42+
await stub.increment(i);
43+
}
44+
45+
async roundTrip(stub) {
46+
return { stub: stub.dup() };
47+
}
48+
}
49+
50+
// Test that (with the rpc_params_dup_stubs compat flag) passing a stub in RPC params doesn't
51+
// transfer ownership of the stub.
52+
export let rpcParamsDontTransferOwnership = {
53+
async test(controller, env, ctx) {
54+
let counter = new Counter();
55+
56+
{
57+
using stub = new RpcStub(counter);
58+
59+
// Use the stub in params twice to prove that ownership isn't transferred away.
60+
await ctx.exports.TestService.increment(stub, 2);
61+
await ctx.exports.TestService.increment(stub, 3);
62+
63+
// Make extra-sure we can still call the stub.
64+
await stub.increment();
65+
66+
assert.strictEqual(counter.count.value, 6);
67+
68+
// RpcTarget disposer should not have been called at all.
69+
await scheduler.wait(0);
70+
assert.strictEqual(counter.disposeCount, 0);
71+
}
72+
73+
// Disposing a stub *asynchrconously* disposes the RpcTarget, so we have to spin the event
74+
// loop to observe the disposal.
75+
await scheduler.wait(0);
76+
assert.strictEqual(counter.disposeCount, 1);
77+
},
78+
};
79+
80+
// Test that placing a plain RpcTarget in RPC params DOES "take ownership", that is, the disposer
81+
// will be called.
82+
export let rpcParamsPlainTarget = {
83+
async test(controller, env, ctx) {
84+
let counter = new Counter();
85+
86+
await ctx.exports.TestService.increment(counter, 2);
87+
await ctx.exports.TestService.increment(counter, 3);
88+
89+
assert.strictEqual(counter.count.value, 5);
90+
91+
// Each RPC invocation will have called the disposer.
92+
await scheduler.wait(0);
93+
assert.strictEqual(counter.disposeCount, 2);
94+
},
95+
};
96+
97+
// Test that placing an RpcTarget with a dup() method in RPC params causes the dup() method to be
98+
// called, and then the duplicate is later disposed.
99+
export let rpcParamsDupTarget = {
100+
async test(controller, env, ctx) {
101+
let counter = new DupableCounter();
102+
103+
// If we directly pass `counter` to RPC params, it'll be dup()ed.
104+
await ctx.exports.TestService.increment(counter, 2);
105+
assert.strictEqual(counter.dupCounts.created, 1);
106+
await ctx.exports.TestService.increment(counter, 3);
107+
assert.strictEqual(counter.dupCounts.created, 2);
108+
109+
assert.strictEqual(counter.count.value, 5);
110+
111+
// Dups should have been disposed, but not original.
112+
await scheduler.wait(0);
113+
assert.strictEqual(counter.dupCounts.disposed, 2);
114+
assert.strictEqual(counter.disposed, false);
115+
},
116+
};
117+
118+
// Like rpcParamsDupTarget but the target is wrapped in a Proxy. (This takes a different code
119+
// path.)
120+
export let rpcParamsDupProxyTarget = {
121+
async test(controller, env, ctx) {
122+
let counter = new Proxy(new DupableCounter(), {});
123+
124+
// If we directly pass `counter` to RPC params, it'll be dup()ed.
125+
await ctx.exports.TestService.increment(counter, 2);
126+
assert.strictEqual(counter.dupCounts.created, 1);
127+
await ctx.exports.TestService.increment(counter, 3);
128+
assert.strictEqual(counter.dupCounts.created, 2);
129+
130+
assert.strictEqual(counter.count.value, 5);
131+
132+
// Dups should have been disposed, but not original.
133+
await scheduler.wait(0);
134+
assert.strictEqual(counter.dupCounts.disposed, 2);
135+
assert.strictEqual(counter.disposed, false);
136+
},
137+
};
138+
139+
// Like rpcParamsDupTarget but the target is a function.
140+
export let rpcParamsDupFunction = {
141+
async test(controller, env, ctx) {
142+
let count = 0;
143+
let dupCount = 0;
144+
let disposeCount = 0;
145+
146+
let increment = (i) => {
147+
return (count += i);
148+
};
149+
increment.dup = () => {
150+
++dupCount;
151+
return increment;
152+
};
153+
increment[Symbol.dispose] = function () {
154+
++disposeCount;
155+
};
156+
157+
let counter = { increment };
158+
159+
// If we directly pass `counter` to RPC params, it'll be dup()ed.
160+
await ctx.exports.TestService.increment(counter, 2);
161+
assert.strictEqual(dupCount, 1);
162+
await ctx.exports.TestService.increment(counter, 3);
163+
assert.strictEqual(dupCount, 2);
164+
165+
assert.strictEqual(count, 5);
166+
167+
await scheduler.wait(0);
168+
assert.strictEqual(disposeCount, 2);
169+
},
170+
};
171+
172+
// Test that returning a stub tansfers ownership of the stub, that is, the system later disposes
173+
// it.
174+
export let rpcReturnsTransferOwnership = {
175+
async test(controller, env, ctx) {
176+
let counter = new Counter();
177+
178+
{
179+
using stub = new RpcStub(counter);
180+
using stub2 = (await ctx.exports.TestService.roundTrip(stub)).stub;
181+
182+
await scheduler.wait(0);
183+
assert.strictEqual(counter.disposeCount, 0);
184+
}
185+
186+
await scheduler.wait(0);
187+
assert.strictEqual(counter.disposeCount, 1);
188+
},
189+
};
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "js-rpc-params-ownership-test",
6+
worker = (
7+
modules = [
8+
(name = "worker", esModule = embed "js-rpc-params-ownership-test.js")
9+
],
10+
compatibilityDate = "2025-12-01",
11+
compatibilityFlags = [
12+
"nodejs_compat",
13+
"rpc_params_dup_stubs",
14+
],
15+
)
16+
),
17+
],
18+
v8Flags = [ "--expose-gc" ],
19+
);

src/workerd/api/worker-rpc.c++

Lines changed: 122 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,12 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js,
492492
}
493493
auto arr = v8::Array::New(js.v8Isolate, argv.data(), argv.size());
494494

495-
auto externalHandler =
496-
RpcSerializerExternalHandler([&]() -> rpc::JsValue::StreamSink::Client {
495+
auto stubOwnership = FeatureFlags::get(js).getRpcParamsDupStubs()
496+
? RpcSerializerExternalHandler::DUPLICATE
497+
: RpcSerializerExternalHandler::TRANSFER;
498+
499+
RpcSerializerExternalHandler externalHandler(
500+
stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client {
497501
// A stream was encountered in the params, so we must expect the response to contain
498502
// paramsStreamSink. But we don't have the response yet. So, we need to set up a
499503
// temporary promise client, which we hook to the response a little bit later.
@@ -842,11 +846,13 @@ void JsRpcStub::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
842846
builder.setRpcTarget(kj::mv(cap));
843847
});
844848

845-
// Instead of disposing the stub immediately, we add a disposer to the serializer
846-
// that will be executed when the pipeline is finished. This ensures the stub
847-
// remains valid for the duration of any pipelined operations.
848-
externalHandler->addStubDisposer(
849-
kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); })));
849+
if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::TRANSFER) {
850+
// Instead of disposing the stub immediately, we add a disposer to the serializer
851+
// that will be executed when the pipeline is finished. This ensures the stub
852+
// remains valid for the duration of any pipelined operations.
853+
externalHandler->addStubDisposer(
854+
kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); })));
855+
}
850856
}
851857

852858
jsg::Ref<JsRpcStub> JsRpcStub::deserialize(
@@ -1592,7 +1598,8 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js,
15921598
auto hasDispose = maybeDispose != kj::none;
15931599

15941600
// Now that we've extracted our dispose function, we can serialize our value.
1595-
auto externalHandler = RpcSerializerExternalHandler(kj::mv(getStreamSinkFunc));
1601+
RpcSerializerExternalHandler externalHandler(
1602+
RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc));
15961603
serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder));
15971604

15981605
auto stubDisposers = externalHandler.releaseStubDisposers();
@@ -1721,6 +1728,62 @@ void JsRpcTarget::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
17211728
// Handle can't possibly be missing during serialization, it's how we got here.
17221729
auto handle = jsg::JsObject(KJ_ASSERT_NONNULL(JSG_THIS.tryGetHandle(js)));
17231730

1731+
if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::DUPLICATE) {
1732+
// This message isn't supposed to take ownership of stubs. What does that mean for an
1733+
// RpcTarget? You might argue that it means we should never call the disposer. But that's not
1734+
// really enough: what if the real owner *does* call the disposer, before our stub is done
1735+
// with it? How do we make sure the RpcTarget stays alive?
1736+
//
1737+
// Things get clearer if we look at a real use case: pure-JS Cap'n Web stubs. We don't see
1738+
// them as stubs (since they are not instances of JsRpcStub). Instead, we see them as
1739+
// RpcTargets. But we need the semantics to come out the same: when passed as a parameter
1740+
// to a native RPC call, we need to duplicate the stub, because the original copy might very
1741+
// well be disposed before we use it.
1742+
//
1743+
// How do we duplicate this non-native stub? Well... proper way to duplicate a pure-JS Cap'n
1744+
// Web stub is, of course, to call its `dup()` method.
1745+
//
1746+
// So how about we just do that? If the target has a `dup()` method, we call it, and we take
1747+
// ownership of the result, instead of taking ownership of the original object.
1748+
auto dup = handle.get(js, "dup");
1749+
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
1750+
auto replacement = dupFunc.call(js, handle);
1751+
bool replaced = false;
1752+
1753+
// We got a duplicate. Is it still an RpcTarget?
1754+
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
1755+
if (replacementObj.isInstanceOf<JsRpcTarget>(js)) {
1756+
// It is! Let's replace our handle with the duplicate!
1757+
handle = replacementObj;
1758+
replaced = true;
1759+
}
1760+
}
1761+
1762+
JSG_REQUIRE(replaced, DOMDataCloneError,
1763+
"Couldn't create a stub for the RcpTarget because it has a dup() method which did not "
1764+
"return another RpcTarget. Either remove the dup() method or make sure it returns an "
1765+
"RpcTarget.");
1766+
} else {
1767+
// If no dup() method was present, then what?
1768+
//
1769+
// The pedantic argument would say: we need to throw an exception. But that would lead to a
1770+
// pretty poor development experience as people would have to fiddle with adding dup()
1771+
// methods to all their RpcTargets.
1772+
//
1773+
// Another argument might say: we should just use the RpcTarget but never call the disposer
1774+
// since we don't own it. But that would probably be confusing. People would wonder why their
1775+
// disposers are never called.
1776+
//
1777+
// If someone passes an RpcTarget with no dup() method, but which does have a disposer, as
1778+
// the argument to an RPC method, *probably* they just want the disposer to be called when
1779+
// the callee is done with the object. That is, they want us to take ownership after all. If
1780+
// that is *not* what they want, then they can always implement a dup() method to make it
1781+
// clear.
1782+
//
1783+
// So, we will just "take ownership" of the target after all, and call its disposer.
1784+
}
1785+
}
1786+
17241787
rpc::JsRpcTarget::Client cap = kj::heap<TransientJsRpcTarget>(js, IoContext::current(), handle);
17251788

17261789
externalHandler->write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable {
@@ -1732,8 +1795,33 @@ void RpcSerializerExternalHandler::serializeFunction(
17321795
jsg::Lock& js, jsg::Serializer& serializer, v8::Local<v8::Function> func) {
17331796
serializer.writeRawUint32(static_cast<uint>(rpc::SerializationTag::JS_RPC_STUB));
17341797

1798+
auto handle = jsg::JsObject(func);
1799+
1800+
// Similar to JsRpcTarget::serialize(), we may need to dup() the function.
1801+
if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) {
1802+
auto dup = handle.get(js, "dup");
1803+
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
1804+
auto replacement = dupFunc.call(js, handle);
1805+
bool replaced = false;
1806+
1807+
// We got a duplicate. Is it still a Function?
1808+
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
1809+
if (isFunctionForRpc(js, replacementObj)) {
1810+
// It is! Let's replace our handle with the duplicate!
1811+
handle = replacementObj;
1812+
replaced = true;
1813+
}
1814+
}
1815+
1816+
JSG_REQUIRE(replaced, DOMDataCloneError,
1817+
"Couldn't create a stub for the function because it has a dup() method which did not "
1818+
"return another function. Either remove the dup() method or make sure it returns a "
1819+
"function.");
1820+
}
1821+
}
1822+
17351823
rpc::JsRpcTarget::Client cap =
1736-
kj::heap<TransientJsRpcTarget>(js, IoContext::current(), jsg::JsObject(func), true);
1824+
kj::heap<TransientJsRpcTarget>(js, IoContext::current(), handle, true);
17371825
write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable {
17381826
builder.setRpcTarget(kj::mv(cap));
17391827
});
@@ -1758,6 +1846,31 @@ void RpcSerializerExternalHandler::serializeProxy(
17581846
"Proxy must emulate either a plain object or an RpcTarget, as indicated by the "
17591847
"Proxy's prototype chain.");
17601848

1849+
// Similar to JsRpcTarget::serialize(), we may need to dup() the proxy.
1850+
if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) {
1851+
auto dup = handle.get(js, "dup");
1852+
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
1853+
auto replacement = dupFunc.call(js, handle);
1854+
bool replaced = false;
1855+
1856+
// We got a duplicate. Is it still the same type?
1857+
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
1858+
KJ_IF_SOME(stubType, checkStubType(js, replacementObj)) {
1859+
if (stubType == allowInstanceProperties) {
1860+
// It is! Let's replace our handle with the duplicate!
1861+
handle = replacementObj;
1862+
replaced = true;
1863+
}
1864+
}
1865+
}
1866+
1867+
JSG_REQUIRE(replaced, DOMDataCloneError,
1868+
"Couldn't create a stub for the Proxy because it has a dup() method which did not "
1869+
"return the same underlying type (RpcTarget or Function) as the Proxy itself represents. "
1870+
"Either remove the dup() method or make sure it returns an RpcTarget.");
1871+
}
1872+
}
1873+
17611874
// Great, we've concluded we can indeed point a stub at this proxy.
17621875
serializer.writeRawUint32(static_cast<uint>(rpc::SerializationTag::JS_RPC_STUB));
17631876

0 commit comments

Comments
 (0)