|
| 1 | +// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors |
| 2 | +// |
| 3 | +// SPDX-License-Identifier: Apache-2.0 |
| 4 | + |
| 5 | +package ccmmanifestwatcher |
| 6 | + |
| 7 | +import ( |
| 8 | + "bytes" |
| 9 | + "context" |
| 10 | + "encoding/json" |
| 11 | + "fmt" |
| 12 | + "io" |
| 13 | + "math/rand/v2" |
| 14 | + "os" |
| 15 | + "path/filepath" |
| 16 | + "sync" |
| 17 | + "time" |
| 18 | + |
| 19 | + "github.com/ghodss/yaml" |
| 20 | + |
| 21 | + "github.com/choria-io/ccm/manager" |
| 22 | + ccmmodel "github.com/choria-io/ccm/model" |
| 23 | + "github.com/choria-io/ccm/resources/apply" |
| 24 | + "github.com/choria-io/go-choria/aagent/model" |
| 25 | + "github.com/choria-io/go-choria/aagent/util" |
| 26 | + "github.com/choria-io/go-choria/aagent/watchers/event" |
| 27 | + "github.com/choria-io/go-choria/aagent/watchers/watcher" |
| 28 | + iu "github.com/choria-io/go-choria/internal/util" |
| 29 | +) |
| 30 | + |
| 31 | +type State int |
| 32 | + |
| 33 | +const ( |
| 34 | + Unknown State = iota |
| 35 | + Error |
| 36 | + Stable |
| 37 | + Changes |
| 38 | + Skipped |
| 39 | + |
| 40 | + wtype = "ccm_manifest" |
| 41 | + version = "v1" |
| 42 | +) |
| 43 | + |
| 44 | +var stateNames = map[State]string{ |
| 45 | + Unknown: "unknown", |
| 46 | + Error: "error", |
| 47 | + Stable: "stable", |
| 48 | + Changes: "changes", |
| 49 | + Skipped: "skipped", |
| 50 | +} |
| 51 | + |
| 52 | +type Properties struct { |
| 53 | + ManifestFile string `mapstructure:"manifest_file"` |
| 54 | + Manifest map[string]any `mapstructure:"manifest"` |
| 55 | + Noop bool `mapstructure:"noop"` |
| 56 | + HealthCheckOnly bool `mapstructure:"healthcheck_only"` |
| 57 | + Governor string `mapstructure:"governor"` |
| 58 | + GovernorTimeout time.Duration `mapstructure:"governor_timeout"` |
| 59 | + Splay bool `mapstructure:"splay"` |
| 60 | + Timeout time.Duration |
| 61 | +} |
| 62 | + |
| 63 | +type Watcher struct { |
| 64 | + *watcher.Watcher |
| 65 | + |
| 66 | + name string |
| 67 | + machine model.Machine |
| 68 | + previous State |
| 69 | + interval time.Duration |
| 70 | + previousRunTime time.Duration |
| 71 | + properties *Properties |
| 72 | + |
| 73 | + lastWatch time.Time |
| 74 | + |
| 75 | + wmu *sync.Mutex |
| 76 | + mu *sync.Mutex |
| 77 | +} |
| 78 | + |
| 79 | +func New(machine model.Machine, name string, states []string, required []model.ForeignMachineState, failEvent string, successEvent string, interval string, ai time.Duration, rawprop map[string]any) (any, error) { |
| 80 | + var err error |
| 81 | + |
| 82 | + ccm := &Watcher{ |
| 83 | + machine: machine, |
| 84 | + name: name, |
| 85 | + mu: &sync.Mutex{}, |
| 86 | + wmu: &sync.Mutex{}, |
| 87 | + } |
| 88 | + |
| 89 | + ccm.Watcher, err = watcher.NewWatcher(name, wtype, ai, states, required, machine, failEvent, successEvent) |
| 90 | + if err != nil { |
| 91 | + return nil, err |
| 92 | + } |
| 93 | + |
| 94 | + err = ccm.setProperties(rawprop) |
| 95 | + if err != nil { |
| 96 | + return nil, fmt.Errorf("could not set properties: %v", err) |
| 97 | + } |
| 98 | + |
| 99 | + if interval != "" { |
| 100 | + ccm.interval, err = iu.ParseDuration(interval) |
| 101 | + if err != nil { |
| 102 | + return nil, fmt.Errorf("invalid interval: %v", err) |
| 103 | + } |
| 104 | + |
| 105 | + if ccm.interval < 500*time.Millisecond { |
| 106 | + return nil, fmt.Errorf("interval %v is too small", ccm.interval) |
| 107 | + } |
| 108 | + } |
| 109 | + |
| 110 | + return ccm, nil |
| 111 | +} |
| 112 | + |
| 113 | +func (w *Watcher) setProperties(props map[string]any) error { |
| 114 | + if w.properties == nil { |
| 115 | + w.properties = &Properties{} |
| 116 | + } |
| 117 | + |
| 118 | + err := util.ParseMapStructure(props, w.properties) |
| 119 | + if err != nil { |
| 120 | + return err |
| 121 | + } |
| 122 | + |
| 123 | + return w.validate() |
| 124 | +} |
| 125 | + |
| 126 | +func (w *Watcher) validate() error { |
| 127 | + if w.properties.ManifestFile == "" && len(w.properties.Manifest) == 0 { |
| 128 | + return fmt.Errorf("manifest_file or manifest is required") |
| 129 | + } |
| 130 | + |
| 131 | + if w.properties.Timeout == 0 { |
| 132 | + w.properties.Timeout = time.Minute |
| 133 | + } |
| 134 | + |
| 135 | + if w.properties.Governor != "" && w.properties.GovernorTimeout == 0 { |
| 136 | + w.Infof("Setting Governor timeout to 5 minutes while unset") |
| 137 | + w.properties.GovernorTimeout = 5 * time.Minute |
| 138 | + } |
| 139 | + |
| 140 | + return nil |
| 141 | +} |
| 142 | + |
| 143 | +func (w *Watcher) Run(ctx context.Context, wg *sync.WaitGroup) { |
| 144 | + defer wg.Done() |
| 145 | + |
| 146 | + if len(w.properties.Manifest) > 0 { |
| 147 | + w.Infof("Watcher for embedded manifest starting") |
| 148 | + } else { |
| 149 | + w.Infof("Watcher for %s starting", w.properties.ManifestFile) |
| 150 | + } |
| 151 | + |
| 152 | + if w.interval != 0 { |
| 153 | + wg.Add(1) |
| 154 | + go w.intervalWatcher(ctx, wg) |
| 155 | + } |
| 156 | + |
| 157 | + for { |
| 158 | + select { |
| 159 | + case <-w.Watcher.StateChangeC(): |
| 160 | + w.performWatch(ctx, true) |
| 161 | + |
| 162 | + case <-ctx.Done(): |
| 163 | + w.Infof("Stopping on context interrupt") |
| 164 | + w.CancelGovernor() |
| 165 | + return |
| 166 | + } |
| 167 | + } |
| 168 | +} |
| 169 | + |
| 170 | +func (w *Watcher) intervalWatcher(ctx context.Context, wg *sync.WaitGroup) { |
| 171 | + defer wg.Done() |
| 172 | + |
| 173 | + tick := time.NewTicker(time.Millisecond) |
| 174 | + if w.properties.Splay { |
| 175 | + splay := rand.N(30 * time.Second) |
| 176 | + w.Infof("Performing initial execution after %v", splay) |
| 177 | + if splay < 1 { |
| 178 | + splay = 1 |
| 179 | + } |
| 180 | + tick.Reset(splay) |
| 181 | + } |
| 182 | + |
| 183 | + for { |
| 184 | + select { |
| 185 | + case <-tick.C: |
| 186 | + w.performWatch(ctx, false) |
| 187 | + tick.Reset(w.interval) |
| 188 | + |
| 189 | + case <-ctx.Done(): |
| 190 | + tick.Stop() |
| 191 | + return |
| 192 | + } |
| 193 | + } |
| 194 | +} |
| 195 | + |
| 196 | +func (w *Watcher) performWatch(ctx context.Context, force bool) { |
| 197 | + w.wmu.Lock() |
| 198 | + defer w.wmu.Unlock() |
| 199 | + |
| 200 | + if !force && time.Since(w.lastWatch) < w.interval { |
| 201 | + return |
| 202 | + } |
| 203 | + |
| 204 | + err := w.handleCheck(w.watch(ctx)) |
| 205 | + if err != nil { |
| 206 | + w.Errorf("could not handle watcher event: %s", err) |
| 207 | + } |
| 208 | +} |
| 209 | + |
| 210 | +func (w *Watcher) watch(ctx context.Context) (state State, err error) { |
| 211 | + if !w.ShouldWatch() { |
| 212 | + return Skipped, nil |
| 213 | + } |
| 214 | + |
| 215 | + var manifest io.Reader |
| 216 | + var manifestType string |
| 217 | + |
| 218 | + if len(w.properties.Manifest) > 0 { |
| 219 | + y, err := yaml.Marshal(w.properties.Manifest) |
| 220 | + if err != nil { |
| 221 | + w.Errorf("Could not marshal manifest: %v", err) |
| 222 | + return Error, fmt.Errorf("invalid manifest: %w", err) |
| 223 | + } |
| 224 | + manifest = bytes.NewReader(y) |
| 225 | + manifestType = "embedded" |
| 226 | + } else { |
| 227 | + y, err := os.ReadFile(filepath.Join(w.machine.Directory(), filepath.Base(w.properties.ManifestFile))) |
| 228 | + if err != nil { |
| 229 | + w.Errorf("Could not read manifest file: %v", err) |
| 230 | + return Error, fmt.Errorf("invalid manifest file: %w", err) |
| 231 | + } |
| 232 | + manifest = bytes.NewReader(y) |
| 233 | + manifestType = w.properties.ManifestFile |
| 234 | + } |
| 235 | + |
| 236 | + if w.properties.Governor != "" { |
| 237 | + fin, err := w.EnterGovernor(ctx, w.properties.Governor, w.properties.GovernorTimeout) |
| 238 | + if err != nil { |
| 239 | + w.Errorf("Cannot enter Governor %s: %s", w.properties.Governor, err) |
| 240 | + return Error, err |
| 241 | + } |
| 242 | + defer fin() |
| 243 | + } |
| 244 | + |
| 245 | + start := time.Now() |
| 246 | + defer func() { |
| 247 | + w.mu.Lock() |
| 248 | + w.previousRunTime = time.Since(start) |
| 249 | + w.mu.Unlock() |
| 250 | + }() |
| 251 | + |
| 252 | + w.Infof("Applying %s manifest", manifestType) |
| 253 | + |
| 254 | + timeoutCtx, cancel := context.WithTimeout(ctx, w.properties.Timeout) |
| 255 | + defer cancel() |
| 256 | + |
| 257 | + mgr, ccmLog, err := w.ccmManager(w.machine.Data(), w.machine.Facts()) |
| 258 | + if err != nil { |
| 259 | + return 0, err |
| 260 | + } |
| 261 | + |
| 262 | + _, a, err := apply.ResolveManifestReader(timeoutCtx, mgr, w.machine.Directory(), manifest) |
| 263 | + if err != nil { |
| 264 | + w.Errorf("Could not resolve manifest: %v", err) |
| 265 | + return Error, fmt.Errorf("could not resolve manifest: %w", err) |
| 266 | + } |
| 267 | + |
| 268 | + _, err = a.Execute(timeoutCtx, mgr, w.properties.HealthCheckOnly, ccmLog) |
| 269 | + if err != nil { |
| 270 | + w.Errorf("Could not apply manifest: %v", err) |
| 271 | + return Error, fmt.Errorf("could not apply manifest: %w", err) |
| 272 | + } |
| 273 | + |
| 274 | + summary, err := mgr.SessionSummary() |
| 275 | + if err != nil { |
| 276 | + w.Errorf("Could not get session summary: %v", err) |
| 277 | + return Error, fmt.Errorf("could not get session summary: %w", err) |
| 278 | + } |
| 279 | + |
| 280 | + switch { |
| 281 | + case summary.TotalResources == summary.StableResources: |
| 282 | + w.Infof("Manifest applied successfully") |
| 283 | + return Stable, nil |
| 284 | + case summary.TotalErrors > 0: |
| 285 | + w.Errorf("Manifest failed to apply with %d errors", summary.TotalErrors) |
| 286 | + return Error, nil |
| 287 | + default: |
| 288 | + w.Infof("Manifest applied with %d changes", summary.ChangedResources) |
| 289 | + return Changes, nil |
| 290 | + } |
| 291 | +} |
| 292 | + |
| 293 | +func (w *Watcher) ccmManager(data map[string]any, facts json.RawMessage) (*manager.CCM, ccmmodel.Logger, error) { |
| 294 | + var fdata map[string]any |
| 295 | + err := json.Unmarshal(facts, &fdata) |
| 296 | + if err != nil { |
| 297 | + return nil, nil, fmt.Errorf("invalid facts: %w", err) |
| 298 | + } |
| 299 | + |
| 300 | + log := NewCCMLogger(w) |
| 301 | + var opts []manager.Option |
| 302 | + if w.properties.Noop { |
| 303 | + opts = append(opts, manager.WithNoop()) |
| 304 | + } |
| 305 | + |
| 306 | + mgr, err := manager.NewManager(log, log, opts...) |
| 307 | + if err != nil { |
| 308 | + return nil, nil, err |
| 309 | + } |
| 310 | + mgr.SetFacts(fdata) |
| 311 | + mgr.SetExternalData(data) |
| 312 | + |
| 313 | + // try to figure out a sane root for things like source, file() etc in manifests |
| 314 | + wd := w.machine.Directory() |
| 315 | + if w.properties.ManifestFile != "" { |
| 316 | + if filepath.IsAbs(w.properties.ManifestFile) { |
| 317 | + wd = filepath.Dir(w.properties.ManifestFile) |
| 318 | + } else { |
| 319 | + abs, err := filepath.Abs(filepath.Join(w.machine.Directory(), filepath.Dir(w.properties.ManifestFile))) |
| 320 | + if err != nil { |
| 321 | + wd = w.machine.Directory() |
| 322 | + } else { |
| 323 | + wd = abs |
| 324 | + } |
| 325 | + } |
| 326 | + } |
| 327 | + mgr.SetWorkingDirectory(wd) |
| 328 | + |
| 329 | + return mgr, log, nil |
| 330 | +} |
| 331 | + |
| 332 | +func (w *Watcher) CurrentState() any { |
| 333 | + w.mu.Lock() |
| 334 | + defer w.mu.Unlock() |
| 335 | + |
| 336 | + s := &StateNotification{ |
| 337 | + Event: event.New(w.name, wtype, version, w.machine), |
| 338 | + PreviousOutcome: stateNames[w.previous], |
| 339 | + PreviousRunTime: w.previousRunTime.Nanoseconds(), |
| 340 | + } |
| 341 | + |
| 342 | + return s |
| 343 | +} |
| 344 | + |
| 345 | +func (w *Watcher) handleCheck(s State, err error) error { |
| 346 | + w.mu.Lock() |
| 347 | + w.previous = s |
| 348 | + w.mu.Unlock() |
| 349 | + |
| 350 | + switch s { |
| 351 | + case Error: |
| 352 | + if err != nil { |
| 353 | + w.Errorf("Check failed: %s", err) |
| 354 | + } |
| 355 | + |
| 356 | + w.NotifyWatcherState(w.CurrentState()) |
| 357 | + return w.FailureTransition() |
| 358 | + |
| 359 | + case Changes: |
| 360 | + // in noop a change means something should be done, so we treat as failure |
| 361 | + if w.properties.Noop { |
| 362 | + w.Infof("Sending failure transition") |
| 363 | + w.NotifyWatcherState(w.CurrentState()) |
| 364 | + return w.FailureTransition() |
| 365 | + } |
| 366 | + |
| 367 | + w.NotifyWatcherState(w.CurrentState()) |
| 368 | + return w.SuccessTransition() |
| 369 | + |
| 370 | + case Stable: |
| 371 | + w.NotifyWatcherState(w.CurrentState()) |
| 372 | + return w.SuccessTransition() |
| 373 | + } |
| 374 | + |
| 375 | + return nil |
| 376 | +} |
0 commit comments