Skip to content
1 change: 1 addition & 0 deletions controlplane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
"stream-json": "^1.8.0",
"stripe": "^14.19.0",
"tiny-lru": "^11.2.11",
"tinypool": "^2.0.0",
"uid": "^2.0.2",
"uuid": "^10.0.0",
"zod": "^3.22.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ export function createContract(
chClient: opts.chClient!,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [{ ...contractGraph, contract }],
composeWorkerPool: opts.composeWorkerPool,
});

compositionErrors.push(...composition.compositionErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export function updateContract(
},
labelMatchers: [],
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
});

Expand All @@ -155,6 +156,7 @@ export function updateContract(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ export function createFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export function deleteFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function enableFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export function updateFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: allFederatedGraphsToCompose,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ export function createFederatedGraph(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [federatedGraph],
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ export function migrateFromApollo(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: {
disableResolvabilityValidation: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export function moveFederatedGraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

const allDeploymentErrors: PlainMessage<DeploymentError>[] = [];
Expand Down Expand Up @@ -163,6 +164,7 @@ export function moveFederatedGraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

allCompositionErrors.push(...contractErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function updateFederatedGraph(
admissionWebhookURL: req.admissionWebhookURL,
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
labelMatchers: req.labelMatchers,
namespaceId: federatedGraph.namespaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export function setGraphRouterCompatibilityVersion(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [federatedGraph],
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export function publishMonograph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

for (const graph of updatedFederatedGraphs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export function updateMonograph(
admissionWebhookURL: req.admissionWebhookURL,
admissionWebhookSecret: req.admissionWebhookSecret,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
});

await subgraphRepo.update(
Expand All @@ -173,6 +174,7 @@ export function updateMonograph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

await auditLogRepo.addAuditLog({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ export function deleteFederatedSubgraph(
chClient: opts.chClient!,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: affectedFederatedGraphs,
composeWorkerPool: opts.composeWorkerPool,
});

return { affectedFederatedGraphs, compositionErrors, deploymentErrors, compositionWarnings };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export function moveSubgraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ export function publishFederatedSubgraph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export function updateSubgraph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
17 changes: 17 additions & 0 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { join } from 'node:path';
import Fastify, { FastifyBaseLogger } from 'fastify';
import { S3Client } from '@aws-sdk/client-s3';
import { fastifyConnectPlugin } from '@connectrpc/connect-fastify';
Expand All @@ -8,6 +9,7 @@ import { compressionBrotli, compressionGzip } from '@connectrpc/connect-node';
import fastifyGracefulShutdown from 'fastify-graceful-shutdown';
import { App } from 'octokit';
import { Worker } from 'bullmq';
import { Tinypool } from 'tinypool';
import routes from './routes.js';
Comment on lines +12 to 13
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

Harden composeWorkerPool initialization and shutdown handling

Two small robustness points around the worker-pool lifecycle:

  • getWorkerPool(opts.composeWorkers?.maxCount)! assumes it never returns undefined. If build() were ever called off the main thread, this would throw at startup. Guarding the result and either failing fast with a clearer error or falling back to inline composition would be safer.
  • During graceful shutdown, await composeWorkerPool.destroy() is unguarded. A try/catch with a warning log would avoid aborting the shutdown sequence if the pool destruction fails.

One possible shape (adjust as needed with corresponding type changes in RouterOptions/call sites):

-  const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
+  const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount);
+  if (!composeWorkerPool) {
+    logger.warn('compose worker pool could not be initialized; falling back to in-process composition');
+  }
@@
-      composeWorkerPool: composeWorkerPool,
+      composeWorkerPool,
@@
-  fastify.gracefulShutdown(async () => {
-    fastify.log.debug('Shutting down compose worker pool');
-
-    await composeWorkerPool.destroy();
-
-    fastify.log.debug('Compose worker pool shut down');
+  fastify.gracefulShutdown(async () => {
+    if (composeWorkerPool) {
+      fastify.log.debug('Shutting down compose worker pool');
+      try {
+        await composeWorkerPool.destroy();
+        fastify.log.debug('Compose worker pool shut down');
+      } catch (err) {
+        fastify.log.warn({ err }, 'Error while shutting down compose worker pool');
+      }
+    }

You could also change import { Tinypool } from 'tinypool'; to an import type if it’s only needed for typing.

Also applies to: 57-58, 477-477, 533-538


I'll verify the review comment by examining the codebase to understand the current implementation and whether the concerns raised are valid.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find and examine the build-server.ts file
find . -name "build-server.ts" -type f 2>/dev/null | head -5

Length of output: 43



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the build-server.ts file, focusing on key lines
cat ./git/controlplane/src/core/build-server.ts

Length of output: 75



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the build-server.ts file
cat controlplane/src/core/build-server.ts | head -100

Length of output: 4297



---

🏁 Script executed:

```shell
#!/bin/bash
# Get more context around lines 57-58 and the initialization
sed -n '50,150p' controlplane/src/core/build-server.ts

Length of output: 2429



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for build function and worker pool initialization
sed -n '400,550p' controlplane/src/core/build-server.ts

Length of output: 4234



---

🏁 Script executed:

```shell
#!/bin/bash
# Find and examine the getWorkerPool function
cat controlplane/src/workers/compose.js

Length of output: 67



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for the compose file
find controlplane/src/workers -name "compose*" -type f

Length of output: 35



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the getWorkerPool function
cat controlplane/src/workers/compose.ts

Length of output: 2135


Harden composeWorkerPool initialization and shutdown handling

The function getWorkerPool() at line 27 in controlplane/src/workers/compose.ts has the signature export function getWorkerPool(maxCount?: number): Tinypool | undefined. It returns undefined when not on the main thread (line 32).

In controlplane/src/core/build-server.ts line 477, the code unsafely assumes a non-null result:

const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;

If build() were ever called off the main thread, this assertion would fail at runtime. Guard the result and either fail fast with a clearer error or fall back to inline composition.

Additionally, the graceful shutdown at lines 533-538 calls await composeWorkerPool.destroy() without a try/catch. Wrap this in error handling to prevent shutdown sequence abortion if pool destruction fails.

One possible shape (adjust type annotations in RouterOptions/call sites as needed):

-  const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
+  const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount);
+  if (!composeWorkerPool) {
+    logger.warn('compose worker pool could not be initialized; falling back to in-process composition');
+  }
@@
-  fastify.gracefulShutdown(async () => {
-    fastify.log.debug('Shutting down compose worker pool');
-
-    await composeWorkerPool.destroy();
-
-    fastify.log.debug('Compose worker pool shut down');
+  fastify.gracefulShutdown(async () => {
+    if (composeWorkerPool) {
+      fastify.log.debug('Shutting down compose worker pool');
+      try {
+        await composeWorkerPool.destroy();
+        fastify.log.debug('Compose worker pool shut down');
+      } catch (err) {
+        fastify.log.warn({ err }, 'Error while shutting down compose worker pool');
+      }
+    }

Committable suggestion skipped: line range outside the PR's diff.

import fastifyHealth from './plugins/health.js';
import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js';
Expand Down Expand Up @@ -136,6 +138,9 @@ export interface BuildConfig {
key?: string; // e.g. string or '/path/to/my/client-key.pem'
};
};
composeWorkers?: {
maxCount: number;
};
}

export interface MetricsOptions {
Expand Down Expand Up @@ -469,6 +474,11 @@ export default async function build(opts: BuildConfig) {
keycloakRealm: opts.keycloak.realm,
});

const composeWorkerPool = new Tinypool({
filename: join(process.cwd(), 'dist/workers/compose.js'),
maxThreads: opts.composeWorkers?.maxCount,
});

// Must be registered after custom fastify routes
// Because it registers an all-catch route for connect handlers

Expand Down Expand Up @@ -502,6 +512,7 @@ export default async function build(opts: BuildConfig) {
stripeSecretKey: opts.stripe?.secret,
admissionWebhookJWTSecret: opts.admissionWebhook.secret,
cdnBaseUrl: opts.cdnBaseUrl,
composeWorkerPool,
}),
contextValues(req) {
return createContextValues().set<FastifyBaseLogger>({ id: fastifyLoggerId, defaultValue: req.log }, req.log);
Expand All @@ -522,6 +533,12 @@ export default async function build(opts: BuildConfig) {
});

fastify.gracefulShutdown(async () => {
fastify.log.debug('Shutting down compose worker pool');

await composeWorkerPool.destroy();

fastify.log.debug('Compose worker pool shut down');

fastify.log.debug('Shutting down bull workers');

await Promise.all(bullWorkers.map((worker) => worker.close()));
Expand Down
24 changes: 17 additions & 7 deletions controlplane/src/core/repositories/FederatedGraphRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { FastifyBaseLogger } from 'fastify';
import { parse } from 'graphql';
import { generateKeyPair, importPKCS8, SignJWT } from 'jose';
import { Tinypool } from 'tinypool';
import { uid } from 'uid/secure';
import {
ContractTagOptions,
Expand Down Expand Up @@ -205,6 +206,7 @@ export class FederatedGraphRepository {
targetId: string;
updatedBy: string;
chClient: ClickHouseClient;
composeWorkerPool: Tinypool;
admissionWebhookSecret?: string;
admissionWebhookURL?: string;
compositionOptions?: CompositionOptions;
Expand Down Expand Up @@ -337,6 +339,7 @@ export class FederatedGraphRepository {
actorId: data.updatedBy,
chClient: data.chClient,
compositionOptions: data.compositionOptions,
composeWorkerPool: data.composeWorkerPool,
});

return {
Expand Down Expand Up @@ -369,6 +372,7 @@ export class FederatedGraphRepository {
cdnBaseUrl: string;
},
chClient: ClickHouseClient,
composeWorkerPool: Tinypool,
compositionOptions?: CompositionOptions,
): Promise<{
compositionErrors: PlainMessage<CompositionError>[];
Expand Down Expand Up @@ -430,6 +434,7 @@ export class FederatedGraphRepository {
chClient,
compositionOptions,
federatedGraphs: [movedContractGraph],
composeWorkerPool,
});

return {
Expand All @@ -449,6 +454,7 @@ export class FederatedGraphRepository {
},
chClient,
compositionOptions,
composeWorkerPool,
});

return {
Expand Down Expand Up @@ -1505,6 +1511,7 @@ export class FederatedGraphRepository {
chClient,
blobStorage,
federatedGraphs,
composeWorkerPool,
}: {
actorId: string;
admissionConfig: {
Expand All @@ -1515,6 +1522,7 @@ export class FederatedGraphRepository {
chClient: ClickHouseClient;
federatedGraphs: FederatedGraphDTO[];
compositionOptions?: CompositionOptions;
composeWorkerPool: Tinypool;
}) => {
return this.db.transaction(async (tx) => {
const subgraphRepo = new SubgraphRepository(this.logger, tx, this.organizationId);
Expand All @@ -1524,7 +1532,7 @@ export class FederatedGraphRepository {
const graphCompositionRepo = new GraphCompositionRepository(this.logger, tx);
const composer = new Composer(
this.logger,
this.db,
tx,
fedGraphRepo,
subgraphRepo,
contractRepo,
Expand Down Expand Up @@ -1577,12 +1585,14 @@ export class FederatedGraphRepository {
const contractBaseCompositionDataByContractId = new Map<string, ContractBaseCompositionData>();

for (const subgraphsToCompose of allSubgraphsToCompose) {
const result: FederationResult | FederationResultWithContracts = getFederationResultWithPotentialContracts(
federatedGraph,
subgraphsToCompose,
tagOptionsByContractName,
compositionOptions,
);
const result: FederationResult | FederationResultWithContracts =
await getFederationResultWithPotentialContracts(
composeWorkerPool,
federatedGraph,
subgraphsToCompose,
tagOptionsByContractName,
compositionOptions,
);

if (!result.success) {
// Collect all composition errors
Expand Down
5 changes: 5 additions & 0 deletions controlplane/src/core/repositories/SubgraphRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { and, asc, count, desc, eq, getTableName, gt, inArray, like, lt, notInAr
import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { FastifyBaseLogger } from 'fastify';
import { GraphQLSchema } from 'graphql';
import { Tinypool } from 'tinypool';
import { DBSubgraphType, WebsocketSubprotocol } from '../../db/models.js';
import * as schema from '../../db/schema.js';
import {
Expand Down Expand Up @@ -242,6 +243,7 @@ export class SubgraphRepository {
cdnBaseUrl: string;
},
chClient: ClickHouseClient,
composeWorkerPool: Tinypool,
compositionOptions?: CompositionOptions,
): Promise<{
compositionErrors: PlainMessage<CompositionError>[];
Expand Down Expand Up @@ -470,6 +472,7 @@ export class SubgraphRepository {
chClient,
compositionOptions,
federatedGraphs: updatedFederatedGraphs.filter((g) => !g.contract),
composeWorkerPool,
});

compositionErrors.push(...cErrors);
Expand Down Expand Up @@ -501,6 +504,7 @@ export class SubgraphRepository {
cdnBaseUrl: string;
},
chClient: ClickHouseClient,
composeWorkerPool: Tinypool,
compositionOptions?: CompositionOptions,
): Promise<{
compositionErrors: PlainMessage<CompositionError>[];
Expand Down Expand Up @@ -554,6 +558,7 @@ export class SubgraphRepository {
actorId: data.updatedBy,
chClient,
compositionOptions,
composeWorkerPool,
});

return { compositionErrors, updatedFederatedGraphs, deploymentErrors, compositionWarnings };
Expand Down
2 changes: 2 additions & 0 deletions controlplane/src/core/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PlatformService } from '@wundergraph/cosmo-connect/dist/platform/v1/pla
import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import pino from 'pino';
import { App } from 'octokit';
import { Tinypool } from 'tinypool';
import * as schema from '../db/schema.js';
import NodeServiceImpl from './bufservices/NodeService.js';
import PlatformServiceImpl from './bufservices/PlatformService.js';
Expand Down Expand Up @@ -51,6 +52,7 @@ export interface RouterOptions {
};
stripeSecretKey?: string;
cdnBaseUrl: string;
composeWorkerPool: Tinypool;
}
const handlerOptions: Partial<ConnectRouterOptions> = {
maxTimeoutMs: 80_000,
Expand Down
Loading
Loading