Skip to content

Commit 82abc80

Browse files
committed
(#2302) Support ccm manifests in watchers
Also move hiera to ccm hiera Signed-off-by: R.I.Pienaar <rip@devco.net>
1 parent 011e89c commit 82abc80

File tree

10 files changed

+656
-130
lines changed

10 files changed

+656
-130
lines changed
Lines changed: 376 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,376 @@
1+
// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package ccmmanifestwatcher
6+
7+
import (
8+
"bytes"
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
"io"
13+
"math/rand/v2"
14+
"os"
15+
"path/filepath"
16+
"sync"
17+
"time"
18+
19+
"github.com/ghodss/yaml"
20+
21+
"github.com/choria-io/ccm/manager"
22+
ccmmodel "github.com/choria-io/ccm/model"
23+
"github.com/choria-io/ccm/resources/apply"
24+
"github.com/choria-io/go-choria/aagent/model"
25+
"github.com/choria-io/go-choria/aagent/util"
26+
"github.com/choria-io/go-choria/aagent/watchers/event"
27+
"github.com/choria-io/go-choria/aagent/watchers/watcher"
28+
iu "github.com/choria-io/go-choria/internal/util"
29+
)
30+
31+
type State int
32+
33+
const (
34+
Unknown State = iota
35+
Error
36+
Stable
37+
Changes
38+
Skipped
39+
40+
wtype = "ccm_manifest"
41+
version = "v1"
42+
)
43+
44+
var stateNames = map[State]string{
45+
Unknown: "unknown",
46+
Error: "error",
47+
Stable: "stable",
48+
Changes: "changes",
49+
Skipped: "skipped",
50+
}
51+
52+
type Properties struct {
53+
ManifestFile string `mapstructure:"manifest_file"`
54+
Manifest map[string]any `mapstructure:"manifest"`
55+
Noop bool `mapstructure:"noop"`
56+
HealthCheckOnly bool `mapstructure:"healthcheck_only"`
57+
Governor string `mapstructure:"governor"`
58+
GovernorTimeout time.Duration `mapstructure:"governor_timeout"`
59+
Splay bool `mapstructure:"splay"`
60+
Timeout time.Duration
61+
}
62+
63+
type Watcher struct {
64+
*watcher.Watcher
65+
66+
name string
67+
machine model.Machine
68+
previous State
69+
interval time.Duration
70+
previousRunTime time.Duration
71+
properties *Properties
72+
73+
lastWatch time.Time
74+
75+
wmu *sync.Mutex
76+
mu *sync.Mutex
77+
}
78+
79+
func New(machine model.Machine, name string, states []string, required []model.ForeignMachineState, failEvent string, successEvent string, interval string, ai time.Duration, rawprop map[string]any) (any, error) {
80+
var err error
81+
82+
ccm := &Watcher{
83+
machine: machine,
84+
name: name,
85+
mu: &sync.Mutex{},
86+
wmu: &sync.Mutex{},
87+
}
88+
89+
ccm.Watcher, err = watcher.NewWatcher(name, wtype, ai, states, required, machine, failEvent, successEvent)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
err = ccm.setProperties(rawprop)
95+
if err != nil {
96+
return nil, fmt.Errorf("could not set properties: %v", err)
97+
}
98+
99+
if interval != "" {
100+
ccm.interval, err = iu.ParseDuration(interval)
101+
if err != nil {
102+
return nil, fmt.Errorf("invalid interval: %v", err)
103+
}
104+
105+
if ccm.interval < 500*time.Millisecond {
106+
return nil, fmt.Errorf("interval %v is too small", ccm.interval)
107+
}
108+
}
109+
110+
return ccm, nil
111+
}
112+
113+
func (w *Watcher) setProperties(props map[string]any) error {
114+
if w.properties == nil {
115+
w.properties = &Properties{}
116+
}
117+
118+
err := util.ParseMapStructure(props, w.properties)
119+
if err != nil {
120+
return err
121+
}
122+
123+
return w.validate()
124+
}
125+
126+
func (w *Watcher) validate() error {
127+
if w.properties.ManifestFile == "" && len(w.properties.Manifest) == 0 {
128+
return fmt.Errorf("manifest_file or manifest is required")
129+
}
130+
131+
if w.properties.Timeout == 0 {
132+
w.properties.Timeout = time.Minute
133+
}
134+
135+
if w.properties.Governor != "" && w.properties.GovernorTimeout == 0 {
136+
w.Infof("Setting Governor timeout to 5 minutes while unset")
137+
w.properties.GovernorTimeout = 5 * time.Minute
138+
}
139+
140+
return nil
141+
}
142+
143+
func (w *Watcher) Run(ctx context.Context, wg *sync.WaitGroup) {
144+
defer wg.Done()
145+
146+
if len(w.properties.Manifest) > 0 {
147+
w.Infof("Watcher for embedded manifest starting")
148+
} else {
149+
w.Infof("Watcher for %s starting", w.properties.ManifestFile)
150+
}
151+
152+
if w.interval != 0 {
153+
wg.Add(1)
154+
go w.intervalWatcher(ctx, wg)
155+
}
156+
157+
for {
158+
select {
159+
case <-w.Watcher.StateChangeC():
160+
w.performWatch(ctx, true)
161+
162+
case <-ctx.Done():
163+
w.Infof("Stopping on context interrupt")
164+
w.CancelGovernor()
165+
return
166+
}
167+
}
168+
}
169+
170+
func (w *Watcher) intervalWatcher(ctx context.Context, wg *sync.WaitGroup) {
171+
defer wg.Done()
172+
173+
tick := time.NewTicker(time.Millisecond)
174+
if w.properties.Splay {
175+
splay := rand.N(30 * time.Second)
176+
w.Infof("Performing initial execution after %v", splay)
177+
if splay < 1 {
178+
splay = 1
179+
}
180+
tick.Reset(splay)
181+
}
182+
183+
for {
184+
select {
185+
case <-tick.C:
186+
w.performWatch(ctx, false)
187+
tick.Reset(w.interval)
188+
189+
case <-ctx.Done():
190+
tick.Stop()
191+
return
192+
}
193+
}
194+
}
195+
196+
func (w *Watcher) performWatch(ctx context.Context, force bool) {
197+
w.wmu.Lock()
198+
defer w.wmu.Unlock()
199+
200+
if !force && time.Since(w.lastWatch) < w.interval {
201+
return
202+
}
203+
204+
err := w.handleCheck(w.watch(ctx))
205+
if err != nil {
206+
w.Errorf("could not handle watcher event: %s", err)
207+
}
208+
}
209+
210+
func (w *Watcher) watch(ctx context.Context) (state State, err error) {
211+
if !w.ShouldWatch() {
212+
return Skipped, nil
213+
}
214+
215+
var manifest io.Reader
216+
var manifestType string
217+
218+
if len(w.properties.Manifest) > 0 {
219+
y, err := yaml.Marshal(w.properties.Manifest)
220+
if err != nil {
221+
w.Errorf("Could not marshal manifest: %v", err)
222+
return Error, fmt.Errorf("invalid manifest: %w", err)
223+
}
224+
manifest = bytes.NewReader(y)
225+
manifestType = "embedded"
226+
} else {
227+
y, err := os.ReadFile(filepath.Join(w.machine.Directory(), filepath.Base(w.properties.ManifestFile)))
228+
if err != nil {
229+
w.Errorf("Could not read manifest file: %v", err)
230+
return Error, fmt.Errorf("invalid manifest file: %w", err)
231+
}
232+
manifest = bytes.NewReader(y)
233+
manifestType = w.properties.ManifestFile
234+
}
235+
236+
if w.properties.Governor != "" {
237+
fin, err := w.EnterGovernor(ctx, w.properties.Governor, w.properties.GovernorTimeout)
238+
if err != nil {
239+
w.Errorf("Cannot enter Governor %s: %s", w.properties.Governor, err)
240+
return Error, err
241+
}
242+
defer fin()
243+
}
244+
245+
start := time.Now()
246+
defer func() {
247+
w.mu.Lock()
248+
w.previousRunTime = time.Since(start)
249+
w.mu.Unlock()
250+
}()
251+
252+
w.Infof("Applying %s manifest", manifestType)
253+
254+
timeoutCtx, cancel := context.WithTimeout(ctx, w.properties.Timeout)
255+
defer cancel()
256+
257+
mgr, ccmLog, err := w.ccmManager(w.machine.Data(), w.machine.Facts())
258+
if err != nil {
259+
return 0, err
260+
}
261+
262+
_, a, err := apply.ResolveManifestReader(timeoutCtx, mgr, w.machine.Directory(), manifest)
263+
if err != nil {
264+
w.Errorf("Could not resolve manifest: %v", err)
265+
return Error, fmt.Errorf("could not resolve manifest: %w", err)
266+
}
267+
268+
_, err = a.Execute(timeoutCtx, mgr, w.properties.MonitorOnly, ccmLog)
269+
if err != nil {
270+
w.Errorf("Could not apply manifest: %v", err)
271+
return Error, fmt.Errorf("could not apply manifest: %w", err)
272+
}
273+
274+
summary, err := mgr.SessionSummary()
275+
if err != nil {
276+
w.Errorf("Could not get session summary: %v", err)
277+
return Error, fmt.Errorf("could not get session summary: %w", err)
278+
}
279+
280+
switch {
281+
case summary.TotalResources == summary.StableResources:
282+
w.Infof("Manifest applied successfully")
283+
return Stable, nil
284+
case summary.TotalErrors > 0:
285+
w.Errorf("Manifest failed to apply with %d errors", summary.TotalErrors)
286+
return Error, nil
287+
default:
288+
w.Infof("Manifest applied with %d changes", summary.ChangedResources)
289+
return Changes, nil
290+
}
291+
}
292+
293+
func (w *Watcher) ccmManager(data map[string]any, facts json.RawMessage) (*manager.CCM, ccmmodel.Logger, error) {
294+
var fdata map[string]any
295+
err := json.Unmarshal(facts, &fdata)
296+
if err != nil {
297+
return nil, nil, fmt.Errorf("invalid facts: %w", err)
298+
}
299+
300+
log := NewCCMLogger(w)
301+
var opts []manager.Option
302+
if w.properties.Noop {
303+
opts = append(opts, manager.WithNoop())
304+
}
305+
306+
mgr, err := manager.NewManager(log, log, opts...)
307+
if err != nil {
308+
return nil, nil, err
309+
}
310+
mgr.SetFacts(fdata)
311+
mgr.SetExternalData(data)
312+
313+
// try to figure out a sane root for things like source, file() etc in manifests
314+
wd := w.machine.Directory()
315+
if w.properties.ManifestFile != "" {
316+
if filepath.IsAbs(w.properties.ManifestFile) {
317+
wd = filepath.Dir(w.properties.ManifestFile)
318+
} else {
319+
abs, err := filepath.Abs(filepath.Join(w.machine.Directory(), filepath.Dir(w.properties.ManifestFile)))
320+
if err != nil {
321+
wd = w.machine.Directory()
322+
} else {
323+
wd = abs
324+
}
325+
}
326+
}
327+
mgr.SetWorkingDirectory(wd)
328+
329+
return mgr, log, nil
330+
}
331+
332+
func (w *Watcher) CurrentState() any {
333+
w.mu.Lock()
334+
defer w.mu.Unlock()
335+
336+
s := &StateNotification{
337+
Event: event.New(w.name, wtype, version, w.machine),
338+
PreviousOutcome: stateNames[w.previous],
339+
PreviousRunTime: w.previousRunTime.Nanoseconds(),
340+
}
341+
342+
return s
343+
}
344+
345+
func (w *Watcher) handleCheck(s State, err error) error {
346+
w.mu.Lock()
347+
w.previous = s
348+
w.mu.Unlock()
349+
350+
switch s {
351+
case Error:
352+
if err != nil {
353+
w.Errorf("Check failed: %s", err)
354+
}
355+
356+
w.NotifyWatcherState(w.CurrentState())
357+
return w.FailureTransition()
358+
359+
case Changes:
360+
// in noop a change means something should be done, so we treat as failure
361+
if w.properties.Noop {
362+
w.Infof("Sending failure transition")
363+
w.NotifyWatcherState(w.CurrentState())
364+
return w.FailureTransition()
365+
}
366+
367+
w.NotifyWatcherState(w.CurrentState())
368+
return w.SuccessTransition()
369+
370+
case Stable:
371+
w.NotifyWatcherState(w.CurrentState())
372+
return w.SuccessTransition()
373+
}
374+
375+
return nil
376+
}

0 commit comments

Comments
 (0)