Skip to content

Commit a65efdb

Browse files
committed
Add logic to check the number of bytes read
1 parent 49c3a10 commit a65efdb

File tree

1 file changed

+29
-18
lines changed

1 file changed

+29
-18
lines changed

pkg/rsstorage/internal/chunks.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ func (w *DefaultChunkUtils) ReadChunked(
200200
address string,
201201
) (io.ReadCloser, *types.ChunksInfo, int64, time.Time, error) {
202202
chunkDir := filepath.Join(dir, address)
203+
var err error
203204

204205
infoFile, _, _, _, ok, err := w.Server.Get(ctx, chunkDir, "info.json")
205206
if err != nil {
@@ -217,7 +218,12 @@ func (w *DefaultChunkUtils) ReadChunked(
217218
}
218219

219220
pR, pW := io.Pipe()
220-
go w.readChunks(ctx, address, chunkDir, info.NumChunks, info.Complete, pW)
221+
go func() {
222+
err = w.readChunks(ctx, address, chunkDir, info.NumChunks, info.Complete, info.ChunkSize, pW)
223+
}()
224+
if err != nil {
225+
return nil, nil, 0, time.Time{}, err
226+
}
221227

222228
return pR, &info, int64(info.FileSize), info.ModTime, nil
223229
}
@@ -228,21 +234,26 @@ func (w *DefaultChunkUtils) readChunks(
228234
chunkDir string,
229235
numChunks uint64,
230236
complete bool,
237+
fileSize uint64,
231238
writer *io.PipeWriter,
232-
) {
239+
) error {
233240
// TODO: Handle this error
234241
defer writer.Close()
235242

243+
totalBytesWritten := uint64(0)
244+
236245
for i := uint64(1); i <= numChunks; i++ {
237-
err := w.retryingChunkRead(ctx, i, address, chunkDir, complete, writer)
246+
bytesRead, err := w.retryingChunkRead(ctx, i, address, chunkDir, complete, writer)
238247
if err != nil {
239-
// TODO: Handle this error
240-
writer.CloseWithError(err)
241-
return
248+
return writer.CloseWithError(err)
242249
}
250+
totalBytesWritten += bytesRead
251+
}
252+
if totalBytesWritten != fileSize {
253+
return fmt.Errorf("expected to read '%d' bytes from file but only read '%d' bytes", fileSize, totalBytesWritten)
243254
}
244255

245-
return
256+
return nil
246257
}
247258

248259
func (w *DefaultChunkUtils) retryingChunkRead(
@@ -252,12 +263,12 @@ func (w *DefaultChunkUtils) retryingChunkRead(
252263
chunkDir string,
253264
complete bool,
254265
writer *io.PipeWriter,
255-
) (err error) {
266+
) (bytesRead uint64, err error) {
256267
attempts := 0
257268
for {
258269
attempts += 1
259270
var done bool
260-
done, err = w.tryChunkRead(ctx, attempts, chunkIndex, address, chunkDir, complete, writer)
271+
done, bytesRead, err = w.tryChunkRead(ctx, attempts, chunkIndex, address, chunkDir, complete, writer)
261272
if err != nil || done {
262273
return
263274
}
@@ -272,42 +283,42 @@ func (w *DefaultChunkUtils) tryChunkRead(
272283
chunkDir string,
273284
complete bool,
274285
writer *io.PipeWriter,
275-
) (bool, error) {
286+
) (bool, uint64, error) {
276287
chunkFile := fmt.Sprintf("%08d", chunkIndex)
277288

278289
// Open the chunks sequentially
279290
chunk, _, _, _, ok, err := w.Server.Get(ctx, chunkDir, chunkFile)
280291
if err != nil {
281-
return false, fmt.Errorf("error opening chunk file at %s: %s", chunkDir, err)
292+
return false, 0, fmt.Errorf("error opening chunk file at %s: %s", chunkDir, err)
282293
} else if !ok {
283294
if !complete {
284295
// If we've waited 5 minutes for this chunk to appear, err to avoid
285296
// blocking forever
286297
if attempts > w.MaxAttempts {
287-
return false, rsstorage.ErrNoChunk
298+
return false, 0, rsstorage.ErrNoChunk
288299
}
289300
// Wait for the next chunk, then retry in for loop.
290301
w.Waiter.WaitForChunk(ctx, &types.ChunkNotification{
291302
Timeout: w.PollTimeout,
292303
Address: address,
293304
Chunk: chunkIndex,
294305
})
295-
return false, nil
306+
return false, 0, nil
296307
} else {
297308
// If already done, return error
298-
return false, rsstorage.ErrNoChunk
309+
return false, 0, rsstorage.ErrNoChunk
299310
}
300311
}
301312
// TODO: handle this error
302313
defer chunk.Close()
303314

304315
// Read the current chunk
305-
_, err = io.Copy(writer, chunk)
306-
if err != nil {
307-
return false, fmt.Errorf("error reading from chunk: %s", err)
316+
bytesCopied, copyErr := io.Copy(writer, chunk)
317+
if copyErr != nil {
318+
return false, 0, fmt.Errorf("error reading from chunk: %s", err)
308319
}
309320

310-
return true, nil
321+
return true, uint64(bytesCopied), nil
311322
}
312323

313324
func FilterChunks(input []types.StoredItem) []types.StoredItem {

0 commit comments

Comments
 (0)