Skip to content

Commit 57eff9b

Browse files
committed
Implement SnapshotMetadataService
Signed-off-by: Prasad Ghangal <[email protected]>
1 parent a785248 commit 57eff9b

File tree

5 files changed

+207
-4
lines changed

5 files changed

+207
-4
lines changed

pkg/hostpath/hostpath.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package hostpath
1818

1919
import (
20+
"bytes"
2021
"errors"
2122
"fmt"
2223
"io"
@@ -26,6 +27,7 @@ import (
2627
"sync"
2728

2829
"github.com/container-storage-interface/spec/lib/go/csi"
30+
"golang.org/x/net/context"
2931
"google.golang.org/grpc/codes"
3032
"google.golang.org/grpc/status"
3133
"k8s.io/apimachinery/pkg/api/resource"
@@ -54,6 +56,7 @@ type hostPath struct {
5456
csi.UnimplementedControllerServer
5557
csi.UnimplementedNodeServer
5658
csi.UnimplementedGroupControllerServer
59+
csi.UnimplementedSnapshotMetadataServer
5760
config Config
5861

5962
// gRPC calls involving any of the fields below must be serialized
@@ -80,6 +83,7 @@ type Config struct {
8083
EnableTopology bool
8184
EnableVolumeExpansion bool
8285
EnableControllerModifyVolume bool
86+
EnableSnapshotMetadata bool
8387
AcceptedMutableParameterNames StringArray
8488
DisableControllerExpansion bool
8589
DisableNodeExpansion bool
@@ -130,7 +134,7 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) {
130134
func (hp *hostPath) Run() error {
131135
s := NewNonBlockingGRPCServer()
132136
// hp itself implements ControllerServer, NodeServer, and IdentityServer.
133-
s.Start(hp.config.Endpoint, hp, hp, hp, hp)
137+
s.Start(hp.config.Endpoint, hp, hp, hp, hp, hp)
134138
s.Wait()
135139

136140
return nil
@@ -410,3 +414,87 @@ func (hp *hostPath) createSnapshotFromVolume(vol state.Volume, file string, opts
410414

411415
return nil
412416
}
417+
418+
func (hp *hostPath) getChangedBlockMetadata(ctx context.Context, sourcePath, targetPath string, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error {
419+
klog.V(4).Infof("finding changed blocks between two files: %s, %s", sourcePath, targetPath)
420+
defer close(changedBlocksChan)
421+
// Open the two files
422+
source, err := os.Open(sourcePath)
423+
if err != nil {
424+
klog.Errorf("failed to read file: %v", err)
425+
return err
426+
}
427+
defer source.Close()
428+
target, err := os.Open(targetPath)
429+
if err != nil {
430+
klog.Errorf("failed to read file: %v", err)
431+
return err
432+
}
433+
defer target.Close()
434+
435+
// Seek to the desired offset in both files
436+
_, err = source.Seek(startingOffset, 0)
437+
if err != nil {
438+
klog.Errorf("failed to seek file: %v", err)
439+
return err
440+
}
441+
_, err = target.Seek(startingOffset, 0)
442+
if err != nil {
443+
klog.Errorf("failed to seek file: %v", err)
444+
return err
445+
}
446+
447+
// Read both files block by block and compare them
448+
var blockIndex int64
449+
buffer1 := make([]byte, blockSize)
450+
buffer2 := make([]byte, blockSize)
451+
for {
452+
changedBlocks := []*csi.BlockMetadata{}
453+
// Read blocks and compare them. Create the list of changed blocks metadata.
454+
// Once the number of blocks reaches to maxResult, return the result and
455+
// compute next batch of blocks.
456+
for i := 1; i <= int(maxResult); i++ {
457+
select {
458+
case <-ctx.Done(): // Detect cancellation from the client
459+
klog.Infof("Stream canceled by client. Exiting goroutine.")
460+
return nil
461+
default:
462+
// Read block from source and target files
463+
n1, err1 := source.Read(buffer1)
464+
n2, err2 := target.Read(buffer2)
465+
466+
// If both files have reached EOF, exit the loop
467+
if err1 == io.EOF && err2 == io.EOF {
468+
klog.Infof("reached to end of the file\n")
469+
return nil
470+
}
471+
if err1 != nil && err1 != io.EOF {
472+
klog.Errorf("failed to read to buffer: %v", err)
473+
return err1
474+
}
475+
if err2 != nil && err2 != io.EOF {
476+
klog.Errorf("failed to read to buffer: %v", err)
477+
return err2
478+
}
479+
480+
// If the number of bytes read differs, the files are different
481+
if n1 != n2 {
482+
klog.Infof("files differ in size at block %d\n", blockIndex)
483+
return nil
484+
}
485+
486+
// Compare the two blocks and add result
487+
if !bytes.Equal(buffer1[:n1], buffer2[:n2]) {
488+
changedBlocks = append(changedBlocks, &csi.BlockMetadata{
489+
ByteOffset: blockIndex * blockSize,
490+
SizeBytes: int64(blockSize),
491+
})
492+
}
493+
blockIndex++
494+
}
495+
}
496+
if len(changedBlocks) != 0 {
497+
changedBlocksChan <- changedBlocks
498+
}
499+
}
500+
}

pkg/hostpath/identityserver.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ func (hp *hostPath) GetPluginCapabilities(ctx context.Context, req *csi.GetPlugi
6262
},
6363
},
6464
},
65+
{
66+
Type: &csi.PluginCapability_Service_{
67+
Service: &csi.PluginCapability_Service{
68+
Type: csi.PluginCapability_Service_SNAPSHOT_METADATA_SERVICE,
69+
},
70+
},
71+
},
6572
}
6673
if hp.config.EnableTopology {
6774
caps = append(caps, &csi.PluginCapability{

pkg/hostpath/server.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ type nonBlockingGRPCServer struct {
4040
cleanup func()
4141
}
4242

43-
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) {
43+
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) {
4444

4545
s.wg.Add(1)
4646

47-
go s.serve(endpoint, ids, cs, ns, gcs)
47+
go s.serve(endpoint, ids, cs, ns, gcs, sms)
4848

4949
return
5050
}
@@ -63,7 +63,7 @@ func (s *nonBlockingGRPCServer) ForceStop() {
6363
s.cleanup()
6464
}
6565

66-
func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) {
66+
func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) {
6767
listener, cleanup, err := endpoint.Listen(ep)
6868
if err != nil {
6969
klog.Fatalf("Failed to listen: %v", err)
@@ -88,6 +88,9 @@ func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.
8888
if gcs != nil {
8989
csi.RegisterGroupControllerServer(server, gcs)
9090
}
91+
if sms != nil {
92+
csi.RegisterSnapshotMetadataServer(server, sms)
93+
}
9194

9295
klog.Infof("Listening for connections on address: %#v", listener.Addr())
9396

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package hostpath
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/container-storage-interface/spec/lib/go/csi"
23+
"github.com/kubernetes-csi/csi-driver-host-path/pkg/state"
24+
"google.golang.org/grpc/codes"
25+
"google.golang.org/grpc/status"
26+
"k8s.io/klog/v2"
27+
)
28+
29+
func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, stream csi.SnapshotMetadata_GetMetadataAllocatedServer) error {
30+
return nil
31+
}
32+
33+
func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream csi.SnapshotMetadata_GetMetadataDeltaServer) error {
34+
ctx := stream.Context()
35+
// Check arguments
36+
baseSnapID := req.GetBaseSnapshotId()
37+
targetSnapID := req.GetTargetSnapshotId()
38+
if len(baseSnapID) == 0 {
39+
return status.Error(codes.InvalidArgument, "BaseSnapshotID missing in request")
40+
}
41+
if len(targetSnapID) == 0 {
42+
return status.Error(codes.InvalidArgument, "TargetSnapshotID missing in request")
43+
}
44+
45+
// Load snapshots
46+
source, err := hp.state.GetSnapshotByID(baseSnapID)
47+
if err != nil {
48+
return status.Error(codes.Internal, "Cannot find the source snapshot")
49+
}
50+
target, err := hp.state.GetSnapshotByID(targetSnapID)
51+
if err != nil {
52+
return status.Error(codes.Internal, "Cannot find the target snapshot")
53+
}
54+
55+
if !source.ReadyToUse {
56+
return status.Error(codes.Unavailable, fmt.Sprintf("snapshot %v is not yet ready to use", baseSnapID))
57+
}
58+
if !target.ReadyToUse {
59+
return status.Error(codes.Unavailable, fmt.Sprintf("snapshot %v is not yet ready to use", targetSnapID))
60+
}
61+
62+
if source.VolID != target.VolID {
63+
return status.Error(codes.InvalidArgument, "Snapshots don't belong to the same Volume")
64+
}
65+
vol, err := hp.state.GetVolumeByID(source.VolID)
66+
if err != nil {
67+
return err
68+
}
69+
if vol.VolAccessType != state.BlockAccess {
70+
return status.Error(codes.InvalidArgument, "Source volume does not have block mode access type")
71+
}
72+
73+
changedBlocks := make(chan []*csi.BlockMetadata)
74+
go func() {
75+
err := hp.getChangedBlockMetadata(ctx, hp.getSnapshotPath(baseSnapID), hp.getSnapshotPath(targetSnapID), req.StartingOffset, state.BlockSizeBytes, req.MaxResults, changedBlocks)
76+
if err != nil {
77+
klog.Errorf("failed to get changed block metadata: %v", err)
78+
}
79+
}()
80+
81+
for {
82+
select {
83+
case cb, ok := <-changedBlocks:
84+
if !ok {
85+
klog.V(4).Info("channel closed, returning")
86+
return nil
87+
}
88+
resp := csi.GetMetadataDeltaResponse{
89+
BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH,
90+
VolumeCapacityBytes: vol.VolSize,
91+
BlockMetadata: cb,
92+
}
93+
if err := stream.Send(&resp); err != nil {
94+
return err
95+
}
96+
case <-ctx.Done():
97+
klog.V(4).Info("received cancellation signal, returning")
98+
return nil
99+
}
100+
}
101+
return nil
102+
}

pkg/state/state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ type AccessType int
3434
const (
3535
MountAccess AccessType = iota
3636
BlockAccess
37+
38+
// BlockSizeBytes represents the default block size.
39+
BlockSizeBytes = 4096
3740
)
3841

3942
type Volume struct {

0 commit comments

Comments
 (0)