Skip to content

Commit 2d8e7b3

Browse files
committed
Fix worker logs not getting fliushed in Core log level set to ERROR
1 parent 7c88cc7 commit 2d8e7b3

File tree

8 files changed

+174
-25
lines changed

8 files changed

+174
-25
lines changed

package-lock.json

Lines changed: 9 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/test/src/helpers.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ export const bundlerOptions = {
141141
'timers',
142142
'timers/promises',
143143
require.resolve('./activities'),
144+
require.resolve('./mock-native-worker'),
144145
],
145146
};
146147

packages/test/src/test-runtime.ts

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
* Test the lifecycle of the Runtime singleton.
33
* Tests run serially because Runtime is a singleton.
44
*/
5-
import test from 'ava';
65
import { v4 as uuid4 } from 'uuid';
76
import asyncRetry from 'async-retry';
8-
import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString } from '@temporalio/worker';
7+
import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString, NativeConnection } from '@temporalio/worker';
98
import { Client, WorkflowClient } from '@temporalio/client';
9+
import * as wf from '@temporalio/workflow';
1010
import { defaultOptions } from './mock-native-worker';
1111
import * as workflows from './workflows';
12-
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
12+
import { RUN_INTEGRATION_TESTS, Worker, test } from './helpers';
13+
import { createTestWorkflowBundle } from './helpers-integration';
1314

1415
if (RUN_INTEGRATION_TESTS) {
1516
test.serial('Runtime can be created and disposed', async (t) => {
@@ -113,4 +114,53 @@ if (RUN_INTEGRATION_TESTS) {
113114
await Runtime.instance().shutdown();
114115
}
115116
});
117+
118+
test.serial(`NativeLogCollector: Buffered logs are periodically flushed even if Core isn't flushing`, async (t) => {
119+
const logEntries: LogEntry[] = [];
120+
121+
const runtime = Runtime.install({
122+
logger: new DefaultLogger('DEBUG', (entry) => logEntries.push(entry)),
123+
telemetryOptions: {
124+
// Sets native logger to ERROR level, so that it never flushes
125+
logging: { forward: {}, filter: { core: 'ERROR', other: 'ERROR' } },
126+
},
127+
});
128+
const bufferedLogger = runtime.logger;
129+
130+
// Hold on to a connection to prevent implicit shutdown of the runtime before we print 'final log'
131+
const connection = await NativeConnection.connect();
132+
133+
try {
134+
const taskQueue = `runtime-native-log-collector-preriodically-flushed-${uuid4()}`;
135+
const worker = await Worker.create({
136+
...defaultOptions,
137+
connection,
138+
taskQueue,
139+
workflowBundle: await createTestWorkflowBundle({ workflowsPath: __filename }),
140+
});
141+
142+
await worker.runUntil(async () => {
143+
await new Client().workflow.execute(log5Times, { taskQueue, workflowId: uuid4() });
144+
});
145+
t.true(logEntries.some((x) => x.message.startsWith('workflow log ')));
146+
147+
// This one will get buffered
148+
bufferedLogger.info('final log');
149+
t.false(logEntries.some((x) => x.message.startsWith('final log')));
150+
} finally {
151+
await connection.close();
152+
await runtime.shutdown();
153+
}
154+
155+
// Assert all log messages have been flushed
156+
t.is(logEntries.filter((x) => x.message.startsWith('workflow log ')).length, 5);
157+
t.is(logEntries.filter((x) => x.message.startsWith('final log')).length, 1);
158+
});
159+
}
160+
161+
export async function log5Times(): Promise<void> {
162+
for (let i = 0; i < 5; i++) {
163+
wf.log.info(`workflow log ${i}`);
164+
await wf.sleep(1);
165+
}
116166
}

packages/worker/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"@temporalio/proto": "file:../proto",
2424
"@temporalio/workflow": "file:../workflow",
2525
"abort-controller": "^3.0.0",
26-
"heap-js": "^2.3.0",
26+
"heap-js": "^2.6.0",
2727
"memfs": "^4.6.0",
2828
"nexus-rpc": "^0.0.1",
2929
"proto3-json-serializer": "^2.0.0",

