Skip to content

Commit f973c69

Browse files
committed
fix: continue with setup-server, remote
1 parent dabc438 commit f973c69

20 files changed

+766
-53
lines changed

src/browser/setup-worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ export function setupWorker(
7272
* Service Workers do not intercept WebSocket connections.
7373
*/
7474
new InterceptorSource({
75-
interceptors: [new WebSocketInterceptor()],
75+
interceptors: [new WebSocketInterceptor() as any],
7676
}),
7777
],
7878
handlers,

src/core/new/sources/remote-process-source.ts

Lines changed: 0 additions & 5 deletions
This file was deleted.

src/core/rpc/events.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import type {
2+
SerializedRequest,
3+
SerializedResponse,
4+
} from './packets/http-packet'
5+
6+
export type StreamEventMap = {
7+
'stream:chunk': (chunk: Uint8Array | undefined) => void
8+
'stream:error': (reason?: unknown) => void
9+
'stream:end': () => void
10+
}
11+
12+
export type NetworkSessionEventMap = StreamEventMap & {
13+
request: (request: SerializedRequest) => void
14+
}
15+
16+
export type RpcServerEventMap = StreamEventMap & {
17+
response: (response: SerializedResponse | undefined) => void
18+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import {
2+
RequestHandler,
3+
type RequestHandlerDefaultInfo,
4+
} from '../../handlers/RequestHandler'
5+
import type { ResponseResolutionContext } from '../../utils/executeHandlers'
6+
import { NetworkHttpTransport } from '../transports/http-transport'
7+
8+
interface RemoteRequestHandlerParsedResult {
9+
response: Response | undefined
10+
}
11+
12+
type RemoteRequestHandlerResolverExtras = {
13+
response: Response | undefined
14+
}
15+
16+
export class RemoteRequestHandler extends RequestHandler<
17+
RequestHandlerDefaultInfo,
18+
RemoteRequestHandlerParsedResult,
19+
RemoteRequestHandlerResolverExtras
20+
> {
21+
#transport: NetworkHttpTransport
22+
23+
constructor(args: { port: number }) {
24+
super({
25+
info: {
26+
header: 'RemoteRequestHandler',
27+
},
28+
resolver({ response }: RemoteRequestHandlerResolverExtras) {
29+
return response
30+
},
31+
})
32+
33+
this.#transport = new NetworkHttpTransport({
34+
port: args.port,
35+
})
36+
}
37+
38+
async parse(args: {
39+
request: Request
40+
resolutionContext?: ResponseResolutionContext
41+
}): Promise<any> {
42+
const response = await this.#transport
43+
.handleRequest({ request: args.request })
44+
.catch(() => undefined)
45+
46+
const parsedResult = await super.parse(args)
47+
48+
if (response != null) {
49+
parsedResult.response = response
50+
}
51+
52+
return parsedResult
53+
}
54+
55+
predicate(args: {
56+
request: Request
57+
parsedResult: RemoteRequestHandlerParsedResult
58+
resolutionContext?: ResponseResolutionContext
59+
}): boolean | Promise<boolean> {
60+
// This handler is considered matching if the remote process
61+
// decided to handle the intercepted request.
62+
return args.parsedResult.response != null
63+
}
64+
65+
protected extendResolverArgs(args: {
66+
request: Request
67+
parsedResult: RemoteRequestHandlerParsedResult
68+
}): RemoteRequestHandlerResolverExtras {
69+
const resolverArgs = super.extendResolverArgs(args)
70+
resolverArgs.response = args.parsedResult.response
71+
return resolverArgs
72+
}
73+
74+
log(_args: {
75+
request: Request
76+
response: Response
77+
parsedResult: RemoteRequestHandlerParsedResult
78+
}): void {
79+
/**
80+
* @note Skip logging. This is an internal request handler.
81+
*/
82+
return
83+
}
84+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { until } from 'until-async'
2+
import {
3+
WebSocketHandler,
4+
type WebSocketHandlerConnection,
5+
type WebSocketResolutionContext,
6+
} from '../../handlers/WebSocketHandler'
7+
import { NetworkSession } from '../session'
8+
import { NetworkWebSocketTransport } from '../transports/websocket-transport'
9+
10+
export class RemoteWebSocketHandler extends WebSocketHandler {
11+
#transport: NetworkWebSocketTransport
12+
13+
constructor(args: { session: NetworkSession }) {
14+
super(/.*/)
15+
16+
this.#transport = new NetworkWebSocketTransport({
17+
session: args.session,
18+
})
19+
}
20+
21+
public async run(
22+
connection: Omit<WebSocketHandlerConnection, 'params'>,
23+
resolutionContext?: WebSocketResolutionContext,
24+
): Promise<boolean> {
25+
const [error, remoteConnection] = await until(() => {
26+
return this.#transport.send({
27+
clientUrl: connection.client.url,
28+
resolutionContext,
29+
})
30+
})
31+
32+
/**
33+
* @todo Check if rejecting with no reason still falls through this check.
34+
*/
35+
if (error) {
36+
return false
37+
}
38+
39+
/**
40+
* @todo How will this work if THIS process has a WS client emitting events
41+
* and THAT process has event listeners? Should we serialize those events now?
42+
*/
43+
return this.connect(remoteConnection)
44+
}
45+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import type { Socket } from 'socket.io-client'
2+
import { DeferredPromise } from '@open-draft/deferred-promise'
3+
import type { NetworkPacket } from '.'
4+
import type { SessionSocket } from '../session'
5+
import { emitReadableStream, WebSocketReadableStreamSource } from '../utils'
6+
import type { StreamEventMap } from '../events'
7+
8+
export interface SerializedRequest {
9+
method: string
10+
url: string
11+
headers: Array<[string, string]>
12+
hasBodyStream: boolean
13+
}
14+
15+
export interface SerializedResponse {
16+
status: number
17+
statusText: string
18+
headers: Array<[string, string]>
19+
hasBodyStream: boolean
20+
}
21+
22+
export class HttpPacket implements NetworkPacket {
23+
constructor(private readonly request: Request) {}
24+
25+
async send(socket: SessionSocket): Promise<Response | undefined> {
26+
const intentionPromise = new DeferredPromise<Response | undefined>()
27+
const serializedRequest = serializeHttpRequest(this.request)
28+
29+
socket.emit('request', serializedRequest)
30+
31+
if (this.request.body != null) {
32+
emitReadableStream(this.request.body, socket)
33+
}
34+
35+
socket.once('response', (serializedResponse) => {
36+
const response =
37+
serializedResponse != null
38+
? deserializeHttpResponse(serializedResponse, socket)
39+
: undefined
40+
41+
intentionPromise.resolve(response)
42+
})
43+
44+
return intentionPromise
45+
}
46+
}
47+
48+
export function serializeHttpRequest(request: Request): SerializedRequest {
49+
return {
50+
method: request.method,
51+
url: request.url,
52+
headers: Array.from(request.headers),
53+
hasBodyStream: request.body != null,
54+
}
55+
}
56+
57+
export function deserializeHttpRequest(
58+
serializedRequest: SerializedRequest,
59+
socket: Socket<StreamEventMap, any>,
60+
): Request {
61+
const { method, url, headers, hasBodyStream } = serializedRequest
62+
63+
return new Request(url, {
64+
method,
65+
headers,
66+
body: hasBodyStream
67+
? new ReadableStream(new WebSocketReadableStreamSource(socket))
68+
: null,
69+
})
70+
}
71+
72+
export function serializeHttpResponse(response: Response): SerializedResponse {
73+
return {
74+
status: response.status,
75+
statusText: response.statusText,
76+
headers: Array.from(response.headers),
77+
hasBodyStream: response.body != null,
78+
}
79+
}
80+
81+
export function deserializeHttpResponse(
82+
serializedResponse: SerializedResponse,
83+
socket: SessionSocket,
84+
): Response {
85+
const { status, statusText, headers, hasBodyStream } = serializedResponse
86+
87+
return new Response(
88+
hasBodyStream
89+
? new ReadableStream(new WebSocketReadableStreamSource(socket))
90+
: null,
91+
{
92+
status,
93+
statusText,
94+
headers,
95+
},
96+
)
97+
}

