Skip to content

Commit aa7d85a

Browse files
committed
✅ Get not initialized test case passing
1 parent 410c5f7 commit aa7d85a

File tree

6 files changed

+157
-56
lines changed

6 files changed

+157
-56
lines changed

json-rpc-connection.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Readable, Writable } from "node:stream";
2-
import { type Operation, resource } from "effection";
2+
import { type Operation, resource, useAbortSignal } from "effection";
33
import * as rpc from "vscode-jsonrpc/node.js";
44

55
export type { MessageConnection } from "vscode-jsonrpc";
@@ -13,15 +13,24 @@ export function useConnection(
1313
options: JSONRPCConnectionOptions,
1414
): Operation<rpc.MessageConnection> {
1515
return resource(function* (provide) {
16-
let connection = rpc.createMessageConnection(
16+
let signal = yield* useAbortSignal();
17+
18+
let readable = new rpc.StreamMessageReader(
1719
//@ts-expect-error 🤷
18-
new rpc.StreamMessageReader(Readable.fromWeb(options.read)),
19-
new rpc.StreamMessageWriter(Writable.fromWeb(options.write)),
20+
Readable.fromWeb(options.read, { signal }),
21+
);
22+
let writable = new rpc.StreamMessageWriter(
23+
Writable.fromWeb(options.write, { signal }),
2024
);
25+
26+
let connection = rpc.createMessageConnection(readable, writable);
27+
2128
connection.listen();
2229
try {
2330
yield* provide(connection);
2431
} finally {
32+
readable.dispose();
33+
writable.dispose();
2534
connection.dispose();
2635
}
2736
});

middleware.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import type { Operation } from "effection";
2+
3+
export interface Middleware<In, Out> {
4+
(request: In, next: (input: In) => Operation<Out>): Operation<Out>;
5+
}
6+
7+
export function concat<A, B>(
8+
...middlewares: Middleware<A, B>[]
9+
): Middleware<A, B> {
10+
if (middlewares.length === 0) {
11+
return (request, next) => next(request);
12+
} else {
13+
return middlewares.reduceRight((rest, middleware) => {
14+
return (request, next) => middleware(request, (req) => rest(req, next));
15+
});
16+
}
17+
}

server.ts

Lines changed: 121 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import {
22
call,
3+
createContext,
34
createSignal,
5+
ensure,
46
type Operation,
7+
race,
58
resource,
9+
suspend,
610
useScope,
11+
withResolvers,
712
} from "effection";
813
import type {
914
Disposable,
@@ -12,14 +17,15 @@ import type {
1217
RequestParams,
1318
} from "./types.ts";
1419

15-
import { useCommand, useDaemon } from "./use-command.ts";
20+
import { useDaemon } from "./use-command.ts";
1621
import { useConnection } from "./json-rpc-connection.ts";
1722
import {
1823
ErrorCodes,
1924
type MessageConnection,
2025
ResponseError,
2126
type StarRequestHandler,
2227
} from "vscode-jsonrpc";
28+
import { concat, type Middleware } from "./middleware.ts";
2329

2430
export interface LSPXOptions {
2531
interactive?: boolean;
@@ -34,69 +40,134 @@ export function start(opts: LSPXOptions): Operation<LSPXServer> {
3440
let connections: MessageConnection[] = [];
3541
let disposables: Disposable[] = [];
3642

37-
try {
38-
for (let command of opts.commands) {
39-
let [exe, ...args] = command.split(/\s/g);
40-
let process = yield* useDaemon(exe, {
41-
args,
42-
stdin: "piped",
43-
stdout: "piped",
44-
stderr: "piped",
45-
});
46-
let connection = yield* useConnection({
47-
read: process.stdout,
48-
write: process.stdin,
49-
});
50-
connections.push(connection);
51-
disposables.push(connection);
52-
disposables.push(
53-
connection.onNotification((method, params) =>
54-
notifications.send({ method, params })
55-
),
56-
);
57-
}
58-
59-
let client = yield* useConnection({
60-
read: opts.input ?? ReadableStream.from([]),
61-
write: opts.output ?? new WritableStream(),
43+
for (let command of opts.commands) {
44+
let [exe, ...args] = command.split(/\s/g);
45+
let process = yield* useDaemon(exe, {
46+
args,
47+
stdin: "piped",
48+
stdout: "piped",
49+
stderr: "piped",
50+
});
51+
let connection = yield* useConnection({
52+
read: process.stdout,
53+
write: process.stdin,
6254
});
55+
connections.push(connection);
56+
disposables.push(connection);
57+
disposables.push(
58+
connection.onNotification((method, params) =>
59+
notifications.send({ method, params })
60+
),
61+
);
62+
}
6363

64-
let scope = yield* useScope();
65-
let dispatch = createDispatch(connections);
64+
let client = yield* useConnection({
65+
read: opts.input ?? ReadableStream.from([]),
66+
write: opts.output ?? new WritableStream(),
67+
});
6668

67-
let handler: StarRequestHandler = (...params) =>
68-
scope.run(() => dispatch(...params));
69+
let scope = yield* useScope();
70+
let dispatch = createDispatch(connections);
6971

70-
disposables.push(client.onRequest(handler));
72+
let handler: StarRequestHandler = (...params) =>
73+
scope.run(() => dispatch(...params));
7174

72-
yield* provide({
73-
notifications,
74-
*request<T>(...params: RequestParams): Operation<T> {
75-
const result = yield* dispatch<T, unknown>(...params);
76-
if (result instanceof ResponseError) {
77-
throw result;
78-
}
79-
return result;
80-
},
81-
});
82-
} finally {
75+
disposables.push(client.onRequest(handler));
76+
77+
yield* ensure(() => {
8378
for (let disposable of disposables) {
8479
disposable.dispose();
8580
}
86-
}
81+
});
82+
83+
yield* provide({
84+
notifications,
85+
*request<T>(...params: RequestParams): Operation<T> {
86+
const result = yield* dispatch<T, unknown>(...params);
87+
if (result instanceof ResponseError) {
88+
throw result;
89+
}
90+
return result;
91+
},
92+
});
8793
});
8894
}
8995

96+
const ResponseErrorContext = createContext<
97+
<T>(error: ResponseError<T>) => void
98+
>("lspx.responseError");
99+
90100
function createDispatch(
91101
connections: MessageConnection[],
92102
): <T, E>(...params: RequestParams) => Operation<T | ResponseError<E>> {
93-
return function* dispatch(...params) {
94-
for (let connection of connections) {
95-
return (yield* call(() => connection.sendRequest(...params)));
103+
let middleware: Middleware<RequestParams, unknown> = concat(
104+
ensureInitialized(),
105+
multiplex(connections),
106+
);
107+
108+
return function* dispatch<T, E>(
109+
...params: RequestParams
110+
): Operation<T | ResponseError<E>> {
111+
let { operation: errored, resolve: raise } = withResolvers<
112+
ResponseError<unknown>
113+
>();
114+
yield* ResponseErrorContext.set(raise);
115+
116+
return yield* race([
117+
errored as Operation<ResponseError<E>>,
118+
middleware(
119+
params,
120+
() => responseError(ErrorCodes.InternalError, "unhandled request"),
121+
) as Operation<T>,
122+
]);
123+
};
124+
}
125+
126+
function* responseError<T = void>(
127+
...args: ConstructorParameters<typeof ResponseError<T>>
128+
// deno-lint-ignore no-explicit-any
129+
): Operation<any> {
130+
let raise = yield* ResponseErrorContext.expect();
131+
132+
raise<T>(new ResponseError(...args));
133+
134+
yield* suspend();
135+
}
136+
137+
function ensureInitialized(): Middleware<RequestParams, unknown> {
138+
let initialized = false;
139+
return function* (request, next) {
140+
let [method] = request;
141+
if (initialized && method === "initialize") {
142+
return yield* responseError(
143+
ErrorCodes.InvalidRequest,
144+
"initialize invoked twice",
145+
);
146+
} else if (!initialized && method !== "initialize") {
147+
return yield* responseError(
148+
ErrorCodes.ServerNotInitialized,
149+
"server not initialized",
150+
);
151+
} else {
152+
initialized = true;
153+
return yield* next(request);
154+
}
155+
};
156+
}
157+
158+
function multiplex(
159+
connections: MessageConnection[],
160+
): Middleware<RequestParams, unknown> {
161+
let [connection] = connections;
162+
163+
return function* (params) {
164+
if (!connection) {
165+
return yield* responseError(
166+
ErrorCodes.InternalError,
167+
"lspx is not connected to any language servers",
168+
);
169+
} else {
170+
return yield* call(() => connection.sendRequest(...params));
96171
}
97-
return new ResponseError(
98-
ErrorCodes.ServerNotInitialized,
99-
`no active server connections`,
100-
);
101172
};
102173
}

test/lspx.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ describe("lspx", function () {
2424
});
2525
});
2626

27-
it.only("returns an error code: -32002 if a request is made before initialize", function* () {
27+
it("returns an error code: -32002 if a request is made before initialize", function* () {
2828
let response = yield* request("textDocument/didOpen", {
2929
textDocument: {},
3030
});
31-
expect(response).toBeErr("not initialized");
31+
expect(response).toBeErr("server not initialized");
3232
});
3333

3434
it("does not allow initialize to be sent more than once");

test/sim/initialize.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ await main(function* () {
1313
write: Deno.stdout.writable,
1414
});
1515

16+
// deno-lint-ignore no-explicit-any
1617
const routes: Record<string, (...params: any[]) => Operation<any>> = {
1718
*initialize(_params: InitializeParams): Operation<InitializeResult> {
1819
return {

use-command.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ export function useCommand(
1717
} finally {
1818
controller.abort();
1919
yield* call(() => process.status);
20+
if (options?.stderr === "piped") {
21+
process.stderr.cancel();
22+
}
2023
}
2124
});
2225
}

0 commit comments

Comments
 (0)