Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/core/src/autoscaling/autoscaled_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { betterClearInterval, betterSetInterval } from '@apify/utilities';
import { Configuration } from '../configuration';
import { CriticalError } from '../errors';
import { log as defaultLog } from '../log';
import type { LoadSignal } from './load_signal';
import type { SnapshotterOptions } from './snapshotter';
import { Snapshotter } from './snapshotter';
import type { SystemInfo, SystemStatusOptions } from './system_status';
Expand Down Expand Up @@ -203,6 +204,10 @@ export class AutoscaledPool {
private resolve: ((val?: unknown) => void) | null = null;
private reject: ((reason?: unknown) => void) | null = null;
private snapshotter: Snapshotter;

/** Additional SystemStatus loadSignals - tracked here for initialization and cleanup */
private loadSignals: LoadSignal[];

private systemStatus: SystemStatus;
private autoscaleInterval!: BetterIntervalID;
private maybeRunInterval!: BetterIntervalID;
Expand Down Expand Up @@ -295,6 +300,7 @@ export class AutoscaledPool {
});
ssoCopy.config ??= this.config;
this.snapshotter = ssoCopy.snapshotter;
this.loadSignals = ssoCopy.loadSignals ?? [];
this.systemStatus = new SystemStatus(ssoCopy);
}

Expand Down Expand Up @@ -366,6 +372,7 @@ export class AutoscaledPool {
});

await this.snapshotter.start();
await Promise.all(this.loadSignals.map((s) => s.start()));

// This interval checks the system status and updates the desired concurrency accordingly.
this.autoscaleInterval = betterSetInterval(this._autoscale, this.autoscaleIntervalMillis);
Expand Down Expand Up @@ -699,6 +706,7 @@ export class AutoscaledPool {
betterClearInterval(this.maybeRunInterval);
if (this.tasksDonePerSecondInterval) betterClearInterval(this.tasksDonePerSecondInterval);
if (this.snapshotter) await this.snapshotter.stop();
await Promise.all(this.loadSignals.map((s) => s.stop()));
}

