Skip to content

Commit 007ba52

Browse files
authored
support metadata service (#49)
* support metadata service * fix lint * fix lint * cache token * fix lint * fix tests
1 parent de67755 commit 007ba52

File tree

9 files changed

+774
-55
lines changed

9 files changed

+774
-55
lines changed

debian/usr/sbin/nebius-observability-agent-updater-run.sh

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@
22
set -ex
33
UPDATER_ENDPOINT=observability-agent-manager.api.nebius.cloud
44

5-
UPDATER_ENDPOINT_PATH_OVERRIDE=/mnt/cloud-metadata/updater-endpoint-override
6-
if [ -f $UPDATER_ENDPOINT_PATH_OVERRIDE ]; then
7-
UPDATER_ENDPOINT=$(cat $UPDATER_ENDPOINT_PATH_OVERRIDE)
5+
METADATA_BASE_URL="http://metadata.nebius.internal"
6+
METADATA_FALLBACK_URL="http://169.254.169.254"
7+
METADATA_HEADER="Metadata: true"
8+
9+
# Try to get updater endpoint override from IMDS
10+
OVERRIDE=$(curl -s -f -H "$METADATA_HEADER" "${METADATA_BASE_URL}/v1/instance-data/o11y/updater-endpoint-override" 2>/dev/null || \
11+
curl -s -f -H "$METADATA_HEADER" "${METADATA_FALLBACK_URL}/v1/instance-data/o11y/updater-endpoint-override" 2>/dev/null || true)
12+
if [ -n "$OVERRIDE" ]; then
13+
UPDATER_ENDPOINT="$OVERRIDE"
814
fi
915

1016
export GOMAXPROCS=1

internal/client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,13 @@ func (s *Client) fillMetadataInfo(req *agentmanager.GetVersionRequest) {
251251
req.ParentId = parentId
252252
}
253253

254-
instanceId, cloudInitFallback, err := s.metadata.GetInstanceId()
254+
instanceId, instanceIdFallback, err := s.metadata.GetInstanceId()
255255
if err != nil {
256256
s.logger.Error("failed to get instance id", "error", err)
257257
} else {
258258
req.InstanceId = instanceId
259259
}
260-
req.InstanceIdUsedFallback = cloudInitFallback
260+
req.InstanceIdUsedFallback = instanceIdFallback
261261
}
262262

