Skip to content

Commit 8408dc6

Browse files
committed
Update data cache logic to support multi-pod scenario
1 parent b675d19 commit 8408dc6

File tree

4 files changed

+313
-223
lines changed

4 files changed

+313
-223
lines changed

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@ imageTag:
5151
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
5252
# Don't change stable image without changing pdImagePlaceholder in
5353
# test/k8s-integration/main.go
54-
newName: gcr.io/songsunny-joonix/gcp-compute-persistent-disk-csi-driver
54+
newName: gcr.io/snehaaradhey-joonix/gcp-compute-persistent-disk-csi-driver
5555
newTag: "datacache"
5656
---

pkg/common/runcmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
func RunCommand(cmd string, args ...string) ([]byte, error) {
1919
klog.V(2).Infof("====== Start RunCommand ======")
2020
execCmd := exec.Command(cmd, args...)
21+
klog.V(2).Infof("=======Running command %v==========", execCmd)
2122
output, err := execCmd.CombinedOutput()
2223
if err != nil {
2324
if err.Error() == errNoChildProcesses {

pkg/gce-pd-csi-driver/cache.go

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
package gceGCEDriver
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
csi "github.com/container-storage-interface/spec/lib/go/csi"
8+
9+
"k8s.io/klog/v2"
10+
11+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
12+
)
13+
14+
const cacheSuffix = "csi-fast"
15+
const mainLvSuffix = "csi-main"
16+
17+
func SetupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId string) (string, error) {
18+
volumeId := req.GetVolumeId()
19+
volumeGroupName := getVolumeGroupName(nodeId)
20+
mainDevicePath := "/dev/" + volumeGroupName + "/" + getLvName(mainLvSuffix, volumeId)
21+
klog.V(2).Infof("====== Start LVM PoC NodeStageVolume Steps ======")
22+
klog.V(2).Infof("====== volumeGroupName is %v ======", volumeGroupName)
23+
24+
klog.V(2).Infof("====== vgscan before vgcreate ======")
25+
args := []string{}
26+
info, err := common.RunCommand("vgscan", args...)
27+
if err != nil {
28+
klog.Errorf("vgscan error %v: %s", err, info)
29+
}
30+
klog.V(2).Infof("====== vgscan info %v ======", string(info))
31+
klog.V(2).Infof("====== vgscan info contains volumeGroupName or not %v ======", strings.Contains(string(info), volumeGroupName))
32+
// Check if the required volume group already exists
33+
if strings.Contains(string(info), volumeGroupName) {
34+
klog.V(2).Infof("============= VG exists, now check if PD is part of VG============")
35+
36+
// Clean up Volume Group before adding the PD
37+
reduceVolumeGroup(volumeGroupName)
38+
} else {
39+
err := createVg(volumeGroupName, devicePath)
40+
if err != nil {
41+
return mainDevicePath, err
42+
}
43+
// VG doesn't exist so throw error
44+
klog.Errorf("No VG on Node, ensure Volume group on node")
45+
}
46+
47+
// Verify if the PD is already in the required volume group
48+
// args = []string{
49+
// "--select",
50+
// "vg_name",
51+
// volumeGroupName
52+
// }
53+
// vgPvs, err := common.RunCommand("pvs", args...)
54+
// if err != nil {
55+
// klog.Errorf("Errored while checking Physical Volume(PV)s for the volume group %v: %s", err, vgPvs)
56+
// }
57+
// if strings.Contains(string(vgPvs), devicePath) {
58+
// klog.V(2).Infof("====Physical Volume(PV) already exists in the Volume Group=====")
59+
// } else {
60+
61+
// Check if the Physical Volume(PV) is part of some other volume group
62+
args = []string{
63+
"--select",
64+
"pv_name=" + devicePath,
65+
"-o",
66+
"vg_name",
67+
}
68+
info, err = common.RunCommand("pvs", args...)
69+
if err != nil {
70+
klog.Errorf("errored while checking physical volume details %v: %s", err, info)
71+
// On error info contains the error message which we cannot use for further steps
72+
}
73+
74+
klog.V(2).Infof("==========Got Volume group details from PV %s=======", info)
75+
76+
infoString := strings.TrimSpace(strings.ReplaceAll(string(info), "\n", " "))
77+
infoString = strings.ReplaceAll(infoString, ".", "")
78+
infoString = strings.ReplaceAll(infoString, "\"", "")
79+
infoSlice := strings.Split(strings.TrimSpace(infoString), " ")
80+
klog.V(2).Info("============ infoSlice %s =============", infoSlice[(len(infoSlice)-1)], infoSlice)
81+
vgNameForPv := strings.TrimSpace(infoSlice[(len(infoSlice) - 1)])
82+
klog.V(2).Infof("============ Physical volume is part of Volume group: %v=======", vgNameForPv)
83+
if vgNameForPv == volumeGroupName {
84+
klog.V(2).Infof("====Physical Volume(PV) already exists in the Volume Group=====")
85+
} else if vgNameForPv != "VG" && vgNameForPv != "" {
86+
87+
klog.V(2).Infof("=========Deactivate VG %s========", vgNameForPv)
88+
info, err = common.RunCommand("vgchange", []string{"-an", vgNameForPv}...)
89+
if err != nil {
90+
klog.Errorf("Errored while deactivating VG %v: err: %v: %s", vgNameForPv, err, info)
91+
}
92+
93+
reduceVolumeGroup(vgNameForPv)
94+
klog.V(2).Infof("==========Merge VG %v to Node VG %v==========", vgNameForPv, volumeGroupName)
95+
info, err = common.RunCommand("vgmerge", []string{volumeGroupName, vgNameForPv}...)
96+
if err != nil {
97+
klog.Errorf("Errored while merging Volume group %s into %s %v: %s", vgNameForPv, volumeGroupName, err, info)
98+
}
99+
100+
klog.V(2).Infof("==========Remove VG from node %v ==========", vgNameForPv)
101+
info, err = common.RunCommand("cgremove", []string{vgNameForPv, "-y"}...)
102+
if err != nil {
103+
klog.Errorf("Errored while removing Volume group %s: info:%s, error:%v", vgNameForPv, err, info)
104+
}
105+
106+
} else {
107+
klog.V(2).Infof("==========Extend Node VG %v for PV %v==========", volumeGroupName, devicePath)
108+
info, err := common.RunCommand("vgextend", []string{volumeGroupName, devicePath}...)
109+
if err != nil {
110+
klog.Errorf("Errored while extending VGs %v: %s", err, info)
111+
}
112+
}
113+
114+
mainLvName := getLvName(mainLvSuffix, volumeId)
115+
// Create LV if not already created
116+
args = []string{
117+
"--select",
118+
"vg_name=" + volumeGroupName,
119+
"-o",
120+
"lv_name",
121+
}
122+
lvList, err := common.RunCommand("lvs", args...)
123+
if err != nil {
124+
klog.V(2).Infof("====== lvs error %v: %s ======", err, info)
125+
return mainDevicePath, fmt.Errorf("lv list error %w: %s", err, info)
126+
}
127+
klog.Info("=============== Got LVs %s on Volume group %s ============", lvList, volumeGroupName)
128+
if !strings.Contains(string(lvList), mainLvName) {
129+
// lvcreate -n main -l 100%PVS cachegroup /dev/sdb
130+
klog.V(2).Infof("====== lvcreate main cache layer ======")
131+
args = []string{
132+
"--yes",
133+
"-n",
134+
mainLvName,
135+
"-l",
136+
"100%PVS",
137+
volumeGroupName,
138+
devicePath,
139+
}
140+
info, err = common.RunCommand("lvcreate", args...)
141+
if err != nil {
142+
klog.V(2).Infof("====== lvcreate error %v: %s ======", err, info)
143+
return mainDevicePath, fmt.Errorf("lvcreate error %w: %s", err, info)
144+
}
145+
146+
}
147+
// Replace this with RAIDed Local SSD
148+
cachePoolName := "dev/nvme0n1"
149+
// Verify caching is setup for PD
150+
args = []string{
151+
"--select",
152+
"lv_name=" + mainLvName,
153+
"-o",
154+
"pool_lv",
155+
}
156+
poolName, err := common.RunCommand("lvs", args...)
157+
if err != nil {
158+
klog.V(2).Infof("====== lvcreate error %v: %s ======", err, info)
159+
return mainDevicePath, fmt.Errorf("lvcreate error %w: %s", err, info)
160+
}
161+
cacheLvName := getLvName(cacheSuffix, volumeId)
162+
if strings.Contains(string(poolName), "csi-fast") {
163+
// Validate that cache is setup for required size
164+
klog.V(2).Infof("================Validate Cache is setup for correct size and mode===============")
165+
} else {
166+
fastCacheSize := req.GetPublishContext()[contexLocalSsdCacheSize] + "Gi"
167+
klog.V(2).Infof("====== fastCacheSize is %v ======", fastCacheSize)
168+
// lvcreate -n fast -L 50G cachegroup /dev/nvme0n1
169+
klog.V(2).Infof("====== lvcreate fast cache layer again with the VolumeGroup %v======", volumeGroupName)
170+
args = []string{
171+
"--yes",
172+
"-n",
173+
cacheLvName,
174+
"-L",
175+
fastCacheSize,
176+
volumeGroupName,
177+
cachePoolName,
178+
}
179+
info, err = common.RunCommand("lvcreate", args...)
180+
if err != nil {
181+
klog.V(2).Infof("====== lvcreate error %v: %s ======", err, info)
182+
return mainDevicePath, fmt.Errorf("lvcreate error %w: %s", err, info)
183+
}
184+
185+
// Once caching is setup, link the PD to cache
186+
// lvconvert --type cache --cachevol fast --zero y --cachemode writethrough cachegroup/main --force -y
187+
klog.V(2).Infof("====== lvconvert fast and main to cache ======")
188+
args = []string{
189+
"--type",
190+
"cache",
191+
"--cachevol",
192+
cacheLvName,
193+
"--zero",
194+
"y",
195+
"--cachemode",
196+
req.GetPublishContext()[contextDataCacheMode],
197+
volumeGroupName + "/" + mainLvName,
198+
"--force",
199+
"-y",
200+
}
201+
info, err = common.RunCommand("lvconvert", args...)
202+
if err != nil {
203+
klog.V(2).Infof("====== lvconvert error %v: %s ======", err, info)
204+
return mainDevicePath, fmt.Errorf("lvconvert error %w: %s", err, info)
205+
}
206+
}
207+
208+
// activate all the LVs in the Volume group
209+
klog.V(2).Infof("====== Activate Volume group %s ======", volumeGroupName)
210+
info, err = common.RunCommand("vgchange", []string{"-ay", volumeGroupName}...)
211+
if err != nil {
212+
klog.Errorf("Failed to activate VG %v %v:%s", volumeGroupName, err, info)
213+
}
214+
215+
return mainDevicePath, nil
216+
}
217+
218+
func CleanupCache(volumeId string, nodeId string) error {
219+
220+
volumeGroupName := getVolumeGroupName(nodeId)
221+
klog.V(2).Infof("=============Deactivating volume %s/%s=====", volumeGroupName, volumeId)
222+
args := []string{
223+
"-an",
224+
"/dev/" + volumeGroupName + "/" + getLvName(mainLvSuffix, volumeId),
225+
}
226+
info, err := common.RunCommand("lvchange", args...)
227+
if err != nil {
228+
klog.Errorf("Errored while deactivating the disk %v: %s", err, info)
229+
}
230+
args = []string{
231+
"--uncache",
232+
volumeGroupName + "/" + getLvName(mainLvSuffix, volumeId),
233+
}
234+
info, err = common.RunCommand("lvconvert", args...)
235+
if err != nil {
236+
klog.Errorf("Errored while uncaching the disk %v: %s", err, info)
237+
return fmt.Errorf("errored while uncaching the disk %w: %s", err, info)
238+
}
239+
reduceVolumeGroup(volumeGroupName)
240+
return nil
241+
}
242+
243+
func getVolumeGroupName(nodePath string) string {
244+
nodeSlice := strings.Split(nodePath, "/")
245+
nodeId := nodeSlice[len(nodeSlice)-1]
246+
return fmt.Sprintf("csi-vg-%s", nodeId)
247+
}
248+
249+
func getLvName(suffix string, volumeId string) string {
250+
pvcNameStringSlice := strings.Split(volumeId, "/")
251+
pvcName := pvcNameStringSlice[len(pvcNameStringSlice)-1]
252+
return fmt.Sprintf("%s-%s", suffix, pvcName)
253+
}
254+
255+
func createVg(volumeGroupName string, devicePath string) error {
256+
// No existing volume group
257+
klog.V(2).Infof("====== vgcreate ======")
258+
args := []string{
259+
"--zero",
260+
"y",
261+
volumeGroupName,
262+
"/dev/nvme0n1",
263+
}
264+
info, err := common.RunCommand("vgcreate", args...)
265+
if err != nil {
266+
klog.Errorf("vgcreate error %v: %s", err, info)
267+
return fmt.Errorf("vgcreate error %w: %s", err, info)
268+
}
269+
270+
klog.V(2).Infof("====== vgscan after vgcreate ======")
271+
args = []string{}
272+
info, err = common.RunCommand("vgscan", args...)
273+
if err != nil {
274+
klog.Errorf("vgscan error %v: %s", err, info)
275+
} else {
276+
klog.V(2).Infof("====== vgscan info %s ======", info)
277+
}
278+
return nil
279+
}
280+
281+
func reduceVolumeGroup(volumeGroupName string) {
282+
klog.V(2).Infof("=========Cleanup VG========")
283+
args := []string{
284+
"--removemissing",
285+
"--force",
286+
volumeGroupName,
287+
}
288+
info, err := common.RunCommand("vgreduce", args...)
289+
if err != nil {
290+
klog.Errorf("Errored while cleaning up volume group %v: %s", err, info)
291+
}
292+
}

0 commit comments

Comments
 (0)