@@ -26,6 +26,8 @@ internal class DiskWriterQueue : IDisposable
2626
2727 private ConcurrentQueue < PageBuffer > _queue = new ConcurrentQueue < PageBuffer > ( ) ;
2828
29+ private int _running = 0 ;
30+
2931 public DiskWriterQueue ( Stream stream )
3032 {
3133 _stream = stream ;
@@ -54,10 +56,15 @@ public void Run()
5456 {
5557 lock ( _queue )
5658 {
57- if ( _queue . Count > 0 && ( _task == null || _task . IsCompleted ) )
59+ if ( _queue . Count == 0 ) return ;
60+
61+ var oldValue = Interlocked . CompareExchange ( ref _running , 1 , 0 ) ;
62+
63+ if ( oldValue == 0 )
5864 {
65+ // Schedule a new thread to process the pages in the queue.
5966 // https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
60- _task = Task . Run ( this . ExecuteQueue ) ;
67+ _task = Task . Run ( ExecuteQueue ) ;
6168 }
6269 }
6370 }
@@ -74,10 +81,7 @@ public void Wait()
7481 _task . Wait ( ) ;
7582 }
7683
77- if ( _queue . Count > 0 )
78- {
79- this . ExecuteQueue ( ) ;
80- }
84+ Run ( ) ;
8185 }
8286
8387 ENSURE ( _queue . Count == 0 , "queue should be empty after wait() call" ) ;
@@ -90,32 +94,50 @@ private void ExecuteQueue()
9094 {
9195 if ( _queue . Count == 0 ) return ;
9296
93- var count = 0 ;
94-
95- try
97+ do
9698 {
97- while ( _queue . TryDequeue ( out var page ) )
99+ if ( _queue . TryDequeue ( out var page ) )
98100 {
99- ENSURE ( page . ShareCounter > 0 , "page must be shared at least 1" ) ;
101+ WritePageToStream ( page ) ;
102+ }
100103
101- // set stream position according to page
102- _stream . Position = page . Position ;
104+ while ( page == null )
105+ {
106+ _stream . FlushToDisk ( ) ;
107+ Volatile . Write ( ref _running , 0 ) ;
103108
104- _stream . Write ( page . Array , page . Offset , PAGE_SIZE ) ;
109+ if ( ! _queue . Any ( ) ) return ;
105110
106- // release page here (no page use after this)
107- page . Release ( ) ;
111+ // Another item was added to the queue after we detected it was empty.
112+ var oldValue = Interlocked . CompareExchange ( ref _running , 1 , 0 ) ;
108113
109- count ++ ;
114+ if ( oldValue == 1 )
115+ {
116+ // A new thread was already scheduled for execution, this thread can return.
117+ return ;
118+ }
119+
120+ // This thread will continue to process the queue as a new thread was not scheduled.
121+ _queue . TryDequeue ( out page ) ;
122+ WritePageToStream ( page ) ;
110123 }
111124
112- // after this I will have 100% sure data are safe on log file
113- _stream . FlushToDisk ( ) ;
114- }
115- catch ( IOException )
116- {
117- //TODO: notify database to stop working (throw error in all operations)
118- }
125+ } while ( true ) ;
126+ }
127+
128+ private void WritePageToStream ( PageBuffer page )
129+ {
130+ if ( page == null ) return ;
131+
132+ ENSURE ( page . ShareCounter > 0 , "page must be shared at least 1" ) ;
133+
134+ // set stream position according to page
135+ _stream . Position = page . Position ;
136+
137+ _stream . Write ( page . Array , page . Offset , PAGE_SIZE ) ;
138+
139+ // release page here (no page use after this)
140+ page . Release ( ) ;
119141 }
120142
121143 public void Dispose ( )
0 commit comments