Skip to content

Commit 9aa33bf

Browse files
authored
Merge pull request #5733 from cloudflare/kenton/rpc_params_dup_stubs
Add rpc_params_dup_stubs compat flag.
2 parents ddf2243 + d05081b commit 9aa33bf

File tree

6 files changed

+370
-11
lines changed

6 files changed

+370
-11
lines changed

src/workerd/api/tests/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,12 @@ wd_test(
275275
data = ["js-rpc-test.js"],
276276
)
277277

278+
wd_test(
279+
src = "js-rpc-params-ownership-test.wd-test",
280+
args = ["--experimental"],
281+
data = ["js-rpc-params-ownership-test.js"],
282+
)
283+
278284
wd_test(
279285
src = "memory-cache-test.wd-test",
280286
args = ["--experimental"],
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import assert from 'node:assert';
2+
import { WorkerEntrypoint, RpcTarget, RpcStub } from 'cloudflare:workers';
3+
4+
class Counter extends RpcTarget {
5+
count = { value: 0 };
6+
dupCounts = { created: 0, disposed: 0 };
7+
disposeCount = 0;
8+
9+
increment(amount = 1) {
10+
this.count.value += amount;
11+
return this.count.value;
12+
}
13+
14+
[Symbol.dispose]() {
15+
++this.disposeCount;
16+
}
17+
}
18+
19+
class DupableCounter extends Counter {
20+
disposed = false;
21+
22+
dup() {
23+
let result = new DupableCounter();
24+
result.count = this.count;
25+
result.dupCounts = this.dupCounts;
26+
++this.dupCounts.created;
27+
return result;
28+
}
29+
30+
[Symbol.dispose]() {
31+
if (this.disposed) {
32+
throw new Error('duplicate disposal');
33+
}
34+
this.disposed = true;
35+
++this.dupCounts.disposed;
36+
++this.disposeCount;
37+
}
38+
}
39+
40+
export class TestService extends WorkerEntrypoint {
41+
async increment(stub, i) {
42+
await stub.increment(i);
43+
}
44+
45+
async roundTrip(stub) {
46+
return { stub: stub.dup() };
47+
}
48+
}
49+
50+
// Test that (with the rpc_params_dup_stubs compat flag) passing a stub in RPC params doesn't
51+
// transfer ownership of the stub.
52+
export let rpcParamsDontTransferOwnership = {
53+
async test(controller, env, ctx) {
54+
let counter = new Counter();
55+
56+
{
57+
using stub = new RpcStub(counter);
58+
59+
// Use the stub in params twice to prove that ownership isn't transferred away.
60+
await ctx.exports.TestService.increment(stub, 2);
61+
await ctx.exports.TestService.increment(stub, 3);
62+
63+
// Make extra-sure we can still call the stub.
64+
await stub.increment();
65+
66+
assert.strictEqual(counter.count.value, 6);
67+
68+
// RpcTarget disposer should not have been called at all.
69+
await scheduler.wait(0);
70+
assert.strictEqual(counter.disposeCount, 0);
71+
}
72+
73+
// Disposing a stub *asynchrconously* disposes the RpcTarget, so we have to spin the event
74+
// loop to observe the disposal.
75+
await scheduler.wait(0);
76+
assert.strictEqual(counter.disposeCount, 1);
77+
},
78+
};
79+
80+
// Test that placing a plain RpcTarget in RPC params DOES "take ownership", that is, the disposer
81+
// will be called.
82+
export let rpcParamsPlainTarget = {
83+
async test(controller, env, ctx) {
84+
let counter = new Counter();
85+
86+
await ctx.exports.TestService.increment(counter, 2);
87+
await ctx.exports.TestService.increment(counter, 3);
88+
89+
assert.strictEqual(counter.count.value, 5);
90+
91+
// Each RPC invocation will have called the disposer.
92+
await scheduler.wait(0);
93+
assert.strictEqual(counter.disposeCount, 2);
94+
},
95+
};
96+
97+
// Test that placing an RpcTarget with a dup() method in RPC params causes the dup() method to be
98+
// called, and then the duplicate is later disposed.
99+
export let rpcParamsDupTarget = {
100+
async test(controller, env, ctx) {
101+
let counter = new DupableCounter();
102+
103+
// If we directly pass `counter` to RPC params, it'll be dup()ed.
104+
await ctx.exports.TestService.increment(counter, 2);
105+
assert.strictEqual(counter.dupCounts.created, 1);
106+
await ctx.exports.TestService.increment(counter, 3);
107+
assert.strictEqual(counter.dupCounts.created, 2);
108+
109+
assert.strictEqual(counter.count.value, 5);
110+
111+
// Dups should have been disposed, but not original.
112+
await scheduler.wait(0);
113+
assert.strictEqual(counter.dupCounts.disposed, 2);
114+
assert.strictEqual(counter.disposed, false);
115+
},
116+
};
117+
118+
// Like rpcParamsDupTarget but the target is wrapped in a Proxy. (This takes a different code
119+
// path.)
120+
export let rpcParamsDupProxyTarget = {
121+
async test(controller, env, ctx) {
122+
let counter = new Proxy(new DupableCounter(), {});
123+
124+
// If we directly pass `counter` to RPC params, it'll be dup()ed.
125+
await ctx.exports.TestService.increment(counter, 2);
126+
assert.strictEqual(counter.dupCounts.created, 1);
127+
await ctx.exports.TestService.increment(counter, 3);
128+
assert.strictEqual(counter.dupCounts.created, 2);
129+
130+
assert.strictEqual(counter.count.value, 5);
131+
132+
// Dups should have been disposed, but not original.
133+
await scheduler.wait(0);
134+
assert.strictEqual(counter.dupCounts.disposed, 2);
135+
assert.strictEqual(counter.disposed, false);
136+
},
137+
};
138+
139+
// Like rpcParamsDupTarget but the target is a function.
140+
export let rpcParamsDupFunction = {
141+
async test(controller, env, ctx) {
142+
let count = 0;
143+
let dupCount = 0;
144+
let disposeCount = 0;
145+
146+
let increment = (i) => {
147+
return (count += i);
148+
};
149+
increment.dup = () => {
150+
++dupCount;
151+
return increment;
152+
};
153+
increment[Symbol.dispose] = function () {
154+
++disposeCount;
155+
};
156+
157+
let counter = { increment };
158+
159+
// If we directly pass `counter` to RPC params, it'll be dup()ed.
160+
await ctx.exports.TestService.increment(counter, 2);
161+
assert.strictEqual(dupCount, 1);
162+
await ctx.exports.TestService.increment(counter, 3);
163+
assert.strictEqual(dupCount, 2);
164+
165+
assert.strictEqual(count, 5);
166+
167+
await scheduler.wait(0);
168+
assert.strictEqual(disposeCount, 2);
169+
},
170+
};
171+
172+
// Test that returning a stub tansfers ownership of the stub, that is, the system later disposes
173+
// it.
174+
export let rpcReturnsTransferOwnership = {
175+
async test(controller, env, ctx) {
176+
let counter = new Counter();
177+
178+
{
179+
using stub = new RpcStub(counter);
180+
using stub2 = (await ctx.exports.TestService.roundTrip(stub)).stub;
181+
182+
await scheduler.wait(0);
183+
assert.strictEqual(counter.disposeCount, 0);
184+
}
185+
186+
await scheduler.wait(0);
187+
assert.strictEqual(counter.disposeCount, 1);
188+
},
189+
};
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "js-rpc-params-ownership-test",
6+
worker = (
7+
modules = [
8+
(name = "worker", esModule = embed "js-rpc-params-ownership-test.js")
9+
],
10+
compatibilityDate = "2025-12-01",
11+
compatibilityFlags = [
12+
"nodejs_compat",
13+
"rpc_params_dup_stubs",
14+
],
15+
)
16+
),
17+
],
18+
v8Flags = [ "--expose-gc" ],
19+
);

