Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 65 additions & 9 deletions builder/deploy/cluster/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/config"
"opencsg.com/csghub-server/common/types"
rtypes "opencsg.com/csghub-server/runner/types"
lwscli "sigs.k8s.io/lws/client-go/clientset/versioned"
)

Expand Down Expand Up @@ -251,6 +252,28 @@ func buildCluster(kubeconfig *rest.Config, id string, index int, connectMode typ
}
clusterStore := database.NewClusterInfoStore()
region := fmt.Sprintf("region-%d", index)

// Read StorageClass and NetworkInterface from ConfigMap before AddByClusterID
storageClass := ""
networkInterface := ""
if connectMode == types.ConnectModeInCluster && len(config.Runner.RunnerNamespace) > 0 && len(config.Runner.WatchConfigmapName) > 0 {
ctxConfigMap, cancelConfigMap := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelConfigMap()
cm, err := client.CoreV1().ConfigMaps(config.Runner.RunnerNamespace).Get(ctxConfigMap, config.Runner.WatchConfigmapName, metav1.GetOptions{})
if err == nil && cm != nil {
if val, ok := cm.Data[rtypes.KeyStorageClass]; ok && len(val) > 0 {
storageClass = val
slog.Debug("read storage class from ConfigMap", slog.String("storage_class", storageClass))
}
if val, ok := cm.Data[rtypes.KeyNetworkInterface]; ok && len(val) > 0 {
networkInterface = val
slog.Debug("read network interface from ConfigMap", slog.String("network_interface", networkInterface))
}
} else if err != nil {
slog.Debug("failed to read ConfigMap during initialization, will use default values", slog.String("namespace", config.Runner.RunnerNamespace), slog.String("configmap", config.Runner.WatchConfigmapName), slog.Any("error", err))
}
}

