Skip to content

Commit 301d926

Browse files
committed
feat(@angular-devkit/architect): support async generator based builders
"Watch" type builders (builders with more than one result) can now be implemented as async generator functions. This allows such a builder to be more easily implemented in cases where an upstream API is async iterator/generator based or when async generators are the preferred method of implementation.
1 parent 2010023 commit 301d926

File tree

4 files changed

+78
-5
lines changed

4 files changed

+78
-5
lines changed

etc/api/angular_devkit/architect/src/index.d.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ export declare type BuilderInput = json.JsonObject & RealBuilderInput;
4040

4141
export declare type BuilderOutput = json.JsonObject & RealBuilderOutput;
4242

43-
export declare type BuilderOutputLike = SubscribableOrPromise<BuilderOutput> | BuilderOutput;
43+
export declare type BuilderOutputLike = AsyncIterable<BuilderOutput> | SubscribableOrPromise<BuilderOutput> | BuilderOutput;
4444

4545
export declare type BuilderProgress = json.JsonObject & RealBuilderProgress & TypedBuilderProgress;
4646

@@ -62,6 +62,8 @@ export interface BuilderRun {
6262

6363
export declare function createBuilder<OptT extends json.JsonObject, OutT extends BuilderOutput = BuilderOutput>(fn: BuilderHandlerFn<OptT>): Builder<OptT>;
6464

65+
export declare function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Observable<T>;
66+
6567
export declare function isBuilderOutput(obj: any): obj is BuilderOutput;
6668

6769
export interface ScheduleOptions {

packages/angular_devkit/architect/src/api.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* found in the LICENSE file at https://angular.io/license
77
*/
88
import { analytics, experimental, json, logging } from '@angular-devkit/core';
9-
import { Observable, SubscribableOrPromise, from } from 'rxjs';
9+
import { Observable, SubscribableOrPromise, Subscriber, from } from 'rxjs';
1010
import { switchMap } from 'rxjs/operators';
1111
import { Schema as RealBuilderInput, Target as RealTarget } from './input-schema';
1212
import { Schema as RealBuilderOutput } from './output-schema';
@@ -258,17 +258,50 @@ export interface BuilderContext {
258258
/**
259259
* An accepted return value from a builder. Can be either an Observable, a Promise or a vector.
260260
*/
261-
export type BuilderOutputLike = SubscribableOrPromise<BuilderOutput> | BuilderOutput;
261+
export type BuilderOutputLike = AsyncIterable<BuilderOutput> | SubscribableOrPromise<BuilderOutput> | BuilderOutput;
262262

263263
// tslint:disable-next-line:no-any
264264
export function isBuilderOutput(obj: any): obj is BuilderOutput {
265265
if (!obj || typeof obj.then === 'function' || typeof obj.subscribe === 'function') {
266266
return false;
267267
}
268268

269+
if (typeof obj[Symbol.asyncIterator] === 'function') {
270+
return false;
271+
}
272+
269273
return typeof obj.success === 'boolean';
270274
}
271275

276+
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Observable<T> {
277+
return new Observable((subscriber) => {
278+
handleAsyncIterator(subscriber, iterable[Symbol.asyncIterator]()).then(
279+
() => subscriber.complete(),
280+
(error) => subscriber.error(error),
281+
);
282+
});
283+
}
284+
285+
async function handleAsyncIterator<T>(
286+
subscriber: Subscriber<T>,
287+
iterator: AsyncIterator<T>,
288+
): Promise<void> {
289+
const teardown = new Promise<void>((resolve) => subscriber.add(() => resolve()));
290+
291+
try {
292+
while (!subscriber.closed) {
293+
const result = await Promise.race([teardown, iterator.next()]);
294+
if (!result || result.done) {
295+
break;
296+
}
297+
298+
subscriber.next(result.value);
299+
}
300+
} finally {
301+
await iterator.return?.();
302+
}
303+
}
304+
272305
/**
273306
* A builder handler function. The function signature passed to `createBuilder()`.
274307
*/

packages/angular_devkit/architect/src/create-builder.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* found in the LICENSE file at https://angular.io/license
77
*/
88
import { analytics, experimental, json, logging } from '@angular-devkit/core';
9-
import { Observable, Subscription, from, of, throwError } from 'rxjs';
9+
import { Observable, Subscription, from, isObservable, of, throwError } from 'rxjs';
1010
import { tap } from 'rxjs/operators';
1111
import {
1212
BuilderContext,
@@ -18,6 +18,7 @@ import {
1818
ScheduleOptions,
1919
Target,
2020
TypedBuilderProgress,
21+
fromAsyncIterable,
2122
isBuilderOutput,
2223
targetStringFromTarget,
2324
} from './api';
@@ -200,6 +201,8 @@ export function createBuilder<
200201
result = fn(i.options as OptT, context);
201202
if (isBuilderOutput(result)) {
202203
result = of(result);
204+
} else if (!isObservable(result) && isAsyncIterable(result)) {
205+
result = fromAsyncIterable(result);
203206
} else {
204207
result = from(result);
205208
}
@@ -234,3 +237,7 @@ export function createBuilder<
234237
[BuilderVersionSymbol]: require('../package.json').version,
235238
};
236239
}
240+
241+
function isAsyncIterable<T>(obj: unknown): obj is AsyncIterable<T> {
242+
return !!obj && typeof (obj as AsyncIterable<T>)[Symbol.asyncIterator] === 'function';
243+
}

packages/angular_devkit/architect/src/index_spec.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ describe('architect', () => {
7070
await run.stop();
7171
});
7272

73+
it('supports async generator builders', async () => {
74+
testArchitectHost.addBuilder('package:test', createBuilder(async function*() { yield { success: true }; }));
75+
76+
const run = await architect.scheduleBuilder('package:test', {});
77+
expect(await run.result).toEqual(jasmine.objectContaining({ success: true }));
78+
await run.stop();
79+
});
80+
7381
it('runs builders parallel', async () => {
7482
const run = await architect.scheduleBuilder('package:test', {});
7583
const run2 = await architect.scheduleBuilder('package:test', {});
@@ -123,7 +131,7 @@ describe('architect', () => {
123131
}
124132
});
125133

126-
it('works with watching builders', async () => {
134+
it('works with watching observable builders', async () => {
127135
let results = 0;
128136
testArchitectHost.addBuilder('package:test-watch', createBuilder((_, context) => {
129137
called++;
@@ -150,6 +158,29 @@ describe('architect', () => {
150158
expect(all.length).toBe(10);
151159
});
152160

161+
it('works with watching async generator builders', async () => {
162+
let results = 0;
163+
testArchitectHost.addBuilder('package:test-watch-gen', createBuilder(async function*(_, context) {
164+
called++;
165+
166+
for (let x = 0; x < 10; x++) {
167+
context.reportRunning();
168+
yield { success: true };
169+
results++;
170+
}
171+
}));
172+
173+
const run = await architect.scheduleBuilder('package:test-watch-gen', {});
174+
await run.result;
175+
expect(called).toBe(1);
176+
expect(results).toBe(1);
177+
178+
const all = await run.output.pipe(toArray()).toPromise();
179+
expect(called).toBe(1);
180+
expect(results).toBe(10);
181+
expect(all.length).toBe(10);
182+
});
183+
153184
it('reports errors in the builder', async () => {
154185
testArchitectHost.addBuilder('package:error', createBuilder(() => {
155186
throw new Error('Error in the builder.');

0 commit comments

Comments
 (0)