Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
"url": "https://github.com/apify/apify-client-js/issues"
},
"homepage": "https://docs.apify.com/api/client/js/",
"files": [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for testing, revert before merging.

"dist",
"!dist/*.tsbuildinfo"
],
"scripts": {
"build": "npm run clean && npm run build:node && npm run build:browser",
"postbuild": "gen-esm-wrapper dist/index.js dist/index.mjs",
Expand Down
3 changes: 3 additions & 0 deletions src/resource_clients/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,15 @@ export class ActorClient extends ResourceClient {
const newRunClient = this.apifyClient.run(id);

const streamedLog = await newRunClient.getStreamedLog({ toLog: options?.log });
const statusMessageWatcher = await newRunClient.getStatusMessageWatcher({ toLog: options?.log });
streamedLog?.start();
statusMessageWatcher?.start();
return this.apifyClient
.run(id)
.waitForFinish({ waitSecs })
.finally(async () => {
await streamedLog?.stop();
await statusMessageWatcher?.stop();
});
}

Expand Down
78 changes: 78 additions & 0 deletions src/resource_clients/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { ApiClientSubResourceOptions } from '../base/api_client';
import { ResourceClient } from '../base/resource_client';
import type { ApifyRequestConfig } from '../http_client';
import { cast, catchNotFoundOrThrow } from '../utils';
import type { RunClient } from './run';

export class LogClient extends ResourceClient {
/**
Expand Down Expand Up @@ -237,3 +238,80 @@ export interface StreamedLogOptions {
/** Whether to redirect all logs from Actor run start (even logs from the past). */
fromStart?: boolean;
}

/**
* Helper class for redirecting Actor run status and status message to log.
*/
export class StatusMessageWatcher {
private static finalSleepTimeMs = 6000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two constants are for sure up for discussion.

private static defaultCheckPeriodMs = 5000;

protected toLog: Log;
protected checkPeriod: number;
protected lastStatusMessage = '';
private runClient: RunClient;
private loggingTask: Promise<void> | null = null;
private stopLogging = false;

constructor(options: StatusMessageWatcherOptions) {
const { toLog, runClient, checkPeriod = StatusMessageWatcher.defaultCheckPeriodMs } = options;
this.runClient = runClient;
this.toLog = toLog;
this.checkPeriod = checkPeriod;
}

/**
* Start Actor run status and status message redirection.
*/
start() {
if (this.loggingTask) {
throw new Error('Logging task already active');
}
this.stopLogging = false;
this.loggingTask = this._logChangedStatusMessage();
}

/**
* Stop Actor run status and status message redirection.
*/
async stop(): Promise<void> {
if (!this.loggingTask) {
throw new Error('Logging task is not active');
}
await new Promise((resolve) => {
// Wait for the final status and status message to be set
setTimeout(resolve, StatusMessageWatcher.finalSleepTimeMs);
});
this.stopLogging = true;
await this.loggingTask;
this.loggingTask = null;
}

async _logChangedStatusMessage(): Promise<void> {
while (!this.stopLogging) {
const runData = await this.runClient.get();
if (runData !== undefined) {
const status = runData.status ?? 'Unknown status';
const statusMessage = runData.statusMessage ?? '';
const newStatusMessage = `Status: ${status}, Message: ${statusMessage}`;
if (newStatusMessage !== this.lastStatusMessage) {
// Log only when status or status message changed
this.lastStatusMessage = newStatusMessage;
this.toLog.info(newStatusMessage);
}
await new Promise((resolve) => {
setTimeout(resolve, this.checkPeriod);
});
}
}
}
}

export interface StatusMessageWatcherOptions {
/** Run client used to communicate with the Apify API. */
runClient: RunClient;
/** Log to which the Actor run logs will be redirected. */
toLog: Log;
/** How often should change in status be checked. */
checkPeriod?: number;
}
49 changes: 37 additions & 12 deletions src/resource_clients/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { cast, isNode, parseDateFields, pluckData } from '../utils';
import type { ActorRun } from './actor';
import { DatasetClient } from './dataset';
import { KeyValueStoreClient } from './key_value_store';
import { LogClient, LoggerActorRedirect, StreamedLog } from './log';
import { LogClient, LoggerActorRedirect, StatusMessageWatcher, StreamedLog } from './log';
import { RequestQueueClient } from './request_queue';

const RUN_CHARGE_IDEMPOTENCY_HEADER = 'idempotency-key';
Expand Down Expand Up @@ -279,24 +279,49 @@ export class RunClient extends ResourceClient {
return undefined;
}
if (toLog === undefined || toLog === 'default') {
// Create default StreamedLog
// Get actor name and run id
const runData = await this.get();
const runId = runData?.id ?? '';

const actorId = runData?.actId ?? '';
const actorData = (await this.apifyClient.actor(actorId).get()) || { name: '' };
toLog = await this.getActorRedirectLog();
}

const actorName = actorData?.name ?? '';
const name = [actorName, `runId:${runId}`].filter(Boolean).join(' ');
return new StreamedLog({ logClient: this.log(), toLog, fromStart });
}

