Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Unless you need to set a non-default value, it is recommended to only populate o
| `LOCK_QUEUE_POLL_INTERVAL_MS` | "50" | Interval in milliseconds between queue position checks when waiting for a lock. Lower values provide faster lock acquisition but increase Redis load. Default is 50ms. |
| `LOCK_HEARTBEAT_MISSED_COUNT` | "5" | Number of consecutive heartbeats that can be missed before a waiter is considered dead and pruned from the queue. Used in Redis locking strategy only. Heartbeat TTL is calculated as `LOCK_QUEUE_POLL_INTERVAL_MS * LOCK_HEARTBEAT_MISSED_COUNT`. Default is 5. |
| `USE_MIRROR_NODE_MODULARIZED_SERVICES` | null | Controls routing of Mirror Node traffic through modularized services. When set to `true`, enables routing a percentage of traffic to modularized services. When set to `false`, ensures traffic follows the traditional non-modularized flow. When not set (i.e. `null` by default), no specific routing preference is applied. As Mirror Node gradually transitions to a fully modularized architecture across all networks, this setting will eventually default to `true`. |
| `WORKERS_POOL_ENABLED` | true | Controls whether CPU-intensive tasks (such as log fetching and block processing) run in a separate worker thread pool. When set to `false`, these tasks run on the main event loop instead, which significantly reduces memory overhead. This setting is recommended for resource-constrained environments. |
| `WORKERS_POOL_MAX_THREADS` | 4 | The maximum number of threads that are always running for this thread pool. |
| `WORKERS_POOL_MIN_THREADS` | 2 | The minimum number of threads that are always running for this thread pool. |

