Skip to content

Commit a7f4233

Browse files
authored
feat: Add redirected actor logs (#769)
### Description - Add convenience methods and arguments for redirecting the streamed actor log to another log. - This changes the default behavior in non non-breaking way. By default, the logs will be redirected now. - Update log endpoints calls to accept `raw` parameter. - Parity with Python implementation apify/apify-client-python#403 ### Example usage: #### Actor.call - default This will redirect logs by default. Example default redirected line: > 2025-10-22T13:06:39.476Z redirect-log-tester runId:SoK1VRJxG61tVrgNz -> 2025-10-22T13:06:06.686Z ACTOR: Pulling container image of build Gh9yAJtjw0rHpc1NU from registry. ```javascript ... await client.actor('redirect-actor-id').call(); ``` #### Actor.call - custom Log This will redirect logs using custom log and logger. Example default redirected line: > 2025-10-22T13:06:39.476Z customPrefix 2025-10-22T13:06:06.686Z ACTOR: Pulling container image of build Gh9yAJtjw0rHpc1NU from registry. ```javascript ... await client.actor('redirect-actor-id').call(someInputs, { log: new Log({ level: LEVELS.DEBUG, prefix: 'customPrefix', logger: new LoggerActorRedirect() }), ``` #### Actor.call - no log redirection This will disable all log redirection (same as current behavior) ```javascript ... await client.actor('redirect-actor-id').call(someInputs, {log: null}), ``` #### Actor.run - attaching to already running actor and redirecting new logs A typical use case is redirecting logs from an Actor that runs in standby. We do not want all the logs; we only want the new logs generated from the moment of connection. ```javascript ... const streamedLog = await client.run('someActorRunId').getStreamedLog({ fromStart: false }); streamedLog.start(); // Do some stuff while also redirecting logs from another Actor await streamedLog.stop(); ``` #### Example actor with recursive redirection: https://console.apify.com/actors/IcfIKTIvbmujMmovj/source ### Issues - Partially implements: #632
1 parent c1dc896 commit a7f4233

23 files changed

+536
-71
lines changed

package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"@apify/utilities": "^2.23.2",
6868
"@crawlee/types": "^3.3.0",
6969
"agentkeepalive": "^4.2.1",
70+
"ansi-colors": "^4.1.1",
7071
"async-retry": "^1.3.3",
7172
"axios": "^1.6.7",
7273
"content-type": "^1.0.5",

src/resource_clients/actor.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import ow from 'ow';
33

44
import type { RUN_GENERAL_ACCESS } from '@apify/consts';
55
import { ACT_JOB_STATUSES, ACTOR_PERMISSION_LEVEL, META_ORIGINS } from '@apify/consts';
6+
import { Log } from '@apify/log';
67

78
import type { ApiClientSubResourceOptions } from '../base/api_client';
89
import { ResourceClient } from '../base/resource_client';
@@ -139,18 +140,28 @@ export class ActorClient extends ResourceClient {
139140
webhooks: ow.optional.array.ofType(ow.object),
140141
maxItems: ow.optional.number.not.negative,
141142
maxTotalChargeUsd: ow.optional.number.not.negative,
143+
log: ow.optional.any(ow.null, ow.object.instanceOf(Log), ow.string.equals('default')),
142144
restartOnError: ow.optional.boolean,
143145
forcePermissionLevel: ow.optional.string.oneOf(Object.values(ACTOR_PERMISSION_LEVEL)),
144146
}),
145147
);
146148

147-
const { waitSecs, ...startOptions } = options;
149+
const { waitSecs, log, ...startOptions } = options;
148150
const { id } = await this.start(input, startOptions);
149151

150152
// Calling root client because we need access to top level API.
151153
// Creating a new instance of RunClient here would only allow
152154
// setting it up as a nested route under actor API.
153-
return this.apifyClient.run(id).waitForFinish({ waitSecs });
155+
const newRunClient = this.apifyClient.run(id);
156+
157+
const streamedLog = await newRunClient.getStreamedLog({ toLog: options?.log });
158+
streamedLog?.start();
159+
return this.apifyClient
160+
.run(id)
161+
.waitForFinish({ waitSecs })
162+
.finally(async () => {
163+
await streamedLog?.stop();
164+
});
154165
}
155166