src/workerd/api/worker-rpc.c++

Lines changed: 122 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,12 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js,
492492
}
493493
auto arr = v8::Array::New(js.v8Isolate, argv.data(), argv.size());
494494

495-
auto externalHandler =
496-
RpcSerializerExternalHandler([&]() -> rpc::JsValue::StreamSink::Client {
495+
auto stubOwnership = FeatureFlags::get(js).getRpcParamsDupStubs()
496+
? RpcSerializerExternalHandler::DUPLICATE
497+
: RpcSerializerExternalHandler::TRANSFER;
498+
499+
RpcSerializerExternalHandler externalHandler(
500+
stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client {
497501
// A stream was encountered in the params, so we must expect the response to contain
498502
// paramsStreamSink. But we don't have the response yet. So, we need to set up a
499503
// temporary promise client, which we hook to the response a little bit later.
@@ -842,11 +846,13 @@ void JsRpcStub::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
842846
builder.setRpcTarget(kj::mv(cap));
843847
});
844848

845-
// Instead of disposing the stub immediately, we add a disposer to the serializer
846-
// that will be executed when the pipeline is finished. This ensures the stub
847-
// remains valid for the duration of any pipelined operations.
848-
externalHandler->addStubDisposer(
849-
kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); })));
849+
if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::TRANSFER) {
850+
// Instead of disposing the stub immediately, we add a disposer to the serializer
851+
// that will be executed when the pipeline is finished. This ensures the stub
852+
// remains valid for the duration of any pipelined operations.
853+
externalHandler->addStubDisposer(
854+
kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); })));
855+
}
850856
}
851857

