Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DeployOptions, HotswapProperties } from '..';
import { Deployments, EcsHotswapProperties, HotswapPropertyOverrides, WorkGraph } from '../../../api/aws-cdk';
import { Deployments, EcsHotswapProperties, HotswapPropertyOverrides, type WorkGraph } from '../../../api/aws-cdk';

export function buildParameterMap(parameters?: Map<string, string | undefined>): { [name: string]: { [name: string]: string | undefined } } {
const parameterMap: {
Expand Down
2 changes: 1 addition & 1 deletion packages/@aws-cdk/toolkit-lib/lib/api/aws-cdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export { DEFAULT_TOOLKIT_STACK_NAME } from '../../../../aws-cdk/lib/api/toolkit-
export { ResourceMigrator } from '../../../../aws-cdk/lib/api/resource-import';
export { StackActivityProgress } from '../../../../aws-cdk/lib/api/stack-events';
export { CloudWatchLogEventMonitor, findCloudWatchLogGroups } from '../../../../aws-cdk/lib/api/logs';
export { WorkGraph, WorkGraphBuilder, AssetBuildNode, AssetPublishNode, StackNode, Concurrency } from '../../../../aws-cdk/lib/api/work-graph';
export { type WorkGraph, WorkGraphBuilder, AssetBuildNode, AssetPublishNode, StackNode, Concurrency } from '../../../../aws-cdk/lib/api/work-graph';

// Context Providers
export * as contextproviders from '../../../../aws-cdk/lib/context-providers';
Expand Down
2 changes: 1 addition & 1 deletion packages/@aws-cdk/toolkit-lib/lib/toolkit/toolkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ export class Toolkit extends CloudAssemblySourceBuilder implements AsyncDisposab
stack,
...stack.dependencies.filter(cxapi.AssetManifestArtifact.isAssetManifestArtifact),
]);
const workGraph = new WorkGraphBuilder(prebuildAssets).build(stacksAndTheirAssetManifests);
const workGraph = new WorkGraphBuilder({ ioHost, action }, prebuildAssets).build(stacksAndTheirAssetManifests);

