Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 237 additions & 63 deletions README.md

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions src/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ class BatchServerTransport implements RpcTransport {
}
}

/**
* Implements the server end of an HTTP batch session, using standard Fetch API types to represent
* HTTP requests and responses.
*
* @param request The request received from the client initiating the session.
* @param localMain The main stub or RpcTarget which the server wishes to expose to the client.
* @param options Optional RPC sesison options.
* @returns The HTTP response to return to the client. Note that the returned object has mutable
* headers, so you can modify them using e.g. `response.headers.set("Foo", "bar")`.
*/
export async function newHttpBatchRpcResponse(
request: Request, localMain: any, options?: RpcSessionOptions): Promise<Response> {
if (request.method !== "POST") {
Expand Down Expand Up @@ -149,6 +159,14 @@ export async function newHttpBatchRpcResponse(
return new Response(transport.getResponseBody());
}

/**
* Implements the server end of an HTTP batch session using traditional Node.js HTTP APIs.
*
* @param request The request received from the client initiating the session.
* @param response The response object, to which the response should be written.
* @param localMain The main stub or RpcTarget which the server wishes to expose to the client.
* @param options Optional RPC sesison options. You can also pass headers to set on the response.
*/
export async function nodeHttpBatchRpcResponse(
request: IncomingMessage, response: ServerResponse,
localMain: any,
Expand Down
166 changes: 60 additions & 106 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1289,34 +1289,14 @@ function followPath(value: unknown, parent: object | undefined,
};
}

// StubHook wrapping an RpcPayload in local memory.
//
// This is used for:
// - Resolution of a promise.
// - Initially on the server side, where it can be pull()ed and used in pipelining.
// - On the client side, after pull() has transmitted the payload.
// - Implementing RpcTargets, on the server side.
// - Since the payload's root is an RpcTarget, pull()ing it will just duplicate the stub.
export class PayloadStubHook extends StubHook {
constructor(payload: RpcPayload) {
super();
this.payload = payload;
}

private payload?: RpcPayload; // cleared when disposed

private getPayload(): RpcPayload {
if (this.payload) {
return this.payload;
} else {
throw new Error("Attempted to use an RPC StubHook after it was disposed.");
}
}
// Shared base class for PayloadStubHook and TargetStubHook.
abstract class ValueStubHook extends StubHook {
protected abstract getValue(): {value: unknown, owner: RpcPayload | null};

call(path: PropertyPath, args: RpcPayload): StubHook {
try {
let payload = this.getPayload();
let followResult = followPath(payload.value, undefined, path, payload);
let {value, owner} = this.getValue();
let followResult = followPath(value, undefined, path, owner);

if (followResult.hook) {
return followResult.hook.call(followResult.remainingPath, args);
Expand All @@ -1339,8 +1319,8 @@ export class PayloadStubHook extends StubHook {
try {
let followResult: FollowPathResult;
try {
let payload = this.getPayload();
followResult = followPath(payload.value, undefined, path, payload);
let {value, owner} = this.getValue();
followResult = followPath(value, undefined, path, owner);;
} catch (err) {
// Oops, we need to dispose the captures of which we took ownership.
for (let cap of captures) {
Expand All @@ -1362,19 +1342,67 @@ export class PayloadStubHook extends StubHook {

get(path: PropertyPath): StubHook {
try {
let payload = this.getPayload();
let followResult = followPath(payload.value, undefined, path, payload);
let {value, owner} = this.getValue();

if (path.length === 0 && owner === null) {
// The only way this happens is if someone sends "pipeline" and references a
// TargetStubHook, but they shouldn't do that, because TargetStubHook never backs a
// promise, and a non-promise cannot be converted to a promise.
// TODO: Is this still correct for rpc-thenable?
throw new Error("Can't dup an RpcTarget stub as a promise.");
}

let followResult = followPath(value, undefined, path, owner);

if (followResult.hook) {
return followResult.hook.get(followResult.remainingPath);
}

// Note that if `followResult.owner` is null, then we've descended into the contents of an
// RpcTarget. In that case, if this deep copy discovers an RpcTarget embedded in the result,
// it will create a new stub for it. If that RpcTarget has a disposer, it'll be disposed when
// that stub is disposed. If the same RpcTarget is returned in *another* get(), it create
// *another* stub, which calls the disposer *another* time. This can be quite weird -- the
// disposer may be called any number of times, including zero if the property is never read
// at all. Unfortunately, that's just the way it is. The application can avoid this problem by
// wrapping the RpcTarget in an RpcStub itself, proactively, and using that as the property --
// then, each time the property is get()ed, a dup() of that stub is returned.
return new PayloadStubHook(RpcPayload.deepCopyFrom(
followResult.value, followResult.parent, followResult.owner));
} catch (err) {
return new ErrorStubHook(err);
}
}
}

// StubHook wrapping an RpcPayload in local memory.
//
// This is used for:
// - Resolution of a promise.
// - Initially on the server side, where it can be pull()ed and used in pipelining.
// - On the client side, after pull() has transmitted the payload.
// - Implementing RpcTargets, on the server side.
// - Since the payload's root is an RpcTarget, pull()ing it will just duplicate the stub.
export class PayloadStubHook extends ValueStubHook {
constructor(payload: RpcPayload) {
super();
this.payload = payload;
}

private payload?: RpcPayload; // cleared when disposed

private getPayload(): RpcPayload {
if (this.payload) {
return this.payload;
} else {
throw new Error("Attempted to use an RPC StubHook after it was disposed.");
}
}

protected getValue() {
let payload = this.getPayload();
return {value: payload.value, owner: payload};
}

dup(): StubHook {
// Although dup() is documented as not copying the payload, what this really means is that
Expand Down Expand Up @@ -1448,7 +1476,7 @@ type BoxedRefcount = { count: number };
// the root of the payload happens to be an RpcTarget), but there can only be one RpcPayload
// pointing at an RpcTarget whereas there can be several TargetStubHooks pointing at it. Also,
// TargetStubHook cannot be pull()ed, because it always backs an RpcStub, not an RpcPromise.
class TargetStubHook extends StubHook {
class TargetStubHook extends ValueStubHook {
// Constructs a TargetStubHook that is not duplicated from an existing hook.
//
// If `value` is a function, `parent` is bound as its "this".
Expand Down Expand Up @@ -1491,82 +1519,8 @@ class TargetStubHook extends StubHook {
}
}

call(path: PropertyPath, args: RpcPayload): StubHook {
try {
let target = this.getTarget();
let followResult = followPath(target, this.parent, path, null);

if (followResult.hook) {
return followResult.hook.call(followResult.remainingPath, args);
}

// It's a local function.
if (typeof followResult.value != "function") {
throw new TypeError(`'${path.join('.')}' is not a function.`);
}
let promise = args.deliverCall(<Function>followResult.value, followResult.parent);
return new PromiseStubHook(promise.then(payload => {
return new PayloadStubHook(payload);
}));
} catch (err) {
return new ErrorStubHook(err);
}
}

map(path: PropertyPath, captures: StubHook[], instructions: unknown[]): StubHook {
try {
let followResult: FollowPathResult;
try {
let target = this.getTarget();
followResult = followPath(target, this.parent, path, null);
} catch (err) {
// Oops, we need to dispose the captures of which we took ownership.
for (let cap of captures) {
cap.dispose();
}
throw err;
}

if (followResult.hook) {
return followResult.hook.map(followResult.remainingPath, captures, instructions);
}

return mapImpl.applyMap(
followResult.value, followResult.parent, followResult.owner, captures, instructions);
} catch (err) {
return new ErrorStubHook(err);
}
}

get(path: PropertyPath): StubHook {
try {
if (path.length == 0) {
// The only way this happens is if someone sends "pipeline" and references a
// TargetStubHook, but they shouldn't do that, because TargetStubHook never backs a
// promise, and a non-promise cannot be converted to a promise.
throw new Error("Can't dup an RpcTarget stub as a promise.");
}

let target = this.getTarget();
let followResult = followPath(target, this.parent, path, null);

if (followResult.hook) {
return followResult.hook.get(followResult.remainingPath);
}

// Note that this deep copy, if it discovers an RpcTarget embedded in the result, will create
// a new stub for it. If the RpcTarget has a disposer, it'll be disposed when that stub is
// disposed. If the same RpcTarget is returned in *another* get(), it create *another* stub,
// which calls the disposer *another* time. This can be quite weird -- the disposer may be
// called any number of times, including zero if the property is never read at all.
// Unfortunately, that's just the way it is. The application can avoid this problem by
// wrapping the RpcTarget in an RpcStub itself, proactively, and using that as the property --
// then, each time the property is get()ed, a dup() of that stub is returned.
return new PayloadStubHook(RpcPayload.deepCopyFrom(
followResult.value, followResult.parent, followResult.owner));
} catch (err) {
return new ErrorStubHook(err);
}
protected getValue() {
return {value: this.getTarget(), owner: null};
}

dup(): StubHook {
Expand Down
102 changes: 90 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,49 @@ export { serialize, deserialize, newWorkersWebSocketRpcResponse, newHttpBatchRpc
export type { RpcTransport, RpcSessionOptions };

// Hack the type system to make RpcStub's types work nicely!
/**
* Represents a reference to a remote object, on which methods may be remotely invoked via RPC.
*
* `RpcStub` can represent any interface (when using TypeScript, you pass the specific interface
* type as `T`, but this isn't known at runtime). The way this works is, `RpcStub` is actually a
* `Proxy`. It makes itself appear as if every possible method / property name is defined. You can
* invoke any method name, and the invocation will be sent to the server. If it turns out that no
* such method exists on the remote object, an exception is thrown back. But the client does not
* actually know, until that point, what methods exist.
*/
export type RpcStub<T extends Serializable<T>> = Stub<T>;
export const RpcStub: {
new <T extends Serializable<T>>(value: T): RpcStub<T>;
} = <any>RpcStubImpl;

/**
* Represents the result of an RPC call.
*
* Also used to represent propreties. That is, `stub.foo` evaluates to an `RpcPromise` for the
* value of `foo`.
*
* This isn't actually a JavaScript `Promise`. It does, however, have `then()`, `catch()`, and
* `finally()` methods, like `Promise` does, and because it has a `then()` method, JavaScript will
* allow you to treat it like a promise, e.g. you can `await` it.
*
* An `RpcPromise` is also a proxy, just like `RpcStub`, where calling methods or awaiting
* properties will make a pipelined network request.
*
* Note that and `RpcPromise` is "lazy": the actual final result is not requested from the server
* until you actually `await` the promise (or call `then()`, etc. on it). This is an optimization:
* if you only intend to use the promise for pipelining and you never await it, then there's no
* need to transmit the resolution!
*/
export type RpcPromise<T extends Serializable<T>> = Stub<T> & Promise<Stubify<T>>;
export const RpcPromise: {
// Note: Cannot construct directly!
} = <any>RpcPromiseImpl;

/**
* Use to construct an `RpcSession` on top of a custom `RpcTransport`.
*
* Most people won't use this. You only need it if you've implemented your own `RpcTransport`.
*/
export interface RpcSession<T extends Serializable<T> = undefined> {
getRemoteMain(): RpcStub<T>;
getStats(): {imports: number, exports: number};
Expand All @@ -42,32 +75,77 @@ export const RpcSession: {

// RpcTarget needs some hackage too to brand it properly and account for the implementation
// conditionally being imported from "cloudflare:workers".
/**
* Classes which are intended to be passed by reference and called over RPC must extend
* `RpcTarget`. A class which does not extend `RpcTarget` (and which dosen't have built-in support
* from the RPC system) cannot be passed in an RPC message at all; an exception will be thrown.
*
* Note that on Cloudflare Workers, this `RpcTarget` is an alias for the one exported from the
* "cloudflare:workers" module, so they can be used interchangably.
*/
export interface RpcTarget extends RpcTargetBranded {};
export const RpcTarget: {
new(): RpcTarget;
} = RpcTargetImpl;

/**
* Empty interface used as default type parameter for sessions where the other side doesn't
* necessarily export a main interface.
*/
interface Empty {}

export let newWebSocketRpcSession:
<T extends Serializable<T> = Empty>
(webSocket: WebSocket | string, localMain?: any, options?: RpcSessionOptions) => Stubify<T> =
/**
* Start a WebSocket session given either an already-open WebSocket or a URL.
*
* @param webSocket Either the `wss://` URL to connect to, or an already-open WebSocket object to
* use.
* @param localMain The main RPC interface to expose to the peer. Returns a stub for the main
* interface exposed from the peer.
*/
export let newWebSocketRpcSession:<T extends Serializable<T> = Empty>
(webSocket: WebSocket | string, localMain?: any, options?: RpcSessionOptions) => RpcStub<T> =
<any>newWebSocketRpcSessionImpl;

export let newHttpBatchRpcSession:
<T extends Serializable<T> = Empty>
(urlOrRequest: string | Request, init?: RequestInit) => Stubify<T> =
/**
* Initiate an HTTP batch session from the client side.
*
* The parameters to this method have exactly the same signature as `fetch()`, but the return
* value is an RpcStub. You can customize anything about the request except for the method
* (it will always be set to POST) and the body (which the RPC system will fill in).
*/
export let newHttpBatchRpcSession:<T extends Serializable<T>>
(urlOrRequest: string | Request, init?: RequestInit) => RpcStub<T> =
<any>newHttpBatchRpcSessionImpl;

export let newMessagePortRpcSession:
<T extends Serializable<T> = Empty>
(port: MessagePort, localMain?: any, options?: RpcSessionOptions) => Stubify<T> =
/**
* Initiate an RPC session over a MessagePort, which is particularly useful for communicating
* between an iframe and its parent frame in a browser context. Each side should call this function
* on its own end of the MessageChannel.
*/
export let newMessagePortRpcSession:<T extends Serializable<T> = Empty>
(port: MessagePort, localMain?: any, options?: RpcSessionOptions) => RpcStub<T> =
<any>newMessagePortRpcSessionImpl;

// Implements inified handling of HTTP-batch and WebSocket responses for the Workers Runtime.
export function newWorkersRpcResponse(request: Request, localMain: any) {
/**
* Implements unified handling of HTTP-batch and WebSocket responses for the Cloudflare Workers
* Runtime.
*
* SECURITY WARNING: This function accepts cross-origin requests. If you do not want this, you
* should validate the `Origin` header before calling this, or use `newHttpBatchRpcSession()` and
* `newWebSocketRpcSession()` directly with appropriate security measures for each type of request.
* But if your API uses in-band authorization (i.e. it has an RPC method that takes the user's
* credentials as parameters and returns the authorized API), then cross-origin requests should
* be safe.
*/
export async function newWorkersRpcResponse(request: Request, localMain: any) {
if (request.method === "POST") {
return newHttpBatchRpcResponse(request, localMain);
let response = await newHttpBatchRpcResponse(request, localMain);
// Since we're exposing the same API over WebSocket, too, and WebScoket always allows
// cross-origin requests, the API necessarily must be safe for cross-origin use (e.g. because
// it uses in-band authorization, as recommended in the readme). So, we might as well allow
// batch requests to be made cross-origin as well.
response.headers.set("Access-Control-Allow-Origin", "*");
return response;
} else if (request.headers.get("Upgrade")?.toLowerCase() === "websocket") {
return newWorkersWebSocketRpcResponse(request, localMain);
} else {
Expand Down
Loading