852858
jsg::Ref<JsRpcStub> JsRpcStub::deserialize(
@@ -1592,7 +1598,8 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js,
15921598
auto hasDispose = maybeDispose != kj::none;
15931599

15941600
// Now that we've extracted our dispose function, we can serialize our value.
1595-
auto externalHandler = RpcSerializerExternalHandler(kj::mv(getStreamSinkFunc));
1601+
RpcSerializerExternalHandler externalHandler(
1602+
RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc));
15961603
serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder));
15971604

15981605
auto stubDisposers = externalHandler.releaseStubDisposers();
@@ -1721,6 +1728,62 @@ void JsRpcTarget::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
17211728
// Handle can't possibly be missing during serialization, it's how we got here.
17221729
auto handle = jsg::JsObject(KJ_ASSERT_NONNULL(JSG_THIS.tryGetHandle(js)));
17231730

1731+
if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::DUPLICATE) {
1732+
// This message isn't supposed to take ownership of stubs. What does that mean for an
1733+
// RpcTarget? You might argue that it means we should never call the disposer. But that's not
1734+
// really enough: what if the real owner *does* call the disposer, before our stub is done
1735+
// with it? How do we make sure the RpcTarget stays alive?
1736+
//
1737+
// Things get clearer if we look at a real use case: pure-JS Cap'n Web stubs. We don't see
1738+
// them as stubs (since they are not instances of JsRpcStub). Instead, we see them as
1739+
// RpcTargets. But we need the semantics to come out the same: when passed as a parameter
1740+
// to a native RPC call, we need to duplicate the stub, because the original copy might very
1741+
// well be disposed before we use it.
1742+
//
1743+
// How do we duplicate this non-native stub? Well... proper way to duplicate a pure-JS Cap'n
1744+
// Web stub is, of course, to call its `dup()` method.
1745+
//
1746+
// So how about we just do that? If the target has a `dup()` method, we call it, and we take
1747+
// ownership of the result, instead of taking ownership of the original object.
1748+
auto dup = handle.get(js, "dup");
1749+
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
1750+
auto replacement = dupFunc.call(js, handle);
1751+
bool replaced = false;
1752+
1753+
// We got a duplicate. Is it still an RpcTarget?
1754+
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
1755+
if (replacementObj.isInstanceOf<JsRpcTarget>(js)) {
1756+
// It is! Let's replace our handle with the duplicate!
1757+
handle = replacementObj;
1758+
replaced = true;
1759+
}
1760+
}
1761+
1762+
JSG_REQUIRE(replaced, DOMDataCloneError,
1763+
"Couldn't create a stub for the RcpTarget because it has a dup() method which did not "
1764+
"return another RpcTarget. Either remove the dup() method or make sure it returns an "
1765+
"RpcTarget.");
1766+
} else {
1767+
// If no dup() method was present, then what?
1768+
//
1769+
// The pedantic argument would say: we need to throw an exception. But that would lead to a
1770+
// pretty poor development experience as people would have to fiddle with adding dup()
1771+
// methods to all their RpcTargets.
1772+
//
1773+
// Another argument might say: we should just use the RpcTarget but never call the disposer
1774+
// since we don't own it. But that would probably be confusing. People would wonder why their
1775+
// disposers are never called.
1776+
//
1777+
// If someone passes an RpcTarget with no dup() method, but which does have a disposer, as
1778+
// the argument to an RPC method, *probably* they just want the disposer to be called when
1779+
// the callee is done with the object. That is, they want us to take ownership after all. If
1780+
// that is *not* what they want, then they can always implement a dup() method to make it
1781+
// clear.
1782+
//
1783+
// So, we will just "take ownership" of the target after all, and call its disposer.
1784+
}
1785+
}
1786+
17241787
rpc::JsRpcTarget::Client cap = kj::heap<TransientJsRpcTarget>(js, IoContext::current(), handle);
17251788

