Skip to content

Commit d0e0b65

Browse files
authored
feat(schema): run composition in worker threads (#6725)
1 parent 114f101 commit d0e0b65

File tree

25 files changed

+1360
-729
lines changed

25 files changed

+1360
-729
lines changed

configs/tsup/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export const watchEntryPlugin = () => {
5858
name: 'node-watch-entry',
5959
esbuildOptions(options) {
6060
const entries = (options.entryPoints as string[]) || [];
61-
const entry = entries[0];
61+
const entry = entries.find(entry => entry === 'src/dev.ts' || entry === 'test/root.ts');
6262

6363
if (!entry) {
6464
throw new Error('No entry point found');

deployment/services/schema.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export function deploySchema({
5050
startupProbe: '/_health',
5151
exposesMetrics: true,
5252
replicas: environment.isProduction ? 3 : 1,
53-
memoryLimit: '1Gi',
53+
memoryLimit: '2Gi',
5454
pdb: true,
5555
},
5656
[redis.deployment, redis.service],

packages/services/api/src/modules/app-deployments/providers/persisted-document-scheduler.ts

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ import path from 'node:path';
22
import { Worker } from 'node:worker_threads';
33
import { fileURLToPath } from 'url';
44
import { Injectable, Scope } from 'graphql-modules';
5-
import { LogLevel } from 'graphql-yoga';
6-
import { Logger } from '../../shared/providers/logger';
5+
import { Logger, registerWorkerLogging } from '../../shared/providers/logger';
76
import { BatchProcessedEvent, BatchProcessEvent } from './persisted-document-ingester';
87

98
const __dirname = path.dirname(fileURLToPath(import.meta.url));
@@ -48,32 +47,19 @@ export class PersistedDocumentScheduler {
4847
}
4948

5049
this.logger.debug('Re-Creating worker %s', index);
50+
this.workers[index] = this.createWorker(index);
5151

52+
this.logger.debug('Cancel pending tasks %s', index);
5253
for (const [, task] of tasks) {
5354
task.reject(new Error('Worker stopped.'));
5455
}
55-
56-
this.workers[index] = this.createWorker(index);
5756
});
5857

58+
registerWorkerLogging(this.logger, worker, name);
59+
5960
worker.on(
6061
'message',
61-
(
62-
data:
63-
| BatchProcessedEvent
64-
| { event: 'error'; id: string; err: Error }
65-
| {
66-
event: 'log';
67-
bindings: Record<string, unknown>;
68-
level: LogLevel;
69-
args: [string, ...unknown[]];
70-
},
71-
) => {
72-
if (data.event === 'log') {
73-
this.logger.child(data.bindings)[data.level](...data.args);
74-
return;
75-
}
76-
62+
(data: BatchProcessedEvent | { event: 'error'; id: string; err: Error }) => {
7763
if (data.event === 'error') {
7864
tasks.get(data.id)?.reject(data.err);
7965
}

packages/services/api/src/modules/schema/providers/orchestrators/single.ts

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { CONTEXT, Inject, Injectable, Scope } from 'graphql-modules';
2+
import { abortSignalAny } from '@graphql-hive/signal';
23
import type { SchemaBuilderApi } from '@hive/schema';
34
import { traceFn } from '@hive/service-common';
45
import { createTRPCProxyClient, httpLink } from '@trpc/client';
@@ -14,6 +15,7 @@ export class SingleOrchestrator implements Orchestrator {
1415
type = ProjectType.SINGLE;
1516
private logger: Logger;
1617
private schemaService;
18+
private incomingRequestAbortSignal: AbortSignal;
1719

1820
constructor(
1921
logger: Logger,
@@ -32,6 +34,7 @@ export class SingleOrchestrator implements Orchestrator {
3234
}),
3335
],
3436
});
37+
this.incomingRequestAbortSignal = context.request.signal;
3538
}
3639

3740
@traceFn('SingleOrchestrator.composeAndValidate', {
@@ -54,14 +57,42 @@ export class SingleOrchestrator implements Orchestrator {
5457
throw new Error('too many schemas');
5558
}
5659

57-
const result = await this.schemaService.composeAndValidate.mutate({
58-
type: 'single',
59-
schemas: schemas.map(s => ({
60-
raw: s.raw,
61-
source: s.source,
62-
})),
63-
});
60+
const timeoutAbortSignal = AbortSignal.timeout(30_000);
61+
62+
const onTimeout = () => {
63+
this.logger.debug('Composition HTTP request aborted due to timeout of 30 seconds.');
64+
};
65+
timeoutAbortSignal.addEventListener('abort', onTimeout);
6466

65-
return result;
67+
const onIncomingRequestAbort = () => {
68+
this.logger.debug('Composition HTTP request aborted due to incoming request being canceled.');
69+
};
70+
this.incomingRequestAbortSignal.addEventListener('abort', onIncomingRequestAbort);
71+
72+
try {
73+
const result = await this.schemaService.composeAndValidate.mutate(
74+
{
75+
type: 'single',
76+
schemas: schemas.map(s => ({
77+
raw: s.raw,
78+
source: s.source,
79+
})),
80+
},
81+
{
82+
// We want to abort composition if the request that does the composition is aborted
83+
// We also limit the maximum time allowed for composition requests to 30 seconds to avoid
84+
//
85+
// The reason for these is a potential dead-lock.
86+
//
87+
// Note: We are using `abortSignalAny` over `AbortSignal.any` because of leak issues.
88+
// @source https://github.com/nodejs/node/issues/57584
89+
signal: abortSignalAny([this.incomingRequestAbortSignal, timeoutAbortSignal]),
90+
},
91+
);
92+
return result;
93+
} finally {
94+
timeoutAbortSignal.removeEventListener('abort', onTimeout);
95+
this.incomingRequestAbortSignal.removeEventListener('abort', onIncomingRequestAbort);
96+
}
6697
}
6798
}

packages/services/api/src/modules/schema/providers/orchestrators/stitching.ts

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { CONTEXT, Inject, Injectable, Scope } from 'graphql-modules';
2+
import { abortSignalAny } from '@graphql-hive/signal';
23
import type { SchemaBuilderApi } from '@hive/schema';
34
import { traceFn } from '@hive/service-common';
45
import { createTRPCProxyClient, httpLink } from '@trpc/client';
@@ -14,6 +15,7 @@ export class StitchingOrchestrator implements Orchestrator {
1415
type = ProjectType.STITCHING;
1516
private logger: Logger;
1617
private schemaService;
18+
private incomingRequestAbortSignal: AbortSignal;
1719

1820
constructor(
1921
logger: Logger,
@@ -32,6 +34,7 @@ export class StitchingOrchestrator implements Orchestrator {
3234
}),
3335
],
3436
});
37+
this.incomingRequestAbortSignal = context.request.signal;
3538
}
3639

