Skip to content

Commit 62129d0

Browse files
authored
Code cleanup (microsoft#256671)
1 parent 580d122 commit 62129d0

File tree

4 files changed

+686
-361
lines changed

4 files changed

+686
-361
lines changed

src/vs/base/common/async.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2423,3 +2423,108 @@ export class AsyncIterableProducer<T> implements AsyncIterable<T> {
24232423
}
24242424

24252425
//#endregion
2426+
2427+
export const AsyncReaderEndOfStream = Symbol('AsyncReaderEndOfStream');
2428+
2429+
export class AsyncReader<T> {
2430+
private _buffer: T[] = [];
2431+
private _atEnd = false;
2432+
2433+
public get endOfStream(): boolean { return this._buffer.length === 0 && this._atEnd; }
2434+
private _extendBufferPromise: Promise<void> | undefined;
2435+
2436+
constructor(
2437+
private readonly _source: AsyncIterator<T>
2438+
) {
2439+
}
2440+
2441+
public async read(): Promise<T | typeof AsyncReaderEndOfStream> {
2442+
if (this._buffer.length === 0 && !this._atEnd) {
2443+
await this._extendBuffer();
2444+
}
2445+
if (this._buffer.length === 0) {
2446+
return AsyncReaderEndOfStream;
2447+
}
2448+
return this._buffer.shift()!;
2449+
}
2450+
2451+
public async readWhile(predicate: (value: T) => boolean, callback: (element: T) => unknown): Promise<void> {
2452+
do {
2453+
const piece = await this.peek();
2454+
if (piece === AsyncReaderEndOfStream) {
2455+
break;
2456+
}
2457+
if (!predicate(piece)) {
2458+
break;
2459+
}
2460+
await this.read(); // consume
2461+
await callback(piece);
2462+
} while (true);
2463+
}
2464+
2465+
public readBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2466+
const value = this.peekBufferedOrThrow();
2467+
this._buffer.shift();
2468+
return value;
2469+
}
2470+
2471+
public async consumeToEnd(): Promise<void> {
2472+
while (!this.endOfStream) {
2473+
await this.read();
2474+
}
2475+
}
2476+
2477+
public async peek(): Promise<T | typeof AsyncReaderEndOfStream> {
2478+
if (this._buffer.length === 0 && !this._atEnd) {
2479+
await this._extendBuffer();
2480+
}
2481+
if (this._buffer.length === 0) {
2482+
return AsyncReaderEndOfStream;
2483+
}
2484+
return this._buffer[0];
2485+
}
2486+
2487+
public peekBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2488+
if (this._buffer.length === 0) {
2489+
if (this._atEnd) {
2490+
return AsyncReaderEndOfStream;
2491+
}
2492+
throw new BugIndicatingError('No buffered elements');
2493+
}
2494+
2495+
return this._buffer[0];
2496+
}
2497+
2498+
public async peekTimeout(timeoutMs: number): Promise<T | typeof AsyncReaderEndOfStream | undefined> {
2499+
if (this._buffer.length === 0 && !this._atEnd) {
2500+
await raceTimeout(this._extendBuffer(), timeoutMs);
2501+
}
2502+
if (this._atEnd) {
2503+
return AsyncReaderEndOfStream;
2504+
}
2505+
if (this._buffer.length === 0) {
2506+
return undefined;
2507+
}
2508+
return this._buffer[0];
2509+
}
2510+
2511+
private _extendBuffer(): Promise<void> {
2512+
if (this._atEnd) {
2513+
return Promise.resolve();
2514+
}
2515+
2516+
if (!this._extendBufferPromise) {
2517+
this._extendBufferPromise = (async () => {
2518+
const { value, done } = await this._source.next();
2519+
this._extendBufferPromise = undefined;
2520+
if (done) {
2521+
this._atEnd = true;
2522+
} else {
2523+
this._buffer.push(value);
2524+
}
2525+
})();
2526+
}
2527+
2528+
return this._extendBufferPromise;
2529+
}
2530+
}

0 commit comments

Comments
 (0)