17261789
externalHandler->write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable {
@@ -1732,8 +1795,33 @@ void RpcSerializerExternalHandler::serializeFunction(
17321795
jsg::Lock& js, jsg::Serializer& serializer, v8::Local<v8::Function> func) {
17331796
serializer.writeRawUint32(static_cast<uint>(rpc::SerializationTag::JS_RPC_STUB));
17341797

1798+
auto handle = jsg::JsObject(func);
1799+
1800+
// Similar to JsRpcTarget::serialize(), we may need to dup() the function.
1801+
if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) {
1802+
auto dup = handle.get(js, "dup");
1803+
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
1804+
auto replacement = dupFunc.call(js, handle);
1805+
bool replaced = false;
1806+
1807+
// We got a duplicate. Is it still a Function?
1808+
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
1809+
if (isFunctionForRpc(js, replacementObj)) {
1810+
// It is! Let's replace our handle with the duplicate!
1811+
handle = replacementObj;
1812+
replaced = true;
1813+
}
1814+
}
1815+
1816+
JSG_REQUIRE(replaced, DOMDataCloneError,
1817+
"Couldn't create a stub for the function because it has a dup() method which did not "
1818+
"return another function. Either remove the dup() method or make sure it returns a "
1819+
"function.");
1820+
}
1821+
}
1822+
17351823
rpc::JsRpcTarget::Client cap =
1736-
kj::heap<TransientJsRpcTarget>(js, IoContext::current(), jsg::JsObject(func), true);
1824+
kj::heap<TransientJsRpcTarget>(js, IoContext::current(), handle, true);
17371825
write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable {
17381826
builder.setRpcTarget(kj::mv(cap));
17391827
});
@@ -1758,6 +1846,31 @@ void RpcSerializerExternalHandler::serializeProxy(
17581846
"Proxy must emulate either a plain object or an RpcTarget, as indicated by the "
17591847
"Proxy's prototype chain.");
17601848

1849+
// Similar to JsRpcTarget::serialize(), we may need to dup() the proxy.
1850+
if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) {
1851+
auto dup = handle.get(js, "dup");
1852+
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
1853+
auto replacement = dupFunc.call(js, handle);
1854+
bool replaced = false;
1855+
1856+
// We got a duplicate. Is it still the same type?
1857+
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
1858+
KJ_IF_SOME(stubType, checkStubType(js, replacementObj)) {
1859+
if (stubType == allowInstanceProperties) {
1860+
// It is! Let's replace our handle with the duplicate!
1861+
handle = replacementObj;
1862+
replaced = true;
1863+
}
1864+
}
1865+
}
1866+
1867+
JSG_REQUIRE(replaced, DOMDataCloneError,
1868+
"Couldn't create a stub for the Proxy because it has a dup() method which did not "
1869+
"return the same underlying type (RpcTarget or Function) as the Proxy itself represents. "
1870+
"Either remove the dup() method or make sure it returns an RpcTarget.");
1871+
}
1872+
}
1873+
17611874
// Great, we've concluded we can indeed point a stub at this proxy.
17621875
serializer.writeRawUint32(static_cast<uint>(rpc::SerializationTag::JS_RPC_STUB));
17631876

0 commit comments

Comments
 (0)