Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions internal/core/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
stonithResourceMissing string = "notconfigured"
stonithAgent string = "stonith:"
sbdFencingAgentName string = "external/sbd"
unknownState string = "unknown"
)

type DiscoveryTools struct {
Expand All @@ -41,6 +42,7 @@ type DiscoveryTools struct {
SBDConfigPath string
CommandExecutor utils.CommandExecutor
SystemdConnector systemd.Systemd
CmdClient CmdClient
}

type BasicInfo struct {
Expand All @@ -57,6 +59,7 @@ type Cluster struct {
DC bool
Provider string
Online bool
State string
}

func Md5sumFile(filePath string) (string, error) {
Expand Down Expand Up @@ -90,6 +93,7 @@ func NewCluster(ctx context.Context) (*Cluster, error) {
SBDConfigPath: SBDConfigPath,
CommandExecutor: utils.Executor{},
SystemdConnector: systemdConn,
CmdClient: NewDefaultCmdClient(),
})
}

Expand All @@ -105,7 +109,7 @@ func NewClusterWithDiscoveryTools(ctx context.Context, discoveryTools *Discovery
if !isHostOnline(ctx, discoveryTools) {
return makeOfflineHostPayload(detectedCluster)
}
return makeOnlineHostPayload(detectedCluster, discoveryTools)
return makeOnlineHostPayload(ctx, detectedCluster, discoveryTools)
}

func detectCluster(discoveryTools *DiscoveryTools) (BasicInfo, bool, error) {
Expand Down Expand Up @@ -153,14 +157,22 @@ func makeOfflineHostPayload(detectedCluster BasicInfo) (*Cluster, error) {
}, nil
}

func makeOnlineHostPayload(detectedCluster BasicInfo, discoveryTools *DiscoveryTools) (*Cluster, error) {
func makeOnlineHostPayload(
ctx context.Context, detectedCluster BasicInfo, discoveryTools *DiscoveryTools,
) (*Cluster, error) {
cibParser := cib.NewCibAdminParser(discoveryTools.CibAdmPath)

cibConfig, err := cibParser.Parse()
if err != nil {
return nil, err
}

state, err := discoveryTools.CmdClient.GetState(ctx)
if err != nil {
slog.Error("Error discovering cluster state", "error", err)
state = unknownState
}

var cluster = &Cluster{
Cib: cib.Root{}, //nolint
Crmmon: crmmon.Root{}, //nolint
Expand All @@ -170,6 +182,7 @@ func makeOnlineHostPayload(detectedCluster BasicInfo, discoveryTools *DiscoveryT
DC: false,
Provider: "",
Online: true,
State: state,
}

cluster.Cib = cibConfig
Expand Down
47 changes: 47 additions & 0 deletions internal/core/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/trento-project/agent/internal/core/cluster"
"github.com/trento-project/agent/internal/core/cluster/cib"
"github.com/trento-project/agent/internal/core/cluster/crmmon"
mocksCluster "github.com/trento-project/agent/internal/core/cluster/mocks"
mocksSystemd "github.com/trento-project/agent/internal/core/systemd/mocks"
"github.com/trento-project/agent/pkg/utils/mocks"
"github.com/trento-project/agent/test/helpers"
Expand Down Expand Up @@ -42,6 +43,9 @@ func (suite *ClusterTestSuite) TestNewClusterWithDiscoveryTools() {
mockSystemd.On("IsActive", ctx, "corosync.service").Return(true, nil)
mockSystemd.On("IsActive", ctx, "pacemaker.service").Return(true, nil)

mockCmdClient := new(mocksCluster.MockCmdClient)
mockCmdClient.On("GetState", ctx).Return("S_IDLE", nil)

c, err := cluster.NewClusterWithDiscoveryTools(ctx, &cluster.DiscoveryTools{
CibAdmPath: helpers.GetFixturePath("discovery/cluster/fake_cibadmin.sh"),
CrmmonAdmPath: helpers.GetFixturePath("discovery/cluster/fake_crm_mon.sh"),
Expand All @@ -51,13 +55,15 @@ func (suite *ClusterTestSuite) TestNewClusterWithDiscoveryTools() {
SBDConfigPath: helpers.GetFixturePath("discovery/cluster/sbd/sbd_config"),
CommandExecutor: mockCommand,
SystemdConnector: mockSystemd,
CmdClient: mockCmdClient,
})

suite.Equal("hana_cluster", c.Name)
suite.Equal("47d1190ffb4f781974c8356d7f863b03", c.ID)
suite.Equal(false, c.DC)
suite.Equal("azure", c.Provider)
suite.Equal("/dev/vdc;/dev/vdb", c.SBD.Config["SBD_DEVICE"])
suite.Equal("S_IDLE", c.State)
suite.NoError(err)
}

Expand All @@ -71,6 +77,9 @@ func (suite *ClusterTestSuite) TestNewClusterDisklessSBD() {
mockSystemd.On("IsActive", ctx, "corosync.service").Return(true, nil)
mockSystemd.On("IsActive", ctx, "pacemaker.service").Return(true, nil)

mockCmdClient := new(mocksCluster.MockCmdClient)
mockCmdClient.On("GetState", ctx).Return("S_IDLE", nil)

c, err := cluster.NewClusterWithDiscoveryTools(ctx, &cluster.DiscoveryTools{
CibAdmPath: helpers.GetFixturePath("discovery/cluster/fake_cibadmin.sh"),
CrmmonAdmPath: helpers.GetFixturePath("discovery/cluster/fake_crm_mon_diskless_sbd.sh"),
Expand All @@ -80,6 +89,7 @@ func (suite *ClusterTestSuite) TestNewClusterDisklessSBD() {
SBDConfigPath: helpers.GetFixturePath("discovery/cluster/sbd/sbd_config_no_device"),
CommandExecutor: mockCommand,
SystemdConnector: mockSystemd,
CmdClient: mockCmdClient,
})

suite.Equal("hana_cluster", c.Name)
Expand Down Expand Up @@ -116,6 +126,7 @@ func (suite *ClusterTestSuite) TestNewClusterWithOfflineHost() {
SBDConfigPath: helpers.GetFixturePath("discovery/cluster/sbd/sbd_config"),
CommandExecutor: mockCommand,
SystemdConnector: mockSystemd,
CmdClient: new(mocksCluster.MockCmdClient),
})

suite.Equal("hana_cluster", c.Name)
Expand Down Expand Up @@ -148,6 +159,7 @@ func (suite *ClusterTestSuite) TestNewClusterWithOfflineHostNoName() {
SBDConfigPath: helpers.GetFixturePath("discovery/cluster/sbd/sbd_config"),
CommandExecutor: mockCommand,
SystemdConnector: mockSystemd,
CmdClient: new(mocksCluster.MockCmdClient),
})

suite.Equal("", c.Name)
Expand All @@ -156,6 +168,39 @@ func (suite *ClusterTestSuite) TestNewClusterWithOfflineHostNoName() {
suite.NoError(err)
}

func (suite *ClusterTestSuite) TestNewClusterWithUnknownState() {
ctx := context.Background()
mockCommand := new(mocks.MockCommandExecutor)
mockCommand.On("Output", "/usr/sbin/dmidecode", "-s", "chassis-asset-tag").
Return([]byte("7783-7084-3265-9085-8269-3286-77"), nil)
mockCommand.On("Output", "/usr/sbin/sbd", "-d", "/dev/vdb", "dump").Return(mockSbdDump(), nil)
mockCommand.On("Output", "/usr/sbin/sbd", "-d", "/dev/vdb", "list").Return(mockSbdList(), nil)
mockCommand.On("Output", "/usr/sbin/sbd", "-d", "/dev/vdc", "dump").Return(mockSbdDump(), nil)
mockCommand.On("Output", "/usr/sbin/sbd", "-d", "/dev/vdc", "list").Return(mockSbdList(), nil)

mockSystemd := new(mocksSystemd.MockSystemd)
mockSystemd.On("IsActive", ctx, "corosync.service").Return(true, nil)
mockSystemd.On("IsActive", ctx, "pacemaker.service").Return(true, nil)

mockCmdClient := new(mocksCluster.MockCmdClient)
mockCmdClient.On("GetState", ctx).Return("", errors.New(""))

c, err := cluster.NewClusterWithDiscoveryTools(ctx, &cluster.DiscoveryTools{
CibAdmPath: helpers.GetFixturePath("discovery/cluster/fake_cibadmin.sh"),
CrmmonAdmPath: helpers.GetFixturePath("discovery/cluster/fake_crm_mon.sh"),
CorosyncKeyPath: helpers.GetFixturePath("discovery/cluster/authkey"),
CorosyncConfigPath: helpers.GetFixturePath("discovery/cluster/corosync.conf"),
SBDPath: "/usr/sbin/sbd",
SBDConfigPath: helpers.GetFixturePath("discovery/cluster/sbd/sbd_config"),
CommandExecutor: mockCommand,
SystemdConnector: mockSystemd,
CmdClient: mockCmdClient,
})

suite.Equal("unknown", c.State)
suite.NoError(err)
}

func (suite *ClusterTestSuite) TestNewClusterCorosyncNotConfigured() {
ctx := context.Background()
c, err := cluster.NewClusterWithDiscoveryTools(ctx, &cluster.DiscoveryTools{
Expand All @@ -167,6 +212,7 @@ func (suite *ClusterTestSuite) TestNewClusterCorosyncNotConfigured() {
SBDConfigPath: helpers.GetFixturePath("discovery/cluster/sbd/sbd_config_no_device"),
CommandExecutor: new(mocks.MockCommandExecutor),
SystemdConnector: new(mocksSystemd.MockSystemd),
CmdClient: new(mocksCluster.MockCmdClient),
})

suite.Nil(c)
Expand All @@ -184,6 +230,7 @@ func (suite *ClusterTestSuite) TestNewClusterCorosyncNoAuthkeyConfigured() {
SBDConfigPath: helpers.GetFixturePath("discovery/cluster/sbd/sbd_config_no_device"),
CommandExecutor: new(mocks.MockCommandExecutor),
SystemdConnector: new(mocksSystemd.MockSystemd),
CmdClient: new(mocksCluster.MockCmdClient),
})

suite.Nil(c)
Expand Down
37 changes: 27 additions & 10 deletions internal/core/cluster/cmdclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"errors"
"fmt"
"log/slog"
"regexp"
"strings"
"time"

"github.com/trento-project/agent/pkg/utils"
)

const resourceRefreshedMessage = "got reply (done)"

var clusterIdlePatternCompiled = regexp.MustCompile("S_IDLE")
const idleState = "S_IDLE"

type CmdClient interface {
GetState(ctx context.Context) (string, error)
IsHostOnline(ctx context.Context) bool
IsIdle(ctx context.Context) (bool, error)
ResourceRefresh(ctx context.Context, resourceID, nodeID string) error
Expand All @@ -42,6 +42,27 @@ func NewCmdClient(executor utils.CommandExecutor, logger *slog.Logger) CmdClient
}
}

// GetState returns the current state of the cluster using crmadmin command
// Find all existing states here:
// https://github.com/ClusterLabs/pacemaker/blob/main/daemons/controld/controld_fsa.h
func (c *client) GetState(ctx context.Context) (string, error) {
// Adding a timeout as crmadmin command can hang forever when it is used
// in a cluster that was recently started and the DC is not selected yet
ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

dcNode, err := c.executor.CombinedOutputContext(ctxWithTimeout, "crmadmin", "-qD")
if err != nil {
return "", fmt.Errorf("error getting DC node with crmadmin: %w", err)
}
state, err := c.executor.CombinedOutputContext(ctxWithTimeout, "crmadmin", "-qS", strings.TrimSpace(string(dcNode)))
if err != nil {
return "", fmt.Errorf("error getting cluster state with crmadmin: %w", err)
}

return strings.TrimSpace(string(state)), nil
}

func (c *client) IsHostOnline(ctx context.Context) bool {
output, err := c.executor.CombinedOutputContext(ctx, "crm", "status")
if err != nil {
Expand Down Expand Up @@ -76,16 +97,12 @@ func (c *client) StopCluster(ctx context.Context) error {
}

func (c *client) IsIdle(ctx context.Context) (bool, error) {
idleOutput, err := c.executor.CombinedOutputContext(ctx, "cs_clusterstate", "-i")
state, err := c.GetState(ctx)
if err != nil {
return false, fmt.Errorf("error running cs_clusterstate: %w", err)
}

if !clusterIdlePatternCompiled.Match(idleOutput) {
return false, nil
return false, err
}

return true, nil
return state == idleState, nil
}

// ResourceRefresh runs the `crm resource refresh [<rsc>] [<node>]` command.
Expand Down
Loading
Loading