Skip to content

Commit f0eb284

Browse files
authored
Merge pull request #297 from vim-denops/batch-accumulate
👍 Add accumulate() function for automatic RPC batching with parallel execution support
2 parents d8bfd9d + 5d6645e commit f0eb284

File tree

5 files changed

+1607
-1
lines changed

5 files changed

+1607
-1
lines changed

batch/accumulate.ts

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
import { nextTick } from "node:process";
2+
import type { Call, Context, Denops, Dispatcher, Meta } from "@denops/core";
3+
import { BatchError } from "@denops/core";
4+
import { AccumulateCancelledError } from "./error.ts";
5+
6+
const errorProp = Symbol("AccumulateErrorResult");
7+
8+
type ErrorResult = {
9+
[errorProp]:
10+
| { type: "error"; message: string; cause: unknown }
11+
| { type: "cancel"; cause: unknown }
12+
| { type: "unknown"; message: string; cause: unknown };
13+
};
14+
15+
class AccumulateHelper implements Denops {
16+
readonly #denops: Denops;
17+
readonly #calls: Call[] = [];
18+
readonly #results: unknown[] = [];
19+
readonly #disposer = Promise.withResolvers<void>();
20+
#closed = false;
21+
#resolvedWaiter = Promise.withResolvers<void>();
22+
23+
constructor(denops: Denops) {
24+
this.#denops = denops;
25+
}
26+
27+
static close(helper: AccumulateHelper): void {
28+
helper.#closed = true;
29+
helper.#disposer.promise.catch(() => {/* prevent unhandled rejection */});
30+
helper.#disposer.reject();
31+
}
32+
33+
get name(): string {
34+
return this.#denops.name;
35+
}
36+
37+
get meta(): Meta {
38+
return this.#denops.meta;
39+
}
40+
41+
get interrupted(): AbortSignal | undefined {
42+
return this.#denops.interrupted;
43+
}
44+
45+
get context(): Record<string | number | symbol, unknown> {
46+
return this.#denops.context;
47+
}
48+
49+
get dispatcher(): Dispatcher {
50+
return this.#denops.dispatcher;
51+
}
52+
53+
set dispatcher(dispatcher: Dispatcher) {
54+
this.#denops.dispatcher = dispatcher;
55+
}
56+
57+
async redraw(force?: boolean): Promise<void> {
58+
return await this.#denops.redraw(force);
59+
}
60+
61+
async call(fn: string, ...args: unknown[]): Promise<unknown> {
62+
this.#ensureAvailable();
63+
const call: Call = [fn, ...args];
64+
const [result] = await this.#waitResolved([call]);
65+
66+
if (isErrorResult(result)) {
67+
const error = result[errorProp];
68+
if (error.type === "error") {
69+
throw new Error(error.message, { cause: error.cause });
70+
} else if (error.type === "cancel") {
71+
const repr = `['${fn}', ...]`;
72+
throw new AccumulateCancelledError(
73+
`Call was cancelled due to another error in parallel execution: ${repr}`,
74+
{ calls: [call], cause: error.cause },
75+
);
76+
} else {
77+
throw new Error(error.message, { cause: error.cause });
78+
}
79+
}
80+
81+
return result;
82+
}
83+
84+
async batch(...calls: Call[]): Promise<unknown[]> {
85+
this.#ensureAvailable();
86+
if (calls.length === 0) {
87+
return [];
88+
}
89+
const results = await this.#waitResolved(calls);
90+
91+
const errorIndex = results.findIndex(isErrorResult);
92+
if (errorIndex >= 0) {
93+
const { [errorProp]: error } = results[errorIndex] as ErrorResult;
94+
if (error.type === "error") {
95+
throw new BatchError(error.message, results.slice(0, errorIndex));
96+
} else if (error.type === "cancel") {
97+
const [[fn]] = calls;
98+
const repr = `[['${fn}', ...], ... total ${calls.length} calls]`;
99+
throw new AccumulateCancelledError(
100+
`Batch calls were cancelled due to another error in parallel execution: ${repr}`,
101+
{ calls, cause: error.cause },
102+
);
103+
} else {
104+
throw new Error(error.message, { cause: error.cause });
105+
}
106+
}
107+
108+
return results;
109+
}
110+
111+
async cmd(cmd: string, ctx: Context = {}): Promise<void> {
112+
await this.call("denops#api#cmd", cmd, ctx);
113+
}
114+
115+
async eval(expr: string, ctx: Context = {}): Promise<unknown> {
116+
return await this.call("denops#api#eval", expr, ctx);
117+
}
118+
119+
async dispatch(
120+
name: string,
121+
fn: string,
122+
...args: unknown[]
123+
): Promise<unknown> {
124+
return await this.#denops.dispatch(name, fn, ...args);
125+
}
126+
127+
#ensureAvailable(): void {
128+
if (this.#closed) {
129+
throw new TypeError(
130+
"AccumulateHelper instance is not available outside of 'accumulate' block",
131+
);
132+
}
133+
}
134+
135+
async #waitResolved(calls: Call[]): Promise<unknown[]> {
136+
const start = this.#calls.length;
137+
this.#calls.push(...calls);
138+
const end = this.#calls.length;
139+
nextTick(() => {
140+
if (end === this.#calls.length) {
141+
this.#resolvePendingCalls();
142+
}
143+
});
144+
try {
145+
await Promise.race([
146+
this.#disposer.promise,
147+
this.#resolvedWaiter.promise,
148+
]);
149+
} catch {
150+
// Rethrow the error if the disposer is rejected.
151+
this.#ensureAvailable();
152+
}
153+
return this.#results.slice(start, end);
154+
}
155+
156+
async #resolvePendingCalls(): Promise<void> {
157+
const resultIndex = this.#results.length;
158+
const calls = this.#calls.slice(resultIndex);
159+
this.#results.length = this.#calls.length;
160+
const { resolve } = this.#resolvedWaiter;
161+
this.#resolvedWaiter = Promise.withResolvers();
162+
if (!this.#closed) {
163+
const results = await this.#resolveCalls(calls);
164+
this.#results.splice(resultIndex, results.length, ...results);
165+
}
166+
resolve();
167+
}
168+
169+
async #resolveCalls(calls: Call[]): Promise<unknown[]> {
170+
try {
171+
return await this.#denops.batch(...calls);
172+
} catch (error: unknown) {
173+
if (isBatchError(error)) {
174+
const { results, message } = error;
175+
const errorResult = {
176+
[errorProp]: { type: "error", message, cause: error },
177+
};
178+
const cancelledResults = calls.slice(results.length + 1)
179+
.map(() => ({
180+
[errorProp]: { type: "cancel", cause: error },
181+
}));
182+
return [...results, errorResult, ...cancelledResults];
183+
} else {
184+
const message = error instanceof Error ? error.message : String(error);
185+
const unknownErrors = calls.map(() => ({
186+
[errorProp]: { type: "unknown", message, cause: error },
187+
}));
188+
return unknownErrors;
189+
}
190+
}
191+
}
192+
}
193+
194+
function isBatchError(obj: unknown): obj is BatchError {
195+
return obj instanceof Error && obj.name === "BatchError";
196+
}
197+
198+
function isErrorResult(obj: unknown): obj is ErrorResult {
199+
return obj != null && Object.hasOwn(obj, errorProp);
200+
}
201+
202+
/**
203+
* Runs an `executor` function while automatically batching multiple RPCs.
204+
*
205+
* `accumulate()` allows you to write normal async functions while automatically
206+
* batching multiple RPCs that occur at the same timing (during microtask
207+
* processing) into a single RPC call.
208+
*
209+
* Note that RPC calls with side effects should be avoided, and if you do, the
210+
* order in which you call them should be carefully considered.
211+
*
212+
* @example
213+
* ```typescript
214+
* import { assertType, IsExact } from "jsr:@std/testing/types";
215+
* import type { Entrypoint } from "jsr:@denops/std";
216+
* import * as fn from "jsr:@denops/std/function";
217+
* import { accumulate } from "jsr:@denops/std/batch";
218+
*
219+
* export const main: Entrypoint = async (denops) => {
220+
* const results = await accumulate(denops, async (denops) => {
221+
* const lines = await fn.getline(denops, 1, "$");
222+
* return await Promise.all(lines.map(async (line, index) => {
223+
* const keyword = await fn.matchstr(denops, line, "\\k\\+");
224+
* const len = await fn.len(denops, keyword);
225+
* return {
226+
* lnum: index + 1,
227+
* keyword,
228+
* len,
229+
* };
230+
* }));
231+
* });
232+
*
233+
* assertType<
234+
* IsExact<
235+
* typeof results,
236+
* { lnum: number; keyword: string; len: number; }[]
237+
* >
238+
* >(true);
239+
* }
240+
* ```
241+
*
242+
* In the case of the example, the following 3 RPCs are called.
243+
*
244+
* 1. RPC call to `getline`.
245+
* 2. Multiple `matchstr` calls in one RPC.
246+
* 3. Multiple `len` calls in one RPC.
247+
*
248+
* @remarks
249+
* The `denops` instance passed as the argument to the `executor` function is
250+
* only valid within the `accumulate()` block. Attempting to use it outside the
251+
* block will result in an error when calling `denops.call()`, `denops.batch()`,
252+
* `denops.cmd()`, or `denops.eval()`.
253+
*/
254+
export async function accumulate<T extends unknown>(
255+
denops: Denops,
256+
executor: (helper: Denops) => T,
257+
): Promise<Awaited<T>> {
258+
const helper = new AccumulateHelper(denops);
259+
try {
260+
return await executor(helper);
261+
} finally {
262+
AccumulateHelper.close(helper);
263+
}
264+
}

0 commit comments

Comments
 (0)