@@ -29,149 +29,165 @@ import (
29
29
// NOTE: This implementation of SnapshotMetadata service is used for demo and CI testing purpose only.
30
30
// This should not be used in production or as an example about how to write a real driver.
31
31
32
- func (hp * hostPath ) getAllocatedBlockMetadata (ctx context.Context , filePath string , startingOffset , blockSize int64 , maxResult int32 , allocBlocksChan chan <- []* csi.BlockMetadata ) error {
33
- klog .V (4 ).Infof ("finding allocated blocks in the file: %s" , filePath )
34
- defer close (allocBlocksChan )
32
+ type fileBlockReader struct {
33
+ base * os.File
34
+ target * os.File
35
+ offset int64
36
+ blockSize int64
37
+ blockMetadataType csi.BlockMetadataType
38
+ maxResult int32
39
+ }
35
40
36
- file , err := os .Open (filePath )
41
+ func newFileBlockReader (
42
+ basePath ,
43
+ targetPath string ,
44
+ startingOffset int64 ,
45
+ blockSize int64 ,
46
+ blockMetadataType csi.BlockMetadataType ,
47
+ maxResult int32 ,
48
+ ) (* fileBlockReader , error ) {
49
+ base , target , err := openFiles (basePath , targetPath )
37
50
if err != nil {
38
- return err
39
- }
40
- defer file .Close ()
41
-
42
- if _ , err := file .Seek (startingOffset , 0 ); err != nil {
43
- return err
51
+ return nil , err
44
52
}
45
53
46
- return hp .compareBlocks (ctx , nil , file , startingOffset , blockSize , maxResult , allocBlocksChan )
54
+ return & fileBlockReader {
55
+ base : base ,
56
+ target : target ,
57
+ offset : startingOffset ,
58
+ blockSize : blockSize ,
59
+ blockMetadataType : blockMetadataType ,
60
+ maxResult : maxResult ,
61
+ }, nil
47
62
}
48
63
49
- func (hp * hostPath ) getChangedBlockMetadata (ctx context.Context , sourcePath , targetPath string , startingOffset , blockSize int64 , maxResult int32 , changedBlocksChan chan <- []* csi.BlockMetadata ) error {
50
- klog .V (4 ).Infof ("finding changed blocks between two files: %s, %s" , sourcePath , targetPath )
51
- defer close (changedBlocksChan )
52
-
53
- source , target , err := openFiles (sourcePath , targetPath )
54
- if err != nil {
64
+ func (cb * fileBlockReader ) seekToStartingOffset () error {
65
+ if _ , err := cb .target .Seek (cb .offset , io .SeekStart ); err != nil {
55
66
return err
56
67
}
57
- defer source . Close ()
58
- defer target . Close ()
59
-
60
- if err := seekToOffset ( source , target , startingOffset ); err != nil {
68
+ if cb . base == nil {
69
+ return nil
70
+ }
71
+ if _ , err := cb . base . Seek ( cb . offset , io . SeekStart ); err != nil {
61
72
return err
62
73
}
74
+ return nil
75
+ }
63
76
64
- return hp .compareBlocks (ctx , source , target , startingOffset , blockSize , maxResult , changedBlocksChan )
77
+ func (cb * fileBlockReader ) Close () error {
78
+ if cb .base != nil {
79
+ if err := cb .base .Close (); err != nil {
80
+ return err
81
+ }
82
+ }
83
+ if cb .target != nil {
84
+ if err := cb .target .Close (); err != nil {
85
+ return err
86
+ }
87
+ }
88
+ return nil
65
89
}
66
90
67
- func openFiles (sourcePath , targetPath string ) (source , target * os.File , err error ) {
68
- source , err = os .Open (sourcePath )
91
+ func openFiles (basePath , targetPath string ) (base , target * os.File , err error ) {
92
+ target , err = os .Open (targetPath )
69
93
if err != nil {
94
+ base .Close ()
70
95
return nil , nil , err
71
96
}
72
-
73
- target , err = os .Open (targetPath )
97
+ if basePath == "" {
98
+ return nil , target , nil
99
+ }
100
+ base , err = os .Open (basePath )
74
101
if err != nil {
75
- source .Close ()
76
102
return nil , nil , err
77
103
}
78
104
79
- return source , target , nil
105
+ return base , target , nil
80
106
}
81
107
82
- func seekToOffset (source , target * os.File , startingOffset int64 ) error {
83
- if _ , err := source .Seek (startingOffset , 0 ); err != nil {
84
- return err
85
- }
86
- if _ , err := target .Seek (startingOffset , 0 ); err != nil {
87
- return err
108
+ // getChangedBlockMetadata reads base and target files, compare block differences between them
109
+ // and returns list of changed block metadata. It reads all the blocks till it reaches EOF or size of changed block
110
+ // metadata list <= maxSize.
111
+ func (cb * fileBlockReader ) getChangedBlockMetadata (ctx context.Context ) ([]* csi.BlockMetadata , error ) {
112
+ if cb .base == nil {
113
+ klog .V (4 ).Infof ("finding allocated blocks by file: %s" , cb .target .Name ())
114
+ } else {
115
+ klog .V (4 ).Infof ("finding changed blocks between two files: %s, %s" , cb .base .Name (), cb .target .Name ())
88
116
}
89
- return nil
90
- }
91
117
92
- // Compare blocks from source and target, and send changed blocks to channel.
93
- // If source if nil, returns blocks allocated by target.
94
- func (hp * hostPath ) compareBlocks (ctx context.Context , source , target * os.File , startingOffset , blockSize int64 , maxResult int32 , changedBlocksChan chan <- []* csi.BlockMetadata ) error {
95
- blockIndex := startingOffset / blockSize
96
- sBuffer := make ([]byte , blockSize )
97
- tBuffer := make ([]byte , blockSize )
98
- eofSourceFile , eofTargetFile := false , false
99
-
100
- for {
101
- changedBlocks := []* csi.BlockMetadata {}
102
-
103
- // Read blocks and compare them. Create the list of changed blocks metadata.
104
- // Once the number of blocks reaches to maxResult, return the result and
105
- // compute next batch of blocks.
106
- for int32 (len (changedBlocks )) < maxResult {
107
- select {
108
- case <- ctx .Done ():
109
- klog .V (4 ).Infof ("handling cancellation signal, closing goroutine" )
110
- return nil
111
- default :
112
- targetReadBytes , eofTarget , err := readFileBlock (target , tBuffer , eofTargetFile )
113
- if err != nil {
114
- return err
115
- }
116
- eofTargetFile = eofTarget
117
-
118
- if source == nil {
119
- // If source is nil, return blocks allocated by target file.
120
- if eofTargetFile {
121
- if len (changedBlocks ) != 0 {
122
- changedBlocksChan <- changedBlocks
123
- }
124
- return nil
125
- }
126
- // if VARIABLE_LENGTH type is enabled, return blocks extend instead of individual blocks.
127
- blockMetadata := createBlockMetadata (blockIndex , blockSize )
128
- if extendBlock (changedBlocks , csi .BlockMetadataType (hp .config .SnapshotMetadataBlockType ), blockIndex , blockSize ) {
129
- changedBlocks [len (changedBlocks )- 1 ].SizeBytes += blockSize
130
- blockIndex ++
131
- continue
132
- }
133
- changedBlocks = append (changedBlocks , blockMetadata )
134
- blockIndex ++
135
- continue
136
- }
137
-
138
- sourceReadBytes , eofSource , err := readFileBlock (source , sBuffer , eofSourceFile )
139
- if err != nil {
140
- return err
141
- }
142
- eofSourceFile = eofSource
143
-
144
- // If both files have reached EOF, exit the loop.
145
- if eofSourceFile && eofTargetFile {
146
- klog .V (4 ).Infof ("reached end of the files" )
147
- if len (changedBlocks ) != 0 {
148
- changedBlocksChan <- changedBlocks
149
- }
150
- return nil
151
- }
152
-
153
- // Compare the two blocks and add result.
154
- // Even if one of the file reaches to end, continue to add block metadata of other file.
155
- if blockChanged (sBuffer [:sourceReadBytes ], tBuffer [:targetReadBytes ]) {
156
- blockMetadata := createBlockMetadata (blockIndex , blockSize )
157
- // if VARIABLE_LEGTH type is enabled, check if blocks are adjacent,
158
- // extend the previous block if adjacent blocks found instead of adding new entry.
159
- if extendBlock (changedBlocks , csi .BlockMetadataType (hp .config .SnapshotMetadataBlockType ), blockIndex , blockSize ) {
160
- changedBlocks [len (changedBlocks )- 1 ].SizeBytes += blockSize
161
- blockIndex ++
162
- continue
163
- }
164
- changedBlocks = append (changedBlocks , blockMetadata )
165
- }
118
+ blockIndex := cb .offset / cb .blockSize
119
+ sBuffer := make ([]byte , cb .blockSize )
120
+ tBuffer := make ([]byte , cb .blockSize )
121
+ eofBaseFile , eofTargetFile := false , false
122
+
123
+ changedBlocks := []* csi.BlockMetadata {}
124
+
125
+ // Read blocks and compare them. Create the list of changed blocks metadata.
126
+ // Once the number of blocks reaches to maxResult, return the result and
127
+ // compute next batch of blocks.
128
+ for int32 (len (changedBlocks )) < cb .maxResult {
129
+ select {
130
+ case <- ctx .Done ():
131
+ klog .V (4 ).Infof ("handling cancellation signal, closing goroutine" )
132
+ return nil , ctx .Err ()
133
+ default :
134
+ }
135
+ targetReadBytes , eofTarget , err := readFileBlock (cb .target , tBuffer , eofTargetFile )
136
+ if err != nil {
137
+ return nil , err
138
+ }
139
+ eofTargetFile = eofTarget
166
140
141
+ // If base is nil, return blocks allocated by target file.
142
+ if cb .base == nil {
143
+ if eofTargetFile {
144
+ return changedBlocks , io .EOF
145
+ }
146
+ // if VARIABLE_LENGTH type is enabled, return blocks extend instead of individual blocks.
147
+ blockMetadata := createBlockMetadata (blockIndex , cb .blockSize )
148
+ if extendBlock (changedBlocks , csi .BlockMetadataType (cb .blockMetadataType ), blockIndex , cb .blockSize ) {
149
+ changedBlocks [len (changedBlocks )- 1 ].SizeBytes += cb .blockSize
150
+ cb .offset += cb .blockSize
167
151
blockIndex ++
152
+ continue
168
153
}
154
+ changedBlocks = append (changedBlocks , blockMetadata )
155
+ cb .offset += cb .blockSize
156
+ blockIndex ++
157
+ continue
158
+ }
159
+
160
+ baseReadBytes , eofBase , err := readFileBlock (cb .base , sBuffer , eofBaseFile )
161
+ if err != nil {
162
+ return nil , err
163
+ }
164
+ eofBaseFile = eofBase
165
+
166
+ // If both files have reached EOF, exit the loop.
167
+ if eofBaseFile && eofTargetFile {
168
+ klog .V (4 ).Infof ("reached end of the files" )
169
+ return changedBlocks , io .EOF
169
170
}
170
171
171
- if len (changedBlocks ) > 0 {
172
- changedBlocksChan <- changedBlocks
172
+ // Compare the two blocks and add result.
173
+ // Even if one of the file reaches to end, continue to add block metadata of other file.
174
+ if blockChanged (sBuffer [:baseReadBytes ], tBuffer [:targetReadBytes ]) {
175
+ blockMetadata := createBlockMetadata (blockIndex , cb .blockSize )
176
+ // if VARIABLE_LEGTH type is enabled, check if blocks are adjacent,
177
+ // extend the previous block if adjacent blocks found instead of adding new entry.
178
+ if extendBlock (changedBlocks , csi .BlockMetadataType (cb .blockMetadataType ), blockIndex , cb .blockSize ) {
179
+ changedBlocks [len (changedBlocks )- 1 ].SizeBytes += cb .blockSize
180
+ cb .offset += cb .blockSize
181
+ blockIndex ++
182
+ continue
183
+ }
184
+ changedBlocks = append (changedBlocks , blockMetadata )
173
185
}
186
+
187
+ cb .offset += cb .blockSize
188
+ blockIndex ++
174
189
}
190
+ return changedBlocks , nil
175
191
}
176
192
177
193
// readFileBlock reads blocks from a file.
@@ -188,8 +204,8 @@ func readFileBlock(file *os.File, buffer []byte, eof bool) (int, bool, error) {
188
204
return bytesRead , err == io .EOF , nil
189
205
}
190
206
191
- func blockChanged (sourceBlock , targetBlock []byte ) bool {
192
- return ! bytes .Equal (sourceBlock , targetBlock )
207
+ func blockChanged (baseBlock , targetBlock []byte ) bool {
208
+ return ! bytes .Equal (baseBlock , targetBlock )
193
209
}
194
210
195
211
func createBlockMetadata (blockIndex , blockSize int64 ) * csi.BlockMetadata {
0 commit comments