Skip to content

Commit 245a8ee

Browse files
authored
Merge pull request #569 from PrasadG193/snapshot-metadata-service
Implement CSI SnapshotMetadata service
2 parents ab4139c + 60a9768 commit 245a8ee

File tree

8 files changed

+972
-6
lines changed

8 files changed

+972
-6
lines changed

cmd/hostpathplugin/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ import (
2525
"path"
2626
"syscall"
2727

28+
"k8s.io/klog/v2"
29+
30+
"github.com/container-storage-interface/spec/lib/go/csi"
2831
"github.com/kubernetes-csi/csi-driver-host-path/internal/proxy"
2932
"github.com/kubernetes-csi/csi-driver-host-path/pkg/hostpath"
30-
"k8s.io/klog/v2"
3133
)
3234

3335
var (
@@ -55,6 +57,8 @@ func main() {
5557

5658
flag.BoolVar(&cfg.EnableVolumeExpansion, "enable-volume-expansion", true, "Enables volume expansion feature.")
5759
flag.BoolVar(&cfg.EnableControllerModifyVolume, "enable-controller-modify-volume", false, "Enables Controller modify volume feature.")
60+
flag.BoolVar(&cfg.EnableSnapshotMetadata, "enable-snapshot-metadata", false, "Enables Snapshot Metadata service.")
61+
snapshotMetadataBlockType := flag.String("snapshot-metadata-block-type", "FIXED_LENGTH", "Expected Snapshot Metadata block type in response. Allowed valid types are FIXED_LENGTH or VARIABLE_LENGTH. If not specified, FIXED_LENGTH is used by default.")
5862
flag.Var(&cfg.AcceptedMutableParameterNames, "accepted-mutable-parameter-names", "Comma separated list of parameter names that can be modified on a persistent volume. This is only used when enable-controller-modify-volume is true. If unset, all parameters are mutable.")
5963
flag.BoolVar(&cfg.DisableControllerExpansion, "disable-controller-expansion", false, "Disables Controller volume expansion capability.")
6064
flag.BoolVar(&cfg.DisableNodeExpansion, "disable-node-expansion", false, "Disables Node volume expansion capability.")
@@ -106,6 +110,14 @@ func main() {
106110
cfg.MaxVolumeExpansionSizeNode = cfg.MaxVolumeSize
107111
}
108112

113+
// validate snapshot-metadata-type arg block type
114+
bt, ok := csi.BlockMetadataType_value[*snapshotMetadataBlockType]
115+
if !ok {
116+
fmt.Printf("invalid snapshot-metadata-block-type passed, please pass one of the - FIXED_LENGTH, VARIABLE_LENGTH")
117+
os.Exit(1)
118+
}
119+
cfg.SnapshotMetadataBlockType = csi.BlockMetadataType(bt)
120+
109121
driver, err := hostpath.NewHostPathDriver(cfg)
110122
if err != nil {
111123
fmt.Printf("Failed to initialize driver: %s", err.Error())

pkg/hostpath/hostpath.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type hostPath struct {
5454
csi.UnimplementedControllerServer
5555
csi.UnimplementedNodeServer
5656
csi.UnimplementedGroupControllerServer
57+
csi.UnimplementedSnapshotMetadataServer
5758
config Config
5859

5960
// gRPC calls involving any of the fields below must be serialized
@@ -80,6 +81,8 @@ type Config struct {
8081
EnableTopology bool
8182
EnableVolumeExpansion bool
8283
EnableControllerModifyVolume bool
84+
EnableSnapshotMetadata bool
85+
SnapshotMetadataBlockType csi.BlockMetadataType
8386
AcceptedMutableParameterNames StringArray
8487
DisableControllerExpansion bool
8588
DisableNodeExpansion bool
@@ -129,8 +132,11 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) {
129132

130133
func (hp *hostPath) Run() error {
131134
s := NewNonBlockingGRPCServer()
132-
// hp itself implements ControllerServer, NodeServer, and IdentityServer.
133-
s.Start(hp.config.Endpoint, hp, hp, hp, hp)
135+
var sms csi.SnapshotMetadataServer
136+
if hp.config.EnableSnapshotMetadata {
137+
sms = hp
138+
}
139+
s.Start(hp.config.Endpoint, hp, hp, hp, hp, sms)
134140
s.Wait()
135141

136142
return nil

pkg/hostpath/identityserver.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ func (hp *hostPath) GetPluginCapabilities(ctx context.Context, req *csi.GetPlugi
7272
},
7373
})
7474
}
75+
if hp.config.EnableSnapshotMetadata {
76+
caps = append(caps, &csi.PluginCapability{
77+
Type: &csi.PluginCapability_Service_{
78+
Service: &csi.PluginCapability_Service{
79+
Type: csi.PluginCapability_Service_SNAPSHOT_METADATA_SERVICE,
80+
},
81+
},
82+
})
83+
}
7584

7685
return &csi.GetPluginCapabilitiesResponse{Capabilities: caps}, nil
7786
}

pkg/hostpath/server.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ type nonBlockingGRPCServer struct {
4040
cleanup func()
4141
}
4242

43-
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) {
43+
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) {
4444

4545
s.wg.Add(1)
4646

47-
go s.serve(endpoint, ids, cs, ns, gcs)
47+
go s.serve(endpoint, ids, cs, ns, gcs, sms)
4848

4949
return
5050
}
@@ -63,7 +63,7 @@ func (s *nonBlockingGRPCServer) ForceStop() {
6363
s.cleanup()
6464
}
6565

66-
func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) {
66+
func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) {
6767
listener, cleanup, err := endpoint.Listen(ep)
6868
if err != nil {
6969
klog.Fatalf("Failed to listen: %v", err)
@@ -88,6 +88,9 @@ func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.
8888
if gcs != nil {
8989
csi.RegisterGroupControllerServer(server, gcs)
9090
}
91+
if sms != nil {
92+
csi.RegisterSnapshotMetadataServer(server, sms)
93+
}
9194

9295
klog.Infof("Listening for connections on address: %#v", listener.Addr())
9396

pkg/hostpath/snapshotmetadata.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package hostpath
18+
19+
import (
20+
"bytes"
21+
"io"
22+
"os"
23+
24+
"github.com/container-storage-interface/spec/lib/go/csi"
25+
"golang.org/x/net/context"
26+
"k8s.io/klog/v2"
27+
)
28+
29+
// NOTE: This implementation of SnapshotMetadata service is used for demo and CI testing purpose only.
30+
// This should not be used in production or as an example about how to write a real driver.
31+
32+
type fileBlockReader struct {
33+
base *os.File
34+
target *os.File
35+
offset int64
36+
blockSize int64
37+
blockMetadataType csi.BlockMetadataType
38+
maxResult int32
39+
}
40+
41+
func newFileBlockReader(
42+
basePath,
43+
targetPath string,
44+
startingOffset int64,
45+
blockSize int64,
46+
blockMetadataType csi.BlockMetadataType,
47+
maxResult int32,
48+
) (*fileBlockReader, error) {
49+
base, target, err := openFiles(basePath, targetPath)
50+
if err != nil {
51+
return nil, err
52+
}
53+
54+
return &fileBlockReader{
55+
base: base,
56+
target: target,
57+
offset: startingOffset,
58+
blockSize: blockSize,
59+
blockMetadataType: blockMetadataType,
60+
maxResult: maxResult,
61+
}, nil
62+
}
63+
64+
func (cb *fileBlockReader) seekToStartingOffset() error {
65+
if _, err := cb.target.Seek(cb.offset, io.SeekStart); err != nil {
66+
return err
67+
}
68+
if cb.base == nil {
69+
return nil
70+
}
71+
if _, err := cb.base.Seek(cb.offset, io.SeekStart); err != nil {
72+
return err
73+
}
74+
return nil
75+
}
76+
77+
func (cb *fileBlockReader) Close() error {
78+
if cb.base != nil {
79+
if err := cb.base.Close(); err != nil {
80+
return err
81+
}
82+
}
83+
if cb.target != nil {
84+
if err := cb.target.Close(); err != nil {
85+
return err
86+
}
87+
}
88+
return nil
89+
}
90+
91+
func openFiles(basePath, targetPath string) (base, target *os.File, err error) {
92+
target, err = os.Open(targetPath)
93+
if err != nil {
94+
return nil, nil, err
95+
}
96+
if basePath == "" {
97+
return nil, target, nil
98+
}
99+
base, err = os.Open(basePath)
100+
if err != nil {
101+
target.Close()
102+
return nil, nil, err
103+
}
104+
105+
return base, target, nil
106+
}
107+
108+
// getChangedBlockMetadata reads base and target files, compare block differences between them
109+
// and returns list of changed block metadata. It reads all the blocks till it reaches EOF or size of changed block
110+
// metadata list <= maxSize.
111+
func (cb *fileBlockReader) getChangedBlockMetadata(ctx context.Context) ([]*csi.BlockMetadata, error) {
112+
if cb.base == nil {
113+
klog.V(4).Infof("finding allocated blocks by file: %s", cb.target.Name())
114+
} else {
115+
klog.V(4).Infof("finding changed blocks between two files: %s, %s", cb.base.Name(), cb.target.Name())
116+
}
117+
118+
blockIndex := cb.offset / cb.blockSize
119+
sBuffer := make([]byte, cb.blockSize)
120+
tBuffer := make([]byte, cb.blockSize)
121+
eofBaseFile, eofTargetFile := false, false
122+
123+
changedBlocks := []*csi.BlockMetadata{}
124+
125+
// Read blocks and compare them. Create the list of changed blocks metadata.
126+
// Once the number of blocks reaches to maxResult, return the result and
127+
// compute next batch of blocks.
128+
for int32(len(changedBlocks)) < cb.maxResult {
129+
select {
130+
case <-ctx.Done():
131+
klog.V(4).Infof("handling cancellation signal, closing goroutine")
132+
return nil, ctx.Err()
133+
default:
134+
}
135+
targetReadBytes, eofTarget, err := readFileBlock(cb.target, tBuffer, eofTargetFile)
136+
if err != nil {
137+
return nil, err
138+
}
139+
eofTargetFile = eofTarget
140+
141+
// If base is nil, return blocks allocated by target file.
142+
if cb.base == nil {
143+
if eofTargetFile {
144+
return changedBlocks, io.EOF
145+
}
146+
// if VARIABLE_LENGTH type is enabled, return blocks extend instead of individual blocks.
147+
blockMetadata := createBlockMetadata(blockIndex, cb.blockSize)
148+
if extendBlock(changedBlocks, csi.BlockMetadataType(cb.blockMetadataType), blockIndex, cb.blockSize) {
149+
changedBlocks[len(changedBlocks)-1].SizeBytes += cb.blockSize
150+
cb.offset += cb.blockSize
151+
blockIndex++
152+
continue
153+
}
154+
changedBlocks = append(changedBlocks, blockMetadata)
155+
cb.offset += cb.blockSize
156+
blockIndex++
157+
continue
158+
}
159+
160+
baseReadBytes, eofBase, err := readFileBlock(cb.base, sBuffer, eofBaseFile)
161+
if err != nil {
162+
return nil, err
163+
}
164+
eofBaseFile = eofBase
165+
166+
// If both files have reached EOF, exit the loop.
167+
if eofBaseFile && eofTargetFile {
168+
klog.V(4).Infof("reached end of the files")
169+
return changedBlocks, io.EOF
170+
}
171+
172+
// Compare the two blocks and add result.
173+
// Even if one of the file reaches to end, continue to add block metadata of other file.
174+
if blockChanged(sBuffer[:baseReadBytes], tBuffer[:targetReadBytes]) {
175+
blockMetadata := createBlockMetadata(blockIndex, cb.blockSize)
176+
// if VARIABLE_LEGTH type is enabled, check if blocks are adjacent,
177+
// extend the previous block if adjacent blocks found instead of adding new entry.
178+
if extendBlock(changedBlocks, csi.BlockMetadataType(cb.blockMetadataType), blockIndex, cb.blockSize) {
179+
changedBlocks[len(changedBlocks)-1].SizeBytes += cb.blockSize
180+
cb.offset += cb.blockSize
181+
blockIndex++
182+
continue
183+
}
184+
changedBlocks = append(changedBlocks, blockMetadata)
185+
}
186+
187+
cb.offset += cb.blockSize
188+
blockIndex++
189+
}
190+
return changedBlocks, nil
191+
}
192+
193+
// readFileBlock reads blocks from a file.
194+
func readFileBlock(file *os.File, buffer []byte, eof bool) (int, bool, error) {
195+
if eof {
196+
return 0, true, nil
197+
}
198+
199+
bytesRead, err := file.Read(buffer)
200+
if err != nil && err != io.EOF {
201+
return 0, false, err
202+
}
203+
204+
return bytesRead, err == io.EOF, nil
205+
}
206+
207+
func blockChanged(baseBlock, targetBlock []byte) bool {
208+
return !bytes.Equal(baseBlock, targetBlock)
209+
}
210+
211+
func createBlockMetadata(blockIndex, blockSize int64) *csi.BlockMetadata {
212+
return &csi.BlockMetadata{
213+
ByteOffset: blockIndex * blockSize,
214+
SizeBytes: blockSize,
215+
}
216+
}
217+
218+
func extendBlock(changedBlocks []*csi.BlockMetadata, metadataType csi.BlockMetadataType, blockIndex, blockSize int64) bool {
219+
blockMetadata := createBlockMetadata(blockIndex, blockSize)
220+
// if VARIABLE_LEGTH type is enabled, check if blocks are adjacent,
221+
// extend the previous block if adjacent blocks found instead of adding new entry.
222+
if len(changedBlocks) < 1 {
223+
return false
224+
}
225+
lastBlock := changedBlocks[len(changedBlocks)-1]
226+
if blockMetadata.ByteOffset == lastBlock.ByteOffset+lastBlock.SizeBytes &&
227+
metadataType == csi.BlockMetadataType_VARIABLE_LENGTH {
228+
return true
229+
}
230+
return false
231+
}

0 commit comments

Comments
 (0)