@@ -70,6 +70,8 @@ public sealed class ForEachObjectCommand : PSCmdlet, IDisposable
70
70
71
71
#endregion
72
72
73
+ #region Common Parameters
74
+
73
75
/// <summary>
74
76
/// This parameter specifies the current pipeline object.
75
77
/// </summary>
@@ -85,6 +87,8 @@ public PSObject InputObject
85
87
86
88
private PSObject _inputObject = AutomationNull . Value ;
87
89
90
+ #endregion
91
+
88
92
#region ScriptBlockSet
89
93
90
94
private List < ScriptBlock > _scripts = new List < ScriptBlock > ( ) ;
@@ -355,6 +359,7 @@ public void Dispose()
355
359
_taskTimer ? . Dispose ( ) ;
356
360
_taskDataStreamWriter ? . Dispose ( ) ;
357
361
_taskPool ? . Dispose ( ) ;
362
+ _taskCollection ? . Dispose ( ) ;
358
363
}
359
364
360
365
#endregion
@@ -368,6 +373,8 @@ public void Dispose()
368
373
private Dictionary < string , object > _usingValuesMap ;
369
374
private Timer _taskTimer ;
370
375
private PSTaskJob _taskJob ;
376
+ private PSDataCollection < System . Management . Automation . PSTasks . PSTask > _taskCollection ;
377
+ private Exception _taskCollectionException ;
371
378
372
379
private void InitParallelParameterSet ( )
373
380
{
@@ -411,6 +418,7 @@ private void InitParallelParameterSet()
411
418
412
419
if ( AsJob )
413
420
{
421
+ // Set up for returning a job object.
414
422
if ( MyInvocation . BoundParameters . ContainsKey ( nameof ( TimeoutSeconds ) ) )
415
423
{
416
424
ThrowTerminatingError (
@@ -424,24 +432,78 @@ private void InitParallelParameterSet()
424
432
_taskJob = new PSTaskJob (
425
433
Parallel . ToString ( ) ,
426
434
ThrottleLimit ) ;
435
+
436
+ return ;
427
437
}
428
- else
438
+
439
+ // Set up for synchronous processing and data streaming.
440
+ _taskCollection = new PSDataCollection < System . Management . Automation . PSTasks . PSTask > ( ) ;
441
+ _taskDataStreamWriter = new PSTaskDataStreamWriter ( this ) ;
442
+ _taskPool = new PSTaskPool ( ThrottleLimit ) ;
443
+ _taskPool . PoolComplete += ( sender , args ) =>
429
444
{
430
- _taskDataStreamWriter = new PSTaskDataStreamWriter ( this ) ;
431
- _taskPool = new PSTaskPool ( ThrottleLimit ) ;
432
- _taskPool . PoolComplete += ( sender , args ) =>
433
- {
434
- _taskDataStreamWriter . Close ( ) ;
435
- } ;
436
- if ( TimeoutSeconds != 0 )
437
- {
438
- _taskTimer = new Timer (
439
- ( _ ) => _taskPool . StopAll ( ) ,
440
- null ,
441
- TimeoutSeconds * 1000 ,
442
- Timeout . Infinite ) ;
443
- }
445
+ _taskDataStreamWriter . Close ( ) ;
446
+ } ;
447
+
448
+ // Create timeout timer if requested.
449
+ if ( TimeoutSeconds != 0 )
450
+ {
451
+ _taskTimer = new Timer (
452
+ callback : ( _ ) => { _taskCollection . Complete ( ) ; _taskPool . StopAll ( ) ; } ,
453
+ state : null ,
454
+ dueTime : TimeoutSeconds * 1000 ,
455
+ period : Timeout . Infinite ) ;
444
456
}
457
+
458
+ // Task collection handler.
459
+ System . Threading . ThreadPool . QueueUserWorkItem (
460
+ ( _ ) =>
461
+ {
462
+ // As piped input are converted to PSTasks and added to the _taskCollection,
463
+ // transfer the task to the _taskPool on this dedicated thread.
464
+ // The _taskPool will block this thread when it is full, and allow more tasks to
465
+ // be added only when a currently running task completes and makes space in the pool.
466
+ // Continue adding any tasks appearing in _taskCollection until the collection is closed.
467
+ while ( true )
468
+ {
469
+ // This handle will unblock the thread when a new task is available or the _taskCollection
470
+ // is closed.
471
+ _taskCollection . WaitHandle . WaitOne ( ) ;
472
+
473
+ // Task collection open state is volatile.
474
+ // Record current task collection open state here, to be checked after processing.
475
+ bool isOpen = _taskCollection . IsOpen ;
476
+
477
+ try
478
+ {
479
+ // Read all tasks in the collection.
480
+ foreach ( var task in _taskCollection . ReadAll ( ) )
481
+ {
482
+ // This _taskPool method will block if the pool is full and will unblock
483
+ // only after a task completes making more space.
484
+ _taskPool . Add ( task ) ;
485
+ }
486
+ }
487
+ catch ( Exception ex )
488
+ {
489
+ // Close the _taskCollection on an unexpected exception so the pool closes and
490
+ // lets any running tasks complete.
491
+ _taskCollection . Complete ( ) ;
492
+ _taskCollectionException = ex ;
493
+ break ;
494
+ }
495
+
496
+ // Loop is exited only when task collection is closed and all task
497
+ // collection tasks are processed.
498
+ if ( ! isOpen )
499
+ {
500
+ break ;
501
+ }
502
+ }
503
+
504
+ // We are done adding tasks and can close the task pool.
505
+ _taskPool . Close ( ) ;
506
+ } ) ;
445
507
}
446
508
447
509
private void ProcessParallelParameterSet ( )
@@ -461,53 +523,77 @@ private void ProcessParallelParameterSet()
461
523
462
524
if ( AsJob )
463
525
{
526
+ // Add child task job.
464
527
var taskChildJob = new PSTaskChildJob (
465
528
Parallel ,
466
529
_usingValuesMap ,
467
530
InputObject ) ;
468
531
469
532
_taskJob . AddJob ( taskChildJob ) ;
533
+
534
+ return ;
470
535
}
471
- else
472
- {
473
- // Write any streaming data
474
- _taskDataStreamWriter . WriteImmediate ( ) ;
475
536
476
- var task = new System . Management . Automation . PSTasks . PSTask (
477
- Parallel ,
478
- _usingValuesMap ,
479
- InputObject ,
480
- _taskDataStreamWriter ) ;
537
+ // Write any streaming data
538
+ _taskDataStreamWriter . WriteImmediate ( ) ;
481
539
482
- // Add task to task pool.
483
- // Block if the pool is full and wait until task can be added.
484
- _taskPool . Add ( task , _taskDataStreamWriter ) ;
540
+ // Add to task collection for processing.
541
+ if ( _taskCollection . IsOpen )
542
+ {
543
+ try
544
+ {
545
+ // Create a PSTask based on this piped input and add it to the task collection.
546
+ // A dedicated thread will add it to the PSTask pool in a performant manner.
547
+ _taskCollection . Add (
548
+ new System . Management . Automation . PSTasks . PSTask (
549
+ Parallel ,
550
+ _usingValuesMap ,
551
+ InputObject ,
552
+ _taskDataStreamWriter ) ) ;
553
+ }
554
+ catch ( InvalidOperationException )
555
+ {
556
+ // This exception is thrown if the task collection is closed, which should not happen.
557
+ Dbg . Assert ( false , "Should not add to a closed PSTask collection" ) ;
558
+ }
485
559
}
486
560
}
487
561
488
562
private void EndParallelParameterSet ( )
489
563
{
490
564
if ( AsJob )
491
565
{
566
+ // Start and return parent job object.
492
567
_taskJob . Start ( ) ;
493
568
JobRepository . Add ( _taskJob ) ;
494
569
WriteObject ( _taskJob ) ;
570
+
571
+ return ;
495
572
}
496
- else
497
- {
498
- _taskDataStreamWriter . WriteImmediate ( ) ;
573
+
574
+ // Close task collection and wait for processing to complete while streaming data.
575
+ _taskDataStreamWriter . WriteImmediate ( ) ;
576
+ _taskCollection . Complete ( ) ;
577
+ _taskDataStreamWriter . WaitAndWrite ( ) ;
499
578
500
- _taskPool . Close ( ) ;
501
- _taskDataStreamWriter . WaitAndWrite ( ) ;
579
+ // Check for an unexpected error from the _taskCollection handler thread and report here.
580
+ var ex = _taskCollectionException ;
581
+ if ( ex != null )
582
+ {
583
+ var msg = string . Format ( CultureInfo . InvariantCulture , InternalCommandStrings . ParallelPipedInputProcessingError , ex ) ;
584
+ WriteError (
585
+ new ErrorRecord (
586
+ exception : new InvalidOperationException ( msg ) ,
587
+ errorId : "ParallelPipedInputProcessingError" ,
588
+ errorCategory : ErrorCategory . InvalidOperation ,
589
+ targetObject : this ) ) ;
502
590
}
503
591
}
504
592
505
593
private void StopParallelProcessing ( )
506
594
{
507
- if ( ! AsJob )
508
- {
509
- _taskPool . StopAll ( ) ;
510
- }
595
+ _taskCollection ? . Complete ( ) ;
596
+ _taskPool ? . StopAll ( ) ;
511
597
}
512
598
513
599
#endregion
0 commit comments