Skip to content

Commit 873f5c6

Browse files
xkeymreiferson
authored andcommitted
proper handling of maxBytesPerFile for reads
1 parent f9543b6 commit 873f5c6

File tree

1 file changed

+28
-13
lines changed

1 file changed

+28
-13
lines changed

diskqueue.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,16 @@ type diskQueue struct {
6767
sync.RWMutex
6868

6969
// instantiation time metadata
70-
name string
71-
dataPath string
72-
maxBytesPerFile int64 // currently this cannot change once created
73-
minMsgSize int32
74-
maxMsgSize int32
75-
syncEvery int64 // number of writes per fsync
76-
syncTimeout time.Duration // duration of time per fsync
77-
exitFlag int32
78-
needSync bool
70+
name string
71+
dataPath string
72+
maxBytesPerFile int64 // cannot change once created
73+
maxBytesPerFileRead int64
74+
minMsgSize int32
75+
maxMsgSize int32
76+
syncEvery int64 // number of writes per fsync
77+
syncTimeout time.Duration // duration of time per fsync
78+
exitFlag int32
79+
needSync bool
7980

8081
// keeps track of the position where we have read
8182
// (but not yet sent over readChan)
@@ -293,6 +294,16 @@ func (d *diskQueue) readOne() ([]byte, error) {
293294
}
294295
}
295296

297+
// for "complete" files (i.e. not the "current" file), maxBytesPerFileRead
298+
// should be initialized to the file's size, or default to maxBytesPerFile
299+
d.maxBytesPerFileRead = d.maxBytesPerFile
300+
if d.readFileNum < d.writeFileNum {
301+
stat, err := d.readFile.Stat()
302+
if err == nil {
303+
d.maxBytesPerFileRead = stat.Size()
304+
}
305+
}
306+
296307
d.reader = bufio.NewReader(d.readFile)
297308
}
298309

@@ -326,10 +337,10 @@ func (d *diskQueue) readOne() ([]byte, error) {
326337
d.nextReadPos = d.readPos + totalBytes
327338
d.nextReadFileNum = d.readFileNum
328339

329-
// TODO: each data file should embed the maxBytesPerFile
330-
// as the first 8 bytes (at creation time) ensuring that
331-
// the value can change without affecting runtime
332-
if d.nextReadPos > d.maxBytesPerFile {
340+
// we only consider rotating if we're reading a "complete" file
341+
// and since we cannot know the size at which it was rotated, we
342+
// rely on maxBytesPerFileRead rather than maxBytesPerFile
343+
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
333344
if d.readFile != nil {
334345
d.readFile.Close()
335346
d.readFile = nil
@@ -396,6 +407,10 @@ func (d *diskQueue) writeOne(data []byte) error {
396407
d.depth += 1
397408

398409
if d.writePos >= d.maxBytesPerFile {
410+
if d.readFileNum == d.writeFileNum {
411+
d.maxBytesPerFileRead = d.writePos
412+
}
413+
399414
d.writeFileNum++
400415
d.writePos = 0
401416

0 commit comments

Comments
 (0)