Skip to content

Commit d8931d9

Browse files
committed
feat: introduce WORKERS_POOL_ENABLED config to optionally disable worker thread pool
Signed-off-by: Logan Nguyen <logan.nguyen@swirldslabs.com>
1 parent 1cfee96 commit d8931d9

File tree

10 files changed

+637
-160
lines changed

10 files changed

+637
-160
lines changed

docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ Unless you need to set a non-default value, it is recommended to only populate o
115115
| `LOCK_QUEUE_POLL_INTERVAL_MS` | "50" | Interval in milliseconds between queue position checks when waiting for a lock. Lower values provide faster lock acquisition but increase Redis load. Default is 50ms. |
116116
| `LOCK_HEARTBEAT_MISSED_COUNT` | "5" | Number of consecutive heartbeats that can be missed before a waiter is considered dead and pruned from the queue. Used in Redis locking strategy only. Heartbeat TTL is calculated as `LOCK_QUEUE_POLL_INTERVAL_MS * LOCK_HEARTBEAT_MISSED_COUNT`. Default is 5. |
117117
| `USE_MIRROR_NODE_MODULARIZED_SERVICES` | null | Controls routing of Mirror Node traffic through modularized services. When set to `true`, enables routing a percentage of traffic to modularized services. When set to `false`, ensures traffic follows the traditional non-modularized flow. When not set (i.e. `null` by default), no specific routing preference is applied. As Mirror Node gradually transitions to a fully modularized architecture across all networks, this setting will eventually default to `true`. |
118+
| `WORKERS_POOL_ENABLED` | true | Controls whether CPU-intensive tasks (such as log fetching and block processing) run in a separate worker thread pool. When set to `false`, these tasks run on the main event loop instead, which significantly reduces memory overhead. This setting is recommended for resource-constrained environments. |
118119
| `WORKERS_POOL_MAX_THREADS` | 4 | The maximum number of threads that are always running for this thread pool. |
119120
| `WORKERS_POOL_MIN_THREADS` | 2 | The minimum number of threads that are always running for this thread pool. |
120121

