@@ -161,29 +161,22 @@ protected void ShutdownAllNodes(bool nodeReuse, NodeContextTerminateDelegate ter
161161 int timeout = 30 ;
162162
163163 // Attempt to connect to the process with the handshake without low priority.
164- using Stream nodeStream = TryConnectToProcess ( nodeProcess . Id , timeout , NodeProviderOutOfProc . GetHandshake ( nodeReuse , false ) ) ;
165- if ( nodeStream != null )
166- {
167- ShutdownNode ( terminateNode , factory , nodeProcess , nodeStream ) ;
168- }
169- else
164+ Stream nodeStream = TryConnectToProcess ( nodeProcess . Id , timeout , NodeProviderOutOfProc . GetHandshake ( nodeReuse , false ) ) ;
165+
166+ if ( nodeStream == null )
170167 {
171168 // If we couldn't connect attempt to connect to the process with the handshake including low priority.
172- using Stream lowPriorityConnection = TryConnectToProcess ( nodeProcess . Id , timeout , NodeProviderOutOfProc . GetHandshake ( nodeReuse , true ) ) ;
173-
174- if ( lowPriorityConnection != null )
175- {
176- ShutdownNode ( terminateNode , factory , nodeProcess , lowPriorityConnection ) ;
177- }
169+ nodeStream = TryConnectToProcess ( nodeProcess . Id , timeout , NodeProviderOutOfProc . GetHandshake ( nodeReuse , true ) ) ;
178170 }
179- }
180171
181- static void ShutdownNode ( NodeContextTerminateDelegate terminateNode , INodePacketFactory factory , Process nodeProcess , Stream nodeStream )
182- {
183- // If we're able to connect to such a process, send a packet requesting its termination
184- CommunicationsUtilities . Trace ( "Shutting down node with pid = {0}" , nodeProcess . Id ) ;
185- NodeContext nodeContext = new NodeContext ( 0 , nodeProcess , nodeStream , factory , terminateNode ) ;
186- nodeContext . SendData ( new NodeBuildComplete ( false /* no node reuse */ ) ) ;
172+ if ( nodeStream != null )
173+ {
174+ // If we're able to connect to such a process, send a packet requesting its termination
175+ CommunicationsUtilities . Trace ( "Shutting down node with pid = {0}" , nodeProcess . Id ) ;
176+ NodeContext nodeContext = new NodeContext ( 0 , nodeProcess , nodeStream , factory , terminateNode ) ;
177+ nodeContext . SendData ( new NodeBuildComplete ( false /* no node reuse */ ) ) ;
178+ nodeStream . Dispose ( ) ;
179+ }
187180 }
188181 }
189182
@@ -607,14 +600,19 @@ private enum ExitPacketState
607600 private readonly NodeContextTerminateDelegate _terminateDelegate ;
608601
609602 /// <summary>
610- /// A task representing the work to consume enqueued packets.
603+ /// A dedicated thread to consume enqueued packets.
604+ /// </summary>
605+ private readonly Thread _drainPacketQueueThread ;
606+
607+ /// <summary>
608+ /// Used to signal the consuming thread that a packet has been enqueued;
611609 /// </summary>
612- private readonly Task _packetWriteDrainTask ;
610+ private readonly AutoResetEvent _packetEnqueued ;
613611
614612 /// <summary>
615- /// Used to signal the consuming task that a packet has been enqueued .
613+ /// Used to signal that the exit packet has been sent and we no longer need to wait for the queue to drain .
616614 /// </summary>
617- private readonly SemaphoreSlim _packetEnqueued ;
615+ private readonly CancellationTokenSource _packetQueueDrainDelayCancellation ;
618616
619617 /// <summary>
620618 /// Tracks the state of the packet sent to terminate the node.
@@ -650,9 +648,13 @@ public NodeContext(int nodeId, Process process,
650648#endif
651649
652650 _packetWriteQueue = new ConcurrentQueue < INodePacket > ( ) ;
653- _packetEnqueued = new SemaphoreSlim ( 0 , 1 ) ;
651+ _packetEnqueued = new AutoResetEvent ( false ) ;
652+ _packetQueueDrainDelayCancellation = new CancellationTokenSource ( ) ;
654653
655- _packetWriteDrainTask = DrainPacketQueue ( ) ;
654+ // specify the smallest stack size - 64kb
655+ _drainPacketQueueThread = new Thread ( DrainPacketQueue , 64 * 1024 ) ;
656+ _drainPacketQueueThread . IsBackground = true ;
657+ _drainPacketQueueThread . Start ( this ) ;
656658 }
657659
658660 /// <summary>
@@ -758,35 +760,30 @@ public void SendData(INodePacket packet)
758760 {
759761 _exitPacketState = ExitPacketState . ExitPacketQueued ;
760762 }
761-
762763 _packetWriteQueue . Enqueue ( packet ) ;
763-
764- if ( _packetEnqueued . CurrentCount == 0 )
765- {
766- // If the semaphore is not already signaled, signal it to wake up the draining task.
767- _packetEnqueued . Release ( ) ;
768- }
764+ _packetEnqueued . Set ( ) ;
769765 }
770766
771767 /// <summary>
772- /// Use a threadpool thread to drain the queue and be careful not to block it where possible .
768+ /// We use a dedicated thread to avoid blocking a threadpool thread .
773769 /// </summary>
774770 /// <remarks>Usually there'll be a single packet in the queue, but sometimes
775771 /// a burst of SendData comes in, with 10-20 packets scheduled.</remarks>
776- private async Task DrainPacketQueue ( )
772+ private void DrainPacketQueue ( object state )
777773 {
778- MemoryStream writeStream = _writeBufferMemoryStream ;
779- Stream serverToClientStream = _pipeStream ;
780- ITranslator writeTranslator = _writeTranslator ;
774+ NodeContext context = ( NodeContext ) state ;
775+ MemoryStream writeStream = context . _writeBufferMemoryStream ;
776+ Stream serverToClientStream = context . _pipeStream ;
781777
782778 while ( true )
783779 {
784- await _packetEnqueued . WaitAsync ( ) ;
785- while ( _packetWriteQueue . TryDequeue ( out INodePacket packet ) )
780+ context . _packetEnqueued . WaitOne ( ) ;
781+ while ( context . _packetWriteQueue . TryDequeue ( out INodePacket packet ) )
786782 {
787783 // clear the buffer but keep the underlying capacity to avoid reallocations
788784 writeStream . SetLength ( 0 ) ;
789785
786+ ITranslator writeTranslator = context . _writeTranslator ;
790787 try
791788 {
792789 writeStream . WriteByte ( ( byte ) packet . Type ) ;
@@ -806,29 +803,22 @@ private async Task DrainPacketQueue()
806803 for ( int i = 0 ; i < writeStreamLength ; i += MaxPacketWriteSize )
807804 {
808805 int lengthToWrite = Math . Min ( writeStreamLength - i , MaxPacketWriteSize ) ;
809- #if NET
810- await serverToClientStream . WriteAsync ( writeStreamBuffer . AsMemory ( i , lengthToWrite ) ) ;
811- #else
812- await serverToClientStream . WriteAsync ( writeStreamBuffer , i , lengthToWrite ) ;
813- #endif
806+
807+ serverToClientStream . Write ( writeStreamBuffer , i , lengthToWrite ) ;
814808 }
815809
816810 if ( IsExitPacket ( packet ) )
817811 {
818- _exitPacketState = ExitPacketState . ExitPacketSent ;
812+ context . _exitPacketState = ExitPacketState . ExitPacketSent ;
813+ context . _packetQueueDrainDelayCancellation . Cancel ( ) ;
819814
820815 return ;
821816 }
822-
823- if ( packet is NodeBuildComplete )
824- {
825- return ;
826- }
827817 }
828818 catch ( IOException e )
829819 {
830820 // Do nothing here because any exception will be caught by the async read handler
831- CommunicationsUtilities . Trace ( _nodeId , "EXCEPTION in SendData: {0}" , e ) ;
821+ CommunicationsUtilities . Trace ( context . _nodeId , "EXCEPTION in SendData: {0}" , e ) ;
832822 }
833823 catch ( ObjectDisposedException ) // This happens if a child dies unexpectedly
834824 {
@@ -860,7 +850,6 @@ private static void WriteInt32(MemoryStream stream, int value)
860850 private void Close ( )
861851 {
862852 _pipeStream . Dispose ( ) ;
863- _packetEnqueued . Dispose ( ) ;
864853 _terminateDelegate ( _nodeId ) ;
865854 }
866855
@@ -874,16 +863,14 @@ public async Task WaitForExitAsync(ILoggingService loggingService)
874863 // Wait up to 100ms until all remaining packets are sent.
875864 // We don't need to wait long, just long enough for the Task to start running on the ThreadPool.
876865#if NET
877- await _packetWriteDrainTask . WaitAsync ( TimeSpan . FromMilliseconds ( 100 ) ) . ConfigureAwait ( ConfigureAwaitOptions . SuppressThrowing ) ;
866+ await Task . Delay ( 100 , _packetQueueDrainDelayCancellation . Token ) . ConfigureAwait ( ConfigureAwaitOptions . SuppressThrowing ) ;
878867#else
879- using ( var cts = new CancellationTokenSource ( 100 ) )
880- {
881- await Task . WhenAny ( _packetWriteDrainTask , Task . Delay ( 100 , cts . Token ) ) ;
882- cts . Cancel ( ) ;
883- }
868+ await Task . WhenAny ( Task . Delay ( 100 , _packetQueueDrainDelayCancellation . Token ) ) ;
884869#endif
885870 }
886871
872+ _packetQueueDrainDelayCancellation ? . Dispose ( ) ;
873+
887874 if ( _exitPacketState == ExitPacketState . ExitPacketSent )
888875 {
889876 CommunicationsUtilities . Trace ( "Waiting for node with pid = {0} to exit" , _process . Id ) ;
0 commit comments