// Unless we are running with '--force', skip already published assets
if (!options.force) {
Expand Down
23 changes: 18 additions & 5 deletions packages/aws-cdk/lib/api/work-graph/work-graph-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as cxapi from '@aws-cdk/cx-api';
import { AssetManifest, type IManifestEntry } from 'cdk-assets';
import { WorkGraph } from './work-graph';
import { DeploymentState, AssetBuildNode, WorkNode } from './work-graph-types';
import { IoMessaging } from '../../toolkit/cli-io-host';
import { ToolkitError } from '../../toolkit/error';
import { contentHashAny } from '../../util/content-hash';

Expand All @@ -20,10 +21,18 @@ export class WorkGraphBuilder {
'asset-publish': 0,
'stack': 5,
};
private readonly graph = new WorkGraph();

constructor(private readonly prebuildAssets: boolean, private readonly idPrefix = '') {

private readonly graph: WorkGraph;
private readonly ioHost: IoMessaging['ioHost'];
private readonly action: IoMessaging['action'];

constructor(
{ ioHost, action }: IoMessaging,
private readonly prebuildAssets: boolean,
private readonly idPrefix = '',
) {
this.graph = new WorkGraph({}, { ioHost, action });
this.ioHost = ioHost;
this.action = action;
}

private addStack(artifact: cxapi.CloudFormationStackArtifact) {
Expand Down Expand Up @@ -120,7 +129,11 @@ export class WorkGraphBuilder {
}
} else if (cxapi.NestedCloudAssemblyArtifact.isNestedCloudAssemblyArtifact(artifact)) {
const assembly = new cxapi.CloudAssembly(artifact.fullPath, { topoSort: false });
const nestedGraph = new WorkGraphBuilder(this.prebuildAssets, `${this.idPrefix}${artifact.id}.`).build(assembly.artifacts);
const nestedGraph = new WorkGraphBuilder(
{ ioHost: this.ioHost, action: this.action },
this.prebuildAssets,
`${this.idPrefix}${artifact.id}.`,
).build(assembly.artifacts);
this.graph.absorb(nestedGraph);
} else {
// Ignore whatever else
Expand Down
68 changes: 41 additions & 27 deletions packages/aws-cdk/lib/api/work-graph/work-graph.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
import { WorkNode, DeploymentState, StackNode, AssetBuildNode, AssetPublishNode } from './work-graph-types';
import { debug, trace } from '../../logging';
import { debug, trace } from '../../cli/messages';
import { IoMessaging } from '../../toolkit/cli-io-host';
import { ToolkitError } from '../../toolkit/error';
import { parallelPromises } from '../../util/parallel';

export type Concurrency = number | Record<WorkNode['type'], number>;

export interface WorkGraphProps {
ioHost: IoMessaging['ioHost'];
action: IoMessaging['action'];
}

export class WorkGraph {
public readonly nodes: Record<string, WorkNode>;
private readonly readyPool: Array<WorkNode> = [];
private readonly lazyDependencies = new Map<string, string[]>();
private readonly ioHost: IoMessaging['ioHost'];
private readonly action: IoMessaging['action'];

public error?: Error;

public constructor(nodes: Record<string, WorkNode> = {}) {
public constructor(nodes: Record<string, WorkNode>, props: WorkGraphProps) {
this.nodes = { ...nodes };

this.ioHost = props.ioHost;
this.action = props.action;
}

public addNodes(...nodes: WorkNode[]) {
Expand Down Expand Up @@ -118,8 +130,8 @@ export class WorkGraph {
/**
* Return the set of unblocked nodes
*/
public ready(): ReadonlyArray<WorkNode> {
this.updateReadyPool();
public async ready(): Promise<ReadonlyArray<WorkNode>> {
await this.updateReadyPool();
return this.readyPool;
}

Expand Down Expand Up @@ -149,28 +161,30 @@ export class WorkGraph {
start();

function start() {
graph.updateReadyPool();

for (let i = 0; i < graph.readyPool.length; ) {
const node = graph.readyPool[i];

if (active[node.type] < max[node.type] && totalActive() < totalMax) {
graph.readyPool.splice(i, 1);
startOne(node);
} else {
i += 1;
graph.updateReadyPool().then(() => {
for (let i = 0; i < graph.readyPool.length; ) {
const node = graph.readyPool[i];

if (active[node.type] < max[node.type] && totalActive() < totalMax) {
graph.readyPool.splice(i, 1);
startOne(node);
} else {
i += 1;
}
}
}

if (totalActive() === 0) {
if (graph.done()) {
ok();
if (totalActive() === 0) {
if (graph.done()) {
ok();
}
// wait for other active deploys to finish before failing
if (graph.hasFailed()) {
fail(graph.error);
}
}
// wait for other active deploys to finish before failing
if (graph.hasFailed()) {
fail(graph.error);
}
}
}).catch((e) => {
fail(e);
});
}

function startOne(x: WorkNode) {
Expand Down Expand Up @@ -252,7 +266,7 @@ export class WorkGraph {
* Do this in parallel, because there may be a lot of assets in an application (seen in practice: >100 assets)
*/
public async removeUnnecessaryAssets(isUnnecessary: (x: AssetPublishNode) => Promise<boolean>) {
debug('Checking for previously published assets');
await this.ioHost.notify(debug(this.action, 'Checking for previously published assets'));

const publishes = this.nodesOfType('asset-publish');

Expand All @@ -265,7 +279,7 @@ export class WorkGraph {
this.removeNode(assetNode);
}

debug(`${publishes.length} total assets, ${publishes.length - alreadyPublished.length} still need to be published`);
await this.ioHost.notify(debug(this.action, `${publishes.length} total assets, ${publishes.length - alreadyPublished.length} still need to be published`));

// Now also remove any asset build steps that don't have any dependencies on them anymore
const unusedBuilds = this.nodesOfType('asset-build').filter(build => this.dependees(build).length === 0);
Expand All @@ -274,7 +288,7 @@ export class WorkGraph {
}
}

private updateReadyPool() {
private async updateReadyPool() {
const activeCount = Object.values(this.nodes).filter((x) => x.deploymentState === DeploymentState.DEPLOYING).length;
const pendingCount = Object.values(this.nodes).filter((x) => x.deploymentState === DeploymentState.PENDING).length;

Expand All @@ -296,7 +310,7 @@ export class WorkGraph {

if (this.readyPool.length === 0 && activeCount === 0 && pendingCount > 0) {
const cycle = this.findCycle() ?? ['No cycle found!'];
trace(`Cycle ${cycle.join(' -> ')} in graph ${this}`);
await this.ioHost.notify(trace(this.action, `Cycle ${cycle.join(' -> ')} in graph ${this}`));
throw new ToolkitError(`Unable to make progress anymore, dependency cycle between remaining artifacts: ${cycle.join(' -> ')} (run with -vv for full graph)`);
}
}
Expand Down
7 changes: 5 additions & 2 deletions packages/aws-cdk/lib/cli/cdk-toolkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { CloudWatchLogEventMonitor } from '../api/logs/logs-monitor';
import { ResourceImporter, removeNonImportResources, ResourceMigrator } from '../api/resource-import';
import { StackActivityProgress } from '../api/stack-events';
import { tagsForStack, type Tag } from '../api/tags';
import { type AssetBuildNode, type AssetPublishNode, type Concurrency, type StackNode, WorkGraph } from '../api/work-graph';
import { type AssetBuildNode, type AssetPublishNode, type Concurrency, type StackNode, type WorkGraph } from '../api/work-graph';
import { WorkGraphBuilder } from '../api/work-graph/work-graph-builder';
import {
generateCdkApp,
Expand Down Expand Up @@ -583,7 +583,10 @@ export class CdkToolkit {
stack,
...stack.dependencies.filter(cxapi.AssetManifestArtifact.isAssetManifestArtifact),
]);
const workGraph = new WorkGraphBuilder(prebuildAssets).build(stacksAndTheirAssetManifests);
const workGraph = new WorkGraphBuilder({
ioHost: this.ioHost,
action: 'deploy',
}, prebuildAssets).build(stacksAndTheirAssetManifests);

// Unless we are running with '--force', skip already published assets
if (!options.force) {
Expand Down
32 changes: 19 additions & 13 deletions packages/aws-cdk/test/api/work-graph/work-graph-builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import { CloudAssemblyBuilder } from '@aws-cdk/cx-api';
import { expect } from '@jest/globals';
import { WorkGraph, WorkGraphBuilder } from '../../../lib/api/work-graph';
import type { AssetBuildNode, AssetPublishNode, StackNode, WorkNode } from '../../../lib/api/work-graph';
import { CliIoHost, IoMessaging } from '../../../lib/toolkit/cli-io-host';

let rootBuilder: CloudAssemblyBuilder;
let mockMsg: IoMessaging = {
ioHost: CliIoHost.instance(),
action: 'deploy'
};

beforeEach(() => {
rootBuilder = new CloudAssemblyBuilder();
});
Expand Down Expand Up @@ -44,7 +50,7 @@ describe('with some stacks and assets', () => {
});

test('stack depends on the asset publishing step', () => {
const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);

expect(assertableNode(graph.node('stack2'))).toEqual(expect.objectContaining({
type: 'stack',
Expand All @@ -53,7 +59,7 @@ describe('with some stacks and assets', () => {
});

test('asset publishing step depends on asset building step', () => {
const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);

expect(graph.node('publish-F1-add54bdbcb')).toEqual(expect.objectContaining({
type: 'asset-publish',
Expand All @@ -62,7 +68,7 @@ describe('with some stacks and assets', () => {
});

test('with prebuild off, asset building inherits dependencies from their parent stack', () => {
const graph = new WorkGraphBuilder(false).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, false).build(assembly.artifacts);

expect(graph.node('build-F1-a533139934')).toEqual(expect.objectContaining({
type: 'asset-build',
Expand All @@ -71,7 +77,7 @@ describe('with some stacks and assets', () => {
});

test('with prebuild on, assets only have their own dependencies', () => {
const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);

expect(graph.node('build-F1-a533139934')).toEqual(expect.objectContaining({
type: 'asset-build',
Expand All @@ -90,8 +96,8 @@ test('tree metadata is ignored', async () => {

const assembly = rootBuilder.buildAssembly();

const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
expect(graph.ready().length).toEqual(0);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);
expect((await graph.ready()).length).toEqual(0);
});

test('can handle nested assemblies', async () => {
Expand All @@ -103,7 +109,7 @@ test('can handle nested assemblies', async () => {
const assembly = rootBuilder.buildAssembly();

let workDone = 0;
const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);

await graph.doParallel(10, {
deployStack: async () => { workDone += 1; },
Expand All @@ -126,8 +132,8 @@ test('dependencies on unselected artifacts are silently ignored', async () => {
});

const asm = rootBuilder.buildAssembly();
const graph = new WorkGraphBuilder(true).build([asm.getStackArtifact('stackB')]);
expect(graph.ready()[0]).toEqual(expect.objectContaining({
const graph = new WorkGraphBuilder(mockMsg, true).build([asm.getStackArtifact('stackB')]);
expect((await graph.ready())[0]).toEqual(expect.objectContaining({
id: 'stackB',
dependencies: new Set(),
}));
Expand Down Expand Up @@ -162,7 +168,7 @@ describe('tests that use assets', () => {

const assembly = rootBuilder.buildAssembly();

const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);
const traversal = await traverseAndRecord(graph);

expect(traversal).toEqual([
Expand All @@ -184,7 +190,7 @@ describe('tests that use assets', () => {
addAssets(rootBuilder, 'StackC.assets', { files });

const assembly = rootBuilder.buildAssembly();
const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);

// THEN
expect(graph.findCycle()).toBeUndefined();
Expand Down Expand Up @@ -224,7 +230,7 @@ describe('tests that use assets', () => {

const assembly = rootBuilder.buildAssembly();

const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);
const traversal = await traverseAndRecord(graph);

expect(traversal).toEqual([
Expand Down Expand Up @@ -270,7 +276,7 @@ describe('tests that use assets', () => {

const assembly = rootBuilder.buildAssembly();

const graph = new WorkGraphBuilder(true).build(assembly.artifacts);
const graph = new WorkGraphBuilder(mockMsg, true).build(assembly.artifacts);
const traversal = await traverseAndRecord(graph);

expect(traversal).toEqual([
Expand Down
14 changes: 10 additions & 4 deletions packages/aws-cdk/test/api/work-graph/work-graph.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { WorkGraph, DeploymentState } from '../../../lib/api/work-graph';
import type { AssetBuildNode, AssetPublishNode, StackNode } from '../../../lib/api/work-graph';
import { CliIoHost, IoMessaging } from '../../../lib/toolkit/cli-io-host';

const DUMMY: any = 'DUMMY';

Expand All @@ -9,6 +10,11 @@ const sleep = async (duration: number) => new Promise<void>((resolve) => setTime
// a chance to start new tasks.
const SLOW = 200;

let mockMsg: IoMessaging = {
ioHost: CliIoHost.instance(),
action: 'deploy'
}

/**
* Repurposing unused stack attributes to create specific test scenarios
* - stack.name = deployment duration
Expand Down Expand Up @@ -243,7 +249,7 @@ describe('WorkGraph', () => {
expected: ['c-build', 'c-publish', 'A', 'b-build', 'b-publish', 'B'],
},
])('Success - Concurrency: $concurrency - $scenario', async ({ concurrency, expected, toDeploy }) => {
const graph = new WorkGraph();
const graph = new WorkGraph({}, mockMsg);
addTestArtifactsToGraph(toDeploy, graph);

await graph.doParallel(concurrency, callbacks);
Expand All @@ -252,7 +258,7 @@ describe('WorkGraph', () => {
});

test('can remove unnecessary assets', async () => {
const graph = new WorkGraph();
const graph = new WorkGraph({}, mockMsg);
addTestArtifactsToGraph([
{ id: 'a', type: 'asset' },
{ id: 'b', type: 'asset' },
Expand Down Expand Up @@ -375,7 +381,7 @@ describe('WorkGraph', () => {
expected: ['b-build', 'C'],
},
])('Failure - Concurrency: $concurrency - $scenario', async ({ concurrency, expectedError, toDeploy, expected }) => {
const graph = new WorkGraph();
const graph = new WorkGraph({}, mockMsg);
addTestArtifactsToGraph(toDeploy, graph);

await expect(graph.doParallel(concurrency, callbacks)).rejects.toThrow(expectedError);
Expand Down Expand Up @@ -411,7 +417,7 @@ describe('WorkGraph', () => {
expectedError: 'B -> C -> D -> B',
},
])('Failure - Graph Circular Dependencies - $scenario', async ({ toDeploy, expectedError }) => {
const graph = new WorkGraph();
const graph = new WorkGraph({}, mockMsg);
addTestArtifactsToGraph(toDeploy, graph);

await expect(graph.doParallel(1, callbacks)).rejects.toThrow(new RegExp(`Unable to make progress.*${expectedError}`));
Expand Down
Loading