Skip to content

Commit 9cbfb7c

Browse files
author
Dev Agent
committed
Support network interface cfg for incluster mode
1 parent 019bd02 commit 9cbfb7c

File tree

5 files changed

+208
-14
lines changed

5 files changed

+208
-14
lines changed

builder/deploy/cluster/cluster_manager.go

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"opencsg.com/csghub-server/builder/store/database"
2727
"opencsg.com/csghub-server/common/config"
2828
"opencsg.com/csghub-server/common/types"
29+
rtypes "opencsg.com/csghub-server/runner/types"
2930
lwscli "sigs.k8s.io/lws/client-go/clientset/versioned"
3031
)
3132

@@ -251,6 +252,28 @@ func buildCluster(kubeconfig *rest.Config, id string, index int, connectMode typ
251252
}
252253
clusterStore := database.NewClusterInfoStore()
253254
region := fmt.Sprintf("region-%d", index)
255+
256+
// Read StorageClass and NetworkInterface from ConfigMap before AddByClusterID
257+
storageClass := ""
258+
networkInterface := ""
259+
if connectMode == types.ConnectModeInCluster && len(config.Runner.RunnerNamespace) > 0 && len(config.Runner.WatchConfigmapName) > 0 {
260+
ctxConfigMap, cancelConfigMap := context.WithTimeout(context.Background(), 5*time.Second)
261+
defer cancelConfigMap()
262+
cm, err := client.CoreV1().ConfigMaps(config.Runner.RunnerNamespace).Get(ctxConfigMap, config.Runner.WatchConfigmapName, metav1.GetOptions{})
263+
if err == nil && cm != nil {
264+
if val, ok := cm.Data[rtypes.KeyStorageClass]; ok && len(val) > 0 {
265+
storageClass = val
266+
slog.Debug("read storage class from ConfigMap", slog.String("storage_class", storageClass))
267+
}
268+
if val, ok := cm.Data[rtypes.KeyNetworkInterface]; ok && len(val) > 0 {
269+
networkInterface = val
270+
slog.Debug("read network interface from ConfigMap", slog.String("network_interface", networkInterface))
271+
}
272+
} else if err != nil {
273+
slog.Debug("failed to read ConfigMap during initialization, will use default values", slog.String("namespace", config.Runner.RunnerNamespace), slog.String("configmap", config.Runner.WatchConfigmapName), slog.Any("error", err))
274+
}
275+
}
276+
254277
var cluster *database.ClusterInfo
255278
if connectMode == types.ConnectModeKubeConfig {
256279
cluster, err = clusterStore.Add(ctxTimeout, id, region, types.ConnectModeKubeConfig)
@@ -270,16 +293,49 @@ func buildCluster(kubeconfig *rest.Config, id string, index int, connectMode typ
270293
if !cluster.Enable {
271294
return nil, nil
272295
}
296+
297+
// Update StorageClass and NetworkInterface to database if read from ConfigMap
298+
if len(storageClass) > 0 || len(networkInterface) > 0 {
299+
updateEvent := types.ClusterEvent{
300+
ClusterID: cluster.ClusterID,
301+
ClusterConfig: cluster.ClusterConfig,
302+
Region: cluster.Region,
303+
Zone: cluster.Zone,
304+
Provider: cluster.Provider,
305+
StorageClass: cluster.StorageClass, // Use existing value as default
306+
NetworkInterface: cluster.NetworkInterface, // Use existing value as default
307+
Mode: cluster.Mode,
308+
Endpoint: cluster.RunnerEndpoint,
309+
AppEndpoint: cluster.AppEndpoint,
310+
}
311+
// Override with values from ConfigMap if available
312+
if len(storageClass) > 0 {
313+
updateEvent.StorageClass = storageClass
314+
cluster.StorageClass = storageClass
315+
}
316+
if len(networkInterface) > 0 {
317+
updateEvent.NetworkInterface = networkInterface
318+
cluster.NetworkInterface = networkInterface
319+
}
320+
err = clusterStore.UpdateByClusterID(ctxTimeout, updateEvent)
321+
if err != nil {
322+
slog.Warn("failed to update cluster info from ConfigMap", slog.Any("error", err))
323+
} else {
324+
slog.Info("updated cluster info from ConfigMap", slog.String("storage_class", updateEvent.StorageClass), slog.String("network_interface", updateEvent.NetworkInterface))
325+
}
326+
}
327+
273328
c := &Cluster{
274-
CID: id,
275-
ID: cluster.ClusterID,
276-
Client: client,
277-
KnativeClient: knativeClient,
278-
ArgoClient: argoClient,
279-
LWSClient: lwsclient,
280-
ConnectMode: connectMode,
281-
Region: region,
282-
StorageClass: cluster.StorageClass,
329+
CID: id,
330+
ID: cluster.ClusterID,
331+
Client: client,
332+
KnativeClient: knativeClient,
333+
ArgoClient: argoClient,
334+
LWSClient: lwsclient,
335+
ConnectMode: connectMode,
336+
Region: region,
337+
StorageClass: cluster.StorageClass,
338+
NetworkInterface: cluster.NetworkInterface,
283339
}
284340
return c, nil
285341
}

runner/component/cluster.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@ func (s *clusterComponentImpl) initCluster() {
5454
if c.ConnectMode == types.ConnectModeInCluster {
5555
go func(c *cluster.Cluster) {
5656
data := types.ClusterEvent{
57-
ClusterID: c.ID,
58-
ClusterConfig: types.DefaultClusterCongfig,
59-
Region: c.Region,
60-
Mode: c.ConnectMode,
57+
ClusterID: c.ID,
58+
ClusterConfig: types.DefaultClusterCongfig,
59+
Region: c.Region,
60+
StorageClass: c.StorageClass,
61+
Mode: c.ConnectMode,
62+
NetworkInterface: c.NetworkInterface,
6163
}
6264
event := &types.WebHookSendEvent{
6365
WebHookHeader: types.WebHookHeader{

runner/component/cluster_watch.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,19 @@ func (w *clusterWatcher) pushClusterChangeEvent(configmapData map[string]string)
6565
w.cluster.StorageClass = storageClass
6666
slog.Debug("update cluster storageclass", slog.Any("cluster", w.cluster))
6767
}
68+
networkInterface := configmapData[rtypes.KeyNetworkInterface]
69+
if len(networkInterface) < 1 && len(w.cluster.NetworkInterface) > 0 {
70+
networkInterface = w.cluster.NetworkInterface
71+
} else if len(networkInterface) > 0 {
72+
w.cluster.NetworkInterface = networkInterface
73+
slog.Debug("update cluster network interface", slog.Any("cluster", w.cluster), slog.String("network_interface", networkInterface))
74+
}
6875
data := types.ClusterEvent{
6976
ClusterID: w.cluster.ID,
7077
ClusterConfig: types.DefaultClusterCongfig,
7178
Mode: w.cluster.ConnectMode,
7279
StorageClass: storageClass,
73-
NetworkInterface: w.cluster.NetworkInterface,
80+
NetworkInterface: networkInterface,
7481
Status: types.ClusterStatusRunning,
7582
Region: configmapData[rtypes.KeyRunnerClusterRegion],
7683
Endpoint: configmapData[rtypes.KeyRunnerExposedEndpont],

runner/component/cluster_watch_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http/httptest"
88
"sync"
99
"testing"
10+
"time"
1011

1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
@@ -127,3 +128,129 @@ func TestClusterWatcher_WatchCallback(t *testing.T) {
127128
})
128129
}
129130
}
131+
132+
func TestClusterWatcher_PushClusterChangeEvent(t *testing.T) {
133+
// mock http server to receive webhook
134+
var receivedEvent types.WebHookSendEvent
135+
var mu sync.Mutex
136+
137+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
138+
mu.Lock()
139+
defer mu.Unlock()
140+
body, err := io.ReadAll(r.Body)
141+
require.NoError(t, err)
142+
err = json.Unmarshal(body, &receivedEvent)
143+
require.NoError(t, err)
144+
w.WriteHeader(http.StatusOK)
145+
}))
146+
defer server.Close()
147+
148+
tests := []struct {
149+
name string
150+
configmapData map[string]string
151+
initialStorageClass string
152+
initialNetworkInterface string
153+
expectedStorageClass string
154+
expectedNetworkInterface string
155+
expectedEventData types.ClusterEvent
156+
}{
157+
{
158+
name: "should use ConfigMap values for StorageClass and NetworkInterface",
159+
configmapData: map[string]string{
160+
rtypes.KeyStorageClass: "fast-ssd",
161+
rtypes.KeyNetworkInterface: "eth1",
162+
rtypes.KeyRunnerClusterRegion: "region-1",
163+
rtypes.KeyRunnerExposedEndpont: server.URL,
164+
},
165+
initialStorageClass: "slow-ssd",
166+
initialNetworkInterface: "eth0",
167+
expectedStorageClass: "fast-ssd",
168+
expectedNetworkInterface: "eth1",
169+
expectedEventData: types.ClusterEvent{
170+
StorageClass: "fast-ssd",
171+
NetworkInterface: "eth1",
172+
},
173+
},
174+
{
175+
name: "should use cluster values when ConfigMap is empty",
176+
configmapData: map[string]string{
177+
rtypes.KeyRunnerClusterRegion: "region-1",
178+
rtypes.KeyRunnerExposedEndpont: server.URL,
179+
},
180+
initialStorageClass: "fast-ssd",
181+
initialNetworkInterface: "eth0",
182+
expectedStorageClass: "fast-ssd",
183+
expectedNetworkInterface: "eth0",
184+
expectedEventData: types.ClusterEvent{
185+
StorageClass: "fast-ssd",
186+
NetworkInterface: "eth0",
187+
},
188+
},
189+
{
190+
name: "should update cluster NetworkInterface when ConfigMap has value",
191+
configmapData: map[string]string{
192+
rtypes.KeyNetworkInterface: "eth2",
193+
rtypes.KeyRunnerClusterRegion: "region-1",
194+
rtypes.KeyRunnerExposedEndpont: server.URL,
195+
},
196+
initialStorageClass: "fast-ssd",
197+
initialNetworkInterface: "eth0",
198+
expectedStorageClass: "fast-ssd",
199+
expectedNetworkInterface: "eth2",
200+
expectedEventData: types.ClusterEvent{
201+
StorageClass: "fast-ssd",
202+
NetworkInterface: "eth2",
203+
},
204+
},
205+
}
206+
207+
for _, tt := range tests {
208+
t.Run(tt.name, func(t *testing.T) {
209+
// reset mock server state
210+
mu.Lock()
211+
receivedEvent = types.WebHookSendEvent{}
212+
mu.Unlock()
213+
214+
// setup
215+
mockCluster := &cluster.Cluster{
216+
ID: "test-cluster",
217+
CID: "test-cluster",
218+
ConnectMode: types.ConnectModeInCluster,
219+
Region: "test-region",
220+
StorageClass: tt.initialStorageClass,
221+
NetworkInterface: tt.initialNetworkInterface,
222+
}
223+
224+
cfg := &config.Config{}
225+
cfg.Runner.WebHookEndpoint = server.URL
226+
cfg.APIToken = "test-token"
227+
228+
watcher := &clusterWatcher{
229+
cluster: mockCluster,
230+
env: cfg,
231+
}
232+
233+
// execute
234+
err := watcher.pushClusterChangeEvent(tt.configmapData)
235+
236+
// assert
237+
assert.NoError(t, err)
238+
assert.Equal(t, tt.expectedStorageClass, mockCluster.StorageClass)
239+
assert.Equal(t, tt.expectedNetworkInterface, mockCluster.NetworkInterface)
240+
241+
// wait for async webhook call
242+
time.Sleep(100 * time.Millisecond)
243+
244+
// verify event data
245+
mu.Lock()
246+
if receivedEvent.Data != nil {
247+
eventData, ok := receivedEvent.Data.(types.ClusterEvent)
248+
if ok {
249+
assert.Equal(t, tt.expectedEventData.StorageClass, eventData.StorageClass)
250+
assert.Equal(t, tt.expectedEventData.NetworkInterface, eventData.NetworkInterface)
251+
}
252+
}
253+
mu.Unlock()
254+
})
255+
}
256+
}

runner/types/configmap_watch.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ const (
1414
KeyApplicationEndpoint = "STARHUB_SERVER_RUNNER_APPLICATION_ENDPOINT"
1515
// key name of configmap for storage class of PVC
1616
KeyStorageClass = "STARHUB_SERVER_RUNNER_STORAGE_CLASS"
17+
// key name of configmap for network interface, e.g., eth0
18+
KeyNetworkInterface = "STARHUB_SERVER_RUNNER_NETWORK_INTERFACE"
1719
)
1820

1921
var SubscribeKeyWithEventPush = map[string]Validator{

0 commit comments

Comments
 (0)