Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion denops/fall/processor/collect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import { dispatch } from "../event.ts";

const THRESHOLD = 10000;
const CHUNK_SIZE = 1000;
const CHUNK_INTERVAL = 100;

export type CollectProcessorOptions = {
threshold?: number;
chunkSize?: number;
chunkInterval?: number;
};

export class CollectProcessor<T extends Detail> implements Disposable {
Expand All @@ -21,6 +23,7 @@ export class CollectProcessor<T extends Detail> implements Disposable {
readonly #source: Source<T>;
readonly #threshold: number;
readonly #chunkSize: number;
readonly #chunkInterval: number;
#processing?: Promise<void>;
#paused?: PromiseWithResolvers<void>;

Expand All @@ -31,6 +34,7 @@ export class CollectProcessor<T extends Detail> implements Disposable {
this.#source = source;
this.#threshold = options.threshold ?? THRESHOLD;
this.#chunkSize = options.chunkSize ?? CHUNK_SIZE;
this.#chunkInterval = options.chunkInterval ?? CHUNK_INTERVAL;
}

get items() {
Expand Down Expand Up @@ -72,10 +76,15 @@ export class CollectProcessor<T extends Detail> implements Disposable {
dispatch({ type: "collect-processor-updated" });
};
const chunker = new Chunker<IdItem<T>>(this.#chunkSize);
let lastChunkTime = performance.now();
for await (const item of iter) {
if (this.#paused) await this.#paused.promise;
signal.throwIfAborted();
if (chunker.put(item)) {
if (
chunker.put(item) ||
performance.now() - lastChunkTime > this.#chunkInterval
) {
lastChunkTime = performance.now();
update(chunker.consume());
}
}
Expand Down
11 changes: 10 additions & 1 deletion denops/fall/processor/match.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import { dispatch } from "../event.ts";
const INTERVAL = 0;
const THRESHOLD = 10000;
const CHUNK_SIZE = 1000;
const CHUNK_INTERVAL = 100;

export type MatchProcessorOptions = {
interval?: number;
threshold?: number;
chunkSize?: number;
chunkInterval?: number;
incremental?: boolean;
};

Expand All @@ -24,6 +26,7 @@ export class MatchProcessor<T extends Detail> implements Disposable {
readonly #interval: number;
readonly #threshold: number;
readonly #chunkSize: number;
readonly #chunkInterval: number;
readonly #incremental: boolean;
#controller: AbortController = new AbortController();
#processing?: Promise<void>;
Expand All @@ -38,6 +41,7 @@ export class MatchProcessor<T extends Detail> implements Disposable {
this.#interval = options.interval ?? INTERVAL;
this.#threshold = options.threshold ?? THRESHOLD;
this.#chunkSize = options.chunkSize ?? CHUNK_SIZE;
this.#chunkInterval = options.chunkInterval ?? CHUNK_INTERVAL;
this.#incremental = options.incremental ?? false;
}

Expand Down Expand Up @@ -109,9 +113,14 @@ export class MatchProcessor<T extends Detail> implements Disposable {
}
};
const chunker = new Chunker<IdItem<T>>(this.#chunkSize);
let lastChunkTime = performance.now();
for await (const item of iter) {
signal.throwIfAborted();
if (chunker.put(item)) {
if (
chunker.put(item) ||
performance.now() - lastChunkTime > this.#chunkInterval
) {
lastChunkTime = performance.now();
update(chunker.consume());
await delay(this.#interval, { signal });
}
Expand Down