packages/worker/src/logger.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ export function hasColorSupport(logger: Logger): boolean {
118118

119119
export interface FlushableLogger extends Logger {
120120
flush(): void;
121+
close?(): void;
121122
}
122123

123124
export function isFlushableLogger(logger: Logger): logger is FlushableLogger {

packages/worker/src/runtime-logger.ts

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Heap } from 'heap-js';
22
import { SdkComponent } from '@temporalio/common';
33
import { native } from '@temporalio/core-bridge';
4-
import { DefaultLogger, LogEntry, Logger, LogTimestamp } from './logger';
4+
import { DefaultLogger, FlushableLogger, LogEntry, Logger, LogTimestamp } from './logger';
55

66
/**
77
* A log collector that accepts log entries either through the TS `Logger` interface (e.g. used by
@@ -25,10 +25,60 @@ export class NativeLogCollector {
2525

2626
protected buffer = new Heap<LogEntry>((a, b) => Number(a.timestampNanos - b.timestampNanos));
2727

28+
/**
29+
* A timer that periodically flushes the buffer to the downstream logger.
30+
*/
31+
protected flushIntervalTimer: NodeJS.Timeout;
32+
33+
/**
34+
* The minimum time an entry should be buffered before getting flushed.
35+
*
36+
* Increasing this value allows the buffer to do a better job of correctly reordering messages
37+
* emitted from different sources (notably from Workflow executions through Sinks, and from Core)
38+
* based on their absolute timestamps, but also increases latency of logs.
39+
*
40+
* The minimum buffer time requirement only applies as long as the buffer is not full. Once the
41+
* buffer reaches its maximum size, older messages are unconditionally flushed, to prevent
42+
* unbounded growth of the buffer.
43+
*
44+
* TODO(JWH): Is 100ms a reasonable compromise? That might seem a little high on latency, but to
45+
* be useful, that value needs to exceed the time it typically takes to process
46+
* Workflow Activations, let's say above the expected P90, but that's highly variable
47+
* across our user base, and we don't really have field data anyway.
48+
* We can revisit depending on user feedback.
49+
*/
50+
protected readonly minBufferTimeMs = 100;
51+
52+
/**
53+
* Interval between flush passes checking for expired messages.
54+
*
55+
* This really is redundant, since Core itself is expected to flush its buffer every 10 ms, and
56+
* we're checking for expired messages when it does. However, Core will only flush if it has
57+
* accumulated at least one message; when Core's log level is set to WARN or higher, it may be
58+
* many seconds, and even minutes, between Core's log messages, resulting in very rare flush
59+
* from that end, which cause considerable delay on flushing log messages from other sources.
60+
*/
61+
protected readonly flushPassIntervalMs = 100;
62+
63+
/**
64+
* The maximum number of log messages to buffer before flushing.
65+
*
66+
* When the buffer reaches this limit, older messages are unconditionally flushed (i.e. without
67+
* regard to the minimum buffer time requirement), to prevent unbounded growth of the buffer.
68+
*/
69+
protected readonly maxBufferSize = 2000;
70+
2871
constructor(downstream: Logger) {
29-
this.logger = new DefaultLogger('TRACE', (entry) => this.buffer.add(entry));
72+
this.logger = new DefaultLogger('TRACE', this.appendOne.bind(this));
73+
(this.logger as FlushableLogger).flush = this.flush.bind(this);
74+
(this.logger as FlushableLogger).close = this.close.bind(this);
75+
3076
this.downstream = downstream;
3177
this.receive = this.receive.bind(this);
78+
79+
// Flush the buffer every so often.
80+
// Unref'ed so that it doesn't prevent the process from exiting.
81+
this.flushIntervalTimer = setInterval(this.flushExpired.bind(this), this.flushPassIntervalMs).unref();
3282
}
3383

3484
/**
@@ -44,14 +94,20 @@ export class NativeLogCollector {
4494
this.buffer.add(log);
4595
}
4696
}
47-
this.flush();
97+
this.flushUnconditionally();
98+
this.flushExpired();
4899
} catch (_e) {
49100
// We're not allowed to throw from here, and conversion errors have already been handled in
50101
// convertFromNativeLogEntry(), so an error at this point almost certainly indicates a problem
51102
// with the downstream logger. Just swallow it, there's really nothing else we can do.
52103
}
53104
}
54105

106+
private appendOne(entry: LogEntry): void {
107+
this.buffer.add(entry);
108+
this.flushUnconditionally();
109+
}
110+
55111
private convertFromNativeLogEntry(entry: native.JsonString<native.LogEntry>): LogEntry | undefined {
56112
try {
57113
const log = JSON.parse(entry) as native.LogEntry;
@@ -78,15 +134,49 @@ export class NativeLogCollector {
78134
}
79135

80136
/**
81-
* Flush all buffered logs into the logger supplied to the constructor/
137+
* Flush messages that have exceeded their required minimal buffering time.
82138
*/
83-
flush(): void {
84-
for (const entry of this.buffer) {
139+
private flushExpired(): void {
140+
const threadholdTimeNanos = BigInt(Date.now() - this.minBufferTimeMs) * 1_000_000n;
141+
for (;;) {
142+
const entry = this.buffer.peek();
143+
if (!entry || entry.timestampNanos > threadholdTimeNanos) break;
144+
this.buffer.pop();
145+
85146
this.downstream.log(entry.level, entry.message, {
86147
[LogTimestamp]: entry.timestampNanos,
87148
...entry.meta,
88149
});
89150
}
90-
this.buffer.clear();
151+
}
152+
153+
/**
154+
* Flush messages without regard to the time threshold, up to a given number of messages.
155+
*
156+
* If no limit is provided, flushes messages in excess to the maximum buffer size.
157+
*/
158+
private flushUnconditionally(maxFlushCount?: number): void {
159+
if (maxFlushCount === undefined) {
160+
maxFlushCount = this.buffer.size() - this.maxBufferSize;
161+
}
162+
163+
while (maxFlushCount-- > 0) {
164+
const entry = this.buffer.pop();
165+
if (!entry) break;
166+
167+
this.downstream.log(entry.level, entry.message, {
168+
[LogTimestamp]: entry.timestampNanos,
169+
...entry.meta,
170+
});
171+
}
172+
}
173+
174+
public flush(): void {
175+
this.flushUnconditionally(Number.MAX_SAFE_INTEGER);
176+
}
177+
178+
public close(): void {
179+
this.flush();
180+
clearInterval(this.flushIntervalTimer);
91181
}
92182
}

packages/worker/src/runtime-options.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export interface TelemetryOptions {
6969
* ### Log Forwarding
7070
*
7171
* By default, logs emitted by the native side of the SDK are printed directly to the console,
72-
* _independently of `RuntimeOptions.logger`_. To enable forwarding of those logs messages to the
72+
* _independently of `RuntimeOptions.logger`_. To enable forwarding of those log messages to the
7373
* TS side logger, add the `forward` property to the `logging` object.
7474
*
7575
* For example:
@@ -86,10 +86,14 @@ export interface TelemetryOptions {
8686
* });
8787
* ```
8888
*
89-
* Note that forwarded log messages are internally throttled/buffered for a few milliseconds to
90-
* reduce overhead incurred by Rust-to-JS calls. In rare cases, this may result in log messages
91-
* appearing out of order by a few milliseconds. Users are discouraged from using log forwarding
92-
* with verboseness sets to `DEBUG` or `TRACE`.
89+
* Note that when log forwarding is enabled, all log messages sent to the runtime logger are
90+
* internally buffered for 100 ms, to allow global sorting of messages from different sources
91+
* based on their absolute timestamps. This helps reduce incoherencies in the order of messages,
92+
* notably those emitted through the Workflow Logging API vs those emitted through Core.
93+
*
94+
* However, in some situations, log messages may still appear out of order, e.g. when a Workflow
95+
* Activation takes longer than 100ms to complete or when log flow exceeds the buffer's capacity
96+
* (2000 messages).
9397
*/
9498
logging?: LogExporterConfig;
9599

packages/worker/src/runtime.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,9 @@ export class Runtime {
280280
this.teardownShutdownHook();
281281
// FIXME(JWH): I think we no longer need this, but will have to thoroughly validate.
282282
native.runtimeShutdown(this.native);
283-
this.flushLogs();
283+
if (isFlushableLogger(this.logger)) {
284+
this.logger.close?.();
285+
}
284286
} finally {
285287
delete (this as any).native;
286288
}

0 commit comments

Comments
 (0)