diff --git a/package-lock.json b/package-lock.json index f127a4684..37111fa7e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9300,9 +9300,10 @@ } }, "node_modules/heap-js": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/heap-js/-/heap-js-2.3.0.tgz", - "integrity": "sha512-E5303mzwQ+4j/n2J0rDvEPBN7GKjhis10oHiYOgjxsmxYgqG++hz9NyLLOXttzH8as/DyiBHYpUrJTZWYaMo8Q==", + "version": "2.6.0", + "resolved": "https://registry.npmjs.org/heap-js/-/heap-js-2.6.0.tgz", + "integrity": "sha512-trFMIq3PATiFRiQmNNeHtsrkwYRByIXUbYNbotiY9RLVfMkdwZdd2eQ38mGt7BRiCKBaj1DyBAIHmm7mmXPuuw==", + "license": "BSD-3-Clause", "engines": { "node": ">=10.0.0" } @@ -18604,7 +18605,7 @@ "@temporalio/proto": "file:../proto", "@temporalio/workflow": "file:../workflow", "abort-controller": "^3.0.0", - "heap-js": "^2.3.0", + "heap-js": "^2.6.0", "memfs": "^4.6.0", "nexus-rpc": "^0.0.1", "proto3-json-serializer": "^2.0.0", @@ -20730,7 +20731,7 @@ "@temporalio/workflow": "file:../workflow", "@types/supports-color": "^8.1.3", "abort-controller": "^3.0.0", - "heap-js": "^2.3.0", + "heap-js": "^2.6.0", "memfs": "^4.6.0", "nexus-rpc": "^0.0.1", "proto3-json-serializer": "^2.0.0", @@ -25368,9 +25369,9 @@ } }, "heap-js": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/heap-js/-/heap-js-2.3.0.tgz", - "integrity": "sha512-E5303mzwQ+4j/n2J0rDvEPBN7GKjhis10oHiYOgjxsmxYgqG++hz9NyLLOXttzH8as/DyiBHYpUrJTZWYaMo8Q==" + "version": "2.6.0", + "resolved": "https://registry.npmjs.org/heap-js/-/heap-js-2.6.0.tgz", + "integrity": "sha512-trFMIq3PATiFRiQmNNeHtsrkwYRByIXUbYNbotiY9RLVfMkdwZdd2eQ38mGt7BRiCKBaj1DyBAIHmm7mmXPuuw==" }, "hosted-git-info": { "version": "7.0.2", diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index fd724a36b..ead48b903 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -141,6 +141,7 @@ export const bundlerOptions = { 'timers', 'timers/promises', require.resolve('./activities'), + require.resolve('./mock-native-worker'), ], }; diff --git a/packages/test/src/test-runtime.ts b/packages/test/src/test-runtime.ts index 8c030795c..405e0a90b 100644 --- a/packages/test/src/test-runtime.ts +++ b/packages/test/src/test-runtime.ts @@ -2,14 +2,15 @@ * Test the lifecycle of the Runtime singleton. * Tests run serially because Runtime is a singleton. */ -import test from 'ava'; import { v4 as uuid4 } from 'uuid'; import asyncRetry from 'async-retry'; -import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString } from '@temporalio/worker'; +import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString, NativeConnection } from '@temporalio/worker'; import { Client, WorkflowClient } from '@temporalio/client'; +import * as wf from '@temporalio/workflow'; import { defaultOptions } from './mock-native-worker'; import * as workflows from './workflows'; -import { RUN_INTEGRATION_TESTS, Worker } from './helpers'; +import { RUN_INTEGRATION_TESTS, Worker, test } from './helpers'; +import { createTestWorkflowBundle } from './helpers-integration'; if (RUN_INTEGRATION_TESTS) { test.serial('Runtime can be created and disposed', async (t) => { @@ -113,4 +114,53 @@ if (RUN_INTEGRATION_TESTS) { await Runtime.instance().shutdown(); } }); + + test.serial(`NativeLogCollector: Buffered logs are periodically flushed even if Core isn't flushing`, async (t) => { + const logEntries: LogEntry[] = []; + + const runtime = Runtime.install({ + logger: new DefaultLogger('DEBUG', (entry) => logEntries.push(entry)), + telemetryOptions: { + // Sets native logger to ERROR level, so that it never flushes + logging: { forward: {}, filter: { core: 'ERROR', other: 'ERROR' } }, + }, + }); + const bufferedLogger = runtime.logger; + + // Hold on to a connection to prevent implicit shutdown of the runtime before we print 'final log' + const connection = await NativeConnection.connect(); + + try { + const taskQueue = `runtime-native-log-collector-preriodically-flushed-${uuid4()}`; + const worker = await Worker.create({ + ...defaultOptions, + connection, + taskQueue, + workflowBundle: await createTestWorkflowBundle({ workflowsPath: __filename }), + }); + + await worker.runUntil(async () => { + await new Client().workflow.execute(log5Times, { taskQueue, workflowId: uuid4() }); + }); + t.true(logEntries.some((x) => x.message.startsWith('workflow log '))); + + // This one will get buffered + bufferedLogger.info('final log'); + t.false(logEntries.some((x) => x.message.startsWith('final log'))); + } finally { + await connection.close(); + await runtime.shutdown(); + } + + // Assert all log messages have been flushed + t.is(logEntries.filter((x) => x.message.startsWith('workflow log ')).length, 5); + t.is(logEntries.filter((x) => x.message.startsWith('final log')).length, 1); + }); +} + +export async function log5Times(): Promise { + for (let i = 0; i < 5; i++) { + wf.log.info(`workflow log ${i}`); + await wf.sleep(1); + } } diff --git a/packages/worker/package.json b/packages/worker/package.json index 5a220c06b..9482c08c0 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -23,7 +23,7 @@ "@temporalio/proto": "file:../proto", "@temporalio/workflow": "file:../workflow", "abort-controller": "^3.0.0", - "heap-js": "^2.3.0", + "heap-js": "^2.6.0", "memfs": "^4.6.0", "nexus-rpc": "^0.0.1", "proto3-json-serializer": "^2.0.0", diff --git a/packages/worker/src/logger.ts b/packages/worker/src/logger.ts index 40288e4a5..affa1a2c9 100644 --- a/packages/worker/src/logger.ts +++ b/packages/worker/src/logger.ts @@ -118,6 +118,7 @@ export function hasColorSupport(logger: Logger): boolean { export interface FlushableLogger extends Logger { flush(): void; + close?(): void; } export function isFlushableLogger(logger: Logger): logger is FlushableLogger { diff --git a/packages/worker/src/runtime-logger.ts b/packages/worker/src/runtime-logger.ts index 66c5eeb53..f8fd60a44 100644 --- a/packages/worker/src/runtime-logger.ts +++ b/packages/worker/src/runtime-logger.ts @@ -1,7 +1,7 @@ import { Heap } from 'heap-js'; import { SdkComponent } from '@temporalio/common'; import { native } from '@temporalio/core-bridge'; -import { DefaultLogger, LogEntry, Logger, LogTimestamp } from './logger'; +import { DefaultLogger, FlushableLogger, LogEntry, Logger, LogTimestamp } from './logger'; /** * A log collector that accepts log entries either through the TS `Logger` interface (e.g. used by @@ -25,10 +25,61 @@ export class NativeLogCollector { protected buffer = new Heap((a, b) => Number(a.timestampNanos - b.timestampNanos)); + /** + * A timer that periodically flushes the buffer to the downstream logger. + */ + protected flushIntervalTimer: NodeJS.Timeout; + + /** + * The minimum time an entry should be buffered before getting flushed. + * + * Increasing this value allows the buffer to do a better job of correctly reordering messages + * emitted from different sources (notably from Workflow executions through Sinks, and from Core) + * based on their absolute timestamps, but also increases latency of logs. + * + * The minimum buffer time requirement only applies as long as the buffer is not full. Once the + * buffer reaches its maximum size, older messages are unconditionally flushed, to prevent + * unbounded growth of the buffer. + * + * TODO(JWH): Is 100ms a reasonable compromise? That might seem a little high on latency, but to + * be useful, that value needs to exceed the time it typically takes to process + * Workflow Activations, let's say above the expected P90, but that's highly variable + * across our user base, and we don't really have field data anyway. + * We can revisit depending on user feedback. + */ + protected readonly minBufferTimeMs = 100; + + /** + * Interval between flush passes checking for expired messages. + * + * This really is redundant, since Core itself is expected to flush its buffer every 10 ms, and + * we're checking for expired messages when it does. However, Core will only flush if it has + * accumulated at least one message; when Core's log level is set to WARN or higher, it may be + * many seconds, and even minutes, between Core's log messages, resulting in very rare flush + * from that end, which cause considerable delay on flushing log messages from other sources. + */ + protected readonly flushPassIntervalMs = 100; + + /** + * The maximum number of log messages to buffer before flushing. + * + * When the buffer reaches this limit, older messages are unconditionally flushed (i.e. without + * regard to the minimum buffer time requirement), to prevent unbounded growth of the buffer. + */ + protected readonly maxBufferSize = 2000; + constructor(downstream: Logger) { - this.logger = new DefaultLogger('TRACE', (entry) => this.buffer.add(entry)); + this.logger = new DefaultLogger('TRACE', this.appendOne.bind(this)); + (this.logger as FlushableLogger).flush = this.flush.bind(this); + (this.logger as FlushableLogger).close = this.close.bind(this); + this.downstream = downstream; this.receive = this.receive.bind(this); + + // Flush matured messages from the buffer every so often. + // Unref'ed so that it doesn't prevent the process from exiting if ever the + // runtime doesn't close the logger properly for whatever reason. + this.flushIntervalTimer = setInterval(this.flushMatured.bind(this), this.flushPassIntervalMs).unref(); } /** @@ -44,7 +95,8 @@ export class NativeLogCollector { this.buffer.add(log); } } - this.flush(); + this.flushExcess(); + this.flushMatured(); } catch (_e) { // We're not allowed to throw from here, and conversion errors have already been handled in // convertFromNativeLogEntry(), so an error at this point almost certainly indicates a problem @@ -52,6 +104,11 @@ export class NativeLogCollector { } } + private appendOne(entry: LogEntry): void { + this.buffer.add(entry); + this.flushExcess(); + } + private convertFromNativeLogEntry(entry: native.JsonString): LogEntry | undefined { try { const log = JSON.parse(entry) as native.LogEntry; @@ -78,15 +135,62 @@ export class NativeLogCollector { } /** - * Flush all buffered logs into the logger supplied to the constructor/ + * Flush messages that have exceeded their required minimal buffering time. */ - flush(): void { + private flushMatured(): void { + const threadholdTimeNanos = BigInt(Date.now() - this.minBufferTimeMs) * 1_000_000n; + for (;;) { + const entry = this.buffer.peek(); + if (!entry || entry.timestampNanos > threadholdTimeNanos) break; + this.buffer.pop(); + + this.downstream.log(entry.level, entry.message, { + [LogTimestamp]: entry.timestampNanos, + ...entry.meta, + }); + } + } + + /** + * Flush messages in excess of the buffer size limit, starting with oldest ones, without regard + * to the `minBufferTimeMs` requirement. This is called every time messages are appended to the + * buffer, to prevent unbounded growth of the buffer when messages are being emitted at high rate. + * + * The only downside of flushing messages before their time is that it increases the probability + * that messages from different sources might end up being passed down to the downstream logger + * in the wrong order; e.g. if an "older" message emitted by the Workflow Logger is received by + * the Collector after we've already flushed a "newer" message emitted by Core. This is totally + * acceptable, and definitely better than a memory leak caused by unbounded growth of the buffer. + */ + private flushExcess(): void { + let excess = this.buffer.size() - this.maxBufferSize; + while (excess-- > 0) { + const entry = this.buffer.pop(); + if (!entry) break; + + this.downstream.log(entry.level, entry.message, { + [LogTimestamp]: entry.timestampNanos, + ...entry.meta, + }); + } + } + + /** + * Flush all messages contained in the buffer, without regard to the `minBufferTimeMs` requirement. + * + * This is called on Runtime and on Worker shutdown. + */ + public flush(): void { for (const entry of this.buffer) { this.downstream.log(entry.level, entry.message, { [LogTimestamp]: entry.timestampNanos, ...entry.meta, }); } - this.buffer.clear(); + } + + public close(): void { + this.flush(); + clearInterval(this.flushIntervalTimer); } } diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index 5597c8268..b92ad017e 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -69,7 +69,7 @@ export interface TelemetryOptions { * ### Log Forwarding * * By default, logs emitted by the native side of the SDK are printed directly to the console, - * _independently of `RuntimeOptions.logger`_. To enable forwarding of those logs messages to the + * _independently of `RuntimeOptions.logger`_. To enable forwarding of those log messages to the * TS side logger, add the `forward` property to the `logging` object. * * For example: @@ -86,10 +86,14 @@ export interface TelemetryOptions { * }); * ``` * - * Note that forwarded log messages are internally throttled/buffered for a few milliseconds to - * reduce overhead incurred by Rust-to-JS calls. In rare cases, this may result in log messages - * appearing out of order by a few milliseconds. Users are discouraged from using log forwarding - * with verboseness sets to `DEBUG` or `TRACE`. + * Note that when log forwarding is enabled, all log messages sent to the runtime logger are + * internally buffered for 100 ms, to allow global sorting of messages from different sources + * based on their absolute timestamps. This helps reduce incoherencies in the order of messages, + * notably those emitted through the Workflow Logging API vs those emitted through Core. + * + * However, in some situations, log messages may still appear out of order, e.g. when a Workflow + * Activation takes longer than 100ms to complete or when log flow exceeds the buffer's capacity + * (2000 messages). */ logging?: LogExporterConfig; diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index c70c7ab8e..49bfc0f5a 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -280,7 +280,9 @@ export class Runtime { this.teardownShutdownHook(); // FIXME(JWH): I think we no longer need this, but will have to thoroughly validate. native.runtimeShutdown(this.native); - this.flushLogs(); + if (isFlushableLogger(this.logger)) { + this.logger.close?.(); + } } finally { delete (this as any).native; }