44 stdCtx "context"
55 "fmt"
66 "maps"
7+ "os"
8+ "path/filepath"
9+ "strings"
710
811 "github.com/pkg/errors"
912 "golang.org/x/sync/errgroup"
@@ -41,9 +44,10 @@ func (r *parallelRunner) Exec(
4144 e * executable.Executable ,
4245 eng engine.Engine ,
4346 inputEnv map [string ]string ,
47+ inputArgs []string ,
4448) error {
4549 parallelSpec := e .Parallel
46- if err := envUtils .SetEnv (ctx .Config .CurrentVaultName (), e .Env (), ctx . Args , inputEnv ); err != nil {
50+ if err := envUtils .SetEnv (ctx .Config .CurrentVaultName (), e .Env (), inputArgs , inputEnv ); err != nil {
4751 return errors .Wrap (err , "unable to set parameters to env" )
4852 }
4953
@@ -52,7 +56,7 @@ func (r *parallelRunner) Exec(
5256 e .FlowFilePath (),
5357 e .WorkspacePath (),
5458 e .Env (),
55- ctx . Args ,
59+ inputArgs ,
5660 inputEnv ,
5761 ); err != nil {
5862 ctx .AddCallback (cb )
@@ -83,12 +87,11 @@ func (r *parallelRunner) Exec(
8387 return fmt .Errorf ("no parallel executables to run" )
8488}
8589
86- //nolint:funlen,gocognit
8790func handleExec (
8891 ctx * context.Context , parent * executable.Executable ,
8992 eng engine.Engine ,
9093 parallelSpec * executable.ParallelExecutableType ,
91- promptedEnv map [string ]string ,
94+ inputEnv map [string ]string ,
9295 cacheData map [string ]string ,
9396) error {
9497 groupCtx , cancel := stdCtx .WithCancel (ctx .Ctx )
@@ -100,18 +103,44 @@ func handleExec(
100103 }
101104 group .SetLimit (limit )
102105
103- dataMap := expr .ExpressionEnv (ctx , parent , cacheData , promptedEnv )
106+ // Expand the directory of the parallel execution. The root / parent's directory is used if one is not specified.
107+ var root * executable.Executable
108+ if ctx .RootExecutable != nil {
109+ root = ctx .RootExecutable
110+ } else {
111+ root = parent
112+ }
113+ if parallelSpec .Dir == "" {
114+ parallelSpec .Dir = executable .Directory (filepath .Dir (root .FlowFilePath ()))
115+ }
116+ targetDir , isTmp , err := parallelSpec .Dir .ExpandDirectory (
117+ root .WorkspacePath (),
118+ root .FlowFilePath (),
119+ ctx .ProcessTmpDir ,
120+ inputEnv ,
121+ )
122+ if err != nil {
123+ return errors .Wrap (err , "unable to expand directory" )
124+ } else if isTmp && ctx .ProcessTmpDir == "" {
125+ ctx .ProcessTmpDir = targetDir
126+ }
104127
128+ // Build the list of steps to execute
105129 var execs []engine.Exec
130+ conditionalData := expr .ExpressionEnv (ctx , parent , cacheData , inputEnv )
131+
106132 for i , refConfig := range parallelSpec .Execs {
133+ // Skip over steps that do not match the condition
107134 if refConfig .If != "" {
108- if truthy , err := expr .IsTruthy (refConfig .If , & dataMap ); err != nil {
135+ if truthy , err := expr .IsTruthy (refConfig .If , & conditionalData ); err != nil {
109136 return err
110137 } else if ! truthy {
111138 logger .Log ().Debugf ("skipping execution %d/%d" , i + 1 , len (parallelSpec .Execs ))
112139 continue
113140 }
114141 }
142+
143+ // Get the executable for the step
115144 var exec * executable.Executable
116145 switch {
117146 case len (refConfig .Ref ) > 0 :
@@ -126,8 +155,10 @@ func handleExec(
126155 return errors .New ("parallel executable must have a ref or cmd" )
127156 }
128157
129- execPromptedEnv := make (map [string ]string )
130- maps .Copy (promptedEnv , execPromptedEnv )
158+ // Prepare the environment and arguments for the child executable
159+ childEnv := make (map [string ]string )
160+ childArgs := make ([]string , 0 )
161+ maps .Copy (childEnv , inputEnv )
131162 if len (refConfig .Args ) > 0 {
132163 execEnv := exec .Env ()
133164 if execEnv == nil || execEnv .Args == nil {
@@ -136,14 +167,31 @@ func handleExec(
136167 exec .Ref ().String (),
137168 )
138169 } else {
139- a , err := envUtils .BuildArgsEnvMap (execEnv .Args , refConfig .Args , execPromptedEnv )
170+ for _ , arg := range os .Environ () {
171+ kv := strings .SplitN (arg , "=" , 2 )
172+ if len (kv ) == 2 {
173+ childEnv [kv [0 ]] = kv [1 ]
174+ }
175+ }
176+
177+ if parallelSpec .Args == nil {
178+ childArgs = refConfig .Args
179+ } else {
180+ childArgs = envUtils .BuildArgsFromEnv (execEnv .Args , childEnv )
181+ if len (childArgs ) == 0 {
182+ childArgs = refConfig .Args // If no resolved args, fallback to original args
183+ }
184+ }
185+
186+ a , err := envUtils .BuildArgsEnvMap (execEnv .Args , childArgs , childEnv )
140187 if err != nil {
141188 logger .Log ().Error (err , "unable to process arguments" )
142189 }
143- maps .Copy (execPromptedEnv , a )
190+ maps .Copy (childEnv , a )
144191 }
145192 }
146193
194+ // Set log fields and directory for the executable
147195 switch {
148196 case exec .Exec != nil :
149197 fields := map [string ]interface {}{"step" : exec .Ref ().String ()}
@@ -159,10 +207,18 @@ func handleExec(
159207 if parallelSpec .Dir != "" && exec .Serial .Dir == "" {
160208 exec .Serial .Dir = parallelSpec .Dir
161209 }
210+ case exec .Request != nil :
211+ if exec .Request .ResponseFile != nil && parallelSpec .Dir != "" && exec .Request .ResponseFile .Dir == "" {
212+ exec .Request .ResponseFile .Dir = parallelSpec .Dir
213+ }
214+ case exec .Render != nil :
215+ if parallelSpec .Dir != "" && exec .Render .Dir == "" {
216+ exec .Render .Dir = parallelSpec .Dir
217+ }
162218 }
163219
164220 runExec := func () error {
165- err := runner .Exec (ctx , exec , eng , execPromptedEnv )
221+ err := runner .Exec (ctx , exec , eng , childEnv , childArgs )
166222 if err != nil {
167223 return err
168224 }
@@ -171,6 +227,7 @@ func handleExec(
171227
172228 execs = append (execs , engine.Exec {ID : exec .Ref ().String (), Function : runExec , MaxRetries : refConfig .Retries })
173229 }
230+
174231 results := eng .Execute (
175232 ctx .Ctx , execs ,
176233 engine .WithMode (engine .Parallel ),
0 commit comments