Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/test/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ export const bundlerOptions = {
'timers',
'timers/promises',
require.resolve('./activities'),
require.resolve('./mock-native-worker'),
],
};

Expand Down
56 changes: 53 additions & 3 deletions packages/test/src/test-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
* Test the lifecycle of the Runtime singleton.
* Tests run serially because Runtime is a singleton.
*/
import test from 'ava';
import { v4 as uuid4 } from 'uuid';
import asyncRetry from 'async-retry';
import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString } from '@temporalio/worker';
import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString, NativeConnection } from '@temporalio/worker';
import { Client, WorkflowClient } from '@temporalio/client';
import * as wf from '@temporalio/workflow';
import { defaultOptions } from './mock-native-worker';
import * as workflows from './workflows';
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
import { RUN_INTEGRATION_TESTS, Worker, test } from './helpers';
import { createTestWorkflowBundle } from './helpers-integration';

if (RUN_INTEGRATION_TESTS) {
test.serial('Runtime can be created and disposed', async (t) => {
Expand Down Expand Up @@ -113,4 +114,53 @@ if (RUN_INTEGRATION_TESTS) {
await Runtime.instance().shutdown();
}
});

test.serial(`NativeLogCollector: Buffered logs are periodically flushed even if Core isn't flushing`, async (t) => {
const logEntries: LogEntry[] = [];

const runtime = Runtime.install({
logger: new DefaultLogger('DEBUG', (entry) => logEntries.push(entry)),
telemetryOptions: {
// Sets native logger to ERROR level, so that it never flushes
logging: { forward: {}, filter: { core: 'ERROR', other: 'ERROR' } },
},
});
const bufferedLogger = runtime.logger;

// Hold on to a connection to prevent implicit shutdown of the runtime before we print 'final log'
const connection = await NativeConnection.connect();

try {
const taskQueue = `runtime-native-log-collector-preriodically-flushed-${uuid4()}`;
const worker = await Worker.create({
...defaultOptions,
connection,
taskQueue,
workflowBundle: await createTestWorkflowBundle({ workflowsPath: __filename }),
});

await worker.runUntil(async () => {
await new Client().workflow.execute(log5Times, { taskQueue, workflowId: uuid4() });
});
t.true(logEntries.some((x) => x.message.startsWith('workflow log ')));

// This one will get buffered
bufferedLogger.info('final log');
t.false(logEntries.some((x) => x.message.startsWith('final log')));
} finally {
await connection.close();
await runtime.shutdown();
}

// Assert all log messages have been flushed
t.is(logEntries.filter((x) => x.message.startsWith('workflow log ')).length, 5);
t.is(logEntries.filter((x) => x.message.startsWith('final log')).length, 1);
});
}

