Skip to content

Commit 617c4d2

Browse files
Changes to Async classes (#16)
- Fix creating async delegates with different return types - Add ability to access delegate argument from $args, $PSItem, or the parameter name from the delegates invoke method. - New instances of PSTaskFactory now have their own runspace pool - Await is no longer blocking
1 parent 88dc0d7 commit 617c4d2

File tree

1 file changed

+180
-33
lines changed

1 file changed

+180
-33
lines changed

module/Classes/Async.ps1

Lines changed: 180 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,84 @@ using namespace System.Collections.ObjectModel
33
using namespace System.Linq.Expressions
44
using namespace System.Management.Automation
55
using namespace System.Management.Automation.Runspaces
6+
using namespace System.Threading
67
using namespace System.Threading.Tasks
78

89
# Static class that facilitates the use of traditional .NET async techniques in PowerShell.
9-
class AsyncOps
10-
{
10+
class AsyncOps {
1111
static [PSTaskFactory] $Factory = [PSTaskFactory]::new();
12-
static [RunspacePool] $RunspacePool;
1312

14-
# Hides the result property on a Task object. This is done because the getter for Result waits
15-
# for the task to finish, even if just output to the console.
16-
static [Task] HideResult([Task] $target) {
13+
static [Task] ContinueWithCodeMethod([psobject] $instance, [scriptblock] $continuationAction) {
14+
$delegate = [AsyncOps]::CreateAsyncDelegate(
15+
$continuationAction,
16+
[Action[Task[Collection[psobject]]]])
17+
18+
return [AsyncOps]::PrepareTask($instance.psadapted.ContinueWith($delegate))
19+
}
20+
21+
# - Hides the result property on a Task object. This is done because the getter for Result waits
22+
# for the task to finish, even if just output to the console.
23+
# - Adds ContinueWith code method that wraps scriptblocks with CreateAsyncDelegate
24+
static [Task] PrepareTask([Task] $target) {
1725
$propertyList = $target.psobject.Properties.Name -notmatch 'Result' -as [string[]]
1826
$propertySet = [PSPropertySet]::new('DefaultDisplayPropertySet', $propertyList) -as [PSMemberInfo[]]
1927
$standardMembers = [PSMemberSet]::new('PSStandardMembers', $propertySet)
2028

2129
$target.psobject.Members.Add($standardMembers)
2230

31+
$target.psobject.Methods.Add(
32+
[PSCodeMethod]::new(
33+
'ContinueWith',
34+
[AsyncOps].GetMethod('ContinueWithCodeMethod')))
35+
2336
return $target
2437
}
2538

26-
# Create a delegate from a scriptblock that can be used in threads without runspaces, like those
27-
# used in Tasks or AsyncCallbacks.
2839
static [MulticastDelegate] CreateAsyncDelegate([scriptblock] $function, [type] $delegateType) {
29-
# Create a runspace pool the first time this method is invoked.
30-
if (-not [AsyncOps]::RunspacePool) {
31-
[AsyncOps]::RunspacePool = [runspacefactory]::CreateRunspacePool(1, 4)
32-
[AsyncOps]::RunspacePool.Open()
33-
}
40+
return [AsyncOps]::CreateAsyncDelegate($function, $delegateType, [AsyncOps]::Factory)
41+
}
3442

43+
# Create a delegate from a scriptblock that can be used in threads without runspaces, like those
44+
# used in Tasks or AsyncCallbacks.
45+
static [MulticastDelegate] CreateAsyncDelegate([scriptblock] $function, [type] $delegateType, [PSTaskFactory] $factory) {
46+
$invokeMethod = $delegateType.GetMethod('Invoke')
47+
$returnType = $invokeMethod.ReturnType
3548
# Create a parameter expression for each parameter the delegate takes.
36-
$parameters = $delegateType.
37-
GetMethod('Invoke').
49+
$parameters = $invokeMethod.
3850
GetParameters().
3951
ForEach{ [Expression]::Parameter($PSItem.ParameterType, $PSItem.Name) }
4052

41-
# Set AsyncState variable that will hold delegate arguments and/or state.
42-
$preparedScript = 'param($AsyncState) . {{ {0} }}' -f $function
53+
$scriptParameters = [string]::Empty
54+
if ($parameters) {
55+
$scriptParameters = '$' + ($invokeMethod.GetParameters().Name -join ', $')
56+
}
57+
58+
# Allow access to parameters in the following ways:
59+
# - By the name given to them by the delegate's invoke method
60+
# - $args
61+
# - $PSItem/$_ (first parameter only)
62+
$preparedScript =
63+
'param({0}) process {{ return {{ {1} }}.InvokeReturnAsIs($PSBoundParameters.Values) }}' -f
64+
$scriptParameters,
65+
$function
4366

4467
# Prepare variable and constant expressions.
45-
$pool = [Expression]::Property($null, [AsyncOps], 'RunspacePool')
4668
$scriptText = [Expression]::Constant($preparedScript, [string])
4769
$ps = [Expression]::Variable([powershell], 'ps')
48-
$result = [Expression]::Variable([Collection[psobject]], 'result')
70+
$collectionResultType = [Collection[psobject]]
71+
if ($returnType -ne [void] -and $returnType -ne [Collection[psobject]]) {
72+
$collectionResultType = [Collection`1].MakeGenericType($returnType)
73+
}
74+
75+
$result = [Expression]::Variable($collectionResultType, 'result')
76+
$psInput = [Expression]::Variable([Object[]], 'psInput')
77+
$guid = [Expression]::Constant($factory.InstanceId, [guid])
78+
$pool = [Expression]::Property(
79+
[Expression]::Property(
80+
[Expression]::Property($null, [PSTaskFactory], 'Instances'),
81+
'Item',
82+
$guid),
83+
'RunspacePool')
4984

5085
# Group the expressions for the body by creating them in a scriptblock.
5186
[Expression[]]$expressions = & {
@@ -57,12 +92,34 @@ class AsyncOps
5792
[Expression]::Call($ps, 'AddArgument', @(), $parameter)
5893
}
5994

60-
[Expression]::Assign($result, [Expression]::Call($ps, 'Invoke', @(), @()))
95+
$invokeArgs = @()
96+
if ($parameters) {
97+
[Expression]::Assign(
98+
$psInput,
99+
[Expression]::NewArrayInit([object], $parameters[0] -as [Expression[]]))
100+
101+
$invokeArgs = @($psInput)
102+
}
103+
104+
$invokeTypeArgs = @()
105+
if ($returnType -ne [void] -and $returnType -ne [Collection[psobject]]) {
106+
$invokeTypeArgs = @($returnType)
107+
}
108+
109+
[Expression]::Assign($result, [Expression]::Call($ps, 'Invoke', $invokeTypeArgs, $invokeArgs))
61110
[Expression]::Call($ps, 'Dispose', @(), @())
62-
$result
111+
if ($returnType -ne [void]) {
112+
[Expression]::Call(
113+
[LanguagePrimitives],
114+
'ConvertTo',
115+
@($returnType),
116+
$result -as [Expression[]])
117+
} else {
118+
$result
119+
}
63120
}
64121

65-
$block = [Expression]::Block([ParameterExpression[]]($ps, $result), $expressions)
122+
$block = [Expression]::Block([ParameterExpression[]]($ps, $result, $psInput), $expressions)
66123
$lambda = [Expression]::Lambda(
67124
$delegateType,
68125
$block,
@@ -73,16 +130,75 @@ class AsyncOps
73130

74131
# A TaskFactory implementation that creates tasks that run scriptblocks in a runspace pool.
75132
class PSTaskFactory : TaskFactory[Collection[psobject]] {
133+
hidden static [Dictionary[guid, PSTaskFactory]] $Instances = [Dictionary[guid, PSTaskFactory]]::new()
134+
135+
hidden [RunspacePool] $RunspacePool;
136+
hidden [guid] $InstanceId;
137+
hidden [bool] $IsDisposed = $false;
138+
139+
PSTaskFactory() : base() {
140+
$this.Initialize()
141+
}
142+
143+
PSTaskFactory([CancellationToken] $cancellationToken) : base($cancellationToken) {
144+
$this.Initialize()
145+
}
146+
147+
PSTaskFactory([TaskScheduler] $scheduler) : base($scheduler) {
148+
$this.Initialize()
149+
}
150+
151+
PSTaskFactory(
152+
[TaskCreationOptions] $creationOptions,
153+
[TaskContinuationOptions] $continuationOptions)
154+
: base($creationOptions, $continuationOptions) {
155+
$this.Initialize()
156+
}
157+
158+
PSTaskFactory(
159+
[CancellationToken] $cancellationToken,
160+
[TaskCreationOptions] $creationOptions,
161+
[TaskContinuationOptions] $continuationOptions,
162+
[TaskScheduler] $scheduler)
163+
: base($cancellationToken, $creationOptions, $continuationAction, $scheduler) {
164+
$this.Initialize()
165+
}
166+
167+
hidden [void] Initialize() {
168+
$this.RunspacePool = [runspacefactory]::CreateRunspacePool(1, 4)
169+
$this.RunspacePool.Open()
170+
$this.InstanceId = [guid]::NewGuid()
171+
[PSTaskFactory]::Instances.Add($this.InstanceId, $this)
172+
}
173+
174+
# Can't implement IDisposable while inheriting a generic class because of a parse error, need
175+
# to create an issue.
176+
[void] Dispose() {
177+
$this.AssertNotDisposed()
178+
[PSTaskFactory]::Instances.Remove($this.InstanceId)
179+
$this.RunspacePool.Dispose()
180+
$this.IsDisposed = $true
181+
}
182+
183+
hidden [void] AssertNotDisposed() {
184+
if ($this.IsDisposed) {
185+
throw [InvalidOperationException]::new(
186+
'Cannot perform operation because object "PSTaskFactory" has already been disposed.')
187+
}
188+
}
189+
76190
# Shortcut to AsyncOps.CreateAsyncDelegate
77191
hidden [MulticastDelegate] Wrap([scriptblock] $function, [type] $delegateType) {
78-
return [AsyncOps]::CreateAsyncDelegate($function, $delegateType)
192+
$this.AssertNotDisposed()
193+
return [AsyncOps]::CreateAsyncDelegate($function, $delegateType, $this)
79194
}
80195

81196
# The remaining functions implement methods from TaskFactory. All of these methods call the base
82197
# method after wrapping the scriptblock to create a delegate that will work in tasks.
83198
[Task[Collection[psobject]]] ContinueWhenAll([Task[]] $tasks, [scriptblock] $continuationAction) {
199+
$this.AssertNotDisposed()
84200
$delegateType = [Func`2].MakeGenericType([Task[]], [Collection[psobject]])
85-
return [AsyncOps]::HideResult(
201+
return [AsyncOps]::PrepareTask(
86202
([TaskFactory[Collection[psobject]]]$this).ContinueWhenAll(
87203
$tasks,
88204
$this.Wrap($continuationAction, $delegateType),
@@ -92,8 +208,9 @@ class PSTaskFactory : TaskFactory[Collection[psobject]] {
92208
}
93209

94210
[Task[Collection[psobject]]] ContinueWhenAny([Task[]] $tasks, [scriptblock] $continuationAction) {
95-
$delegateType = [Func`2].MakeGenericType([Task[]], [Collection[psobject]])
96-
return [AsyncOps]::HideResult(
211+
$this.AssertNotDisposed()
212+
$delegateType = [Func`2].MakeGenericType([Task], [Collection[psobject]])
213+
return [AsyncOps]::PrepareTask(
97214
([TaskFactory[Collection[psobject]]]$this).ContinueWhenAny(
98215
$tasks,
99216
$this.Wrap($continuationAction, $delegateType),
@@ -103,7 +220,8 @@ class PSTaskFactory : TaskFactory[Collection[psobject]] {
103220
}
104221

105222
[Task[Collection[psobject]]] StartNew([scriptblock] $function) {
106-
return [AsyncOps]::HideResult(
223+
$this.AssertNotDisposed()
224+
return [AsyncOps]::PrepareTask(
107225
([TaskFactory[Collection[psobject]]]$this).StartNew(
108226
$this.Wrap($function, [Func[Collection[psobject]]]),
109227
$this.CancellationToken,
@@ -112,7 +230,8 @@ class PSTaskFactory : TaskFactory[Collection[psobject]] {
112230
}
113231

114232
[Task[Collection[psobject]]] StartNew([scriptblock] $function, [object] $state) {
115-
return [AsyncOps]::HideResult(
233+
$this.AssertNotDisposed()
234+
return [AsyncOps]::PrepareTask(
116235
([TaskFactory[Collection[psobject]]]$this).StartNew(
117236
$this.Wrap($function, [Func[object, Collection[psobject]]]),
118237
$state,
@@ -133,6 +252,7 @@ function async {
133252
$ArgumentList
134253
)
135254
process {
255+
if (-not $scriptblock) { return }
136256
[AsyncOps]::Factory.StartNew($ScriptBlock, $ArgumentList)
137257
}
138258
}
@@ -148,10 +268,35 @@ function await {
148268
$taskList = [List[Task]]::new()
149269
}
150270
process {
151-
$taskList.Add($Task)
271+
if ($Task) { $taskList.Add($Task) }
152272
}
153273
end {
154-
return $taskList.Result
274+
if (-not $taskList.Count) { return }
275+
276+
$finished = $false
277+
while (-not $finished) {
278+
$finished = $taskList.TrueForAll({
279+
param([Task]$task)
280+
281+
$task.IsCompleted -or
282+
$task.IsCanceled -or
283+
$task.IsFaulted
284+
})
285+
}
286+
$taskList.ToArray().ForEach{
287+
if ($PSItem.IsFaulted -and $PSItem.Exception) {
288+
if (-not ($exception = $PSItem.Exception.InnerException)) {
289+
$exception = $PSItem.Exception
290+
}
291+
$PSCmdlet.WriteError(
292+
[ErrorRecord]::new(
293+
$exception,
294+
$exception.GetType().Name,
295+
[ErrorCategory]::InvalidResult,
296+
$PSItem))
297+
}
298+
$PSItem.Result
299+
}
155300
}
156301
}
157302

@@ -162,7 +307,7 @@ function ContinueWith {
162307
$ContinuationAction,
163308

164309
[switch]
165-
$Any,
310+
$WhenAny,
166311

167312
[Parameter(ValueFromPipeline)]
168313
[Task]
@@ -172,10 +317,12 @@ function ContinueWith {
172317
$taskList = [List[Task]]::new()
173318
}
174319
process {
175-
$taskList.Add($Task)
320+
if ($Task) { $taskList.Add($Task) }
176321
}
177322
end {
178-
if ($Any.IsPresent) {
323+
if (-not $taskList.Count) { return }
324+
325+
if ($WhenAny.IsPresent) {
179326
return [AsyncOps]::Factory.ContinueWhenAny($taskList, $ContinuationAction)
180327
}
181328
return [AsyncOps]::Factory.ContinueWhenAll($taskList, $ContinuationAction)

0 commit comments

Comments
 (0)