-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstate-machine.asl.yml
More file actions
403 lines (395 loc) · 13.7 KB
/
state-machine.asl.yml
File metadata and controls
403 lines (395 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
StartAt: Normalize Input
States:
# Start of pre-processing
Normalize Input:
Comment: Normalizes input data
Type: Task
InputPath: "$"
Resource: "${NormalizeInputLambdaFunctionArn}"
Parameters:
Input.$: "$"
StateMachine:
Id.$: "$$.StateMachine.Id"
ResultPath: "$"
OutputPath: "$"
Next: JobReceived Callbacks Map
Retry:
- ErrorEquals:
- MissingTaskTypeError
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 1
MaxAttempts: 2
BackoffRate: 1
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: Normalize Input Catcher
Normalize Input Catcher:
Comment: Add a parameter so normalize errors can be identified
Type: Pass
InputPath: "$"
Result: NORMALIZE_INPUT_ERROR
ResultPath: "$.State"
OutputPath: "$"
Next: Add Empty TaskResults
JobReceived Callbacks Map:
Type: Map
Comment: Iterates over all callback endpoints to indicate the job was received
InputPath: "$"
ItemsPath: "$.Job.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
Message: # The JSON of this value will be sent to endpoints
JobReceived:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
State: RECEIVED
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$"
Next: Tasks Map
MaxConcurrency: 0
Iterator:
StartAt: Send JobReceived Callback
States:
Send JobReceived Callback:
Type: Task
Comment: >-
Sends a job received message for a single callback endpoint in
the iterator
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
# Start of task execution
Tasks Map:
# The key output of this map state is the TaskResults array. There will
# be one element for each task, whether it succeeded or failed. Successful
# task elements will have a `Task` property, and failed tasks will have a
# `Type` property.
Type: Map
Comment: Iterates over all tasks included in the job
InputPath: "$"
ItemsPath: "$.Job.Tasks"
Parameters:
# Parameters' value is what is passed to each state within the iterator,
# but each state decides independetly what is passed to its backing
# resource (Lambda, etc). For example, Callbacks is available to each
# state, but usually not passed into Lambdas, since most tasks don't
# utilize that data.
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
TaskIndex.$: "$$.Map.Item.Index"
Task.$: "$$.Map.Item.Value" # This value is a task defined in the job input, e.g., { "Type": "Copy" }
Callbacks.$: "$.Job.Callbacks"
ResultPath: "$.TaskResults"
OutputPath: "$"
Next: JobResult Callback Map
MaxConcurrency: 0
Iterator:
StartAt: Route Task By Type
States:
Route Task By Type:
Type: Choice
Comment: >-
For the current task being iterated, routes to the appropriate
state
Default: Unknown Task Type
Choices:
- Variable: "$.Task.Type"
StringEquals: FFmpeg
Next: FFmpeg Task Fargate Execution
# This is a no-op. No callbacks are sent, and any record of this task
# will be filtered out of the job result callbacks
Unknown Task Type:
Type: Pass
Comment: Handles and blackholes tasks with an unknown type
End: true
ResultPath: "$"
Result:
Task: "Null"
# Task operations
# - The InputPath and OutputPath of all of these must be $, so that
# all data is available to states down the line.
# - The ResultPath for states that are returning the final output of a
# task's operation (i.e., states immediately prior to the callbacks
# map) must be $.TaskResult, and their return value must be the
# expected standard task output (e.g., { Task: 'Copy' })
# - All task operation states, not just the final state, should catch
# to the TaskResult Error Callback Map.
FFmpeg Task Fargate Execution:
Type: Task
Comment: Runs FFmpeg
Resource: arn:aws:states:::ecs:runTask.waitForTaskToken
InputPath: "$"
Parameters:
Cluster: "${EcsClusterArn}"
LaunchType: FARGATE
Overrides:
ContainerOverrides:
- Environment:
- Name: STATE_MACHINE_ARN
Value.$: "$$.StateMachine.Id"
- Name: STATE_MACHINE_NAME
Value.$: "$$.StateMachine.Name"
- Name: STATE_MACHINE_EXECUTION_ID
Value.$: "$$.Execution.Id"
- Name: STATE_MACHINE_JOB_ID
Value.$: "$.Job.Id"
- Name: STATE_MACHINE_TASK_INDEX
Value.$: States.Format('{}', $.TaskIndex)
- Name: STATE_MACHINE_S3_DESTINATION_WRITER_ROLE
Value: "${S3DestinationWriterRoleArn}"
- Name: STATE_MACHINE_AWS_REGION
Value: "${AwsRegion}"
- Name: STATE_MACHINE_ARTIFACT_BUCKET_NAME
Value: "${ArtifactBucketName}"
- Name: STATE_MACHINE_FFMPEG_TASK_JSON
Value.$: States.JsonToString($.Task.FFmpeg)
- Name: STATE_MACHINE_TASK_TYPE
Value.$: "$.Task.Type"
- Name: STATE_MACHINE_TASK_TOKEN
Value.$: "$$.Task.Token"
Name: "${FFmpegContainerName}"
NetworkConfiguration:
AwsvpcConfiguration:
AssignPublicIp: ENABLED
Subnets:
- "${VpcPublicSubnet1}"
- "${VpcPublicSubnet2}"
PropagateTags: TASK_DEFINITION
TaskDefinition: "${FFmpegEcsTaskDefinitionArn}"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 15
MaxAttempts: 5
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
# Task callbacks
# All tasks operations end up at one of these callback states,
# depending on whether the task was successful or not. The value for
# a given task in the TaskResults array coming out of the Tasks Map
# will be the output whichever of these callback states is used.
#
# For failed tasks, the task callback will include information about
# the error. This information is not included in the job result
# callback.
#
# The output for a successful task is the result of the task.
# The output for a failed task is the original input task definition.
TaskResult Callbacks Map:
Type: Map
Comment: >-
Iterates over all callback endpoints to send messages when tasks
are successful
InputPath: "$"
ItemsPath: "$.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
TaskIteratorIndex.$: "$.TaskIndex"
Message: # The JSON of this value will be sent to endpoints
Task.$: "$.Task"
TaskResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Result.$: "$.TaskResult"
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$.TaskResult"
End: true
MaxConcurrency: 0
Iterator:
StartAt: Send TaskResult Callback
States:
Send TaskResult Callback:
Type: Task
Comment: >-
Sends a callback message to a single endpoint when tasks
are successful
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
TaskResult Error Callback Map:
Type: Map
Comment: >-
Iterates over all callback endpoints to send messages when tasks
fail
InputPath: "$"
ItemsPath: "$.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
TaskIteratorIndex.$: "$.TaskIndex"
Message: # The JSON of this value will be sent to endpoints
Task.$: "$.Task"
TaskResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Error.$: "$.Error"
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$.Task"
End: true
MaxConcurrency: 0
Iterator:
StartAt: Send TaskResult Error Callback
States:
Send TaskResult Error Callback:
Type: Task
Comment: >-
Sends a callback message to a single endpoint when tasks
fail
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: Tasks Map Catcher
Tasks Map Catcher:
Comment: >-
Add a parameter so task map errors can be identified. Is NOT responsible
for catching individual task execution errors (like if a Copy task fails)
Type: Pass
InputPath: "$"
Result: ITERATOR_ERROR
ResultPath: "$.State"
OutputPath: "$"
Next: Add Empty TaskResults
# End of task execution
# States below this will expect $.TaskResults to exist
Add Empty TaskResults:
Comment: >-
Add a TaskResults key with an empty array value, for cases where the task
iterator does not succeed and it's not otherwise added
Type: Pass
InputPath: "$"
Result: []
ResultPath: "$.TaskResults"
OutputPath: "$"
Next: JobResult Callback Map
# Start of post-processing
JobResult Callback Map:
Type: Map
Comment: Iterates over all callback endpoints to send job results
InputPath: "$"
ItemsPath: "$.Job.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
Message: # The JSON of this value will be sent to endpoints
JobResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
State.$: "$.State"
# Task/Type = Null elements are filtered out.
# Successful tasks will have a Task property
TaskResults.$: "$.TaskResults.[?(@.Task && @.Task != 'Null')]"
# Failed tasks will have a Type property
FailedTasks.$: "$.TaskResults.[?(@.Type && @.Type != 'Null')]"
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$"
Next: Normalize Output
MaxConcurrency: 0
Iterator:
StartAt: Send JobResult Callback
States:
Send JobResult Callback:
Type: Task
Comment: >-
Sends a callback message to a single endpoint in the iterator with
a job result
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Normalize Output:
Comment: Normalizes output data
Type: Task
InputPath: "$"
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 1
MaxAttempts: 2
BackoffRate: 1
Parameters:
StateMachine:
Id.$: "$$.StateMachine.Id"
Message:
JobResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
State.$: "$.State"
TaskResults.$: "$.TaskResults.[?(@.Task && @.Task != 'Null')]"
FailedTasks.$: "$.TaskResults.[?(@.Type && @.Type != 'Null')]"
Resource: "${NormalizeOutputLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
# End of post-processing