3740
@traceFn('StitchingOrchestrator.composeAndValidate', {
@@ -47,15 +50,43 @@ export class StitchingOrchestrator implements Orchestrator {
4750
async composeAndValidate(schemas: SchemaObject[]) {
4851
this.logger.debug('Composing and Validating Stitched Schemas');
4952

50-
const result = await this.schemaService.composeAndValidate.mutate({
51-
type: 'stitching',
52-
schemas: schemas.map(s => ({
53-
raw: s.raw,
54-
source: s.source,
55-
url: s.url ?? null,
56-
})),
57-
});
53+
const timeoutAbortSignal = AbortSignal.timeout(30_000);
54+
55+
const onTimeout = () => {
56+
this.logger.debug('Composition HTTP request aborted due to timeout of 30 seconds.');
57+
};
58+
timeoutAbortSignal.addEventListener('abort', onTimeout);
59+
60+
const onIncomingRequestAbort = () => {
61+
this.logger.debug('Composition HTTP request aborted due to incoming request being canceled.');
62+
};
63+
this.incomingRequestAbortSignal.addEventListener('abort', onIncomingRequestAbort);
5864

59-
return result;
65+
try {
66+
const result = await this.schemaService.composeAndValidate.mutate(
67+
{
68+
type: 'stitching',
69+
schemas: schemas.map(s => ({
70+
raw: s.raw,
71+
source: s.source,
72+
url: s.url ?? null,
73+
})),
74+
},
75+
{
76+
// We want to abort composition if the request that does the composition is aborted
77+
// We also limit the maximum time allowed for composition requests to 30 seconds to avoid
78+
//
79+
// The reason for these is a potential dead-lock.
80+
//
81+
// Note: We are using `abortSignalAny` over `AbortSignal.any` because of leak issues.
82+
// @source https://github.com/nodejs/node/issues/57584
83+
signal: abortSignalAny([this.incomingRequestAbortSignal, timeoutAbortSignal]),
84+
},
85+
);
86+
return result;
87+
} finally {
88+
timeoutAbortSignal.removeEventListener('abort', onTimeout);
89+
this.incomingRequestAbortSignal.removeEventListener('abort', onIncomingRequestAbort);
90+
}
6091
}
6192
}

packages/services/api/src/modules/shared/providers/logger.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import type { MessagePort, Worker } from 'node:worker_threads';
2+
import type { LogLevel } from 'fastify';
13
import { Injectable } from 'graphql-modules';
24

35
export type LogFn = (msg: string, ...args: unknown[]) => void;
@@ -30,3 +32,59 @@ export class NoopLogger extends Logger {
3032
debug = noop;
3133
child = () => this;
3234
}
35+
36+
export type MessagePortLog = {
37+
event: 'log';
38+
bindings: Record<string, unknown>;
39+
level: Exclude<LogLevel, 'silent'>;
40+
args: [string, ...unknown[]];
41+
};
42+
43+
/**
44+
* Create a logger that posts the logs to the message port.
45+
* The main use-case of this is to forward logs from a worker thread to the main thread.
46+
*/
47+
export function createMessagePortLogger(
48+
port: MessagePort,
49+
bindings: Record<string, unknown> = {},
50+
): Logger {
51+
return {
52+
child(newBindings) {
53+
return createMessagePortLogger(port, { ...bindings, ...newBindings });
54+
},
55+
debug(...args) {
56+
port.postMessage({ event: 'log', level: 'debug', args, bindings });
57+
},
58+
error(...args) {
59+
port.postMessage({ event: 'log', level: 'error', args, bindings });
60+
},
61+
fatal(...args) {
62+
port.postMessage({ event: 'log', level: 'fatal', args, bindings });
63+
},
64+
info(...args) {
65+
port.postMessage({ event: 'log', level: 'info', args, bindings });
66+
},
67+
trace(...args) {
68+
port.postMessage({ event: 'log', level: 'trace', args, bindings });
69+
},
70+
warn(...args) {
71+
port.postMessage({ event: 'log', level: 'warn', args, bindings });
72+
},
73+
};
74+
}
75+
76+
/**
77+
* Register a logger from the message port to log to this threads logger.
78+
*/
79+
export function registerWorkerLogging(logger: Logger, worker: Worker, workerId: string) {
80+
worker.on('message', (data: MessagePortLog) => {
81+
if ('event' in data && data.event === 'log') {
82+
logger
83+
.child({
84+
...data.bindings,
85+
workerId,
86+
})
87+
[data.level](...data.args);
88+
}
89+
});
90+
}

packages/services/schema/README.md

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# `@hive/schema`
22

33
Service for validating schemas or verifying whether a composite GraphQL schema can be composed out
4-
of subschemas.
4+
of subschemas. Supports Federation, Schema Stitching and Monolithic Schemas.
55

66
## Configuration
77

@@ -26,3 +26,53 @@ of subschemas.
2626
| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) |
2727
| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) |
2828
| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` |
29+
30+
## Documentation
31+
32+
### Composition Request Handling
33+
34+
The following diagram outlines how the service handles incoming composition requests via HTTP
35+
(tRPC). It details the decision-making process around caching with Redis, reuse of in-progress
36+
tasks, and task execution using a limited pool of worker threads.
37+
38+
Each composition task runs in an isolated worker thread with memory limits to prevent a single
39+
malfunctioning task from affecting the stability of the entire service. This setup ensures robust
40+
and efficient processing by avoiding redundant computation, serving cached results when possible,
41+
and queuing tasks when resources are saturated.
42+
43+
```mermaid
44+
sequenceDiagram
45+
participant Client
46+
participant Service
47+
participant Redis
48+
participant TaskManager
49+
participant WorkerPool
50+
51+
Client->>Service: Composition HTTP request (tRPC)
52+
Service->>Redis: Check for cached result
53+
alt Cached result found
54+
Redis-->>Service: Return result
55+
Service-->>Client: Send cached result
56+
else Not cached
57+
Service->>TaskManager: Check if task in progress
58+
alt Task in progress
59+
TaskManager-->>Service: Return existing task
60+
Service->>TaskManager: Wait for task completion
61+
TaskManager-->>Service: Return result
62+
Service-->>Client: Send result
63+
else No task in progress
64+
TaskManager->>WorkerPool: Check for available worker
65+
alt Worker available
66+
WorkerPool-->>TaskManager: Assign task
67+
else No workers available
68+
TaskManager->>TaskManager: Enqueue task in memory
69+
TaskManager->>WorkerPool: Wait for available worker
70+
WorkerPool-->>TaskManager: Assign task when ready
71+
end
72+
WorkerPool->>TaskManager: Task completed
73+
TaskManager->>Redis: Cache result
74+
TaskManager-->>Service: Return result to pending requests
75+
Service-->>Client: Send result
76+
end
77+
end
78+
```

packages/services/schema/__tests__/cache.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ test('run action again when the action expires', async ({ expect }) => {
258258

259259
const actionId = randomString();
260260
async function actionFn() {
261-
await waitFor(timeoutMs);
261+
await waitFor(timeoutMs - 1);
262262
return 'foo';
263263
}
264264

packages/services/schema/package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
"license": "MIT",
55
"private": true,
66
"scripts": {
7-
"build": "tsx ../../../scripts/runify.ts",
8-
"dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts",
7+
"build": "tsx ../../../scripts/runify.ts src/index.ts src/composition-worker-main.ts",
8+
"dev": "tsup-node --config ../../../configs/tsup/dev.config.node.ts src/dev.ts src/composition-worker-main.ts ",
99
"typecheck": "tsc --noEmit"
1010
},
1111
"devDependencies": {
@@ -22,12 +22,14 @@
2222
"dotenv": "16.4.7",
2323
"fast-json-stable-stringify": "2.1.0",
2424
"fastify": "4.29.0",
25+
"fastq": "1.19.1",
2526
"got": "14.4.7",
2627
"graphql": "16.9.0",
2728
"ioredis": "5.4.2",
2829
"ioredis-mock": "8.9.0",
2930
"p-timeout": "6.1.4",
3031
"pino-pretty": "11.3.0",
32+
"reflect-metadata": "0.2.2",
3133
"zod": "3.24.1"
3234
}
3335
}

0 commit comments

Comments
 (0)