var cluster *database.ClusterInfo
if connectMode == types.ConnectModeKubeConfig {
cluster, err = clusterStore.Add(ctxTimeout, id, region, types.ConnectModeKubeConfig)
Expand All @@ -270,16 +293,49 @@ func buildCluster(kubeconfig *rest.Config, id string, index int, connectMode typ
if !cluster.Enable {
return nil, nil
}

// Update StorageClass and NetworkInterface to database if read from ConfigMap
if len(storageClass) > 0 || len(networkInterface) > 0 {
updateEvent := types.ClusterEvent{
ClusterID: cluster.ClusterID,
ClusterConfig: cluster.ClusterConfig,
Region: cluster.Region,
Zone: cluster.Zone,
Provider: cluster.Provider,
StorageClass: cluster.StorageClass, // Use existing value as default
NetworkInterface: cluster.NetworkInterface, // Use existing value as default
Mode: cluster.Mode,
Endpoint: cluster.RunnerEndpoint,
AppEndpoint: cluster.AppEndpoint,
}
// Override with values from ConfigMap if available
if len(storageClass) > 0 {
updateEvent.StorageClass = storageClass
cluster.StorageClass = storageClass
}
if len(networkInterface) > 0 {
updateEvent.NetworkInterface = networkInterface
cluster.NetworkInterface = networkInterface
}
err = clusterStore.UpdateByClusterID(ctxTimeout, updateEvent)
if err != nil {
slog.Warn("failed to update cluster info from ConfigMap", slog.Any("error", err))
} else {
slog.Info("updated cluster info from ConfigMap", slog.String("storage_class", updateEvent.StorageClass), slog.String("network_interface", updateEvent.NetworkInterface))
}
}

c := &Cluster{
CID: id,
ID: cluster.ClusterID,
Client: client,
KnativeClient: knativeClient,
ArgoClient: argoClient,
LWSClient: lwsclient,
ConnectMode: connectMode,
Region: region,
StorageClass: cluster.StorageClass,
CID: id,
ID: cluster.ClusterID,
Client: client,
KnativeClient: knativeClient,
ArgoClient: argoClient,
LWSClient: lwsclient,
ConnectMode: connectMode,
Region: region,
StorageClass: cluster.StorageClass,
NetworkInterface: cluster.NetworkInterface,
}
return c, nil
}
Expand Down
10 changes: 6 additions & 4 deletions runner/component/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (s *clusterComponentImpl) initCluster() {
if c.ConnectMode == types.ConnectModeInCluster {
go func(c *cluster.Cluster) {
data := types.ClusterEvent{
ClusterID: c.ID,
ClusterConfig: types.DefaultClusterCongfig,
Region: c.Region,
Mode: c.ConnectMode,
ClusterID: c.ID,
ClusterConfig: types.DefaultClusterCongfig,
Region: c.Region,
StorageClass: c.StorageClass,
Mode: c.ConnectMode,
NetworkInterface: c.NetworkInterface,
}
event := &types.WebHookSendEvent{
WebHookHeader: types.WebHookHeader{
Expand Down
9 changes: 8 additions & 1 deletion runner/component/cluster_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,19 @@ func (w *clusterWatcher) pushClusterChangeEvent(configmapData map[string]string)
w.cluster.StorageClass = storageClass
slog.Debug("update cluster storageclass", slog.Any("cluster", w.cluster))
}
networkInterface := configmapData[rtypes.KeyNetworkInterface]
if len(networkInterface) < 1 && len(w.cluster.NetworkInterface) > 0 {
networkInterface = w.cluster.NetworkInterface
} else if len(networkInterface) > 0 {
w.cluster.NetworkInterface = networkInterface
slog.Debug("update cluster network interface", slog.Any("cluster", w.cluster), slog.String("network_interface", networkInterface))
}
data := types.ClusterEvent{
ClusterID: w.cluster.ID,
ClusterConfig: types.DefaultClusterCongfig,
Mode: w.cluster.ConnectMode,
StorageClass: storageClass,
NetworkInterface: w.cluster.NetworkInterface,
NetworkInterface: networkInterface,
Status: types.ClusterStatusRunning,
Region: configmapData[rtypes.KeyRunnerClusterRegion],
Endpoint: configmapData[rtypes.KeyRunnerExposedEndpont],
Expand Down
127 changes: 127 additions & 0 deletions runner/component/cluster_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -127,3 +128,129 @@ func TestClusterWatcher_WatchCallback(t *testing.T) {
})
}
}

func TestClusterWatcher_PushClusterChangeEvent(t *testing.T) {
// mock http server to receive webhook
var receivedEvent types.WebHookSendEvent
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
err = json.Unmarshal(body, &receivedEvent)
require.NoError(t, err)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

tests := []struct {
name string
configmapData map[string]string
initialStorageClass string
initialNetworkInterface string
expectedStorageClass string
expectedNetworkInterface string
expectedEventData types.ClusterEvent
}{
{
name: "should use ConfigMap values for StorageClass and NetworkInterface",
configmapData: map[string]string{
rtypes.KeyStorageClass: "fast-ssd",
rtypes.KeyNetworkInterface: "eth1",
rtypes.KeyRunnerClusterRegion: "region-1",
rtypes.KeyRunnerExposedEndpont: server.URL,
},
initialStorageClass: "slow-ssd",
initialNetworkInterface: "eth0",
expectedStorageClass: "fast-ssd",
expectedNetworkInterface: "eth1",
expectedEventData: types.ClusterEvent{
StorageClass: "fast-ssd",
NetworkInterface: "eth1",
},
},
{
name: "should use cluster values when ConfigMap is empty",
configmapData: map[string]string{
rtypes.KeyRunnerClusterRegion: "region-1",
rtypes.KeyRunnerExposedEndpont: server.URL,
},
initialStorageClass: "fast-ssd",
initialNetworkInterface: "eth0",
expectedStorageClass: "fast-ssd",
expectedNetworkInterface: "eth0",
expectedEventData: types.ClusterEvent{
StorageClass: "fast-ssd",
NetworkInterface: "eth0",
},
},
{
name: "should update cluster NetworkInterface when ConfigMap has value",
configmapData: map[string]string{
rtypes.KeyNetworkInterface: "eth2",
rtypes.KeyRunnerClusterRegion: "region-1",
rtypes.KeyRunnerExposedEndpont: server.URL,
},
initialStorageClass: "fast-ssd",
initialNetworkInterface: "eth0",
expectedStorageClass: "fast-ssd",
expectedNetworkInterface: "eth2",
expectedEventData: types.ClusterEvent{
StorageClass: "fast-ssd",
NetworkInterface: "eth2",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// reset mock server state
mu.Lock()
receivedEvent = types.WebHookSendEvent{}
mu.Unlock()

// setup
mockCluster := &cluster.Cluster{
ID: "test-cluster",
CID: "test-cluster",
ConnectMode: types.ConnectModeInCluster,
Region: "test-region",
StorageClass: tt.initialStorageClass,
NetworkInterface: tt.initialNetworkInterface,
}

cfg := &config.Config{}
cfg.Runner.WebHookEndpoint = server.URL
cfg.APIToken = "test-token"

watcher := &clusterWatcher{
cluster: mockCluster,
env: cfg,
}

// execute
err := watcher.pushClusterChangeEvent(tt.configmapData)

// assert
assert.NoError(t, err)
assert.Equal(t, tt.expectedStorageClass, mockCluster.StorageClass)
assert.Equal(t, tt.expectedNetworkInterface, mockCluster.NetworkInterface)

// wait for async webhook call
time.Sleep(100 * time.Millisecond)

// verify event data
mu.Lock()
if receivedEvent.Data != nil {
eventData, ok := receivedEvent.Data.(types.ClusterEvent)
if ok {
assert.Equal(t, tt.expectedEventData.StorageClass, eventData.StorageClass)
assert.Equal(t, tt.expectedEventData.NetworkInterface, eventData.NetworkInterface)
}
}
mu.Unlock()
})
}
}
2 changes: 2 additions & 0 deletions runner/types/configmap_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
KeyApplicationEndpoint = "STARHUB_SERVER_RUNNER_APPLICATION_ENDPOINT"
// key name of configmap for storage class of PVC
KeyStorageClass = "STARHUB_SERVER_RUNNER_STORAGE_CLASS"
// key name of configmap for network interface, e.g., eth0
KeyNetworkInterface = "STARHUB_SERVER_RUNNER_NETWORK_INTERFACE"
)

var SubscribeKeyWithEventPush = map[string]Validator{
Expand Down
Loading