Skip to content
1 change: 1 addition & 0 deletions controlplane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
"stream-json": "^1.8.0",
"stripe": "^14.19.0",
"tiny-lru": "^11.2.11",
"tinypool": "^2.0.0",
"uid": "^2.0.2",
"uuid": "^10.0.0",
"zod": "^3.22.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ export function createContract(
chClient: opts.chClient!,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [{ ...contractGraph, contract }],
composeWorkerPool: opts.composeWorkerPool,
});

compositionErrors.push(...composition.compositionErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export function updateContract(
},
labelMatchers: [],
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
});

Expand All @@ -155,6 +156,7 @@ export function updateContract(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ export function createFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export function deleteFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function enableFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export function updateFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: allFederatedGraphsToCompose,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ export function createFederatedGraph(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [federatedGraph],
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ export function migrateFromApollo(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: {
disableResolvabilityValidation: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export function moveFederatedGraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

const allDeploymentErrors: PlainMessage<DeploymentError>[] = [];
Expand Down Expand Up @@ -163,6 +164,7 @@ export function moveFederatedGraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

allCompositionErrors.push(...contractErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function updateFederatedGraph(
admissionWebhookURL: req.admissionWebhookURL,
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
labelMatchers: req.labelMatchers,
namespaceId: federatedGraph.namespaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export function setGraphRouterCompatibilityVersion(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [federatedGraph],
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export function publishMonograph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

for (const graph of updatedFederatedGraphs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export function updateMonograph(
admissionWebhookURL: req.admissionWebhookURL,
admissionWebhookSecret: req.admissionWebhookSecret,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
});

await subgraphRepo.update(
Expand All @@ -173,6 +174,7 @@ export function updateMonograph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

await auditLogRepo.addAuditLog({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ export function deleteFederatedSubgraph(
chClient: opts.chClient!,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: affectedFederatedGraphs,
composeWorkerPool: opts.composeWorkerPool,
});

return { affectedFederatedGraphs, compositionErrors, deploymentErrors, compositionWarnings };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export function moveSubgraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ export function publishFederatedSubgraph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export function updateSubgraph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
14 changes: 14 additions & 0 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { compressionBrotli, compressionGzip } from '@connectrpc/connect-node';
import fastifyGracefulShutdown from 'fastify-graceful-shutdown';
import { App } from 'octokit';
import { Worker } from 'bullmq';
import { Tinypool } from 'tinypool';
import { getWorkerPool } from '../workers/compose.js';
import routes from './routes.js';
import fastifyHealth from './plugins/health.js';
import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js';
Expand Down Expand Up @@ -136,6 +138,9 @@ export interface BuildConfig {
key?: string; // e.g. string or '/path/to/my/client-key.pem'
};
};
composeWorkers?: {
maxCount: number;
};
}

export interface MetricsOptions {
Expand Down Expand Up @@ -469,6 +474,8 @@ export default async function build(opts: BuildConfig) {
keycloakRealm: opts.keycloak.realm,
});

const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;

// Must be registered after custom fastify routes
// Because it registers an all-catch route for connect handlers

Expand Down Expand Up @@ -502,6 +509,7 @@ export default async function build(opts: BuildConfig) {
stripeSecretKey: opts.stripe?.secret,
admissionWebhookJWTSecret: opts.admissionWebhook.secret,
cdnBaseUrl: opts.cdnBaseUrl,
composeWorkerPool,
}),
contextValues(req) {
return createContextValues().set<FastifyBaseLogger>({ id: fastifyLoggerId, defaultValue: req.log }, req.log);
Expand All @@ -522,6 +530,12 @@ export default async function build(opts: BuildConfig) {
});

fastify.gracefulShutdown(async () => {
fastify.log.debug('Shutting down compose worker pool');

await composeWorkerPool.destroy();

fastify.log.debug('Compose worker pool shut down');

fastify.log.debug('Shutting down bull workers');

await Promise.all(bullWorkers.map((worker) => worker.close()));
Expand Down
24 changes: 17 additions & 7 deletions controlplane/src/core/repositories/FederatedGraphRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { FastifyBaseLogger } from 'fastify';
import { parse } from 'graphql';
import { generateKeyPair, importPKCS8, SignJWT } from 'jose';
import { Tinypool } from 'tinypool';
import { uid } from 'uid/secure';
import {
ContractTagOptions,
Expand Down Expand Up @@ -205,6 +206,7 @@ export class FederatedGraphRepository {
targetId: string;
updatedBy: string;
chClient: ClickHouseClient;
composeWorkerPool: Tinypool;
admissionWebhookSecret?: string;
admissionWebhookURL?: string;
compositionOptions?: CompositionOptions;
Expand Down Expand Up @@ -337,6 +339,7 @@ export class FederatedGraphRepository {
actorId: data.updatedBy,
chClient: data.chClient,
compositionOptions: data.compositionOptions,
composeWorkerPool: data.composeWorkerPool,
});

return {
Expand Down Expand Up @@ -369,6 +372,7 @@ export class FederatedGraphRepository {
cdnBaseUrl: string;
},
chClient: ClickHouseClient,
composeWorkerPool: Tinypool,
compositionOptions?: CompositionOptions,
): Promise<{
compositionErrors: PlainMessage<CompositionError>[];
Expand Down Expand Up @@ -430,6 +434,7 @@ export class FederatedGraphRepository {
chClient,
compositionOptions,
federatedGraphs: [movedContractGraph],
composeWorkerPool,
});

return {
Expand All @@ -449,6 +454,7 @@ export class FederatedGraphRepository {
},
chClient,
compositionOptions,
composeWorkerPool,
});

return {
Expand Down Expand Up @@ -1505,6 +1511,7 @@ export class FederatedGraphRepository {
chClient,
blobStorage,
federatedGraphs,
composeWorkerPool,
}: {
actorId: string;
admissionConfig: {
Expand All @@ -1515,6 +1522,7 @@ export class FederatedGraphRepository {
chClient: ClickHouseClient;
federatedGraphs: FederatedGraphDTO[];
compositionOptions?: CompositionOptions;
composeWorkerPool: Tinypool;
}) => {
return this.db.transaction(async (tx) => {
const subgraphRepo = new SubgraphRepository(this.logger, tx, this.organizationId);
Expand All @@ -1524,7 +1532,7 @@ export class FederatedGraphRepository {
const graphCompositionRepo = new GraphCompositionRepository(this.logger, tx);
const composer = new Composer(
this.logger,
this.db,
tx,
fedGraphRepo,
subgraphRepo,
contractRepo,
Expand Down Expand Up @@ -1577,12 +1585,14 @@ export class FederatedGraphRepository {
const contractBaseCompositionDataByContractId = new Map<string, ContractBaseCompositionData>();

for (const subgraphsToCompose of allSubgraphsToCompose) {
const result: FederationResult | FederationResultWithContracts = getFederationResultWithPotentialContracts(
federatedGraph,
subgraphsToCompose,
tagOptionsByContractName,
compositionOptions,
);
const result: FederationResult | FederationResultWithContracts =
await getFederationResultWithPotentialContracts(
composeWorkerPool,
federatedGraph,
subgraphsToCompose,
tagOptionsByContractName,
compositionOptions,
);

if (!result.success) {
// Collect all composition errors
Expand Down
5 changes: 5 additions & 0 deletions controlplane/src/core/repositories/SubgraphRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { and, asc, count, desc, eq, getTableName, gt, inArray, like, lt, notInAr
import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { FastifyBaseLogger } from 'fastify';
import { GraphQLSchema } from 'graphql';
import { Tinypool } from 'tinypool';
import { DBSubgraphType, WebsocketSubprotocol } from '../../db/models.js';
import * as schema from '../../db/schema.js';
import {
Expand Down Expand Up @@ -242,6 +243,7 @@ export class SubgraphRepository {
cdnBaseUrl: string;
},
chClient: ClickHouseClient,
composeWorkerPool: Tinypool,
compositionOptions?: CompositionOptions,
): Promise<{
compositionErrors: PlainMessage<CompositionError>[];
Expand Down Expand Up @@ -470,6 +472,7 @@ export class SubgraphRepository {
chClient,
compositionOptions,
federatedGraphs: updatedFederatedGraphs.filter((g) => !g.contract),
composeWorkerPool,
});

compositionErrors.push(...cErrors);
Expand Down Expand Up @@ -501,6 +504,7 @@ export class SubgraphRepository {
cdnBaseUrl: string;
},
chClient: ClickHouseClient,
composeWorkerPool: Tinypool,
compositionOptions?: CompositionOptions,
): Promise<{
compositionErrors: PlainMessage<CompositionError>[];
Expand Down Expand Up @@ -554,6 +558,7 @@ export class SubgraphRepository {
actorId: data.updatedBy,
chClient,
compositionOptions,
composeWorkerPool,
});

return { compositionErrors, updatedFederatedGraphs, deploymentErrors, compositionWarnings };
Expand Down
2 changes: 2 additions & 0 deletions controlplane/src/core/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PlatformService } from '@wundergraph/cosmo-connect/dist/platform/v1/pla
import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import pino from 'pino';
import { App } from 'octokit';
import { Tinypool } from 'tinypool';
import * as schema from '../db/schema.js';
import NodeServiceImpl from './bufservices/NodeService.js';
import PlatformServiceImpl from './bufservices/PlatformService.js';
Expand Down Expand Up @@ -51,6 +52,7 @@ export interface RouterOptions {
};
stripeSecretKey?: string;
cdnBaseUrl: string;
composeWorkerPool: Tinypool;
}
const handlerOptions: Partial<ConnectRouterOptions> = {
maxTimeoutMs: 80_000,
Expand Down
Loading
Loading