protected _incrementTasksDonePerSecond(intervalCallback: () => void) {
Expand Down
60 changes: 60 additions & 0 deletions packages/core/src/autoscaling/client_load_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { StorageClient } from '@crawlee/types';

import type { LoadSnapshot } from './load_signal';
import { SnapshotStore } from './load_signal';

const CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT = 2;

export interface ClientSnapshot extends LoadSnapshot {
rateLimitErrorCount: number;
}

export interface ClientLoadSignalOptions {
client: StorageClient;
clientSnapshotIntervalSecs?: number;
maxClientErrors?: number;
overloadedRatio?: number;
snapshotHistoryMillis?: number;
}

/**
* Periodically checks the storage client for rate-limit errors (HTTP 429)
* and reports overload when the error delta exceeds a threshold.
*/
export function createClientLoadSignal(options: ClientLoadSignalOptions) {
const maxClientErrors = options.maxClientErrors ?? 3;

const signal = SnapshotStore.fromInterval<ClientSnapshot>({
name: 'clientInfo',
overloadedRatio: options.overloadedRatio ?? 0.3,
intervalMillis: (options.clientSnapshotIntervalSecs ?? 1) * 1000,
snapshotHistoryMillis: options.snapshotHistoryMillis,
handler(store, intervalCallback) {
const now = new Date();

const allErrorCounts = options.client.stats?.rateLimitErrors ?? [];
const currentErrCount = allErrorCounts[CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT] || 0;

const snapshot: ClientSnapshot = {
createdAt: now,
isOverloaded: false,
rateLimitErrorCount: currentErrCount,
};
const all = store.getAll();
const previousSnapshot = all[all.length - 1];
if (previousSnapshot) {
const { rateLimitErrorCount } = previousSnapshot;
const delta = currentErrCount - rateLimitErrorCount;
if (delta > maxClientErrors) snapshot.isOverloaded = true;
}

store.push(snapshot, now);
intervalCallback();
},
});

return signal;
}

/** @internal Return type for backward compat in Snapshotter facade */
export type ClientLoadSignal = ReturnType<typeof createClientLoadSignal>;
45 changes: 45 additions & 0 deletions packages/core/src/autoscaling/cpu_load_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { Configuration } from '../configuration';
import { EventType } from '../events/event_manager';
import type { LoadSnapshot } from './load_signal';
import { SnapshotStore } from './load_signal';
import type { SystemInfo } from './system_status';

export interface CpuSnapshot extends LoadSnapshot {
usedRatio: number;
ticks?: { idle: number; total: number };
}

export interface CpuLoadSignalOptions {
overloadedRatio?: number;
snapshotHistoryMillis?: number;
config: Configuration;
}

/**
* Tracks CPU usage via `SYSTEM_INFO` events and reports overload when
* the platform or local OS metrics indicate the CPU is overloaded.
*/
export function createCpuLoadSignal(options: CpuLoadSignalOptions) {
return SnapshotStore.fromEvent<CpuSnapshot, SystemInfo>({
name: 'cpuInfo',
overloadedRatio: options.overloadedRatio ?? 0.4,
events: options.config.getEventManager(),
event: EventType.SYSTEM_INFO,
snapshotHistoryMillis: options.snapshotHistoryMillis,
handler(store, systemInfo) {
const { cpuCurrentUsage, isCpuOverloaded } = systemInfo;
const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date();
store.push(
{
createdAt,
isOverloaded: isCpuOverloaded!,
usedRatio: Math.ceil(cpuCurrentUsage! / 100),
},
createdAt,
);
},
});
}

/** @internal Return type for backward compat in Snapshotter facade */
export type CpuLoadSignal = ReturnType<typeof createCpuLoadSignal>;
56 changes: 56 additions & 0 deletions packages/core/src/autoscaling/event_loop_load_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import type { LoadSnapshot } from './load_signal';
import { SnapshotStore } from './load_signal';

export interface EventLoopSnapshot extends LoadSnapshot {
exceededMillis: number;
}

export interface EventLoopLoadSignalOptions {
eventLoopSnapshotIntervalSecs?: number;
maxBlockedMillis?: number;
overloadedRatio?: number;
snapshotHistoryMillis?: number;
}

/**
* Periodically measures event loop delay and reports overload when the
* delay exceeds a configured threshold.
*/
export function createEventLoopLoadSignal(options: EventLoopLoadSignalOptions = {}) {
const intervalMillis = (options.eventLoopSnapshotIntervalSecs ?? 0.5) * 1000;
const maxBlockedMillis = options.maxBlockedMillis ?? 50;

const signal = SnapshotStore.fromInterval<EventLoopSnapshot>({
name: 'eventLoopInfo',
overloadedRatio: options.overloadedRatio ?? 0.6,
intervalMillis,
snapshotHistoryMillis: options.snapshotHistoryMillis,
handler(store, intervalCallback) {
const now = new Date();

const snapshot: EventLoopSnapshot = {
createdAt: now,
isOverloaded: false,
exceededMillis: 0,
};

const all = store.getAll();
const previousSnapshot = all[all.length - 1];
if (previousSnapshot) {
const { createdAt } = previousSnapshot;
const delta = now.getTime() - +createdAt - intervalMillis;

if (delta > maxBlockedMillis) snapshot.isOverloaded = true;
snapshot.exceededMillis = Math.max(delta - maxBlockedMillis, 0);
}

store.push(snapshot, now);
intervalCallback();
},
});

return signal;
}

/** @internal Return type for backward compat in Snapshotter facade */
export type EventLoopLoadSignal = ReturnType<typeof createEventLoopLoadSignal>;
5 changes: 5 additions & 0 deletions packages/core/src/autoscaling/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export * from './autoscaled_pool';
export * from './client_load_signal';
export * from './cpu_load_signal';
export * from './event_loop_load_signal';
export * from './load_signal';
export * from './memory_load_signal';
export * from './snapshotter';
export * from './system_status';
Loading