toLog = new Log({ level: LEVELS.DEBUG, prefix: `${name} -> `, logger: new LoggerActorRedirect() });
/**
* Get StatusMessageWatcher for convenient streaming of the Actor run status message and its redirection.
*/
async getStatusMessageWatcher(
options: getStatusMessageWatcherOptions = {},
): Promise<StatusMessageWatcher | undefined> {
let { toLog } = options;
if (toLog === null || !isNode()) {
// Explicitly no logging or not in Node.js
return undefined;
}
if (toLog === undefined || toLog === 'default') {
toLog = await this.getActorRedirectLog();
}
return new StatusMessageWatcher({ toLog, runClient: this, checkPeriod: options.checkPeriod });
}

return new StreamedLog({ logClient: this.log(), toLog, fromStart });
private async getActorRedirectLog(): Promise<Log> {
// Get actor name and run id
const runData = await this.get();
const runId = runData ? `${runData.id ?? ''}` : '';

const actorId = runData?.actId ?? '';
const actorData = (await this.apifyClient.actor(actorId).get()) || { name: '' };

const actorName = runData ? (actorData.name ?? '') : '';
const name = [actorName, `runId:${runId}`].filter(Boolean).join(' ');

return new Log({ level: LEVELS.DEBUG, prefix: `${name} -> `, logger: new LoggerActorRedirect() });
}
}

export interface getStatusMessageWatcherOptions {
toLog?: Log | null | 'default';
checkPeriod?: number;
}

