33
44using System ;
55using System . Text ;
6+ using System . Threading ;
67using System . Threading . Channels ;
78using System . Threading . Tasks ;
89
@@ -15,11 +16,20 @@ internal class ConsoleWriter : IDisposable
1516 // So in the extreme case, this is about 1 second of buffer and should be less than 3MB
1617 private const int DefaultBufferSize = 8000 ;
1718
19+ // Because we read the log lines in batches from the buffer and write them to the console in one go,
20+ // we can influence the latency distribution by controlling how much of the buffer we will process in one pass.
21+ // If we set this to 1, the P50 latency will be low, but the P99 latency will be high.
22+ // If we set this to a large value, it keeps the P99 latency under control but the P50 degrades.
23+ // In local testing with a console attached, processing 1/10th of the buffer size per iteration yields single digit P50 while keeping P99 under 100ms.
24+ private const int SingleWriteBufferDenominator = 10 ;
25+
1826 private static readonly TimeSpan DisposeTimeout = TimeSpan . FromSeconds ( 5 ) ;
1927 private static readonly TimeSpan DefaultConsoleBufferTimeout = TimeSpan . FromSeconds ( 1 ) ;
28+ private readonly ManualResetEvent _writeResetEvent ;
2029 private readonly Channel < string > _consoleBuffer ;
2130 private readonly TimeSpan _consoleBufferTimeout ;
2231 private readonly Action < Exception > _exceptionhandler ;
32+ private readonly int _maxLinesPerWrite ;
2333 private Task _consoleBufferReadLoop ;
2434 private Action < string > _writeEvent ;
2535 private bool _disposed ;
@@ -35,32 +45,29 @@ internal ConsoleWriter(IEnvironment environment, Action<Exception> exceptionHand
3545
3646 if ( consoleEnabled )
3747 {
38- // We are going to used stdout, but do we write directly or use a buffer?
39- _consoleBuffer = environment . GetEnvironmentVariable ( EnvironmentSettingNames . ConsoleLoggingBufferSize ) switch
48+ int maxBufferSize = environment . GetEnvironmentVariable ( EnvironmentSettingNames . ConsoleLoggingBufferSize ) switch
4049 {
41- "-1" => Channel . CreateUnbounded < string > ( new UnboundedChannelOptions ( ) { SingleReader = true , SingleWriter = false } ) , // buffer size of -1 indicates that buffer should be enabled but unbounded
42- "0" => null , // buffer size of 0 indicates that buffer should be disabled
43- var s when int . TryParse ( s , out int i ) && i > 0 => Channel . CreateBounded < string > ( i ) ,
44- _ => Channel . CreateBounded < string > ( new BoundedChannelOptions ( DefaultBufferSize ) { SingleReader = true , SingleWriter = false } ) , // default behavior is to use buffer with default size
50+ var s when int . TryParse ( s , out int i ) && i >= 0 => i ,
51+ var s when int . TryParse ( s , out int i ) && i < 0 => throw new ArgumentOutOfRangeException ( nameof ( EnvironmentSettingNames . ConsoleLoggingBufferSize ) , "Console buffer size cannot be negative" ) ,
52+ _ => DefaultBufferSize ,
4553 } ;
4654
47- if ( _consoleBuffer == null )
55+ if ( maxBufferSize == 0 )
4856 {
57+ // buffer size was set to zero - disable it
4958 _writeEvent = Console . WriteLine ;
5059 }
5160 else
5261 {
62+ _consoleBuffer = Channel . CreateBounded < string > ( new BoundedChannelOptions ( maxBufferSize ) { SingleReader = true , SingleWriter = false } ) ;
5363 _writeEvent = WriteToConsoleBuffer ;
5464 _consoleBufferTimeout = consoleBufferTimeout ;
65+ _writeResetEvent = new ManualResetEvent ( true ) ;
66+ _maxLinesPerWrite = maxBufferSize / SingleWriteBufferDenominator ;
67+
5568 if ( autoStart )
5669 {
57- bool batched = environment . GetEnvironmentVariable ( EnvironmentSettingNames . ConsoleLoggingBufferBatched ) switch
58- {
59- "0" => false , // disable batching by setting to 0
60- _ => true , // default behavior is batched
61- } ;
62-
63- StartProcessingBuffer ( batched ) ;
70+ StartProcessingBuffer ( ) ;
6471 }
6572 }
6673 }
@@ -81,30 +88,60 @@ private void WriteToConsoleBuffer(string evt)
8188 {
8289 if ( _consoleBuffer . Writer . TryWrite ( evt ) == false )
8390 {
84- Console . WriteLine ( evt ) ;
91+ _writeResetEvent . Reset ( ) ;
92+ if ( _writeResetEvent . WaitOne ( _consoleBufferTimeout ) == false || _consoleBuffer . Writer . TryWrite ( evt ) == false )
93+ {
94+ // We have either timed out or the buffer was full again, so just write directly to console
95+ Console . WriteLine ( evt ) ;
96+ }
8597 }
8698 }
8799
88- internal void StartProcessingBuffer ( bool batched )
100+ internal void StartProcessingBuffer ( )
89101 {
90102 // intentional no-op if the task is already running
91103 if ( _consoleBufferReadLoop == null || _consoleBufferReadLoop . IsCompleted )
92104 {
93- _consoleBufferReadLoop = ProcessConsoleBufferAsync ( batched ) ;
105+ _consoleBufferReadLoop = ProcessConsoleBufferAsync ( ) ;
94106 }
95107 }
96108
97- private async Task ProcessConsoleBufferAsync ( bool batched )
109+ private async Task ProcessConsoleBufferAsync ( )
98110 {
99111 try
100112 {
101- if ( batched )
102- {
103- await ProcessConsoleBufferBatchedAsync ( ) ;
104- }
105- else
113+ var builder = new StringBuilder ( ) ;
114+
115+ while ( await _consoleBuffer . Reader . WaitToReadAsync ( ) )
106116 {
107- await ProcessConsoleBufferNonBatchedAsync ( ) ;
117+ if ( _consoleBuffer . Reader . TryRead ( out string line1 ) )
118+ {
119+ _writeResetEvent . Set ( ) ;
120+
121+ // Can we synchronously read multiple lines?
122+ // If yes, use the string builder to batch them together into a single write
123+ // If no, just write the single line without using the builder;
124+ if ( _consoleBuffer . Reader . TryRead ( out string line2 ) )
125+ {
126+ builder . AppendLine ( line1 ) ;
127+ builder . AppendLine ( line2 ) ;
128+ int lines = 2 ;
129+
130+ while ( lines < _maxLinesPerWrite && _consoleBuffer . Reader . TryRead ( out string nextLine ) )
131+ {
132+ builder . AppendLine ( nextLine ) ;
133+ lines ++ ;
134+ }
135+
136+ _writeResetEvent . Set ( ) ;
137+ Console . Write ( builder . ToString ( ) ) ;
138+ builder . Clear ( ) ;
139+ }
140+ else
141+ {
142+ Console . WriteLine ( line1 ) ;
143+ }
144+ }
108145 }
109146 }
110147 catch ( Exception ex )
@@ -120,48 +157,6 @@ private async Task ProcessConsoleBufferAsync(bool batched)
120157 }
121158 }
122159
123- private async Task ProcessConsoleBufferNonBatchedAsync ( )
124- {
125- await foreach ( var line in _consoleBuffer . Reader . ReadAllAsync ( ) )
126- {
127- Console . WriteLine ( line ) ;
128- }
129- }
130-
131- private async Task ProcessConsoleBufferBatchedAsync ( )
132- {
133- var builder = new StringBuilder ( ) ;
134-
135- while ( await _consoleBuffer . Reader . WaitToReadAsync ( ) )
136- {
137- if ( _consoleBuffer . Reader . TryRead ( out string line1 ) )
138- {
139- // Can we synchronously read multiple lines?
140- // If yes, use the string builder to batch them together into a single write
141- // If no, just write the single line without using the builder;
142- if ( _consoleBuffer . Reader . TryRead ( out string line2 ) )
143- {
144- builder . AppendLine ( line1 ) ;
145- builder . AppendLine ( line2 ) ;
146- int lines = 2 ;
147-
148- while ( lines < DefaultBufferSize && _consoleBuffer . Reader . TryRead ( out string nextLine ) )
149- {
150- builder . AppendLine ( nextLine ) ;
151- lines ++ ;
152- }
153-
154- Console . Write ( builder . ToString ( ) ) ;
155- builder . Clear ( ) ;
156- }
157- else
158- {
159- Console . WriteLine ( line1 ) ;
160- }
161- }
162- }
163- }
164-
165160 protected virtual void Dispose ( bool disposing )
166161 {
167162 if ( ! _disposed )
@@ -173,6 +168,8 @@ protected virtual void Dispose(bool disposing)
173168 _consoleBuffer . Writer . TryComplete ( ) ;
174169 _consoleBufferReadLoop . Wait ( DisposeTimeout ) ;
175170 }
171+
172+ _writeResetEvent ? . Dispose ( ) ;
176173 }
177174
178175 _disposed = true ;
0 commit comments