src/core/rpc/packets/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import type { SessionSocket } from '../session'
2+
3+
export interface NetworkPacket {
4+
send(socket: SessionSocket): Promise<any>
5+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import type { NetworkPacket } from '.'
2+
import type { WebSocketResolutionContext } from '../../handlers/WebSocketHandler'
3+
import type { SessionSocket } from '../session'
4+
5+
export class WebSocketPacket implements NetworkPacket {
6+
constructor(
7+
private readonly args: {
8+
url: string
9+
resolutionContext?: WebSocketResolutionContext
10+
},
11+
) {}
12+
13+
async send(socket: SessionSocket): Promise<any> {
14+
// 1. Create a frame that describe this WS connection.
15+
// 2. Send it over the `ws`.
16+
// 3. (?) Return the response?
17+
socket.send('...TODO...')
18+
}
19+
}

src/core/rpc/server.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import * as http from 'node:http'
2+
import { DeferredPromise } from '@open-draft/deferred-promise'
3+
import { Server } from 'socket.io'
4+
import type { Socket } from 'socket.io-client'
5+
import { Emitter, TypedEvent } from 'rettime'
6+
import type { NetworkSessionEventMap, StreamEventMap } from './events'
7+
import {
8+
deserializeHttpRequest,
9+
serializeHttpResponse,
10+
} from './packets/http-packet'
11+
import { emitReadableStream } from './utils'
12+
13+
type RpcServerPublicEventMap = {
14+
request: TypedEvent<{ request: Request }, Response>
15+
}
16+
17+
export class RpcServer extends Emitter<RpcServerPublicEventMap> {
18+
#server: Server<NetworkSessionEventMap, RpcServerPublicEventMap>
19+
20+
constructor() {
21+
super()
22+
23+
const httpServer = http.createServer()
24+
25+
this.#server = new Server()
26+
this.#server.attach(httpServer)
27+
28+
this.#server.on('connection', (client) => {
29+
client.on('request', async (serializedRequest) => {
30+
const request = deserializeHttpRequest(
31+
serializedRequest,
32+
client as unknown as Socket<any, StreamEventMap>,
33+
)
34+
35+
/** @todo Notify the consumer there's been a request! */
36+
const results = await this.emitAsPromise(
37+
new TypedEvent('request', {
38+
data: {
39+
request,
40+
},
41+
}),
42+
)
43+
44+
const response = new Response('hello world')
45+
46+
// client.emit('response', serializeHttpResponse(response))
47+
48+
if (response.body != null) {
49+
emitReadableStream(
50+
response.body,
51+
client as unknown as Socket<any, StreamEventMap>,
52+
)
53+
}
54+
})
55+
})
56+
}
57+
58+
public async listen(port: number): Promise<void> {
59+
const listenPromise = new DeferredPromise<void>()
60+
const { httpServer } = this.#server
61+
62+
httpServer
63+
.listen(port, () => listenPromise.resolve())
64+
.once('error', (error) => listenPromise.reject(error))
65+
66+
return listenPromise
67+
}
68+
69+
public async close(): Promise<void> {
70+
const closePromise = new DeferredPromise<void>()
71+
72+
this.#server.disconnectSockets()
73+
this.#server.close((error) => {
74+
if (error) {
75+
closePromise.reject(error)
76+
} else {
77+
closePromise.resolve()
78+
}
79+
})
80+
81+
return closePromise
82+
}
83+
}

0 commit comments

Comments
 (0)