Expand Down
5 changes: 5 additions & 0 deletions packages/config-service/src/services/globalConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,11 @@ const _CONFIG = {
required: false,
defaultValue: true,
},
WORKERS_POOL_ENABLED: {
type: 'boolean',
required: false,
defaultValue: true,
},
WORKERS_POOL_MAX_THREADS: {
type: 'number',
required: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
import { Block, Log, Transaction } from '../../../model';
import { IContractResultsParams, ITransactionReceipt, MirrorNodeBlock, RequestDetails } from '../../../types';
import { IReceiptRlpInput } from '../../../types/IReceiptRlpInput';
import { WorkersPool } from '../../workersService/WorkersPool';
import { wrapError } from '../../workersService/WorkersErrorUtils';
import { CommonService } from '../ethCommonService/CommonService';

/**
Expand Down Expand Up @@ -375,7 +375,7 @@ export async function getBlock(
receiptsRoot,
});
} catch (e: unknown) {
throw WorkersPool.wrapError(e);
throw wrapError(e);
}
}

Expand Down Expand Up @@ -467,7 +467,7 @@ export async function getBlockReceipts(

return sortedReceipts as ITransactionReceipt[];
} catch (e: unknown) {
throw WorkersPool.wrapError(e);
throw wrapError(e);
}
}

Expand All @@ -482,8 +482,8 @@ export async function getBlockReceipts(
* @param blockHashOrBlockNumber - Block hash (0x-prefixed) or block number string
* @param requestDetails - The request details for logging and tracking
* @returns Promise of an array of hex-encoded receipt strings (RLP), or empty array if
* the block has no contract results and no logs. Re-throws errors wrapped with
* {@link WorkersPool.wrapError}.
* the block has no contract results and no logs. Re-throws errors via {@link wrapError}
* when running inside a worker thread, or propagates natively on the main thread.
*/
export async function getRawReceipts(
blockHashOrBlockNumber: string,
Expand Down Expand Up @@ -527,7 +527,7 @@ export async function getRawReceipts(

return encodedReceipts;
} catch (e: unknown) {
throw WorkersPool.wrapError(e);
throw wrapError(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { MirrorNodeClient } from '../../../clients/mirrorNodeClient';
import { CacheClientFactory } from '../../../factories/cacheClientFactory';
import { RegistryFactory } from '../../../factories/registryFactory';
import { RequestDetails } from '../../../types';
import { WorkersPool } from '../../workersService/WorkersPool';
import { wrapError } from '../../workersService/WorkersErrorUtils';
import { CommonService } from './CommonService';

const logger = pino({ level: ConfigService.get('LOG_LEVEL') || 'trace' });
Expand Down Expand Up @@ -57,6 +57,6 @@ export async function getLogs(

return await commonService.getLogsWithParams(address, params, requestDetails, sliceCountWrapper.value);
} catch (e: unknown) {
throw WorkersPool.wrapError(e);
throw wrapError(e);
}
}
103 changes: 103 additions & 0 deletions packages/relay/src/lib/services/workersService/WorkersErrorUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-License-Identifier: Apache-2.0

import { parentPort } from 'worker_threads';

import { JsonRpcError, predefined } from '../../errors/JsonRpcError';
import { MirrorNodeClientError } from '../../errors/MirrorNodeClientError';

/**
* Plain JSON representation of a serializable error that can be safely transported
* across worker thread boundaries using the Structured Clone algorithm.
*
* Piscina communicates task results and errors between threads via `postMessage`, which
* uses Structured Clone. Only plain data survives this boundary — class instances, prototype
* chains, and private fields are stripped. This envelope captures the fields needed to
* reconstruct supported error types on the receiving thread.
*/
interface ErrorEnvelope {
name: string;
message: string;
code?: number;
statusCode?: number;
data?: string;
detail?: string;
}

/**
* Conditionally serializes an error for cross-thread transport.
*
* When invoked inside a Piscina worker thread (`parentPort` is non-null), serializes the
* error into a standard {@link Error} whose `message` contains the JSON-encoded payload.
* This ensures that rich error types (e.g. {@link JsonRpcError}, {@link MirrorNodeClientError})
* survive the Structured Clone boundary used by `postMessage`, which otherwise strips
* prototype chains and class-specific fields.
*
* When invoked on the main thread (`parentPort` is null), returns the original error
* unchanged. This avoids the CPU/memory overhead of unnecessary serialization-deserialization
* cycles during local execution on the main thread (e.g. when WORKERS_POOL_ENABLED is false)
* and preserves the original object's identity.
*
* @param err - The error-like value to wrap if in a worker context.
* @returns The original error when called outside a worker, or a JSON-encoded `Error` inside one.
*/
export function wrapError(err: unknown): unknown {
if (!parentPort) {
return err;
}
return new Error(JSON.stringify(err));
}

/**
* Reconstructs the original typed error from an error previously produced by {@link wrapError}.
*
* Parses the JSON payload embedded in `err.message` and attempts to reconstruct one of the
* supported rich error types. Returns a predefined internal error if the input is not an
* `Error`, the message is not valid JSON, or the error name is unrecognised.
*
* Supported error types:
* - {@link JsonRpcError}
* - {@link MirrorNodeClientError}
*
* @param err - An error whose `message` is expected to contain a JSON-encoded {@link ErrorEnvelope}.
* @returns The reconstructed typed error, or a {@link predefined.INTERNAL_ERROR} if the
* envelope cannot be parsed or the type is unsupported.
*/
export function unwrapError(err: unknown): Error {
if (!(err instanceof Error)) {
return predefined.INTERNAL_ERROR('Failed unwrapping piscina error: value is not an Error instance.');
}

let parsedErr: ErrorEnvelope;
try {
parsedErr = JSON.parse(err.message) as ErrorEnvelope;
} catch {
return predefined.INTERNAL_ERROR('Failed parsing wrapped piscina error while unwrapping.');
}

switch (parsedErr?.name) {
case JsonRpcError.name: {
if (typeof parsedErr.code !== 'number') {
return predefined.INTERNAL_ERROR(
'Failed unwrapping piscina error: missing numeric code in JsonRpcError envelope.',
);
}
return new JsonRpcError({
code: parsedErr.code,
data: parsedErr.data,
message: parsedErr.message,
});
}

case MirrorNodeClientError.name: {
if (typeof parsedErr.statusCode !== 'number') {
return predefined.INTERNAL_ERROR(
'Failed unwrapping piscina error: missing numeric statusCode in MirrorNodeClientError envelope.',
);
}
return MirrorNodeClientError.fromJSON(parsedErr.statusCode, parsedErr.message, parsedErr.data, parsedErr.detail);
}

default:
return predefined.INTERNAL_ERROR('Failed unwrapping piscina error.');
}
}
117 changes: 38 additions & 79 deletions packages/relay/src/lib/services/workersService/WorkersPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,9 @@ import { parentPort } from 'worker_threads';

import { MeasurableCache, MirrorNodeClient } from '../../clients';
import { ICacheClient } from '../../clients/cache/ICacheClient';
import { JsonRpcError, predefined } from '../../errors/JsonRpcError';
import { MirrorNodeClientError } from '../../errors/MirrorNodeClientError';
import { RegistryFactory } from '../../factories/registryFactory';

/**
* Plain JSON representation of a serialized error that can be safely transferred across worker or process boundaries.
*/
interface ErrorEnvelope {
name: string;
message: string;
code?: number;
statusCode?: number;
data?: string;
detail?: string;
}
import type { WorkerTask } from './workers';
import { unwrapError } from './WorkersErrorUtils';

/**
* A wrapper around a shared Piscina worker thread pool.
Expand Down Expand Up @@ -80,6 +68,13 @@ export class WorkersPool {
*/
private static workerPoolQueueSizeGauge: Gauge;

/**
* Cached reference to the local task handler function used when {@link WORKERS_POOL_ENABLED} is `false`.
*
* Populated on the first invocation of {@link run} to avoid repeating the dynamic import on every call.
*/
private static handleTaskFn: ((task: WorkerTask) => Promise<any>) | null = null;

/**
* Updates a metric either by delegating the update to a worker thread
* or by executing the update locally when no worker context is present.
Expand Down Expand Up @@ -219,14 +214,36 @@ export class WorkersPool {
}

/**
* Executes a worker task using the shared Piscina pool.
* Executes a worker task using either the shared Piscina thread pool or the main thread,
* depending on the {@link WORKERS_POOL_ENABLED} configuration flag.
*
* When the pool is enabled, the task is dispatched to a Piscina worker thread.
*
* @param options - The data passed to the worker.
* @param mirrorNodeClient - The mirror node client instance.
* @param cacheService - The cache service instance.
* @returns A promise resolving to the worker's result.
* When the pool is disabled, the task is executed locally on the main thread and entirely
* bypasses the worker pool. In this mode `mirrorNodeClient` and `cacheService` are not
* forwarded to the task handler — the worker modules maintain their own module-level
* instances initialised on first use.
*
* @param options - The task descriptor forwarded to the worker handler.
* @param mirrorNodeClient - Mirror node client instance used to forward inter-thread metrics.
* Unused when {@link WORKERS_POOL_ENABLED} is `false`.
* @param cacheService - Cache service instance used to forward inter-thread metrics.
* Unused when {@link WORKERS_POOL_ENABLED} is `false`.
* @returns A promise that resolves to the task handler's return value.
* @throws The original error from the task handler; reconstructed from its serialized form
* in worker mode, or native in local mode.
*/
static async run(options: unknown, mirrorNodeClient: MirrorNodeClient, cacheService: ICacheClient): Promise<any> {
static async run(options: WorkerTask, mirrorNodeClient: MirrorNodeClient, cacheService: ICacheClient): Promise<any> {
if (!ConfigService.get('WORKERS_POOL_ENABLED')) {
if (!this.handleTaskFn) {
// Dynamic import to defer loading worker modules and their dependencies until actually needed,
// ensuring any module-level instances are created once and reused across all local task executions.
const mod = await import('./workers');
this.handleTaskFn = mod.default;
}
return this.handleTaskFn(options);
}

this.mirrorNodeClient = mirrorNodeClient;
this.cacheService = cacheService as MeasurableCache;

Expand All @@ -240,7 +257,7 @@ export class WorkersPool {
const result = await this.getInstance()
.run(options)
.catch((error: unknown) => {
const unwrappedErr = WorkersPool.unwrapError(error);
const unwrappedErr = unwrapError(error);

this.workerTaskDurationSecondsHistogram
?.labels(taskType)
Expand All @@ -259,62 +276,4 @@ export class WorkersPool {

return result;
}

/**
* Wraps an error into a standard `Error` instance by serializing its plain JSON representation into
* the error message. Intended for transporting rich error information (including custom error types) across
* boundaries such as Piscina worker threads.
*
* @param err - An error-like object that implements `toJSON()`.
* @returns A new `Error` whose `message` contains a JSON-encoded
*/
static wrapError(err: any): Error {
return new Error(JSON.stringify(err));
}

/**
* Unwraps an error previously wrapped with {@link wrapError} and attempts to reconstruct the original error
* instance. If parsing fails or the error type is unsupported, a predefined internal error is returned instead.
*
* Supported error types:
* - {@link JsonRpcError}
* - {@link MirrorNodeClientError}
*
* @param err - An error whose `message` is expected to contain a JSON-encoded {@link ErrorEnvelope}.
* @returns The reconstructed error instance, or an internal error if unwrapping fails.
*/
static unwrapError(err: unknown): Error {
if (!(err instanceof Error)) {
return predefined.INTERNAL_ERROR(`Failed unwrapping piscina error: value is not an Error instance.`);
}

let parsedErr: ErrorEnvelope;
try {
parsedErr = JSON.parse(err.message) as ErrorEnvelope;
} catch {
return predefined.INTERNAL_ERROR(`Failed parsing wrapped piscina error while unwrapping.`);
}

switch (parsedErr?.name) {
case JsonRpcError.name: {
return new JsonRpcError({
code: parsedErr.code!,
data: parsedErr.data!,
message: parsedErr.message,
});
}

case MirrorNodeClientError.name: {
return MirrorNodeClientError.fromJSON(
parsedErr.statusCode!,
parsedErr.message,
parsedErr.data,
parsedErr.detail,
);
}

default:
return predefined.INTERNAL_ERROR('Failed unwrapping piscina error.');
}
}
}
12 changes: 9 additions & 3 deletions packages/relay/src/lib/services/workersService/workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ interface GetRawReceiptsTask {
requestDetails: RequestDetails;
}

type WorkerTask = GetLogsTask | GetBlockTask | GetBlockReceiptsTask | GetRawReceiptsTask;
export type WorkerTask = GetLogsTask | GetBlockTask | GetBlockReceiptsTask | GetRawReceiptsTask;

/**
* Main worker export - handles different task types.
* This function is called by Piscina with the task data.
* Dispatches a worker task to the appropriate handler function.
*
* Invoked either by Piscina on a dedicated worker thread, or directly on the main
* thread when {@link WORKERS_POOL_ENABLED} is `false` (local execution mode).
*
* @param task - Discriminated-union descriptor for the task to execute.
* @returns A promise that resolves to the handler's result.
* @throws {Error} If `task.type` does not match any known task variant.
*/
export default async function handleTask(task: WorkerTask): Promise<any> {
switch (task.type) {
Expand Down
Loading
Loading