14
14
package chaosd
15
15
16
16
import (
17
+ "context"
17
18
"fmt"
18
19
"os"
19
- "os/exec"
20
- "sync"
21
- "time"
22
20
23
21
"github.com/hashicorp/go-multierror"
24
22
"github.com/pingcap/log"
25
23
"go.uber.org/zap"
26
24
27
- "github.com/chaos-mesh/chaosd/pkg/server /utils"
25
+ pkgUtils "github.com/chaos-mesh/chaosd/pkg/utils"
28
26
29
27
"github.com/chaos-mesh/chaosd/pkg/core"
30
28
)
@@ -33,88 +31,71 @@ type diskAttack struct{}
33
31
34
32
var DiskAttack AttackType = diskAttack {}
35
33
36
- func (disk diskAttack ) Attack (options core.AttackConfig , env Environment ) error {
34
+ func handleDiskAttackOutput (output []byte , err error , c chan interface {}) {
35
+ if err != nil {
36
+ log .Error (string (output ), zap .Error (err ))
37
+ c <- err
38
+ }
39
+ log .Info (string (output ))
40
+ }
41
+
42
+ func (diskAttack ) Attack (options core.AttackConfig , env Environment ) error {
43
+ err := ApplyDiskAttack (options , env )
44
+ if err != nil {
45
+ return err
46
+ }
47
+ return nil
48
+ }
49
+
50
+ func handleOutputChannelError (c chan interface {}) error {
51
+ close (c )
52
+ var multiErrs error
53
+ for i := range c {
54
+ if err , ok := i .(error ); ok {
55
+ multiErrs = multierror .Append (multiErrs , err )
56
+ }
57
+ }
58
+ if multiErrs != nil {
59
+ return multiErrs
60
+ }
61
+ return nil
62
+ }
63
+
64
+ func ApplyDiskAttack (options core.AttackConfig , env Environment ) error {
37
65
var attackConf * core.DiskAttackConfig
38
66
var ok bool
39
67
if attackConf , ok = options .(* core.DiskAttackConfig ); ! ok {
40
68
return fmt .Errorf ("AttackConfig -> *DiskAttackConfig meet error" )
41
69
}
70
+ poolSize := getPoolSize (attackConf )
71
+ outputChan := make (chan interface {}, poolSize + 1 )
42
72
if attackConf .Action == core .DiskFillAction {
43
- if attackConf .FAllocateOption != nil {
44
- cmd := core .FAllocateCommand .Unmarshal (* attackConf .FAllocateOption )
45
- output , err := cmd .CombinedOutput ()
46
-
47
- if err != nil {
48
- log .Error (string (output ), zap .Error (err ))
49
- return err
50
- }
51
- log .Info (string (output ))
52
- return nil
53
- }
54
-
55
- for _ , DdOption := range * attackConf .DdOptions {
56
- cmd := core .DdCommand .Unmarshal (DdOption )
57
- output , err := cmd .CombinedOutput ()
58
-
59
- if err != nil {
60
- log .Error (string (output ), zap .Error (err ))
61
- return err
62
- }
63
- log .Info (string (output ))
64
- }
65
- return nil
73
+ cmdPool := pkgUtils .NewCommandPools (context .Background (), nil , poolSize )
74
+ env .Chaos .CmdPools [env .AttackUid ] = cmdPool
75
+ fillDisk (attackConf , cmdPool , NewOutputHandler (handleDiskAttackOutput , outputChan ))
76
+ cmdPool .Wait ()
77
+ cmdPool .Close ()
78
+ return handleOutputChannelError (outputChan )
66
79
}
67
80
68
81
if attackConf .DdOptions != nil {
69
- duration , _ := options .ScheduleDuration ()
70
- var deadline <- chan time.Time
71
- if duration != nil {
72
- deadline = time .After (* duration )
73
- }
74
-
75
- if len (* attackConf .DdOptions ) == 0 {
76
- return nil
77
- }
78
- rest := (* attackConf .DdOptions )[len (* attackConf .DdOptions )- 1 ]
79
- * attackConf .DdOptions = (* attackConf .DdOptions )[:len (* attackConf .DdOptions )- 1 ]
80
-
81
- cmd := core .DdCommand .Unmarshal (rest )
82
- err := utils .ExecWithDeadline (deadline , cmd )
83
- if err != nil {
84
- return err
82
+ var cmdPool * pkgUtils.CommandPools
83
+ deadline := getDeadline (options )
84
+ if deadline != nil {
85
+ cmdPool = pkgUtils .NewCommandPools (context .Background (), deadline , poolSize )
85
86
}
87
+ cmdPool = pkgUtils .NewCommandPools (context .Background (), nil , poolSize )
88
+ env .Chaos .CmdPools [env .AttackUid ] = cmdPool
86
89
87
- var wg sync.WaitGroup
88
- var mu sync.Mutex
89
- var errs error
90
- wg .Add (len (* attackConf .DdOptions ))
91
- for _ , ddOpt := range * attackConf .DdOptions {
92
- cmd := core .DdCommand .Unmarshal (ddOpt )
93
-
94
- go func (cmd * exec.Cmd ) {
95
- defer wg .Done ()
96
- err := utils .ExecWithDeadline (deadline , cmd )
97
- if err != nil {
98
- log .Error (cmd .String (), zap .Error (err ))
99
- mu .Lock ()
100
- defer mu .Unlock ()
101
- errs = multierror .Append (errs , err )
102
- return
103
- }
104
- }(cmd )
105
- }
106
-
107
- wg .Wait ()
108
-
109
- if errs != nil {
110
- return errs
111
- }
90
+ applyPayload (attackConf , cmdPool , NewOutputHandler (handleDiskAttackOutput , outputChan ))
91
+ cmdPool .Wait ()
92
+ cmdPool .Close ()
93
+ return handleOutputChannelError (outputChan )
112
94
}
113
95
return nil
114
-
115
96
}
116
97
117
- func (diskAttack ) Recover (exp core.Experiment , _ Environment ) error {
98
+ func (diskAttack ) Recover (exp core.Experiment , env Environment ) error {
118
99
attackConfig , err := exp .GetRequestCommand ()
119
100
if err != nil {
120
101
return err
@@ -127,5 +108,10 @@ func (diskAttack) Recover(exp core.Experiment, _ Environment) error {
127
108
log .Warn (fmt .Sprintf ("recover disk: remove %s failed" , config .Path ), zap .Error (err ))
128
109
}
129
110
}
111
+
112
+ if cmdPool , ok := env .Chaos .CmdPools [exp .Uid ]; ok {
113
+ cmdPool .Close ()
114
+ }
115
+
130
116
return nil
131
117
}
0 commit comments