export interface GetStreamedLogOptions {
toLog?: Log | null | 'default';
fromStart?: boolean;
Expand Down
91 changes: 89 additions & 2 deletions test/actors.test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const { Browser, validateRequest, DEFAULT_OPTIONS } = require('./_helper');
const { ActorListSortBy, ApifyClient, LoggerActorRedirect } = require('apify-client');
const { ActorListSortBy, ApifyClient, LoggerActorRedirect, RunClient, StatusMessageWatcher } = require('apify-client');
const { stringifyWebhooksToBase64 } = require('../src/utils');
const { mockServer, createDefaultApp } = require('./mock_server/server');
const c = require('ansi-colors');
const { MOCKED_ACTOR_LOGS_PROCESSED, StatusGenerator } = require('./mock_server/test_utils');
const { MOCKED_ACTOR_LOGS_PROCESSED, StatusGenerator, MOCKED_ACTOR_STATUSES } = require('./mock_server/test_utils');
const { Log, LEVELS } = require('@apify/log');
const express = require('express');
const { setTimeout } = require('node:timers/promises');
Expand Down Expand Up @@ -727,12 +727,16 @@ describe('Run actor with redirected logs', () => {

describe('actor.call - redirected logs', () => {
test.each(testCases)('logOptions:$logOptions', async ({ expectedPrefix, logOptions }) => {
const mockedGetStatusMessageWatcher = jest
.spyOn(RunClient.prototype, 'getStatusMessageWatcher')
.mockImplementation(() => {});
const logSpy = jest.spyOn(console, 'log').mockImplementation(() => {});

await client.actor('redirect-actor-id').call(undefined, logOptions);

expect(logSpy.mock.calls).toEqual(MOCKED_ACTOR_LOGS_PROCESSED.map((item) => [expectedPrefix + item]));
logSpy.mockRestore();
mockedGetStatusMessageWatcher.mockRestore();
});

test('logOptions:{ "log": null }', async () => {
Expand All @@ -745,3 +749,86 @@ describe('Run actor with redirected logs', () => {
});
});
});

describe('Run actor with status message watcher', () => {
let baseUrl;
let client;
const originalCheckPeriodMs = StatusMessageWatcher.defaultCheckPeriodMs;
const originalFinalSleepTimeMs = StatusMessageWatcher.finalSleepTimeMs;
const statusGenerator = new StatusGenerator();

beforeAll(async () => {
// Use custom router for the tests
const router = express.Router();
// Set up a status generator to simulate run status changes. It will be reset for each test.
router.get('/actor-runs/redirect-run-id', async (req, res) => {
// Delay the response to give the actor time to run and produce expected logs
await setTimeout(10);
let [status, statusMessage] = ['', ''];
[status, statusMessage] = statusGenerator.next().value;
res.json({ data: { id: 'redirect-run-id', actId: 'redirect-actor-id', status, statusMessage } });
});
const app = createDefaultApp(router);
const server = await mockServer.start(undefined, app);
baseUrl = `http://localhost:${server.address().port}`;

StatusMessageWatcher.defaultCheckPeriodMs = 1; // speed up tests
StatusMessageWatcher.finalSleepTimeMs = 1; // speed up tests
});

afterAll(async () => {
await Promise.all([mockServer.close()]);
StatusMessageWatcher.defaultCheckPeriodMs = originalCheckPeriodMs;
StatusMessageWatcher.finalSleepTimeMs = originalFinalSleepTimeMs;
});

beforeEach(async () => {
client = new ApifyClient({
baseUrl,
maxRetries: 0,
...DEFAULT_OPTIONS,
});
});
afterEach(async () => {
// Reset the generator to so that the next test starts fresh
statusGenerator.reset();
client = null;
});

const testCases = [
{ expectedPrefix: c.cyan('redirect-actor-name runId:redirect-run-id -> '), logOptions: {} },
{ expectedPrefix: c.cyan('redirect-actor-name runId:redirect-run-id -> '), logOptions: { log: 'default' } },
{
expectedPrefix: c.cyan('custom prefix...'),
logOptions: {
log: new Log({ level: LEVELS.DEBUG, prefix: 'custom prefix...', logger: new LoggerActorRedirect() }),
},
},
];

describe('actor.call - status watcher', () => {
test.each(testCases)('logOptions:$logOptions', async ({ expectedPrefix, logOptions }) => {
const logSpy = jest.spyOn(console, 'log').mockImplementation(() => {});
// Ignore StreamedLog to not clutter the status message watcher test
const mockedGetStreamedLog = jest.spyOn(RunClient.prototype, 'getStreamedLog').mockImplementation(() => {});

await client.actor('redirect-actor-id').call(undefined, logOptions);

const uniqueStatuses = Array.from(new Set(MOCKED_ACTOR_STATUSES.map(JSON.stringify))).map(JSON.parse);
expect(logSpy.mock.calls).toEqual(
uniqueStatuses.map((item) => [`${expectedPrefix}Status: ${item[0]}, Message: ${item[1]}`]),
);
logSpy.mockRestore();
mockedGetStreamedLog.mockRestore();
});

test('no log', async () => {
const logSpy = jest.spyOn(console, 'log').mockImplementation(() => {});

await client.actor('redirect-actor-id').call(undefined, { log: null });

expect(logSpy.mock.calls).toEqual([]);
logSpy.mockRestore();
});
});
});
8 changes: 3 additions & 5 deletions test/mock_server/test_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ const MOCKED_ACTOR_LOGS_PROCESSED = [
];

const MOCKED_ACTOR_STATUSES = [
['RUNNING', 'Actor Started'],
['RUNNING', 'Actor Started'],
['RUNNING', 'Doing some stuff'],
['RUNNING', 'Doing some stuff'],
['RUNNING', 'Doing some stuff'],
['RUNNING', 'Doing some stuff'],
['RUNNING', 'Doing some stuff'],
['RUNNING', 'Doing some stuff'],
['RUNNING', 'Doing some stuff'],
['RUNNING', 'Doing some stuff'],
['SUCCEEDED', 'Actor Finished'],
['SUCCEEDED', 'Actor Finished'],
['SUCCEEDED', 'Actor Finished'],
];

Expand Down
Loading