Skip to content

Commit 5c59237

Browse files
authored
add spec folder to collect mc spec (#64)
1 parent a68b8d0 commit 5c59237

File tree

5 files changed

+344
-2
lines changed

5 files changed

+344
-2
lines changed

commands.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.goms.io/aks/AKSFlexNode/pkg/bootstrapper"
1515
"go.goms.io/aks/AKSFlexNode/pkg/config"
1616
"go.goms.io/aks/AKSFlexNode/pkg/logger"
17+
"go.goms.io/aks/AKSFlexNode/pkg/spec"
1718
"go.goms.io/aks/AKSFlexNode/pkg/status"
1819
)
1920

@@ -124,7 +125,7 @@ func runDaemonLoop(ctx context.Context, cfg *config.Config) error {
124125
// Create status file directory - using runtime directory for service or temp for development
125126
statusFilePath := status.GetStatusFilePath()
126127
statusDir := filepath.Dir(statusFilePath)
127-
if err := os.MkdirAll(statusDir, 0750); err != nil {
128+
if err := os.MkdirAll(statusDir, 0o750); err != nil {
128129
return fmt.Errorf("failed to create status directory %s: %w", statusDir, err)
129130
}
130131

@@ -143,14 +144,21 @@ func runDaemonLoop(ctx context.Context, cfg *config.Config) error {
143144
// Create tickers for different intervals
144145
statusTicker := time.NewTicker(1 * time.Minute)
145146
bootstrapTicker := time.NewTicker(2 * time.Minute)
147+
specTicker := time.NewTicker(30 * time.Minute)
146148
defer statusTicker.Stop()
147149
defer bootstrapTicker.Stop()
150+
defer specTicker.Stop()
148151

149152
// Collect status immediately on start
150153
if err := collectAndWriteStatus(ctx, cfg, statusFilePath); err != nil {
151154
logger.Errorf("Failed to collect initial status: %v", err)
152155
}
153156

157+
// Collect managed cluster spec once on daemon startup.
158+
if err := collectAndWriteManagedClusterSpec(ctx, cfg); err != nil {
159+
logger.Warnf("Failed to collect initial managed cluster spec: %v", err)
160+
}
161+
154162
// Run the periodic collection and monitoring loop
155163
for {
156164
select {
@@ -173,10 +181,24 @@ func runDaemonLoop(ctx context.Context, cfg *config.Config) error {
173181
} else {
174182
logger.Infof("Bootstrap health check completed at %s", time.Now().Format("2006-01-02 15:04:05"))
175183
}
184+
case <-specTicker.C:
185+
logger.Infof("Starting periodic managed cluster spec collection at %s...", time.Now().Format("2006-01-02 15:04:05"))
186+
if err := collectAndWriteManagedClusterSpec(ctx, cfg); err != nil {
187+
logger.Warnf("Failed to collect managed cluster spec at %s: %v", time.Now().Format("2006-01-02 15:04:05"), err)
188+
} else {
189+
logger.Infof("Managed cluster spec collection completed at %s", time.Now().Format("2006-01-02 15:04:05"))
190+
}
176191
}
177192
}
178193
}
179194

195+
func collectAndWriteManagedClusterSpec(ctx context.Context, cfg *config.Config) error {
196+
logger := logger.GetLoggerFromContext(ctx)
197+
collector := spec.NewManagedClusterSpecCollector(cfg, logger)
198+
_, err := collector.Collect(ctx)
199+
return err
200+
}
201+
180202
// checkAndBootstrap checks if the node needs re-bootstrapping and performs it if necessary
181203
func checkAndBootstrap(ctx context.Context, cfg *config.Config) error {
182204
logger := logger.GetLoggerFromContext(ctx)
@@ -242,7 +264,7 @@ func collectAndWriteStatus(ctx context.Context, cfg *config.Config, statusFilePa
242264

243265
// Write to temporary file first, then rename (atomic operation)
244266
tempFile := statusFilePath + ".tmp"
245-
if err := os.WriteFile(tempFile, statusData, 0600); err != nil {
267+
if err := os.WriteFile(tempFile, statusData, 0o600); err != nil {
246268
return fmt.Errorf("failed to write status to temp file: %w", err)
247269
}
248270

pkg/spec/collector.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package spec
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"os"
8+
"path/filepath"
9+
"time"
10+
11+
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v5"
12+
"github.com/sirupsen/logrus"
13+
14+
"go.goms.io/aks/AKSFlexNode/pkg/auth"
15+
"go.goms.io/aks/AKSFlexNode/pkg/config"
16+
"go.goms.io/aks/AKSFlexNode/pkg/utils"
17+
)
18+
19+
// ManagedClusterClient is the subset of the Azure SDK managed clusters client we need.
20+
// It exists to allow lightweight mocking in unit tests.
21+
type ManagedClusterClient interface {
22+
Get(ctx context.Context, resourceGroupName, resourceName string, options *armcontainerservice.ManagedClustersClientGetOptions) (armcontainerservice.ManagedClustersClientGetResponse, error)
23+
}
24+
25+
// ManagedClusterSpecEnricher enriches the collected spec snapshot based on the managed cluster response.
26+
// It enables adding more spec signals in the future without changing the collector control-flow.
27+
type ManagedClusterSpecEnricher func(spec *ManagedClusterSpec, resp armcontainerservice.ManagedClustersClientGetResponse) error
28+
29+
// ManagedClusterSpecCollector collects spec from the target AKS managed cluster
30+
// and persists it to a local file for later checking.
31+
type ManagedClusterSpecCollector struct {
32+
cfg *config.Config
33+
logger *logrus.Logger
34+
35+
authProvider *auth.AuthProvider
36+
client ManagedClusterClient
37+
38+
outputPath string
39+
40+
enrichers []ManagedClusterSpecEnricher
41+
}
42+
43+
// NewManagedClusterSpecCollector creates a collector that writes to the default spec path.
44+
// The Azure client is created lazily on the first Collect call.
45+
func NewManagedClusterSpecCollector(cfg *config.Config, logger *logrus.Logger) *ManagedClusterSpecCollector {
46+
c := &ManagedClusterSpecCollector{
47+
cfg: cfg,
48+
logger: logger,
49+
authProvider: auth.NewAuthProvider(),
50+
outputPath: GetManagedClusterSpecFilePath(),
51+
}
52+
// Keep KubernetesVersion, fqdn required for now; more enrichers can be added over time.
53+
c.enrichers = []ManagedClusterSpecEnricher{enrichKubernetesVersionRequired, enrichFQDNRequired}
54+
return c
55+
}
56+
57+
// NewManagedClusterSpecCollectorWithClient allows injecting a ManagedClusterClient and output path (primarily for tests).
58+
func NewManagedClusterSpecCollectorWithClient(cfg *config.Config, logger *logrus.Logger, client ManagedClusterClient, outputPath string) *ManagedClusterSpecCollector {
59+
c := NewManagedClusterSpecCollector(cfg, logger)
60+
c.client = client
61+
if outputPath != "" {
62+
c.outputPath = outputPath
63+
}
64+
return c
65+
}
66+
67+
// AddEnricher appends a spec enricher.
68+
func (c *ManagedClusterSpecCollector) AddEnricher(enricher ManagedClusterSpecEnricher) {
69+
if c == nil || enricher == nil {
70+
return
71+
}
72+
c.enrichers = append(c.enrichers, enricher)
73+
}
74+
75+
// Collect queries the Azure managed cluster resource to retrieve a spec snapshot.
76+
// It writes a JSON payload to the configured output path and returns the collected spec.
77+
func (c *ManagedClusterSpecCollector) Collect(ctx context.Context) (*ManagedClusterSpec, error) {
78+
if c == nil {
79+
return nil, fmt.Errorf("collector is nil")
80+
}
81+
if c.cfg == nil {
82+
return nil, fmt.Errorf("config is nil")
83+
}
84+
if c.logger == nil {
85+
c.logger = logrus.New()
86+
}
87+
88+
clusterName := c.cfg.GetTargetClusterName()
89+
clusterRG := c.cfg.GetTargetClusterResourceGroup()
90+
if clusterName == "" || clusterRG == "" {
91+
return nil, fmt.Errorf("target cluster name/resourceGroup missing (name=%q, resourceGroup=%q)", clusterName, clusterRG)
92+
}
93+
94+
if c.client == nil {
95+
subscriptionID := c.cfg.GetTargetClusterSubscriptionID()
96+
if subscriptionID == "" {
97+
subscriptionID = c.cfg.GetSubscriptionID()
98+
}
99+
if subscriptionID == "" {
100+
return nil, fmt.Errorf("subscription ID missing")
101+
}
102+
103+
cred, err := c.authProvider.UserCredential(c.cfg)
104+
if err != nil {
105+
return nil, fmt.Errorf("failed to get credential: %w", err)
106+
}
107+
108+
mcClient, err := armcontainerservice.NewManagedClustersClient(subscriptionID, cred, nil)
109+
if err != nil {
110+
return nil, fmt.Errorf("failed to create managed clusters client: %w", err)
111+
}
112+
c.client = mcClient
113+
}
114+
115+
c.logger.Infof("Collecting managed cluster spec for %s/%s", clusterRG, clusterName)
116+
resp, err := c.client.Get(ctx, clusterRG, clusterName, nil)
117+
if err != nil {
118+
return nil, fmt.Errorf("failed to get AKS managed cluster via SDK: %w", err)
119+
}
120+
121+
spec := &ManagedClusterSpec{
122+
SchemaVersion: ManagedClusterSpecSchemaVersion,
123+
ClusterResourceID: c.cfg.GetTargetClusterID(),
124+
ClusterName: clusterName,
125+
ResourceGroup: clusterRG,
126+
CollectedAt: time.Now().UTC(),
127+
}
128+
129+
for _, enricher := range c.enrichers {
130+
if enricher == nil {
131+
continue
132+
}
133+
if err := enricher(spec, resp); err != nil {
134+
return nil, err
135+
}
136+
}
137+
138+
data, err := json.MarshalIndent(spec, "", " ")
139+
if err != nil {
140+
return nil, fmt.Errorf("failed to marshal spec JSON: %w", err)
141+
}
142+
data = append(data, '\n')
143+
144+
if err := os.MkdirAll(filepath.Dir(c.outputPath), 0o750); err != nil {
145+
return nil, fmt.Errorf("failed to create spec output directory: %w", err)
146+
}
147+
if err := utils.WriteFileAtomicSystem(c.outputPath, data, 0o644); err != nil {
148+
return nil, fmt.Errorf("failed to write managed cluster spec file: %w", err)
149+
}
150+
151+
return spec, nil
152+
}
153+
154+
func enrichKubernetesVersionRequired(spec *ManagedClusterSpec, resp armcontainerservice.ManagedClustersClientGetResponse) error {
155+
if spec == nil {
156+
return fmt.Errorf("spec is nil")
157+
}
158+
// set kubernetesVersion
159+
if resp.Properties == nil || resp.Properties.KubernetesVersion == nil || *resp.Properties.KubernetesVersion == "" {
160+
return fmt.Errorf("managed cluster kubernetesVersion is empty")
161+
}
162+
spec.KubernetesVersion = *resp.Properties.KubernetesVersion
163+
164+
// set currentKubernetesVersion
165+
if resp.Properties == nil || resp.Properties.CurrentKubernetesVersion == nil || *resp.Properties.CurrentKubernetesVersion == "" {
166+
return fmt.Errorf("managed cluster currentKubernetesVersion is empty")
167+
}
168+
spec.CurrentKubernetesVersion = *resp.Properties.CurrentKubernetesVersion
169+
return nil
170+
}
171+
172+
func enrichFQDNRequired(spec *ManagedClusterSpec, resp armcontainerservice.ManagedClustersClientGetResponse) error {
173+
if spec == nil {
174+
return fmt.Errorf("spec is nil")
175+
}
176+
if resp.Properties == nil || resp.Properties.Fqdn == nil || *resp.Properties.Fqdn == "" {
177+
return fmt.Errorf("managed cluster FQDN is empty")
178+
}
179+
spec.Fqdn = *resp.Properties.Fqdn
180+
return nil
181+
}

pkg/spec/collector_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package spec
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"os"
7+
"path/filepath"
8+
"testing"
9+
10+
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v5"
11+
"github.com/sirupsen/logrus"
12+
13+
"go.goms.io/aks/AKSFlexNode/pkg/config"
14+
)
15+
16+
type fakeManagedClusterClient struct {
17+
resp armcontainerservice.ManagedClustersClientGetResponse
18+
err error
19+
}
20+
21+
func (f *fakeManagedClusterClient) Get(ctx context.Context, resourceGroupName, resourceName string, options *armcontainerservice.ManagedClustersClientGetOptions) (armcontainerservice.ManagedClustersClientGetResponse, error) {
22+
return f.resp, f.err
23+
}
24+
25+
func ptr[T any](v T) *T { return &v }
26+
27+
func TestManagedClusterSpecCollector_Collect_WritesFile(t *testing.T) {
28+
cfg := &config.Config{
29+
Azure: config.AzureConfig{
30+
SubscriptionID: "sub",
31+
TargetCluster: &config.TargetClusterConfig{
32+
Name: "c1",
33+
ResourceGroup: "rg1",
34+
ResourceID: "/subscriptions/sub/resourceGroups/rg1/providers/Microsoft.ContainerService/managedClusters/c1",
35+
},
36+
},
37+
}
38+
39+
outDir := t.TempDir()
40+
outPath := filepath.Join(outDir, "managedcluster.json")
41+
42+
resp := armcontainerservice.ManagedClustersClientGetResponse{
43+
ManagedCluster: armcontainerservice.ManagedCluster{
44+
Properties: &armcontainerservice.ManagedClusterProperties{
45+
KubernetesVersion: ptr("1.30.1"),
46+
CurrentKubernetesVersion: ptr("1.30.9"),
47+
Fqdn: ptr("c1-12345.hcp.eastus.azmk8s.io"),
48+
},
49+
},
50+
}
51+
52+
collector := NewManagedClusterSpecCollectorWithClient(cfg, logrus.New(), &fakeManagedClusterClient{resp: resp}, outPath)
53+
_, err := collector.Collect(context.Background())
54+
if err != nil {
55+
t.Fatalf("Collect() error = %v", err)
56+
}
57+
58+
b, err := os.ReadFile(outPath)
59+
if err != nil {
60+
t.Fatalf("ReadFile() error = %v", err)
61+
}
62+
63+
var got ManagedClusterSpec
64+
if err := json.Unmarshal(b, &got); err != nil {
65+
t.Fatalf("Unmarshal() error = %v", err)
66+
}
67+
if got.KubernetesVersion != "1.30.1" {
68+
t.Fatalf("expected version 1.30.1, got %q", got.KubernetesVersion)
69+
}
70+
if got.CurrentKubernetesVersion != "1.30.9" {
71+
t.Fatalf("expected current version 1.30.9, got %q", got.CurrentKubernetesVersion)
72+
}
73+
if got.Fqdn != "c1-12345.hcp.eastus.azmk8s.io" {
74+
t.Fatalf("expected fqdn %q, got %q", "c1-12345.hcp.eastus.azmk8s.io", got.Fqdn)
75+
}
76+
if got.SchemaVersion != ManagedClusterSpecSchemaVersion {
77+
t.Fatalf("expected schemaVersion %d, got %d", ManagedClusterSpecSchemaVersion, got.SchemaVersion)
78+
}
79+
if got.ClusterName != "c1" || got.ResourceGroup != "rg1" {
80+
t.Fatalf("unexpected cluster metadata: %+v", got)
81+
}
82+
}
83+
84+
func TestManagedClusterSpecCollector_Collect_MissingClusterInfo(t *testing.T) {
85+
cfg := &config.Config{Azure: config.AzureConfig{SubscriptionID: "sub", TargetCluster: &config.TargetClusterConfig{}}}
86+
collector := NewManagedClusterSpecCollectorWithClient(cfg, logrus.New(), &fakeManagedClusterClient{}, filepath.Join(t.TempDir(), "x.json"))
87+
if _, err := collector.Collect(context.Background()); err == nil {
88+
t.Fatalf("expected error, got nil")
89+
}
90+
}

pkg/spec/paths.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package spec
2+
3+
import (
4+
"os/user"
5+
"path/filepath"
6+
)
7+
8+
// GetSpecDir returns the appropriate directory for spec artifacts.
9+
// Uses /run/aks-flex-node when running as aks-flex-node user (systemd service)
10+
// Uses /tmp/aks-flex-node for direct user execution (testing/development)
11+
func GetSpecDir() string {
12+
specDir := "/tmp/aks-flex-node"
13+
currentUser, err := user.Current()
14+
if err == nil && currentUser.Username == "aks-flex-node" {
15+
specDir = "/run/aks-flex-node"
16+
}
17+
return specDir
18+
}
19+
20+
// GetManagedClusterSpecFilePath returns the path where the managed cluster spec snapshot is stored.
21+
func GetManagedClusterSpecFilePath() string {
22+
return filepath.Join(GetSpecDir(), "managedcluster-spec.json")
23+
}

pkg/spec/types.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package spec
2+
3+
import "time"
4+
5+
const (
6+
// ManagedClusterSpecSchemaVersion is incremented when the persisted JSON schema changes.
7+
ManagedClusterSpecSchemaVersion = 1
8+
)
9+
10+
// ManagedClusterSpec is the persisted spec snapshot of the target AKS managed cluster.
11+
// It is intentionally extensible so we can add more fields over time without rewriting the collector.
12+
type ManagedClusterSpec struct {
13+
SchemaVersion int `json:"schemaVersion"`
14+
15+
ClusterResourceID string `json:"clusterResourceId,omitempty"`
16+
ClusterName string `json:"clusterName,omitempty"`
17+
ResourceGroup string `json:"resourceGroup,omitempty"`
18+
19+
// KubernetesVersion is kept as a first-class field because many components care about it.
20+
KubernetesVersion string `json:"kubernetesVersion,omitempty"` // "e.g., 1.32"
21+
CurrentKubernetesVersion string `json:"currentKubernetesVersion,omitempty"` // "e.g., 1.32.7"
22+
Fqdn string `json:"fqdn,omitempty"`
23+
24+
// metadata
25+
CollectedAt time.Time `json:"collectedAt"`
26+
}

0 commit comments

Comments
 (0)