Skip to content

Commit 12d2fbe

Browse files
committed
feat: enqueue --wait returns task exit code
Signed-off-by: aavarghese <[email protected]>
1 parent 22e7554 commit 12d2fbe

File tree

16 files changed

+187
-14
lines changed

16 files changed

+187
-14
lines changed

cmd/subcommands/enqueue/file.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package enqueue
22

33
import (
44
"fmt"
5+
"os"
56

67
"github.com/spf13/cobra"
78

@@ -19,7 +20,7 @@ func NewEnqueueFileCmd() *cobra.Command {
1920

2021
var wait bool
2122
var verbose bool
22-
cmd.Flags().BoolVarP(&wait, "wait", "w", false, "Wait for the task to be completed")
23+
cmd.Flags().BoolVarP(&wait, "wait", "w", false, "Wait for the task to be completed, and exit with the exit code of that task")
2324
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output")
2425

2526
cmd.RunE = func(cmd *cobra.Command, args []string) error {
@@ -29,7 +30,11 @@ func NewEnqueueFileCmd() *cobra.Command {
2930
return fmt.Errorf("TODO")
3031
}
3132

32-
return queue.EnqueueFile(args[0], queue.EnqueueFileOptions{Wait: wait, Verbose: verbose})
33+
exitcode, err := queue.EnqueueFile(args[0], queue.EnqueueFileOptions{Wait: wait, Verbose: verbose})
34+
if exitcode != 0 {
35+
os.Exit(exitcode)
36+
}
37+
return err
3338
}
3439

3540
return cmd

pkg/runtime/queue/enqueue.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,25 @@ type EnqueueFileOptions struct {
1919
Verbose bool
2020
}
2121

22-
func EnqueueFile(task string, opts EnqueueFileOptions) error {
22+
func EnqueueFile(task string, opts EnqueueFileOptions) (int, error) {
2323
c, err := NewS3Client()
2424
if err != nil {
25-
return err
25+
return 0, err
2626
}
2727

2828
if err := c.Mkdirp(c.Paths.Bucket); err != nil {
29-
return err
29+
return 0, err
3030
}
3131

3232
if err := c.Upload(c.Paths.Bucket, task, filepath.Join(c.Paths.PoolPrefix, c.Paths.Inbox, filepath.Base(task))); err != nil {
33-
return err
33+
return 0, err
3434
}
3535

3636
if opts.Wait {
3737
return c.WaitForCompletion(filepath.Base(task), opts.Verbose)
3838
}
3939

40-
return nil
40+
return 0, nil
4141
}
4242

4343
func EnqueueFromS3(fullpath, endpoint, accessKeyId, secretAccessKey string, repeat int) error {

pkg/runtime/queue/s3.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package queue
44
// can share this with the runtime/worker/s3.go
55

66
import (
7+
"bytes"
78
"context"
89
"fmt"
910
"io"
@@ -167,6 +168,16 @@ func (s3 S3Client) ListObjects(bucket, filePath string, recursive bool) <-chan m
167168
})
168169
}
169170

171+
func (s3 S3Client) Get(bucket, filePath string) (string, error) {
172+
var content bytes.Buffer
173+
s, err := s3.client.GetObject(context.Background(), bucket, filePath, minio.GetObjectOptions{})
174+
if err != nil {
175+
return "", err
176+
}
177+
io.Copy(io.Writer(&content), s)
178+
return content.String(), nil
179+
}
180+
170181
func (s3 S3Client) Cat(bucket, filePath string) error {
171182
s, err := s3.client.GetObject(context.Background(), bucket, filePath, minio.GetObjectOptions{})
172183
if err != nil {

pkg/runtime/queue/wait.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ import (
55
"os"
66
"path/filepath"
77
"slices"
8+
"strconv"
89
"time"
910
)
1011

11-
func (c S3Client) WaitForCompletion(task string, verbose bool) error {
12+
func (c S3Client) WaitForCompletion(task string, verbose bool) (int, error) {
1213
for {
1314
doneTasks, err := c.Lsf(c.Paths.Bucket, filepath.Join(c.Paths.PoolPrefix, c.Paths.Outbox))
1415
if err != nil {
15-
return err
16+
return 0, err
1617
}
1718

1819
if idx := slices.IndexFunc(doneTasks, func(otask string) bool { return otask == task }); idx >= 0 {
@@ -29,5 +30,18 @@ func (c S3Client) WaitForCompletion(task string, verbose bool) error {
2930
fmt.Fprintf(os.Stderr, "Task completed %s\n", task)
3031
}
3132

32-
return nil
33+
codeFile := filepath.Join(c.Paths.PoolPrefix, c.Paths.Outbox, task+".code")
34+
if code, err := c.Get(c.Paths.Bucket, codeFile); err != nil {
35+
return 0, err
36+
} else {
37+
if verbose {
38+
fmt.Fprintf(os.Stderr, "Task completed %s with return code %s\n", task, code)
39+
}
40+
41+
exitcode, err := strconv.Atoi(code)
42+
if err != nil {
43+
return 0, err
44+
}
45+
return exitcode, nil
46+
}
3347
}

pkg/runtime/worker/watcher.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ func startWatch(handler []string, client queue.S3Client) error {
125125
err = handlercmd.Run()
126126
if err != nil {
127127
fmt.Println("Internal Error running the handler:", err)
128-
continue
129128
}
130129
EC := handlercmd.ProcessState.ExitCode()
131130

@@ -151,7 +150,6 @@ func startWatch(handler []string, client queue.S3Client) error {
151150
if err != nil {
152151
fmt.Println("Internal Error creating succeeded marker:", err)
153152
}
154-
// fmt.Println("handler success: " + in)
155153
} else {
156154
err = client.Touch(bucket, failed)
157155
if err != nil {

tests/bin/helpers.sh

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,15 @@ function waitForIt {
114114
lunchpail qcat outbox/${output}.code)
115115
if [[ $code = 0 ]] || [[ $code = -1 ]] || [[ $code = 143 ]] || [[ $code = 137 ]]
116116
then echo "✅ PASS run-controller test=$name output=$output code=0"
117-
else echo "❌ FAIL run-controller non-zero exit code test=$name output=$output code=$code" && return 1
117+
else
118+
if [[ -n "$expectTaskFailure" ]]
119+
then
120+
if [[ ! "$code" =~ $expectTaskFailure ]]
121+
then echo "Missing expected task failure output from code=$code" && return 1
122+
fi
123+
echo "✅ PASS run-controller got expected non-zero exit code test=$name output=$output code=$code"
124+
else echo "❌ FAIL run-controller non-zero exit code test=$name output=$output code=$code" && return 1
125+
fi
118126
fi
119127

120128
stdout=$(kubectl exec $(kubectl get pod -n $ns -l app.kubernetes.io/component=$S3C -o name) -n $ns -- \
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# test7-wait-withfail
2+
3+
Same as test7, except this test uses the ParameterSweep in "wait" mode with one failing task.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#!/usr/bin/env bash
2+
3+
SCRIPTDIR=$(cd $(dirname "$0") && pwd)
4+
5+
export NAMESPACE=$1
6+
7+
# number of task
8+
N=${2-10}
9+
10+
# name of s3 bucket in which to store the tasks
11+
BUCKET=${3-test7}
12+
RUN_NAME=$BUCKET
13+
14+
B=$(mktemp -d)/$BUCKET # bucket path
15+
D=$B/$BUCKET # data path; in this case the bucket name and the folder name are both the run name
16+
mkdir -p $D
17+
echo "Staging to $D" 1>&2
18+
19+
for idx in $(seq 1 $N) # for each iteration
20+
do
21+
# if we are doing a test, then make sure to use a
22+
# repeatable name for the task files, so that we know what
23+
# to look for when confirming that the tasks were
24+
# processed by the workers
25+
if [[ -n "$CI" ]] || [[ -n "$RUNNING_CODEFLARE_TESTS" ]]; then
26+
id=$idx
27+
else
28+
# otherwise, use a more random name, so that we can
29+
# inject multiple batches of tasks across executions
30+
# of this script
31+
id=$(uuidgen)
32+
fi
33+
34+
echo "this is task idx=$idx" > $D/task.$id.txt
35+
done
36+
37+
"$SCRIPTDIR"/../../../tests/bin/add-data.sh $B
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/usr/bin/env bash
2+
3+
SCRIPTDIR=$(cd $(dirname "$0") && pwd)
4+
5+
# make sure these values are compatible with the values in ./settings.sh
6+
NUM_TASKS=6
7+
8+
# $1: namespace
9+
10+
"$SCRIPTDIR"/add-data-to-queue.sh \
11+
$1 \
12+
$NUM_TASKS \
13+
${TEST_NAME-test7}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
apiVersion: lunchpail.io/v1alpha1
2+
kind: Application
3+
metadata:
4+
name: test7-wait-withfail
5+
spec:
6+
role: worker
7+
code:
8+
- name: main.sh
9+
source: |
10+
#!/usr/bin/env sh
11+
12+
# $1 input filepath
13+
# $2 output filepath
14+
in="$1"
15+
out="$2"
16+
17+
dataset_name=test # match with below!
18+
bucket_name=test7-wait-withfail
19+
folder_name=test7-wait-withfail
20+
N=$(ls $dataset_name/$bucket_name/$folder_name | wc -l | xargs)
21+
22+
echo "Processing $N $(basename $in)"
23+
sleep 5
24+
25+
if [ $(basename $in) = "task.3.txt" ]
26+
then
27+
echo "Error!" 1>&2
28+
exit 64
29+
fi
30+
31+
echo "Done with $(basename $in)"
32+
33+
command: ./main.sh
34+
minSize: auto
35+
securityContext:
36+
runAsUser: 2000 # lunchpail, same as is specified Dockerfile
37+
runAsGroup: 0 # root, ibid
38+
containerSecurityContext:
39+
runAsUser: 2000 # lunchpail, same as is specified Dockerfile
40+
runAsGroup: 0 # root, ibid
41+
42+
datasets:
43+
- name: test
44+
s3:
45+
secret: test7data
46+
copyIn:
47+
path: "test7-wait-withfail/"

0 commit comments

Comments
 (0)