Skip to content

Commit 3b2b102

Browse files
committed
Add unit tests
Implement GetMetadataAllocated RPC handler Signed-off-by: Prasad Ghangal <[email protected]>
1 parent 57eff9b commit 3b2b102

File tree

4 files changed

+650
-88
lines changed

4 files changed

+650
-88
lines changed

pkg/hostpath/hostpath.go

Lines changed: 0 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package hostpath
1818

1919
import (
20-
"bytes"
2120
"errors"
2221
"fmt"
2322
"io"
@@ -27,7 +26,6 @@ import (
2726
"sync"
2827

2928
"github.com/container-storage-interface/spec/lib/go/csi"
30-
"golang.org/x/net/context"
3129
"google.golang.org/grpc/codes"
3230
"google.golang.org/grpc/status"
3331
"k8s.io/apimachinery/pkg/api/resource"
@@ -83,7 +81,6 @@ type Config struct {
8381
EnableTopology bool
8482
EnableVolumeExpansion bool
8583
EnableControllerModifyVolume bool
86-
EnableSnapshotMetadata bool
8784
AcceptedMutableParameterNames StringArray
8885
DisableControllerExpansion bool
8986
DisableNodeExpansion bool
@@ -414,87 +411,3 @@ func (hp *hostPath) createSnapshotFromVolume(vol state.Volume, file string, opts
414411

415412
return nil
416413
}
417-
418-
func (hp *hostPath) getChangedBlockMetadata(ctx context.Context, sourcePath, targetPath string, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error {
419-
klog.V(4).Infof("finding changed blocks between two files: %s, %s", sourcePath, targetPath)
420-
defer close(changedBlocksChan)
421-
// Open the two files
422-
source, err := os.Open(sourcePath)
423-
if err != nil {
424-
klog.Errorf("failed to read file: %v", err)
425-
return err
426-
}
427-
defer source.Close()
428-
target, err := os.Open(targetPath)
429-
if err != nil {
430-
klog.Errorf("failed to read file: %v", err)
431-
return err
432-
}
433-
defer target.Close()
434-
435-
// Seek to the desired offset in both files
436-
_, err = source.Seek(startingOffset, 0)
437-
if err != nil {
438-
klog.Errorf("failed to seek file: %v", err)
439-
return err
440-
}
441-
_, err = target.Seek(startingOffset, 0)
442-
if err != nil {
443-
klog.Errorf("failed to seek file: %v", err)
444-
return err
445-
}
446-
447-
// Read both files block by block and compare them
448-
var blockIndex int64
449-
buffer1 := make([]byte, blockSize)
450-
buffer2 := make([]byte, blockSize)
451-
for {
452-
changedBlocks := []*csi.BlockMetadata{}
453-
// Read blocks and compare them. Create the list of changed blocks metadata.
454-
// Once the number of blocks reaches to maxResult, return the result and
455-
// compute next batch of blocks.
456-
for i := 1; i <= int(maxResult); i++ {
457-
select {
458-
case <-ctx.Done(): // Detect cancellation from the client
459-
klog.Infof("Stream canceled by client. Exiting goroutine.")
460-
return nil
461-
default:
462-
// Read block from source and target files
463-
n1, err1 := source.Read(buffer1)
464-
n2, err2 := target.Read(buffer2)
465-
466-
// If both files have reached EOF, exit the loop
467-
if err1 == io.EOF && err2 == io.EOF {
468-
klog.Infof("reached to end of the file\n")
469-
return nil
470-
}
471-
if err1 != nil && err1 != io.EOF {
472-
klog.Errorf("failed to read to buffer: %v", err)
473-
return err1
474-
}
475-
if err2 != nil && err2 != io.EOF {
476-
klog.Errorf("failed to read to buffer: %v", err)
477-
return err2
478-
}
479-
480-
// If the number of bytes read differs, the files are different
481-
if n1 != n2 {
482-
klog.Infof("files differ in size at block %d\n", blockIndex)
483-
return nil
484-
}
485-
486-
// Compare the two blocks and add result
487-
if !bytes.Equal(buffer1[:n1], buffer2[:n2]) {
488-
changedBlocks = append(changedBlocks, &csi.BlockMetadata{
489-
ByteOffset: blockIndex * blockSize,
490-
SizeBytes: int64(blockSize),
491-
})
492-
}
493-
blockIndex++
494-
}
495-
}
496-
if len(changedBlocks) != 0 {
497-
changedBlocksChan <- changedBlocks
498-
}
499-
}
500-
}

pkg/hostpath/snapshotmetadata.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package hostpath
18+
19+
import (
20+
"bytes"
21+
"io"
22+
"os"
23+
24+
"github.com/container-storage-interface/spec/lib/go/csi"
25+
"golang.org/x/net/context"
26+
"k8s.io/klog/v2"
27+
)
28+
29+
func (hp *hostPath) getAllocatedBlockMetadata(ctx context.Context, filePath string, startingOffset, blockSize int64, maxResult int32, allocBlocksChan chan<- []*csi.BlockMetadata) error {
30+
klog.V(4).Infof("finding allocated blocks in the file: %s", filePath)
31+
defer close(allocBlocksChan)
32+
33+
file, err := os.Open(filePath)
34+
if err != nil {
35+
return err
36+
}
37+
defer file.Close()
38+
39+
if _, err := file.Seek(startingOffset, 0); err != nil {
40+
return err
41+
}
42+
43+
return hp.compareBlocks(ctx, nil, file, startingOffset, blockSize, maxResult, allocBlocksChan)
44+
}
45+
46+
func (hp *hostPath) getChangedBlockMetadata(ctx context.Context, sourcePath, targetPath string, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error {
47+
klog.V(4).Infof("finding changed blocks between two files: %s, %s", sourcePath, targetPath)
48+
defer close(changedBlocksChan)
49+
50+
source, target, err := openFiles(sourcePath, targetPath)
51+
if err != nil {
52+
return err
53+
}
54+
defer source.Close()
55+
defer target.Close()
56+
57+
if err := seekToOffset(source, target, startingOffset); err != nil {
58+
return err
59+
}
60+
61+
return hp.compareBlocks(ctx, source, target, startingOffset, blockSize, maxResult, changedBlocksChan)
62+
}
63+
64+
func openFiles(sourcePath, targetPath string) (source, target *os.File, err error) {
65+
source, err = os.Open(sourcePath)
66+
if err != nil {
67+
return nil, nil, err
68+
}
69+
70+
target, err = os.Open(targetPath)
71+
if err != nil {
72+
source.Close()
73+
return nil, nil, err
74+
}
75+
76+
return source, target, nil
77+
}
78+
79+
func seekToOffset(source, target *os.File, startingOffset int64) error {
80+
if _, err := source.Seek(startingOffset, 0); err != nil {
81+
return err
82+
}
83+
if _, err := target.Seek(startingOffset, 0); err != nil {
84+
return err
85+
}
86+
return nil
87+
}
88+
89+
// Compare blocks from source and target, and send changed blocks to channel.
90+
// If source if nil, returns blocks allocated by target.
91+
func (hp *hostPath) compareBlocks(ctx context.Context, source, target *os.File, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error {
92+
blockIndex := startingOffset / blockSize
93+
sBuffer := make([]byte, blockSize)
94+
tBuffer := make([]byte, blockSize)
95+
eofSourceFile, eofTargetFile := false, false
96+
97+
for {
98+
changedBlocks := []*csi.BlockMetadata{}
99+
100+
// Read blocks and compare them. Create the list of changed blocks metadata.
101+
// Once the number of blocks reaches to maxResult, return the result and
102+
// compute next batch of blocks.
103+
for int32(len(changedBlocks)) < maxResult {
104+
select {
105+
case <-ctx.Done():
106+
klog.V(4).Infof("handling cancellation signal, closing goroutine")
107+
return nil
108+
default:
109+
targetReadBytes, eofTarget, err := readFileBlock(target, tBuffer, eofTargetFile)
110+
if err != nil {
111+
return err
112+
}
113+
eofTargetFile = eofTarget
114+
115+
if source == nil {
116+
// If source is nil, return blocks allocated by target file.
117+
if eofTargetFile {
118+
if len(changedBlocks) != 0 {
119+
changedBlocksChan <- changedBlocks
120+
}
121+
return nil
122+
}
123+
changedBlocks = append(changedBlocks, createBlockMetadata(blockIndex, blockSize))
124+
blockIndex++
125+
continue
126+
}
127+
128+
sourceReadBytes, eofSource, err := readFileBlock(source, sBuffer, eofSourceFile)
129+
if err != nil {
130+
return err
131+
}
132+
eofSourceFile = eofSource
133+
134+
// If both files have reached EOF, exit the loop.
135+
if eofSourceFile && eofTargetFile {
136+
klog.V(4).Infof("reached end of the files")
137+
if len(changedBlocks) != 0 {
138+
changedBlocksChan <- changedBlocks
139+
}
140+
return nil
141+
}
142+
143+
// Compare the two blocks and add result.
144+
// Even if one of the file reaches to end, continue to add block metadata of other file.
145+
if blockChanged(sBuffer[:sourceReadBytes], tBuffer[:targetReadBytes]) {
146+
// TODO: Support for VARIABLE sized block metadata
147+
changedBlocks = append(changedBlocks, createBlockMetadata(blockIndex, blockSize))
148+
}
149+
150+
blockIndex++
151+
}
152+
}
153+
154+
if len(changedBlocks) > 0 {
155+
changedBlocksChan <- changedBlocks
156+
}
157+
}
158+
}
159+
160+
// readFileBlock reads blocks from a file.
161+
func readFileBlock(file *os.File, buffer []byte, eof bool) (int, bool, error) {
162+
if eof {
163+
return 0, true, nil
164+
}
165+
166+
bytesRead, err := file.Read(buffer)
167+
if err != nil && err != io.EOF {
168+
return 0, false, err
169+
}
170+
171+
return bytesRead, err == io.EOF, nil
172+
}
173+
174+
func blockChanged(sourceBlock, targetBlock []byte) bool {
175+
return !bytes.Equal(sourceBlock, targetBlock)
176+
}
177+
178+
func createBlockMetadata(blockIndex, blockSize int64) *csi.BlockMetadata {
179+
return &csi.BlockMetadata{
180+
ByteOffset: blockIndex * blockSize,
181+
SizeBytes: blockSize,
182+
}
183+
}

0 commit comments

Comments
 (0)