Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion plugins/inputs/nutanix/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ The Nutanix PRISM plugin uses the Nutanix API to gather metrics.
enabled_services = ["cluster", "hosts", "disks", "vms"]

## Amount of time allowed to complete the HTTP(s) request.
# timeout = "5s"
# response_timeout = "5s"

## Maximum number of consecutive failures before skipping all services.
## After reaching this limit, all services will be skipped in subsequent gather cycles.
## The plugin will automatically retry when connection is restored.
## Default is 3.
# max_retries = 3

## Optional TLS Config
# tls_ca = /path/to/cafile
Expand Down
76 changes: 71 additions & 5 deletions plugins/inputs/nutanix/nutanix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ type Nutanix struct {
EnabledServices []string `toml:"enabled_services"`
Log telegraf.Logger `toml:"-"`

ResponseTimeout config.Duration
ResponseTimeout config.Duration `toml:"response_timeout"`
MaxRetries int `toml:"max_retries"`
tls.ClientConfig

client *http.Client
sessionCookie []*http.Cookie

mu sync.Mutex
mu sync.Mutex
failureCount int // Track failure count for the entire connection
skipLogged bool // Track if we've already logged the skip message
}

func (*Nutanix) SampleConfig() string {
Expand All @@ -56,29 +59,77 @@ func (n *Nutanix) Init() error {
return errors.New("username or password can not be empty string")
}

if n.MaxRetries <= 0 {
n.MaxRetries = 3 // Default to 3 retries
}

client, err := n.createHTTPClient()
if err != nil {
return err
}

n.client = client
n.failureCount = 0
n.skipLogged = false

return nil
}

// Gather gathers resources from the Nutanix API and accumulates metrics. This
// implements the Input interface.
func (n *Nutanix) Gather(acc telegraf.Accumulator) error {
// Check if we've exceeded max retries for the entire connection
n.mu.Lock()
failCount := n.failureCount
n.mu.Unlock()

if failCount >= n.MaxRetries {
// Log only once when we start skipping
n.mu.Lock()
defer n.mu.Unlock()
if !n.skipLogged {
n.Log.Infof("Skipping all services: exceeded max consecutive failures (%d). Will retry on next successful connection.", n.MaxRetries)
n.skipLogged = true
}
return nil
}

callDuration := map[string]interface{}{}
hasError := false
var lastError error

for _, service := range n.EnabledServices {
// As Services are already gathered in Init(), using this to accumulate them.
start := time.Now()
if err := n.gatherGeneric(service, acc); err != nil {
acc.AddError(fmt.Errorf("failed to get resource %q %w", service, err))
hasError = true
lastError = err
acc.AddError(fmt.Errorf("failed to get resource %q: %w", service, err))
}
callDuration[service] = time.Since(start).Nanoseconds()
}

// Update failure count based on overall result
var newFailCount int
n.mu.Lock()
if hasError {
n.failureCount++
newFailCount = n.failureCount
} else {
// Reset failure count on success
n.failureCount = 0
n.skipLogged = false // Reset skip log flag on success
}
n.mu.Unlock()

// Log errors outside of lock to avoid holding lock during logging
if hasError {
if newFailCount >= n.MaxRetries {
acc.AddError(fmt.Errorf("connection failed (exceeded max consecutive failures %d): %w", n.MaxRetries, lastError))
} else {
acc.AddError(fmt.Errorf("connection failed (consecutive failures %d/%d): %w", newFailCount, n.MaxRetries, lastError))
}
}

return nil
}

Expand Down Expand Up @@ -140,11 +191,18 @@ func (n *Nutanix) doWithSessionRetry(payload []byte) (*http.Response, error) {
if err != nil {
return nil, err
}
// If not unauthorized, check for other error status codes
if resp.StatusCode != http.StatusUnauthorized {
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body))
}
return resp, nil
}
resp.Body.Close()

// Retry with basic auth
req2, err := http.NewRequest("POST", n.URL, bytes.NewBuffer(payload))
if err != nil {
return nil, err
Expand All @@ -157,6 +215,7 @@ func (n *Nutanix) doWithSessionRetry(payload []byte) (*http.Response, error) {
return nil, err
}

// Check status code after retry
if resp2.StatusCode == http.StatusOK {
newCookies := []*http.Cookie{}
for _, c := range resp2.Cookies() {
Expand All @@ -167,9 +226,16 @@ func (n *Nutanix) doWithSessionRetry(payload []byte) (*http.Response, error) {
n.mu.Lock()
n.sessionCookie = newCookies
n.mu.Unlock()
return resp2, nil
}

return resp2, nil
// Authentication failed or other error
body, _ := io.ReadAll(resp2.Body)
resp2.Body.Close()
if resp2.StatusCode == http.StatusUnauthorized {
return nil, fmt.Errorf("authentication failed (401 Unauthorized): invalid username or password")
}
return nil, fmt.Errorf("API request failed with status %d: %s", resp2.StatusCode, string(body))
}

func parseMetrics(key string, body []byte, acc telegraf.Accumulator) error {
Expand Down
8 changes: 7 additions & 1 deletion plugins/inputs/nutanix/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
enabled_services = ["cluster", "hosts", "disks", "vms"]

## Amount of time allowed to complete the HTTP(s) request.
# timeout = "5s"
# response_timeout = "5s"

## Maximum number of consecutive failures before skipping all services.
## After reaching this limit, all services will be skipped in subsequent gather cycles.
## The plugin will automatically retry when connection is restored.
## Default is 3.
# max_retries = 3

## Optional TLS Config
# tls_ca = /path/to/cafile
Expand Down
Loading