packages/config-service/src/services/globalConfig.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,11 @@ const _CONFIG = {
776776
required: false,
777777
defaultValue: true,
778778
},
779+
WORKERS_POOL_ENABLED: {
780+
type: 'boolean',
781+
required: false,
782+
defaultValue: true,
783+
},
779784
WORKERS_POOL_MAX_THREADS: {
780785
type: 'number',
781786
required: false,

packages/relay/src/lib/services/ethService/blockService/blockWorker.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import {
2323
import { Block, Log, Transaction } from '../../../model';
2424
import { IContractResultsParams, ITransactionReceipt, MirrorNodeBlock, RequestDetails } from '../../../types';
2525
import { IReceiptRlpInput } from '../../../types/IReceiptRlpInput';
26-
import { WorkersPool } from '../../workersService/WorkersPool';
26+
import { wrapError } from '../../workersService/WorkersErrorUtils';
2727
import { CommonService } from '../ethCommonService/CommonService';
2828

2929
/**
@@ -375,7 +375,7 @@ export async function getBlock(
375375
receiptsRoot,
376376
});
377377
} catch (e: unknown) {
378-
throw WorkersPool.wrapError(e);
378+
throw wrapError(e);
379379
}
380380
}
381381

@@ -467,7 +467,7 @@ export async function getBlockReceipts(
467467

468468
return sortedReceipts as ITransactionReceipt[];
469469
} catch (e: unknown) {
470-
throw WorkersPool.wrapError(e);
470+
throw wrapError(e);
471471
}
472472
}
473473

@@ -482,8 +482,8 @@ export async function getBlockReceipts(
482482
* @param blockHashOrBlockNumber - Block hash (0x-prefixed) or block number string
483483
* @param requestDetails - The request details for logging and tracking
484484
* @returns Promise of an array of hex-encoded receipt strings (RLP), or empty array if
485-
* the block has no contract results and no logs. Re-throws errors wrapped with
486-
* {@link WorkersPool.wrapError}.
485+
* the block has no contract results and no logs. Re-throws errors via {@link wrapError}
486+
* when running inside a worker thread, or propagates natively on the main thread.
487487
*/
488488
export async function getRawReceipts(
489489
blockHashOrBlockNumber: string,
@@ -527,7 +527,7 @@ export async function getRawReceipts(
527527

528528
return encodedReceipts;
529529
} catch (e: unknown) {
530-
throw WorkersPool.wrapError(e);
530+
throw wrapError(e);
531531
}
532532
}
533533

packages/relay/src/lib/services/ethService/ethCommonService/commonWorker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { MirrorNodeClient } from '../../../clients/mirrorNodeClient';
77
import { CacheClientFactory } from '../../../factories/cacheClientFactory';
88
import { RegistryFactory } from '../../../factories/registryFactory';
99
import { RequestDetails } from '../../../types';
10-
import { WorkersPool } from '../../workersService/WorkersPool';
10+
import { wrapError } from '../../workersService/WorkersErrorUtils';
1111
import { CommonService } from './CommonService';
1212

1313
const logger = pino({ level: ConfigService.get('LOG_LEVEL') || 'trace' });
@@ -57,6 +57,6 @@ export async function getLogs(
5757

5858
return await commonService.getLogsWithParams(address, params, requestDetails, sliceCountWrapper.value);
5959
} catch (e: unknown) {
60-
throw WorkersPool.wrapError(e);
60+
throw wrapError(e);
6161
}
6262
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
import { parentPort } from 'worker_threads';
4+
5+
import { JsonRpcError, predefined } from '../../errors/JsonRpcError';
6+
import { MirrorNodeClientError } from '../../errors/MirrorNodeClientError';
7+
8+
/**
9+
* Plain JSON representation of a serializable error that can be safely transported
10+
* across worker thread boundaries using the Structured Clone algorithm.
11+
*
12+
* Piscina communicates task results and errors between threads via `postMessage`, which
13+
* uses Structured Clone. Only plain data survives this boundary — class instances, prototype
14+
* chains, and private fields are stripped. This envelope captures the fields needed to
15+
* reconstruct supported error types on the receiving thread.
16+
*/
17+
interface ErrorEnvelope {
18+
name: string;
19+
message: string;
20+
code?: number;
21+
statusCode?: number;
22+
data?: string;
23+
detail?: string;
24+
}
25+
26+
/**
27+
* Conditionally serializes an error for cross-thread transport.
28+
*
29+
* When invoked inside a Piscina worker thread (`parentPort` is non-null), serializes the
30+
* error into a standard {@link Error} whose `message` contains the JSON-encoded payload.
31+
* This ensures that rich error types (e.g. {@link JsonRpcError}, {@link MirrorNodeClientError})
32+
* survive the Structured Clone boundary used by `postMessage`, which otherwise strips
33+
* prototype chains and class-specific fields.
34+
*
35+
* When invoked on the main thread (`parentPort` is null), returns the original error
36+
* unchanged. This avoids the CPU/memory overhead of unnecessary serialization-deserialization
37+
* cycles during local execution on the main thread (e.g. when WORKERS_POOL_ENABLED is false)
38+
* and preserves the original object's identity.
39+
*
40+
* @param err - The error-like value to wrap if in a worker context.
41+
* @returns The original error when called outside a worker, or a JSON-encoded `Error` inside one.
42+
*/
43+
export function wrapError(err: unknown): unknown {
44+
if (!parentPort) {
45+
return err;
46+
}
47+
return new Error(JSON.stringify(err));
48+
}
49+
50+
/**
51+
* Reconstructs the original typed error from an error previously produced by {@link wrapError}.
52+
*
53+
* Parses the JSON payload embedded in `err.message` and attempts to reconstruct one of the
54+
* supported rich error types. Returns a predefined internal error if the input is not an
55+
* `Error`, the message is not valid JSON, or the error name is unrecognised.
56+
*
57+
* Supported error types:
58+
* - {@link JsonRpcError}
59+
* - {@link MirrorNodeClientError}
60+
*
61+
* @param err - An error whose `message` is expected to contain a JSON-encoded {@link ErrorEnvelope}.
62+
* @returns The reconstructed typed error, or a {@link predefined.INTERNAL_ERROR} if the
63+
* envelope cannot be parsed or the type is unsupported.
64+
*/
65+
export function unwrapError(err: unknown): Error {
66+
if (!(err instanceof Error)) {
67+
return predefined.INTERNAL_ERROR('Failed unwrapping piscina error: value is not an Error instance.');
68+
}
69+
70+
let parsedErr: ErrorEnvelope;
71+
try {
72+
parsedErr = JSON.parse(err.message) as ErrorEnvelope;
73+
} catch {
74+
return predefined.INTERNAL_ERROR('Failed parsing wrapped piscina error while unwrapping.');
75+
}
76+
77+
switch (parsedErr?.name) {
78+
case JsonRpcError.name: {
79+
if (typeof parsedErr.code !== 'number') {
80+
return predefined.INTERNAL_ERROR(
81+
'Failed unwrapping piscina error: missing numeric code in JsonRpcError envelope.',
82+
);
83+
}
84+
return new JsonRpcError({
85+
code: parsedErr.code,
86+
data: parsedErr.data,
87+
message: parsedErr.message,
88+
});
89+
}
90+
91+
case MirrorNodeClientError.name: {
92+
if (typeof parsedErr.statusCode !== 'number') {
93+
return predefined.INTERNAL_ERROR(
94+
'Failed unwrapping piscina error: missing numeric statusCode in MirrorNodeClientError envelope.',
95+
);
96+
}
97+
return MirrorNodeClientError.fromJSON(parsedErr.statusCode, parsedErr.message, parsedErr.data, parsedErr.detail);
98+
}
99+
100+
default:
101+
return predefined.INTERNAL_ERROR('Failed unwrapping piscina error.');
102+
}
103+
}

packages/relay/src/lib/services/workersService/WorkersPool.ts

Lines changed: 38 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,9 @@ import { parentPort } from 'worker_threads';
77

88
import { MeasurableCache, MirrorNodeClient } from '../../clients';
99
import { ICacheClient } from '../../clients/cache/ICacheClient';
10-
import { JsonRpcError, predefined } from '../../errors/JsonRpcError';
11-
import { MirrorNodeClientError } from '../../errors/MirrorNodeClientError';
1210
import { RegistryFactory } from '../../factories/registryFactory';
13-
14-
/**
15-
* Plain JSON representation of a serialized error that can be safely transferred across worker or process boundaries.
16-
*/
17-
interface ErrorEnvelope {
18-
name: string;
19-
message: string;
20-
code?: number;
21-
statusCode?: number;
22-
data?: string;
23-
detail?: string;
24-
}
11+
import type { WorkerTask } from './workers';
12+
import { unwrapError } from './WorkersErrorUtils';
2513

2614
/**
2715
* A wrapper around a shared Piscina worker thread pool.
@@ -80,6 +68,13 @@ export class WorkersPool {
8068
*/
8169
private static workerPoolQueueSizeGauge: Gauge;
8270

71+
/**
72+
* Cached reference to the local task handler function used when {@link WORKERS_POOL_ENABLED} is `false`.
73+
*
74+
* Populated on the first invocation of {@link run} to avoid repeating the dynamic import on every call.
75+
*/
76+
private static handleTaskFn: ((task: WorkerTask) => Promise<any>) | null = null;
77+
8378
/**
8479
* Updates a metric either by delegating the update to a worker thread
8580
* or by executing the update locally when no worker context is present.
@@ -219,14 +214,36 @@ export class WorkersPool {
219214
}
220215

221216
/**
222-
* Executes a worker task using the shared Piscina pool.
217+
* Executes a worker task using either the shared Piscina thread pool or the main thread,
218+
* depending on the {@link WORKERS_POOL_ENABLED} configuration flag.
219+
*
220+
* When the pool is enabled, the task is dispatched to a Piscina worker thread.
223221
*
224-
* @param options - The data passed to the worker.
225-
* @param mirrorNodeClient - The mirror node client instance.
226-
* @param cacheService - The cache service instance.
227-
* @returns A promise resolving to the worker's result.
222+
* When the pool is disabled, the task is executed locally on the main thread and entirely
223+
* bypasses the worker pool. In this mode `mirrorNodeClient` and `cacheService` are not
224+
* forwarded to the task handler — the worker modules maintain their own module-level
225+
* instances initialised on first use.
226+
*
227+
* @param options - The task descriptor forwarded to the worker handler.
228+
* @param mirrorNodeClient - Mirror node client instance used to forward inter-thread metrics.
229+
* Unused when {@link WORKERS_POOL_ENABLED} is `false`.
230+
* @param cacheService - Cache service instance used to forward inter-thread metrics.
231+
* Unused when {@link WORKERS_POOL_ENABLED} is `false`.
232+
* @returns A promise that resolves to the task handler's return value.
233+
* @throws The original error from the task handler; reconstructed from its serialized form
234+
* in worker mode, or native in local mode.
228235
*/
229-
static async run(options: unknown, mirrorNodeClient: MirrorNodeClient, cacheService: ICacheClient): Promise<any> {
236+
static async run(options: WorkerTask, mirrorNodeClient: MirrorNodeClient, cacheService: ICacheClient): Promise<any> {
237+
if (!ConfigService.get('WORKERS_POOL_ENABLED')) {
238+
if (!this.handleTaskFn) {
239+
// Dynamic import to defer loading worker modules and their dependencies until actually needed,
240+
// ensuring any module-level instances are created once and reused across all local task executions.
241+
const mod = await import('./workers');
242+
this.handleTaskFn = mod.default;
243+
}
244+
return this.handleTaskFn(options);
245+
}
246+
230247
this.mirrorNodeClient = mirrorNodeClient;
231248
this.cacheService = cacheService as MeasurableCache;
232249

@@ -240,7 +257,7 @@ export class WorkersPool {
240257
const result = await this.getInstance()
241258
.run(options)
242259
.catch((error: unknown) => {
243-
const unwrappedErr = WorkersPool.unwrapError(error);
260+
const unwrappedErr = unwrapError(error);
244261

245262
this.workerTaskDurationSecondsHistogram
246263
?.labels(taskType)
@@ -259,62 +276,4 @@ export class WorkersPool {
259276

260277
return result;
261278
}
262-
263-
/**
264-
* Wraps an error into a standard `Error` instance by serializing its plain JSON representation into
265-
* the error message. Intended for transporting rich error information (including custom error types) across
266-
* boundaries such as Piscina worker threads.
267-
*
268-
* @param err - An error-like object that implements `toJSON()`.
269-
* @returns A new `Error` whose `message` contains a JSON-encoded
270-
*/
271-
static wrapError(err: any): Error {
272-
return new Error(JSON.stringify(err));
273-
}
274-
275-
/**
276-
* Unwraps an error previously wrapped with {@link wrapError} and attempts to reconstruct the original error
277-
* instance. If parsing fails or the error type is unsupported, a predefined internal error is returned instead.
278-
*
279-
* Supported error types:
280-
* - {@link JsonRpcError}
281-
* - {@link MirrorNodeClientError}
282-
*
283-
* @param err - An error whose `message` is expected to contain a JSON-encoded {@link ErrorEnvelope}.
284-
* @returns The reconstructed error instance, or an internal error if unwrapping fails.
285-
*/
286-
static unwrapError(err: unknown): Error {
287-
if (!(err instanceof Error)) {
288-
return predefined.INTERNAL_ERROR(`Failed unwrapping piscina error: value is not an Error instance.`);
289-
}
290-
291-
let parsedErr: ErrorEnvelope;
292-
try {
293-
parsedErr = JSON.parse(err.message) as ErrorEnvelope;
294-
} catch {
295-
return predefined.INTERNAL_ERROR(`Failed parsing wrapped piscina error while unwrapping.`);
296-
}
297-
298-
switch (parsedErr?.name) {
299-
case JsonRpcError.name: {
300-
return new JsonRpcError({
301-
code: parsedErr.code!,
302-
data: parsedErr.data!,
303-
message: parsedErr.message,
304-
});
305-
}
306-
307-
case MirrorNodeClientError.name: {
308-
return MirrorNodeClientError.fromJSON(
309-
parsedErr.statusCode!,
310-
parsedErr.message,
311-
parsedErr.data,
312-
parsedErr.detail,
313-
);
314-
}
315-
316-
default:
317-
return predefined.INTERNAL_ERROR('Failed unwrapping piscina error.');
318-
}
319-
}
320279
}

packages/relay/src/lib/services/workersService/workers.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,17 @@ interface GetRawReceiptsTask {
3434
requestDetails: RequestDetails;
3535
}
3636

37-
type WorkerTask = GetLogsTask | GetBlockTask | GetBlockReceiptsTask | GetRawReceiptsTask;
37+
export type WorkerTask = GetLogsTask | GetBlockTask | GetBlockReceiptsTask | GetRawReceiptsTask;
3838

3939
/**
40-
* Main worker export - handles different task types.
41-
* This function is called by Piscina with the task data.
40+
* Dispatches a worker task to the appropriate handler function.
41+
*
42+
* Invoked either by Piscina on a dedicated worker thread, or directly on the main
43+
* thread when {@link WORKERS_POOL_ENABLED} is `false` (local execution mode).
44+
*
45+
* @param task - Discriminated-union descriptor for the task to execute.
46+
* @returns A promise that resolves to the handler's result.
47+
* @throws {Error} If `task.type` does not match any known task variant.
4248
*/
4349
export default async function handleTask(task: WorkerTask): Promise<any> {
4450
switch (task.type) {

0 commit comments

Comments
 (0)