@@ -76,9 +76,10 @@ export class NativeLogCollector {
7676 this . downstream = downstream ;
7777 this . receive = this . receive . bind ( this ) ;
7878
79- // Flush the buffer every so often.
80- // Unref'ed so that it doesn't prevent the process from exiting.
81- this . flushIntervalTimer = setInterval ( this . flushExpired . bind ( this ) , this . flushPassIntervalMs ) . unref ( ) ;
79+ // Flush matured messages from the buffer every so often.
80+ // Unref'ed so that it doesn't prevent the process from exiting if ever the
81+ // runtime doesn't close the logger properly for whatever reason.
82+ this . flushIntervalTimer = setInterval ( this . flushMatured . bind ( this ) , this . flushPassIntervalMs ) . unref ( ) ;
8283 }
8384
8485 /**
@@ -94,8 +95,8 @@ export class NativeLogCollector {
9495 this . buffer . add ( log ) ;
9596 }
9697 }
97- this . flushUnconditionally ( ) ;
98- this . flushExpired ( ) ;
98+ this . flushExcess ( ) ;
99+ this . flushMatured ( ) ;
99100 } catch ( _e ) {
100101 // We're not allowed to throw from here, and conversion errors have already been handled in
101102 // convertFromNativeLogEntry(), so an error at this point almost certainly indicates a problem
@@ -105,7 +106,7 @@ export class NativeLogCollector {
105106
106107 private appendOne ( entry : LogEntry ) : void {
107108 this . buffer . add ( entry ) ;
108- this . flushUnconditionally ( ) ;
109+ this . flushExcess ( ) ;
109110 }
110111
111112 private convertFromNativeLogEntry ( entry : native . JsonString < native . LogEntry > ) : LogEntry | undefined {
@@ -136,7 +137,7 @@ export class NativeLogCollector {
136137 /**
137138 * Flush messages that have exceeded their required minimal buffering time.
138139 */
139- private flushExpired ( ) : void {
140+ private flushMatured ( ) : void {
140141 const threadholdTimeNanos = BigInt ( Date . now ( ) - this . minBufferTimeMs ) * 1_000_000n ;
141142 for ( ; ; ) {
142143 const entry = this . buffer . peek ( ) ;
@@ -151,16 +152,19 @@ export class NativeLogCollector {
151152 }
152153
153154 /**
154- * Flush messages without regard to the time threshold, up to a given number of messages.
155+ * Flush messages in excess of the buffer size limit, starting with oldest ones, without regard
156+ * to the `minBufferTimeMs` requirement. This is called every time messages are appended to the
157+ * buffer, to prevent unbounded growth of the buffer when messages are being emitted at high rate.
155158 *
156- * If no limit is provided, flushes messages in excess to the maximum buffer size.
159+ * The only downside of flushing messages before their time is that it increases the probability
160+ * that messages from different sources might end up being passed down to the downstream logger
161+ * in the wrong order; e.g. if an "older" message emitted by the Workflow Logger is received by
162+ * the Collector after we've already flushed a "newer" message emitted by Core. This is totally
163+ * acceptable, and definitely better than a memory leak caused by unbounded growth of the buffer.
157164 */
158- private flushUnconditionally ( maxFlushCount ?: number ) : void {
159- if ( maxFlushCount === undefined ) {
160- maxFlushCount = this . buffer . size ( ) - this . maxBufferSize ;
161- }
162-
163- while ( maxFlushCount -- > 0 ) {
165+ private flushExcess ( ) : void {
166+ let excess = this . buffer . size ( ) - this . maxBufferSize ;
167+ while ( excess -- > 0 ) {
164168 const entry = this . buffer . pop ( ) ;
165169 if ( ! entry ) break ;
166170
@@ -171,8 +175,18 @@ export class NativeLogCollector {
171175 }
172176 }
173177
178+ /**
179+ * Flush all messages contained in the buffer, without regard to the `minBufferTimeMs` requirement.
180+ *
181+ * This is called on Runtime and on Worker shutdown.
182+ */
174183 public flush ( ) : void {
175- this . flushUnconditionally ( Number . MAX_SAFE_INTEGER ) ;
184+ for ( const entry of this . buffer ) {
185+ this . downstream . log ( entry . level , entry . message , {
186+ [ LogTimestamp ] : entry . timestampNanos ,
187+ ...entry . meta ,
188+ } ) ;
189+ }
176190 }
177191
178192 public close ( ) : void {
0 commit comments