263263
func (s *Client) fillOSInfo(req *agentmanager.GetVersionRequest) {

internal/client/client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (m *mockMetadataReader) GetParentId() (string, error) {
3838

3939
func (m *mockMetadataReader) GetInstanceId() (string, bool, error) {
4040
args := m.Called()
41-
return args.String(0), false, args.Error(1)
41+
return args.String(0), args.Bool(1), args.Error(2)
4242
}
4343

4444
func (m *mockMetadataReader) GetIamToken() (string, error) {
@@ -212,7 +212,7 @@ func TestSendAgentData(t *testing.T) {
212212

213213
// Set up mock expectations
214214
metadata.On("GetParentId").Return("parent-123", nil)
215-
metadata.On("GetInstanceId").Return("instance-456", nil)
215+
metadata.On("GetInstanceId").Return("instance-456", false, nil)
216216
oh.On("GetDebVersion", mock.Anything).Return("1.0.0", nil)
217217
oh.On("GetServiceUptime", mock.Anything).Return(10*time.Minute, nil)
218218
oh.On("GetSystemUptime").Return(1*time.Hour, nil)
@@ -265,7 +265,7 @@ func TestFillRequest(t *testing.T) {
265265

266266
// Set up mock expectations
267267
metadata.On("GetParentId").Return("parent-123", nil)
268-
metadata.On("GetInstanceId").Return("instance-456", nil)
268+
metadata.On("GetInstanceId").Return("instance-456", false, nil)
269269
oh.On("GetDebVersion", mock.Anything).Return("1.0.0", nil)
270270
oh.On("GetServiceUptime", mock.Anything).Return(10*time.Minute, nil)
271271
oh.On("GetSystemUptime").Return(1*time.Hour, nil)
@@ -396,7 +396,7 @@ func TestSendAgentDataWithRetry(t *testing.T) {
396396

397397
// Set up mock expectations
398398
metadata.On("GetParentId").Return("parent-123", nil)
399-
metadata.On("GetInstanceId").Return("instance-456", nil)
399+
metadata.On("GetInstanceId").Return("instance-456", false, nil)
400400
oh.On("GetDebVersion", mock.Anything).Return("1.0.0", nil)
401401
oh.On("GetServiceUptime", mock.Anything).Return(10*time.Minute, nil)
402402
oh.On("GetSystemUptime").Return(1*time.Hour, nil)
@@ -470,7 +470,7 @@ func TestSendAgentDataWithRetryFailure(t *testing.T) {
470470

471471
// Set up mock expectations (same as in the previous test)
472472
metadata.On("GetParentId").Return("parent-123", nil)
473-
metadata.On("GetInstanceId").Return("instance-456", nil)
473+
metadata.On("GetInstanceId").Return("instance-456", false, nil)
474474
oh.On("GetDebVersion", mock.Anything).Return("1.0.0", nil)
475475
oh.On("GetServiceUptime", mock.Anything).Return(10*time.Minute, nil)
476476
oh.On("GetSystemUptime").Return(1*time.Hour, nil)
@@ -534,7 +534,7 @@ func TestFillRequestDebNotFound(t *testing.T) {
534534
oh.On("GetDebVersion", "test-agent-package").Return("", osutils.ErrDebNotFound)
535535
oh.On("GetDebVersion", "nebius-observability-agent-updater").Return("", osutils.ErrDebNotFound)
536536
metadata.On("GetParentId").Return("parent-123", nil)
537-
metadata.On("GetInstanceId").Return("instance-456", nil)
537+
metadata.On("GetInstanceId").Return("instance-456", false, nil)
538538
oh.On("GetOsName").Return("Linux", nil)
539539
oh.On("GetUname").Return("Linux 5.4.0-generic", nil)
540540
oh.On("GetArch").Return("x86_64", nil)

internal/config/config.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@ func GetDefaultConfig() *Config {
2525
PollInterval: time.Minute,
2626
PollJitter: 30 * time.Second,
2727
Metadata: metadata.Config{
28-
Path: "/mnt/cloud-metadata",
29-
ParentIdFilename: "parent-id",
30-
InstanceIdFilename: "instance-id",
31-
IamTokenFilename: "tsa-token",
28+
Path: "/mnt/cloud-metadata",
29+
ParentIdFilename: "parent-id",
30+
InstanceIdFilename: "instance-id",
31+
IamTokenFilename: "tsa-token",
32+
UseMetadataService: true,
33+
MetadataServiceURL: "http://metadata.nebius.internal",
34+
MetadataServiceFallbackURL: "http://169.254.169.254",
35+
MetadataTokenType: "tsa",
3236
},
3337
Mk8sClusterIdPath: "/usr/local/etc/mk8s-cluster-id",
3438
HealthCheckPath: "/var/log/nebius-logs",

internal/metadata/metadata.go

Lines changed: 187 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,225 @@ package metadata
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
7+
"io"
68
"log/slog"
9+
"net/http"
710
"os"
8-
"os/exec"
911
"strings"
12+
"sync"
1013
"time"
1114
)
1215

1316
type Config struct {
14-
Path string `yaml:"path"`
15-
ParentIdFilename string `yaml:"parent_id_filename"`
16-
InstanceIdFilename string `yaml:"instance_id_filename"`
17-
IamTokenFilename string `yaml:"iam_token_filename"`
18-
Mk8sClusterIdFilename string `yaml:"mk8s_cluster_id_filename"`
17+
Path string `yaml:"path"`
18+
ParentIdFilename string `yaml:"parent_id_filename"`
19+
InstanceIdFilename string `yaml:"instance_id_filename"`
20+
IamTokenFilename string `yaml:"iam_token_filename"`
21+
Mk8sClusterIdFilename string `yaml:"mk8s_cluster_id_filename"`
22+
UseMetadataService bool `yaml:"use_metadata_service"`
23+
MetadataServiceURL string `yaml:"metadata_service_url"`
24+
MetadataServiceFallbackURL string `yaml:"metadata_service_fallback_url"`
25+
MetadataTokenType string `yaml:"metadata_token_type"`
26+
}
27+
28+
type instanceData struct {
29+
ID string `json:"id"`
30+
ParentID string `json:"parent_id"`
31+
}
32+
33+
const instanceDataCacheTTL = 5 * time.Minute
34+
35+
// tokenRefreshMargin is how long before expiry we refresh the token
36+
const tokenRefreshMargin = 1 * time.Hour
37+
38+
type cachedToken struct {
39+
token string
40+
expiresAt time.Time
1941
}
2042

2143
type Reader struct {
2244
cfg Config
2345
logger *slog.Logger
46+
client *http.Client
47+
48+
mu sync.Mutex
49+
cachedInstance *instanceData
50+
cachedFetchedAt time.Time
51+
52+
tokenMu sync.Mutex
53+
cachedIAM *cachedToken
2454
}
2555

2656
func NewReader(cfg Config, logger *slog.Logger) *Reader {
27-
return &Reader{cfg: cfg, logger: logger}
57+
return &Reader{
58+
cfg: cfg,
59+
logger: logger,
60+
client: &http.Client{Timeout: 5 * time.Second},
61+
}
2862
}
2963

3064
func (r *Reader) GetParentId() (string, error) {
65+
if r.cfg.UseMetadataService {
66+
data, err := r.getInstanceData()
67+
if err == nil {
68+
return data.ParentID, nil
69+
}
70+
r.logger.Warn("Failed to get parent_id from IMDS, falling back to file", "error", err)
71+
}
3172
return r.readAndTrimFile(r.cfg.Path + "/" + r.cfg.ParentIdFilename)
3273
}
3374

3475
func (r *Reader) GetInstanceId() (instanceId string, isFallback bool, err error) {
35-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
36-
defer cancel()
76+
instanceId, err = r.readAndTrimFile(r.cfg.Path + "/" + r.cfg.InstanceIdFilename)
77+
if err == nil {
78+
return instanceId, false, nil
79+
}
80+
r.logger.Warn("Failed to get instance_id from file, falling back to IMDS", "error", err)
3781

38-
cmd := exec.CommandContext(ctx, "cloud-init", "query", "instance-id")
39-
output, err := cmd.Output()
40-
if err != nil {
41-
instanceId, err2 := r.readAndTrimFile(r.cfg.Path + "/" + r.cfg.InstanceIdFilename)
42-
if err2 != nil {
43-
return "", true, fmt.Errorf("failed to call cloud-init query instance-id: %w and read from file: %w", err, err2)
82+
if r.cfg.UseMetadataService {
83+
data, imdsErr := r.getInstanceData()
84+
if imdsErr == nil {
85+
return data.ID, true, nil
4486
}
45-
return instanceId, true, nil
87+
return "", true, fmt.Errorf("failed to get instance_id from file: %w and from IMDS: %w", err, imdsErr)
4688
}
47-
return strings.TrimSpace(string(output)), false, nil
89+
return "", false, err
4890
}
4991

5092
func (r *Reader) GetIamToken() (string, error) {
93+
if r.cfg.UseMetadataService {
94+
token, err := r.getCachedIAMToken()
95+
if err == nil {
96+
return token, nil
97+
}
98+
r.logger.Warn("Failed to get IAM token from IMDS, falling back to file", "error", err)
99+
}
51100
return r.readAndTrimFile(r.cfg.Path + "/" + r.cfg.IamTokenFilename)
52101
}
53102

103+
func (r *Reader) getCachedIAMToken() (string, error) {
104+
r.tokenMu.Lock()
105+
defer r.tokenMu.Unlock()
106+
107+
if r.cachedIAM != nil && time.Until(r.cachedIAM.expiresAt) > tokenRefreshMargin {
108+
return r.cachedIAM.token, nil
109+
}
110+
111+
tokenPath := fmt.Sprintf("/v1/iam/%s/token/access_token", r.cfg.MetadataTokenType)
112+
body, err := r.fetchFromMetadataService(tokenPath)
113+
if err != nil {
114+
if r.cachedIAM != nil && time.Until(r.cachedIAM.expiresAt) > 0 {
115+
r.logger.Warn("Failed to refresh IAM token, using cached token until expiry", "error", err, "expires_at", r.cachedIAM.expiresAt)
116+
return r.cachedIAM.token, nil
117+
}
118+
return "", fmt.Errorf("failed to fetch IAM token from IMDS: %w", err)
119+
}
120+
token := strings.TrimSpace(string(body))
121+
122+
expiresAt, err := r.fetchTokenExpiresAt()
123+
if err != nil {
124+
r.logger.Warn("Failed to get token expiry from IMDS, using default TTL", "error", err)
125+
expiresAt = time.Now().Add(instanceDataCacheTTL)
126+
}
127+
128+
if time.Until(expiresAt) <= 0 {
129+
return "", fmt.Errorf("token from IMDS is already expired (expires_at: %s)", expiresAt.Format(time.RFC3339Nano))
130+
}
131+
132+
r.cachedIAM = &cachedToken{
133+
token: token,
134+
expiresAt: expiresAt,
135+
}
136+
return token, nil
137+
}
138+
139+
func (r *Reader) fetchTokenExpiresAt() (time.Time, error) {
140+
expiresAtPath := fmt.Sprintf("/v1/iam/%s/token/expires_at", r.cfg.MetadataTokenType)
141+
body, err := r.fetchFromMetadataService(expiresAtPath)
142+
if err != nil {
143+
return time.Time{}, fmt.Errorf("failed to fetch token expires_at: %w", err)
144+
}
145+
expiresAt, err := time.Parse(time.RFC3339Nano, strings.TrimSpace(string(body)))
146+
if err != nil {
147+
return time.Time{}, fmt.Errorf("failed to parse expires_at timestamp: %w", err)
148+
}
149+
return expiresAt, nil
150+
}
151+
152+
func (r *Reader) getInstanceData() (*instanceData, error) {
153+
r.mu.Lock()
154+
defer r.mu.Unlock()
155+
156+
if r.cachedInstance != nil && time.Since(r.cachedFetchedAt) < instanceDataCacheTTL {
157+
return r.cachedInstance, nil
158+
}
159+
160+
body, err := r.fetchFromMetadataService("/v1/instance-data")
161+
if err != nil {
162+
if r.cachedInstance != nil {
163+
r.logger.Warn("Failed to refresh instance-data from IMDS, using stale cache", "error", err)
164+
return r.cachedInstance, nil
165+
}
166+
return nil, fmt.Errorf("failed to fetch instance-data from IMDS: %w", err)
167+
}
168+
169+
var data instanceData
170+
if err := json.Unmarshal(body, &data); err != nil {
171+
if r.cachedInstance != nil {
172+
r.logger.Warn("Failed to parse instance-data JSON, using stale cache", "error", err)
173+
return r.cachedInstance, nil
174+
}
175+
return nil, fmt.Errorf("failed to parse instance-data JSON: %w", err)
176+
}
177+
178+
r.cachedInstance = &data
179+
r.cachedFetchedAt = time.Now()
180+
return r.cachedInstance, nil
181+
}
182+
183+
func (r *Reader) fetchFromMetadataService(path string) ([]byte, error) {
184+
urls := []string{r.cfg.MetadataServiceURL, r.cfg.MetadataServiceFallbackURL}
185+
var lastErr error
186+
for _, baseURL := range urls {
187+
body, err := r.doMetadataRequest(baseURL + path)
188+
if err == nil {
189+
return body, nil
190+
}
191+
lastErr = err
192+
r.logger.Debug("IMDS request failed", "url", baseURL+path, "error", err)
193+
}
194+
return nil, fmt.Errorf("all IMDS URLs failed: %w", lastErr)
195+
}
196+
197+
func (r *Reader) doMetadataRequest(url string) ([]byte, error) {
198+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
199+
defer cancel()
200+
201+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
202+
if err != nil {
203+
return nil, fmt.Errorf("failed to create request: %w", err)
204+
}
205+
req.Header.Set("Metadata", "true")
206+
207+
resp, err := r.client.Do(req)
208+
if err != nil {
209+
return nil, fmt.Errorf("request failed: %w", err)
210+
}
211+
defer resp.Body.Close()
212+
213+
if resp.StatusCode != http.StatusOK {
214+
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
215+
}
216+
217+
body, err := io.ReadAll(resp.Body)
218+
if err != nil {
219+
return nil, fmt.Errorf("failed to read response body: %w", err)
220+
}
221+
return body, nil
222+
}
223+
54224
func (r *Reader) readAndTrimFile(filename string) (string, error) {
55225
r.logger.Debug("Reading file", "filename", filename)
56226
content, err := os.ReadFile(filename)

0 commit comments

Comments
 (0)