156167
/**
@@ -425,7 +436,16 @@ export interface ActorStartOptions {
425436
}
426437

427438
export interface ActorCallOptions extends Omit<ActorStartOptions, 'waitForFinish'> {
439+
/**
440+
* Wait time in seconds for the actor run to finish.
441+
*/
428442
waitSecs?: number;
443+
/**
444+
* `Log` instance that should be used to redirect actor run logs to.
445+
* If `undefined` or `'default'` the pre-defined `Log` will be created and used.
446+
* If `null`, no log redirection will occur.
447+
*/
448+
log?: Log | null | 'default';
429449
}
430450

431451
export interface ActorRunListItem {

src/resource_clients/log.ts

Lines changed: 177 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
// eslint-disable-next-line max-classes-per-file
12
import type { Readable } from 'node:stream';
23

4+
import c from 'ansi-colors';
5+
6+
import type { Log } from '@apify/log';
7+
import { Logger, LogLevel } from '@apify/log';
8+
39
import type { ApifyApiError } from '../apify_api_error';
410
import type { ApiClientSubResourceOptions } from '../base/api_client';
511
import { ResourceClient } from '../base/resource_client';
@@ -20,11 +26,11 @@ export class LogClient extends ResourceClient {
2026
/**
2127
* https://docs.apify.com/api/v2#/reference/logs/log/get-log
2228
*/
23-
async get(): Promise<string | undefined> {
29+
async get(options: LogOptions = {}): Promise<string | undefined> {
2430
const requestOpts: ApifyRequestConfig = {
2531
url: this._url(),
2632
method: 'GET',
27-
params: this._params(),
33+
params: this._params(options),
2834
};
2935

3036
try {
@@ -41,9 +47,10 @@ export class LogClient extends ResourceClient {
4147
* Gets the log in a Readable stream format. Only works in Node.js.
4248
* https://docs.apify.com/api/v2#/reference/logs/log/get-log
4349
*/
44-
async stream(): Promise<Readable | undefined> {
50+
async stream(options: LogOptions = {}): Promise<Readable | undefined> {
4551
const params = {
4652
stream: true,
53+
raw: options.raw,
4754
};
4855

4956
const requestOpts: ApifyRequestConfig = {
@@ -63,3 +70,170 @@ export class LogClient extends ResourceClient {
6370
return undefined;
6471
}
6572
}
73+
74+
export interface LogOptions {
75+
/** @default false */
76+
raw?: boolean;
77+
}
78+
79+
/**
80+
* Logger for redirected actor logs.
81+
*/
82+
export class LoggerActorRedirect extends Logger {
83+
constructor(options = {}) {
84+
super({ skipTime: true, level: LogLevel.DEBUG, ...options });
85+
}
86+
87+
override _log(level: LogLevel, message: string, data?: any, exception?: unknown, opts: Record<string, any> = {}) {
88+
if (level > this.options.level) {
89+
return;
90+
}
91+
if (data || exception) {
92+
throw new Error('Redirect logger does not use other arguments than level and message');
93+
}
94+
let { prefix } = opts;
95+
prefix = prefix ? `${prefix}` : '';
96+
97+
let maybeDate = '';
98+
if (!this.options.skipTime) {
99+
maybeDate = `${new Date().toISOString().replace('Z', '').replace('T', ' ')} `;
100+
}
101+
102+
const line = `${c.gray(maybeDate)}${c.cyan(prefix)}${message || ''}`;
103+
104+
// All redirected logs are logged at info level to avid any console specific formating for non-info levels,
105+
// which have already been applied once to the original log. (For example error stack traces etc.)
106+
this._outputWithConsole(LogLevel.INFO, line);
107+
return line;
108+
}
109+
}
110+
111+
/**
112+
* Helper class for redirecting streamed Actor logs to another log.
113+
*/
114+
export class StreamedLog {
115+
private destinationLog: Log;
116+
private streamBuffer: Buffer[] = [];
117+
private splitMarker = /(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)/g;
118+
private relevancyTimeLimit: Date | null;
119+
120+
private logClient: LogClient;
121+
private streamingTask: Promise<void> | null = null;
122+
private stopLogging = false;
123+
124+
constructor(options: StreamedLogOptions) {
125+
const { toLog, logClient, fromStart = true } = options;
126+
this.destinationLog = toLog;
127+
this.logClient = logClient;
128+
this.relevancyTimeLimit = fromStart ? null : new Date();
129+
}
130+
131+
/**
132+
* Start log redirection.
133+
*/
134+
public start(): void {
135+
if (this.streamingTask) {
136+
throw new Error('Streaming task already active');
137+
}
138+
this.stopLogging = false;
139+
this.streamingTask = this.streamLog();
140+
}
141+
142+
/**
143+
* Stop log redirection.
144+
*/
145+
public async stop(): Promise<void> {
146+
if (!this.streamingTask) {
147+
throw new Error('Streaming task is not active');
148+
}
149+
this.stopLogging = true;
150+
try {
151+
await this.streamingTask;
152+
} catch (err) {
153+
if (!(err instanceof Error && err.name === 'AbortError')) {
154+
throw err;
155+
}
156+
} finally {
157+
this.streamingTask = null;
158+
}
159+
}
160+
161+
/**
162+
* Get log stream from response and redirect it to another log.
163+
*/
164+
private async streamLog(): Promise<void> {
165+
const logStream = await this.logClient.stream({ raw: true });
166+
if (!logStream) {
167+
return;
168+
}
169+
const lastChunkRemainder = await this.logStreamChunks(logStream);
170+
// Process whatever is left when exiting. Maybe it is incomplete, maybe it is last log without EOL.
171+
const lastMessage = Buffer.from(lastChunkRemainder).toString().trim();
172+
if (lastMessage.length) {
173+
this.destinationLog.info(lastMessage);
174+
}
175+
}
176+
177+
private async logStreamChunks(logStream: Readable): Promise<Uint8Array> {
178+
// Chunk may be incomplete. Keep remainder for next chunk.
179+
let previousChunkRemainder: Uint8Array = new Uint8Array();
180+
181+
for await (const chunk of logStream) {
182+
// Handle possible leftover incomplete line from previous chunk.
183+
// Everything before last end of line is complete.
184+
const chunkWithPreviousRemainder = new Uint8Array(previousChunkRemainder.length + chunk.length);
185+
chunkWithPreviousRemainder.set(previousChunkRemainder, 0);
186+
chunkWithPreviousRemainder.set(chunk, previousChunkRemainder.length);
187+
188+
const lastCompleteMessageIndex = chunkWithPreviousRemainder.lastIndexOf(0x0a);
189+
previousChunkRemainder = chunkWithPreviousRemainder.slice(lastCompleteMessageIndex);
190+
191+
// Push complete part of the chunk to the buffer
192+
this.streamBuffer.push(Buffer.from(chunkWithPreviousRemainder.slice(0, lastCompleteMessageIndex)));
193+
this.logBufferContent();
194+
195+
// Keep processing the new data until stopped
196+
if (this.stopLogging) {
197+
break;
198+
}
199+
}
200+
return previousChunkRemainder;
201+
}
202+
203+
/**
204+
* Parse the buffer and log complete messages.
205+
*/
206+
private logBufferContent(): void {
207+
const allParts = Buffer.concat(this.streamBuffer).toString().split(this.splitMarker).slice(1);
208+
// Parse the buffer parts into complete messages
209+
const messageMarkers = allParts.filter((_, i) => i % 2 === 0);
210+
const messageContents = allParts.filter((_, i) => i % 2 !== 0);
211+
this.streamBuffer = [];
212+
213+
messageMarkers.forEach((marker, index) => {
214+
const decodedMarker = marker;
215+
const decodedContent = messageContents[index];
216+
if (this.relevancyTimeLimit) {
217+
// Log only relevant messages. Ignore too old log messages.
218+
const logTime = new Date(decodedMarker);
219+
if (logTime < this.relevancyTimeLimit) {
220+
return;
221+
}
222+
}
223+
const message = decodedMarker + decodedContent;
224+
225+
// Original log level information is not available. Log all on info level. Log level could be guessed for
226+
// some logs, but for any multiline logs such guess would be probably correct only for the first line.
227+
this.destinationLog.info(message.trim());
228+
});
229+
}
230+
}
231+
232+
export interface StreamedLogOptions {
233+
/** Log client used to communicate with the Apify API. */
234+
logClient: LogClient;
235+
/** Log to which the Actor run logs will be redirected. */
236+
toLog: Log;
237+
/** Whether to redirect all logs from Actor run start (even logs from the past). */
238+
fromStart?: boolean;
239+
}

src/resource_clients/run.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ import type { AxiosRequestConfig } from 'axios';
22
import ow from 'ow';
33

44
import type { RUN_GENERAL_ACCESS } from '@apify/consts';
5+
import { LEVELS, Log } from '@apify/log';
56

67
import type { ApiClientOptionsWithOptionalResourcePath } from '../base/api_client';
78
import { ResourceClient } from '../base/resource_client';
89
import type { ApifyResponse } from '../http_client';
9-
import { cast, parseDateFields, pluckData } from '../utils';
10+
import { cast, isNode, parseDateFields, pluckData } from '../utils';
1011
import type { ActorRun } from './actor';
1112
import { DatasetClient } from './dataset';
1213
import { KeyValueStoreClient } from './key_value_store';
13-
import { LogClient } from './log';
14+
import { LogClient, LoggerActorRedirect, StreamedLog } from './log';
1415
import { RequestQueueClient } from './request_queue';
1516

1617
const RUN_CHARGE_IDEMPOTENCY_HEADER = 'idempotency-key';
@@ -266,6 +267,39 @@ export class RunClient extends ResourceClient {
266267
}),
267268
);
268269
}
270+
271+
/**
272+
* Get StreamedLog for convenient streaming of the run log and their redirection.
273+
*/
274+
async getStreamedLog(options: GetStreamedLogOptions = {}): Promise<StreamedLog | undefined> {
275+
const { fromStart = true } = options;
276+
let { toLog } = options;
277+
if (toLog === null || !isNode()) {
278+
// Explicitly no logging or not in Node.js
279+
return undefined;
280+
}
281+
if (toLog === undefined || toLog === 'default') {
282+
// Create default StreamedLog
283+
// Get actor name and run id
284+
const runData = await this.get();
285+
const runId = runData?.id ?? '';
286+
287+
const actorId = runData?.actId ?? '';
288+
const actorData = (await this.apifyClient.actor(actorId).get()) || { name: '' };
289+
290+
const actorName = actorData?.name ?? '';
291+
const name = [actorName, `runId:${runId}`].filter(Boolean).join(' ');
292+
293+
toLog = new Log({ level: LEVELS.DEBUG, prefix: `${name} -> `, logger: new LoggerActorRedirect() });
294+
}
295+
296+
return new StreamedLog({ logClient: this.log(), toLog, fromStart });
297+
}
298+
}
299+
300+
export interface GetStreamedLogOptions {
301+
toLog?: Log | null | 'default';
302+
fromStart?: boolean;
269303
}
270304

271305
export interface RunGetOptions {

test/_helper.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const { launchPuppeteer, puppeteerUtils } = require('@crawlee/puppeteer');
22

3-
const mockServer = require('./mock_server/server');
3+
const { mockServer } = require('./mock_server/server');
44

55
class Browser {
66
async start() {

0 commit comments

Comments
 (0)