export async function log5Times(): Promise<void> {
for (let i = 0; i < 5; i++) {
wf.log.info(`workflow log ${i}`);
await wf.sleep(1);
}
}
2 changes: 1 addition & 1 deletion packages/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"@temporalio/proto": "file:../proto",
"@temporalio/workflow": "file:../workflow",
"abort-controller": "^3.0.0",
"heap-js": "^2.3.0",
"heap-js": "^2.6.0",
"memfs": "^4.6.0",
"nexus-rpc": "^0.0.1",
"proto3-json-serializer": "^2.0.0",
Expand Down
1 change: 1 addition & 0 deletions packages/worker/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export function hasColorSupport(logger: Logger): boolean {

export interface FlushableLogger extends Logger {
flush(): void;
close?(): void;
}

export function isFlushableLogger(logger: Logger): logger is FlushableLogger {
Expand Down
116 changes: 110 additions & 6 deletions packages/worker/src/runtime-logger.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Heap } from 'heap-js';
import { SdkComponent } from '@temporalio/common';
import { native } from '@temporalio/core-bridge';
import { DefaultLogger, LogEntry, Logger, LogTimestamp } from './logger';
import { DefaultLogger, FlushableLogger, LogEntry, Logger, LogTimestamp } from './logger';

/**
* A log collector that accepts log entries either through the TS `Logger` interface (e.g. used by
Expand All @@ -25,10 +25,61 @@ export class NativeLogCollector {

protected buffer = new Heap<LogEntry>((a, b) => Number(a.timestampNanos - b.timestampNanos));

/**
* A timer that periodically flushes the buffer to the downstream logger.
*/
protected flushIntervalTimer: NodeJS.Timeout;

/**
* The minimum time an entry should be buffered before getting flushed.
*
* Increasing this value allows the buffer to do a better job of correctly reordering messages
* emitted from different sources (notably from Workflow executions through Sinks, and from Core)
* based on their absolute timestamps, but also increases latency of logs.
*
* The minimum buffer time requirement only applies as long as the buffer is not full. Once the
* buffer reaches its maximum size, older messages are unconditionally flushed, to prevent
* unbounded growth of the buffer.
*
* TODO(JWH): Is 100ms a reasonable compromise? That might seem a little high on latency, but to
* be useful, that value needs to exceed the time it typically takes to process
* Workflow Activations, let's say above the expected P90, but that's highly variable
* across our user base, and we don't really have field data anyway.
* We can revisit depending on user feedback.
*/
protected readonly minBufferTimeMs = 100;

/**
* Interval between flush passes checking for expired messages.
*
* This really is redundant, since Core itself is expected to flush its buffer every 10 ms, and
* we're checking for expired messages when it does. However, Core will only flush if it has
* accumulated at least one message; when Core's log level is set to WARN or higher, it may be
* many seconds, and even minutes, between Core's log messages, resulting in very rare flush
* from that end, which cause considerable delay on flushing log messages from other sources.
*/
protected readonly flushPassIntervalMs = 100;

/**
* The maximum number of log messages to buffer before flushing.
*
* When the buffer reaches this limit, older messages are unconditionally flushed (i.e. without
* regard to the minimum buffer time requirement), to prevent unbounded growth of the buffer.
*/
protected readonly maxBufferSize = 2000;

constructor(downstream: Logger) {
this.logger = new DefaultLogger('TRACE', (entry) => this.buffer.add(entry));
this.logger = new DefaultLogger('TRACE', this.appendOne.bind(this));
(this.logger as FlushableLogger).flush = this.flush.bind(this);
(this.logger as FlushableLogger).close = this.close.bind(this);

this.downstream = downstream;
this.receive = this.receive.bind(this);

// Flush matured messages from the buffer every so often.
// Unref'ed so that it doesn't prevent the process from exiting if ever the
// runtime doesn't close the logger properly for whatever reason.
this.flushIntervalTimer = setInterval(this.flushMatured.bind(this), this.flushPassIntervalMs).unref();
}

/**
Expand All @@ -44,14 +95,20 @@ export class NativeLogCollector {
this.buffer.add(log);
}
}
this.flush();
this.flushExcess();
this.flushMatured();
} catch (_e) {
// We're not allowed to throw from here, and conversion errors have already been handled in
// convertFromNativeLogEntry(), so an error at this point almost certainly indicates a problem
// with the downstream logger. Just swallow it, there's really nothing else we can do.
}
}

private appendOne(entry: LogEntry): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit of a nit, but would prefer a better name here (i.e. appendOneAndFlush)
it's not clear unless you read the method that you are also flushing

this.buffer.add(entry);
this.flushExcess();
}

private convertFromNativeLogEntry(entry: native.JsonString<native.LogEntry>): LogEntry | undefined {
try {
const log = JSON.parse(entry) as native.LogEntry;
Expand All @@ -78,15 +135,62 @@ export class NativeLogCollector {
}

/**
* Flush all buffered logs into the logger supplied to the constructor/
* Flush messages that have exceeded their required minimal buffering time.
*/
flush(): void {
private flushMatured(): void {
const threadholdTimeNanos = BigInt(Date.now() - this.minBufferTimeMs) * 1_000_000n;
for (;;) {
const entry = this.buffer.peek();
if (!entry || entry.timestampNanos > threadholdTimeNanos) break;
this.buffer.pop();

this.downstream.log(entry.level, entry.message, {
[LogTimestamp]: entry.timestampNanos,
...entry.meta,
});
}
}

/**
* Flush messages in excess of the buffer size limit, starting with oldest ones, without regard
* to the `minBufferTimeMs` requirement. This is called every time messages are appended to the
* buffer, to prevent unbounded growth of the buffer when messages are being emitted at high rate.
*
* The only downside of flushing messages before their time is that it increases the probability
* that messages from different sources might end up being passed down to the downstream logger
* in the wrong order; e.g. if an "older" message emitted by the Workflow Logger is received by
* the Collector after we've already flushed a "newer" message emitted by Core. This is totally
* acceptable, and definitely better than a memory leak caused by unbounded growth of the buffer.
*/
private flushExcess(): void {
let excess = this.buffer.size() - this.maxBufferSize;
while (excess-- > 0) {
const entry = this.buffer.pop();
if (!entry) break;

this.downstream.log(entry.level, entry.message, {
[LogTimestamp]: entry.timestampNanos,
...entry.meta,
});
}
}

/**
* Flush all messages contained in the buffer, without regard to the `minBufferTimeMs` requirement.
*
* This is called on Runtime and on Worker shutdown.
*/
public flush(): void {
for (const entry of this.buffer) {
this.downstream.log(entry.level, entry.message, {
[LogTimestamp]: entry.timestampNanos,
...entry.meta,
});
}
this.buffer.clear();
}

public close(): void {
this.flush();
clearInterval(this.flushIntervalTimer);
}
}
14 changes: 9 additions & 5 deletions packages/worker/src/runtime-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export interface TelemetryOptions {
* ### Log Forwarding
*
* By default, logs emitted by the native side of the SDK are printed directly to the console,
* _independently of `RuntimeOptions.logger`_. To enable forwarding of those logs messages to the
* _independently of `RuntimeOptions.logger`_. To enable forwarding of those log messages to the
* TS side logger, add the `forward` property to the `logging` object.
*
* For example:
Expand All @@ -86,10 +86,14 @@ export interface TelemetryOptions {
* });
* ```
*
* Note that forwarded log messages are internally throttled/buffered for a few milliseconds to
* reduce overhead incurred by Rust-to-JS calls. In rare cases, this may result in log messages
* appearing out of order by a few milliseconds. Users are discouraged from using log forwarding
* with verboseness sets to `DEBUG` or `TRACE`.
* Note that when log forwarding is enabled, all log messages sent to the runtime logger are
* internally buffered for 100 ms, to allow global sorting of messages from different sources
* based on their absolute timestamps. This helps reduce incoherencies in the order of messages,
* notably those emitted through the Workflow Logging API vs those emitted through Core.
*
* However, in some situations, log messages may still appear out of order, e.g. when a Workflow
* Activation takes longer than 100ms to complete or when log flow exceeds the buffer's capacity
* (2000 messages).
*/
logging?: LogExporterConfig;

Expand Down
4 changes: 3 additions & 1 deletion packages/worker/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ export class Runtime {
this.teardownShutdownHook();
// FIXME(JWH): I think we no longer need this, but will have to thoroughly validate.
native.runtimeShutdown(this.native);
this.flushLogs();
if (isFlushableLogger(this.logger)) {
this.logger.close?.();
}
} finally {
delete (this as any).native;
}
Expand Down
Loading