From 9f9dc6358691b0888c849b6d0062a6112d4227ba Mon Sep 17 00:00:00 2001 From: Kris Watts Date: Fri, 26 Mar 2021 13:50:18 -0600 Subject: [PATCH 1/7] adding sync and close methods --- buffer.go | 34 ++++++++++++++++++++++++++++++++++ buffer_test.go | 6 ++++++ 2 files changed, 40 insertions(+) diff --git a/buffer.go b/buffer.go index fe2fd6e..5193527 100644 --- a/buffer.go +++ b/buffer.go @@ -6,10 +6,15 @@ import ( "os" "sync" "syscall" + "unsafe" "github.com/cloudflare/buffer/binary" ) +var ( + ErrNotOpen = errors.New("Not open") +) + // Buffer data format // Total size doesn't include header // (32 bytes) @@ -283,3 +288,32 @@ func New(filename string, capacity int) (*Buffer, error) { return b, nil } + +func (b *Buffer) Sync() (err error) { + b.Lock() + defer b.Unlock() + //check that our data isn't nil + if len(b.data) == 0 { + return ErrNotOpen + } + //call a blocking msync + base := uintptr(unsafe.Pointer(&b.data[0])) + sz := uintptr(len(b.data)) + if _, _, errno := syscall.Syscall(syscall.SYS_MSYNC, base, sz, 0x4); errno != 0 { + err = errno + } + return +} + +func (b *Buffer) Close() (err error) { + b.Lock() + defer b.Unlock() + //check that our data isn't nil + if b.data == nil { + return ErrNotOpen + } + //unmap and set the buffer to nil + err = syscall.Munmap(b.data) + b.data = nil + return +} diff --git a/buffer_test.go b/buffer_test.go index 3fa2360..15f6651 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -112,10 +112,16 @@ func TestAll(t *testing.T) { t.Logf("%v", string(val)) } } + if err = b.Sync(); err != nil { + t.Errorf("Failed to sync: %v", err) + } m := b.readmeta() if c.expect != m { t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) } + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } os.Remove(filename) } } From 696176a8d26fa7b2e262d5f3d7fb613a0f4ff521 Mon Sep 17 00:00:00 2001 From: kristopher watts Date: Thu, 1 Apr 2021 14:38:02 -0600 Subject: [PATCH 2/7] Adding in ability to open existing without truncating --- buffer.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/buffer.go b/buffer.go index 5193527..1ff1a01 100644 --- a/buffer.go +++ b/buffer.go @@ -246,7 +246,7 @@ func New(filename string, capacity int) (*Buffer, error) { // TODO: don't assume filename given is a good file. // TODO: get passed in open file? - if _, err := os.Stat(filename); os.IsNotExist(err) { + if _, err = os.Stat(filename); os.IsNotExist(err) { f, err = os.Create(filename) if err != nil { return nil, err @@ -261,7 +261,13 @@ func New(filename string, capacity int) (*Buffer, error) { if err := syscall.Truncate(filename, int64(capacity)); err != nil { return nil, err } + return open(f, newFile, capacity) +} +func open(f *os.File, newFile bool, capacity int) (*Buffer, error) { + if f == nil || capacity <= 0 { + return nil, errors.New("Bad parameters") + } data, err := syscall.Mmap( int(f.Fd()), 0, capacity, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, @@ -272,7 +278,9 @@ func New(filename string, capacity int) (*Buffer, error) { // don't need this anymore f.Close() - b := &Buffer{data: data} + b := &Buffer{ + data: data, + } if newFile { m := meta{ @@ -289,6 +297,24 @@ func New(filename string, capacity int) (*Buffer, error) { return b, nil } +// Open will open an existing buffer, if the file does not exist an error is returned +func Open(filename string) (*Buffer, error) { + var ( + f *os.File + err error + capacity int64 + ) + f, err = os.OpenFile(filename, os.O_RDWR, 0644) + if err != nil { + return nil, err + } else if fi, err := f.Stat(); err != nil { + return nil, err + } else { + capacity = fi.Size() + } + return open(f, false, int(capacity)) +} + func (b *Buffer) Sync() (err error) { b.Lock() defer b.Unlock() From 569d0e7e8023ec8e96cff716346bdf7db0708d75 Mon Sep 17 00:00:00 2001 From: kristopher watts Date: Thu, 8 Apr 2021 16:45:37 -0600 Subject: [PATCH 3/7] working true circular buffer --- advise_linux.go | 10 +- buffer.go | 360 ++++++++++++++++++++++++++++++++++-------------- buffer_test.go | 241 ++++++++++++++++++++------------ flock.go | 71 ++++++++++ flock_others.go | 15 ++ go.mod | 3 + 6 files changed, 504 insertions(+), 196 deletions(-) create mode 100644 flock.go create mode 100644 flock_others.go create mode 100644 go.mod diff --git a/advise_linux.go b/advise_linux.go index ac8e16d..dc34e09 100644 --- a/advise_linux.go +++ b/advise_linux.go @@ -15,13 +15,11 @@ func (b *Buffer) Advise() error { b.Lock() defer b.Unlock() - m := b.readmeta() - // Start of next page above write offset - wo := int(m.woff) + (pageSize - int(m.woff)%pageSize) + wo := int(b.m.woff) + (pageSize - int(b.m.woff)%pageSize) // Start of page below read offset - ro := int(m.roff) - int(m.roff)%pageSize + ro := int(b.m.roff) - int(b.m.roff)%pageSize if ro > wo { return syscall.Madvise(b.data[wo:ro], syscall.MADV_DONTNEED) @@ -34,8 +32,8 @@ func (b *Buffer) Advise() error { return err } } - if wo < int(m.cap) { - return syscall.Madvise(b.data[wo:m.cap], syscall.MADV_DONTNEED) + if wo < int(b.m.cap) { + return syscall.Madvise(b.data[wo:b.m.cap], syscall.MADV_DONTNEED) } return nil } diff --git a/buffer.go b/buffer.go index 1ff1a01..8ff2d46 100644 --- a/buffer.go +++ b/buffer.go @@ -2,6 +2,7 @@ package buffer import ( + "bytes" "errors" "os" "sync" @@ -13,6 +14,8 @@ import ( var ( ErrNotOpen = errors.New("Not open") + ErrEmpty = errors.New("Empty file") + ErrCorrupt = errors.New("Corrupt") ) // Buffer data format @@ -33,43 +36,106 @@ var ( type Buffer struct { sync.Mutex + f *os.File data []byte + m *meta } type meta struct { - size uint64 - woff uint64 - roff uint64 - cap uint64 + magic [16]byte + size uint64 + woff uint64 + roff uint64 + cap uint64 } -// TODO: add magic + signatures const ( - offSize = iota * 8 - offNextRead = iota * 8 - offNextWrite = iota * 8 - offMaxCapacity = iota * 8 - offData = iota * 8 + magicHeaderSize = 16 +) + +var ( + magicHeader = []byte(`CLOUDFLAREBUFFER`) +) + +const ( + offMagic = 0 + offSize = offMagic + magicHeaderSize + offNextRead = offSize + 8 + offNextWrite = offNextRead + 8 + offMaxCapacity = offNextWrite + 8 + offData = offMaxCapacity + 8 ) // recordLength contains metadata about what's stored. // Currently only the recordLength const recMeta = 8 -func (b *Buffer) readmeta() meta { +func readmeta(f *os.File) (m meta, err error) { + var fi os.FileInfo + var buff []byte + if f == nil { + err = ErrNotOpen + } else if fi, err = f.Stat(); err != nil { + if sz := fi.Size(); sz == 0 { + err = ErrEmpty + } else if sz < offData { + //not a new file but metdata is bad + err = errors.New("corrupted file header") + } else { + var n int + //check the magic bytes + buff = make([]byte, offData) + if n, err = f.ReadAt(buff, 0); err != nil { + return + } else if n != len(buff) { + err = errors.New("Failed read") + } else if !bytes.Equal(buff[0:magicHeaderSize], magicHeader) { + err = ErrCorrupt + } + + } + } + if err != nil || len(buff) < offData { + return + } + //do the actual extration + m = getMeta(buff[offSize:]) + //check the capacity against the file size + if uint64(fi.Size()) != m.cap { + err = ErrCorrupt + } + + return +} + +func getMeta(b []byte) meta { return meta{ - size: binary.GetLittleEndianUint64(b.data, offSize), - roff: binary.GetLittleEndianUint64(b.data, offNextRead), - woff: binary.GetLittleEndianUint64(b.data, offNextWrite), - cap: binary.GetLittleEndianUint64(b.data, offMaxCapacity), + size: binary.GetLittleEndianUint64(b, offSize), + roff: binary.GetLittleEndianUint64(b, offNextRead), + woff: binary.GetLittleEndianUint64(b, offNextWrite), + cap: binary.GetLittleEndianUint64(b, offMaxCapacity), } } +func putMeta(m meta, b []byte) { + copy(b, magicHeader) + binary.PutLittleEndianUint64(b, offSize, m.size) + binary.PutLittleEndianUint64(b, offNextRead, m.roff) + binary.PutLittleEndianUint64(b, offNextWrite, m.woff) + binary.PutLittleEndianUint64(b, offMaxCapacity, m.cap) +} + +func (b *Buffer) readmeta() (m meta) { + b.Lock() + m = *b.m + b.Unlock() + return +} + func (b *Buffer) writemeta(m meta) { - binary.PutLittleEndianUint64(b.data, offSize, m.size) - binary.PutLittleEndianUint64(b.data, offNextRead, m.roff) - binary.PutLittleEndianUint64(b.data, offNextWrite, m.woff) - binary.PutLittleEndianUint64(b.data, offMaxCapacity, m.cap) + b.Lock() + *b.m = m + b.Unlock() } var ( @@ -82,29 +148,54 @@ var ( // Inserting data which will overflow the buffer fails // Inserting does not overwrite any existing data inside the buffer // if it would overwrite data, errOverwrite is returned -func (b *Buffer) Insert(data []byte) error { +func (b *Buffer) Insert(data []byte) (err error) { b.Lock() - defer b.Unlock() + err = b.insert(data) + b.Unlock() + return +} - m := b.readmeta() +// InsertWithOverwrite inserts data into the buffer, potentially removing existing data +// Inserting using this function CAN overwrite data if there isn't enough free space +func (b *Buffer) InsertWithOverwrite(data []byte) (err error) { + writeLen := uint64(len(data)) + // data is larger filesize - metadata + if writeLen > (b.m.cap - offData - recMeta) { + return errToBig + } + + b.Lock() + for writeLen > b.free() { + if _, err = b.read(true); err != nil { + break + } + } + if err == nil { + err = b.insert(data) + } + b.Unlock() + return +} + +func (b *Buffer) insert(data []byte) error { writeLen := uint64(len(data)) // data is larger filesize - metadata - if writeLen > (m.cap - offData - recMeta) { + if writeLen > (b.m.cap - offData - recMeta) { return errToBig } // data exceeds available space - if m.size+writeLen+offData+recMeta > m.cap { + if b.m.size+writeLen+offData+recMeta > b.m.cap { return errOverflow } wrap := false copyTo := 0 - endOff := m.woff + writeLen + recMeta - if endOff >= m.cap { + endOff := b.m.woff + writeLen + recMeta + if endOff >= b.m.cap { // Get to the start of the file - endOff = endOff % m.cap + endOff = endOff % b.m.cap // save for later copyTo = int(writeLen - endOff) // skip the metadata @@ -116,38 +207,37 @@ func (b *Buffer) Insert(data []byte) error { switch { // r e w // [_______++++++++____] - case m.roff < m.woff && m.roff < endOff && endOff < m.woff: + case b.m.roff < b.m.woff && b.m.roff < endOff && endOff < b.m.woff: return errOverwrite // w r e // [+++++++________++++] - case m.woff < m.roff && endOff > m.roff: + case b.m.woff < b.m.roff && endOff > b.m.roff: return errOverwrite // e w r // [+++++++________++++] - case m.woff < m.roff && endOff < m.woff: + case b.m.woff < b.m.roff && endOff < b.m.woff: return errOverwrite } switch { - case wrap && m.woff+recMeta > m.cap: + case wrap && b.m.woff+recMeta > b.m.cap: tmp := serializeMeta(writeLen) - left := m.cap - m.woff - // startData := (m.woff+recMeta)%m.cap + offData - copy(b.data[m.woff:m.cap], tmp[:left]) + left := b.m.cap - b.m.woff + // startData := (b.m.woff+recMeta)%b.m.cap + offData + copy(b.data[b.m.woff:b.m.cap], tmp[:left]) copy(b.data[offData:offData+recMeta-left], tmp[left:]) copy(b.data[offData+recMeta-left:endOff], data) case wrap: - binary.PutLittleEndianUint64(b.data, int(m.woff), uint64(writeLen)) - copy(b.data[m.woff+recMeta:m.cap], data[:copyTo]) + binary.PutLittleEndianUint64(b.data, int(b.m.woff), uint64(writeLen)) + copy(b.data[b.m.woff+recMeta:b.m.cap], data[:copyTo]) copy(b.data[offData:endOff], data[copyTo:]) default: // write doesn't wrap - binary.PutLittleEndianUint64(b.data, int(m.woff), uint64(writeLen)) - copy(b.data[m.woff+recMeta:endOff], data) + binary.PutLittleEndianUint64(b.data, int(b.m.woff), uint64(writeLen)) + copy(b.data[b.m.woff+recMeta:endOff], data) } - m.woff = endOff - m.size += (writeLen + recMeta) - b.writemeta(m) + b.m.woff = endOff + b.m.size += (writeLen + recMeta) return nil } @@ -163,12 +253,8 @@ func deserializeMeta(meta []byte) uint64 { } func (b *Buffer) read(mutate bool) ([]byte, error) { - b.Lock() - defer b.Unlock() - m := b.readmeta() - - if m.size == 0 { + if b.m.size == 0 { return nil, nil } @@ -176,22 +262,22 @@ func (b *Buffer) read(mutate bool) ([]byte, error) { readLen uint64 left uint64 ) - if m.roff+recMeta > m.cap { - left = m.cap - m.roff + if b.m.roff+recMeta > b.m.cap { + left = b.m.cap - b.m.roff tmp := make([]byte, recMeta) - copy(tmp[:left], b.data[m.roff:m.cap]) + copy(tmp[:left], b.data[b.m.roff:b.m.cap]) copy(tmp[left:], b.data[offData:offData+recMeta-left]) readLen = deserializeMeta(tmp) } else { - readLen = binary.GetLittleEndianUint64(b.data, int(m.roff)) + readLen = binary.GetLittleEndianUint64(b.data, int(b.m.roff)) } var copyTo int - endOff := m.roff + readLen + recMeta + endOff := b.m.roff + readLen + recMeta wrap := false - if endOff > m.cap { + if endOff > b.m.cap { // Get to the start of the file - endOff = endOff % m.cap + endOff = endOff % b.m.cap // save for later copyTo = int(readLen - endOff) // skip the metadata @@ -202,37 +288,61 @@ func (b *Buffer) read(mutate bool) ([]byte, error) { ret := make([]byte, readLen) switch { - case wrap && m.roff+recMeta > m.cap: + case wrap && b.m.roff+recMeta > b.m.cap: copy(ret, b.data[offData+recMeta-left:endOff]) case wrap: - copy(ret[:copyTo], b.data[m.roff+recMeta:m.cap]) + copy(ret[:copyTo], b.data[b.m.roff+recMeta:b.m.cap]) copy(ret[copyTo:], b.data[offData:endOff]) default: - copy(ret, b.data[m.roff+recMeta:endOff]) + copy(ret, b.data[b.m.roff+recMeta:endOff]) } if mutate { - m.roff = endOff - m.size -= (readLen + recMeta) - b.writemeta(m) + b.m.roff = endOff + b.m.size -= (readLen + recMeta) } return ret, nil } // Peek reads the first record and returns it without removing it from the buffer. -func (b *Buffer) Peek() ([]byte, error) { - return b.read(false) +func (b *Buffer) Peek() (buff []byte, err error) { + b.Lock() + buff, err = b.read(false) + b.Unlock() + return } // Pop removes and returns the first record in the buffer -func (b *Buffer) Pop() ([]byte, error) { - return b.read(true) +func (b *Buffer) Pop() (buff []byte, err error) { + b.Lock() + buff, err = b.read(true) + b.Unlock() + return } -func (b *Buffer) Size() int { +// Size returns the overall size of the held data +func (b *Buffer) Size() (v int) { b.Lock() - m := b.readmeta() + v = int(b.m.size) b.Unlock() - return int(m.size) + return +} + +// Free returns how much free data there is in the buffer +// we account for the record metadata, so if the actual free is less +func (b *Buffer) Free() (v int) { + b.Lock() + v = int(b.free()) + b.Unlock() + return +} + +func (b *Buffer) free() (v uint64) { + if v = b.m.cap - (b.m.size + offData); v < recMeta { + v = 0 + } else { + v -= recMeta + } + return } // New creates a new Buffer backed by the file given by filename. @@ -244,75 +354,118 @@ func New(filename string, capacity int) (*Buffer, error) { err error ) - // TODO: don't assume filename given is a good file. // TODO: get passed in open file? - if _, err = os.Stat(filename); os.IsNotExist(err) { + if _, err := os.Stat(filename); os.IsNotExist(err) { f, err = os.Create(filename) if err != nil { return nil, err } newFile = true } else { - f, err = os.OpenFile(filename, os.O_RDWR, 0644) - if err != nil { + if f, err = os.OpenFile(filename, os.O_RDWR, 0644); err != nil { + return nil, err + } else if _, err = readmeta(f); err != nil { + f.Close() return nil, err } } if err := syscall.Truncate(filename, int64(capacity)); err != nil { + f.Close() return nil, err } - return open(f, newFile, capacity) -} - -func open(f *os.File, newFile bool, capacity int) (*Buffer, error) { - if f == nil || capacity <= 0 { - return nil, errors.New("Bad parameters") - } - data, err := syscall.Mmap( - int(f.Fd()), 0, capacity, - syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, - ) + b, err := open(f, capacity) if err != nil { + f.Close() return nil, err } - // don't need this anymore - f.Close() - - b := &Buffer{ - data: data, + if newFile { + b.Lock() + *b.m = meta{ + size: 0, + woff: offData, + roff: offData, + cap: uint64(capacity), + } + b.Unlock() } - if newFile { - m := meta{ + return b, nil +} + +// Open will open an existing Buffer backed by the given file +// If the file does not exist it will be created +// If the buffer is new it will be set to the given capacity +// if the file already exists, AND the given capacity is larger, the file will be expanded +// if the file already exists and the capacity is smaller, an error is returned +func Open(filename string, capacity int) (*Buffer, error) { + var ( + newFile bool + f *os.File + err error + m meta + ) + + if _, err := os.Stat(filename); os.IsNotExist(err) { + f, err = os.Create(filename) + if err != nil { + return nil, err + } + newFile = true + m = meta{ size: 0, woff: offData, roff: offData, cap: uint64(capacity), } + } else { + if f, err = os.OpenFile(filename, os.O_RDWR, 0644); err != nil { + return nil, err + } else if m, err = readmeta(f); err != nil { + f.Close() + return nil, err + } + if m.cap > uint64(capacity) { + f.Close() + return nil, errors.New("Cannot shrink existing file") + } else if uint64(capacity) > m.cap { + m.cap = uint64(capacity) + } + } + if err := syscall.Truncate(filename, int64(capacity)); err != nil { + f.Close() + return nil, err + } + b, err := open(f, capacity) + if err != nil { + f.Close() + return nil, err + } + if newFile { b.Lock() - b.writemeta(m) + *b.m = m b.Unlock() } return b, nil } -// Open will open an existing buffer, if the file does not exist an error is returned -func Open(filename string) (*Buffer, error) { - var ( - f *os.File - err error - capacity int64 +func open(f *os.File, capacity int) (*Buffer, error) { + data, err := syscall.Mmap( + int(f.Fd()), 0, capacity, + syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, ) - f, err = os.OpenFile(filename, os.O_RDWR, 0644) if err != nil { return nil, err - } else if fi, err := f.Stat(); err != nil { + } + if err = flock(f, true); err != nil { return nil, err - } else { - capacity = fi.Size() } - return open(f, false, int(capacity)) + b := &Buffer{ + f: f, + data: data, + m: (*meta)(unsafe.Pointer(&data[0])), + } + return b, nil } func (b *Buffer) Sync() (err error) { @@ -338,8 +491,13 @@ func (b *Buffer) Close() (err error) { if b.data == nil { return ErrNotOpen } + funlock(b.f) //unmap and set the buffer to nil - err = syscall.Munmap(b.data) + if err = syscall.Munmap(b.data); err == nil { + err = b.f.Close() + } else { + b.f.Close() + } b.data = nil return } diff --git a/buffer_test.go b/buffer_test.go index 15f6651..a2e63fc 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -1,6 +1,7 @@ package buffer import ( + "fmt" "io/ioutil" "os" "strings" @@ -23,6 +24,21 @@ type tc struct { err error } +var ( + td string +) + +func TestMain(m *testing.M) { + var err error + if td, err = ioutil.TempDir(os.TempDir(), "buffer"); err != nil { + fmt.Println("Failed to make temporary directory", err) + os.Exit(-1) + } + r := m.Run() + os.RemoveAll(td) + os.Exit(r) +} + func TestAll(t *testing.T) { cases := []tc{ tc{ // happy test @@ -32,9 +48,9 @@ func TestAll(t *testing.T) { }, expect: meta{ size: 32, - woff: 32, - roff: 32, - cap: 64, + woff: 48, + roff: 48, + cap: 80, }, }, tc{ // happy test @@ -45,9 +61,9 @@ func TestAll(t *testing.T) { }, expect: meta{ size: 35, - woff: 102, - roff: 67, - cap: 108, + woff: 118, + roff: 83, + cap: 124, }, }, tc{ // Tests read/write for recMeta wrap @@ -59,9 +75,9 @@ func TestAll(t *testing.T) { }, expect: meta{ size: 0, - woff: 64, - roff: 64, - cap: 70, + woff: 80, + roff: 80, + cap: 86, }, }, tc{ // Tests read/write for data wrapping @@ -73,24 +89,14 @@ func TestAll(t *testing.T) { }, expect: meta{ size: 0, - woff: 42, - roff: 42, - cap: 70, + woff: 58, + roff: 58, + cap: 86, }, }, } - for i, c := range cases { - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove b, err := New(filename, int(c.expect.cap)) if err != nil { t.Errorf("Unexpected failure creating buffer for case: %d", i) @@ -122,7 +128,29 @@ func TestAll(t *testing.T) { if err = b.Close(); err != nil { t.Errorf("Failed to close: %v", err) } - os.Remove(filename) + + //re-open the file with Open + if b, err = Open(filename, int(c.expect.cap)); err != nil { + t.Errorf("Failed to re-open: %v", err) + } + if m = b.readmeta(); c.expect != m { + t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) + } + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + + //re-open the file with Open and an expansion + if b, err = Open(filename, int(c.expect.cap+1024)); err != nil { + t.Errorf("Failed to re-open: %v", err) + } + m.cap += 1024 + if m = b.readmeta(); c.expect != m { + t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) + } + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } } } @@ -136,9 +164,9 @@ func TestErrors(t *testing.T) { err: errOverflow, expect: meta{ size: 21, - woff: 53, - roff: 32, - cap: 64, + woff: 69, + roff: 48, + cap: 80, }, }, tc{ @@ -148,9 +176,9 @@ func TestErrors(t *testing.T) { err: errToBig, expect: meta{ size: 0, - woff: 32, - roff: 32, - cap: 48, + woff: 48, + roff: 48, + cap: 64, }, }, tc{ @@ -160,25 +188,14 @@ func TestErrors(t *testing.T) { err: nil, expect: meta{ size: 0, - woff: 32, - roff: 32, - cap: 64, + woff: 48, + roff: 48, + cap: 80, }, }, } - for i, c := range cases { - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove b, err := New(filename, int(c.expect.cap)) if err != nil { t.Errorf("Unexpected failure creating buffer for case: %d", i) @@ -209,7 +226,6 @@ func TestErrors(t *testing.T) { if c.expect != m { t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) } - os.Remove(filename) } } @@ -223,23 +239,12 @@ func TestHelpers(t *testing.T) { err: errOverflow, expect: meta{ size: 21, - woff: 53, - roff: 32, - cap: 64, + woff: 69, + roff: 48, + cap: 80, }, } - - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove b, err := New(filename, int(c.expect.cap)) if err != nil { t.Errorf("Unexpected failure creating buffer: %v", err) @@ -256,7 +261,7 @@ func TestHelpers(t *testing.T) { t.Errorf("Unexpected Insert failure. err: %v", err) } } else { - _, err := b.Pop() + _, err = b.Pop() switch err { case c.err: break @@ -273,22 +278,10 @@ func TestHelpers(t *testing.T) { if b.Size() != int(m.size) { t.Errorf("Reported size does not match metadata") } - os.Remove(filename) } func TestAdvise(t *testing.T) { - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } - + filename := tempFileName(t) //located in testing.T tempdir, no need to remove recordSize := 1000 // 20 records precisely cap := (20*(recordSize+recMeta) + offData) @@ -322,25 +315,79 @@ func TestAdvise(t *testing.T) { if err != nil { t.Errorf("unexpected error in advise: %v", err) } +} + +func TestOverlap(t *testing.T) { + c := tc{ // happy test + ops: []operation{ + {"Wooorldd", write}, + {"Helloooo", write}, + {"Helloooo", write}, + }, + expect: meta{ + size: 32, + woff: 64, //do to the loop + roff: 64, + cap: 80, + }, + } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove + b, err := New(filename, int(c.expect.cap)) + if err != nil { + t.Errorf("Unexpected failure creating buffer") + } + for _, op := range c.ops { + if op.write { + err := b.InsertWithOverwrite([]byte(op.value)) + if err != nil { + t.Errorf("Unexpected Insert failure: %v", err) + } + } else { + val, err := b.Pop() + if err != nil { + t.Errorf("Unexpected Pop failure: %v", err) + } + if string(val) != op.value { + t.Errorf("Values do not match expected: %v actual: %v", op.value, val) + } + t.Logf("%v", string(val)) + } + } + if err = b.Sync(); err != nil { + t.Errorf("Failed to sync: %v", err) + } + m := b.readmeta() + if c.expect != m { + t.Errorf("metadata structs do not match expect: %v, actual: %v", c.expect, m) + } - os.Remove(filename) + //make sure we can only pop 2 + var buffs []string + for { + if buff, err := b.Pop(); err != nil { + t.Errorf("Failed to pop: %v", err) + } else if buff == nil { + break + } else { + buffs = append(buffs, string(buff)) + } + } + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + if len(buffs) != 2 { + t.Errorf("Too many entries came back out: %d != 2", len(buffs)) + } + if buffs[0] != string(c.ops[1].value) || buffs[1] != string(c.ops[2].value) { + t.Errorf("invalid output: %v", buffs) + } } // benchBuffer tests reading/writing count batches of size batch in sequential chunks // with file capacity cap func benchBuffer(cap, recordSize int, b *testing.B) { + filename := tempFileName(b) //located in testing.T tempdir, no need to remove - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - b.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - b.Fatalf("Unexpected failure removing temp file: %v", err) - } buf, err := New(filename, int(cap)) if err != nil { b.Errorf("Unexpected failure creating buffer.") @@ -358,8 +405,6 @@ func benchBuffer(cap, recordSize int, b *testing.B) { b.Fatalf("Unexpected error while popping: %v", err) } } - - os.Remove(filename) } func BenchmarkBuffer50KBCap1KBRec(b *testing.B) { @@ -381,3 +426,21 @@ func BenchmarkBuffer500MBCap1MBRec(b *testing.B) { // 500MB file, 1MB record benchBuffer(500*1000*1000, 1000*1000, b) } + +func tempFileName(t ft) (s string) { + f, err := ioutil.TempFile(td, "tmpBufTest") + if err != nil { + t.Fatalf("Unexpected failure creating temp file: %v", err) + } + s = f.Name() + if err = f.Close(); err != nil { + t.Fatalf("Failed to close file: %v", err) + } else if err = os.Remove(s); err != nil { + t.Fatalf("Unexpected failure removing temp file: %v", err) + } + return +} + +type ft interface { + Fatalf(f string, args ...interface{}) +} diff --git a/flock.go b/flock.go new file mode 100644 index 0000000..9aa3d2d --- /dev/null +++ b/flock.go @@ -0,0 +1,71 @@ +// +build !windows,!plan9,!solaris + +//this package is based on the flock implementation used in boltdb +//which is MIT licensed and available at: +// https://github.com/boltdb/bolt/blob/master/bolt_unix.go + +package buffer + +import ( + "errors" + "os" + "syscall" +) + +var ( + ErrTimeout = errors.New("Timeout") + ErrLocked = errors.New("File is already locked") +) + +//flock locks a file for this process, this DOES NOT prevent the same process +//from opening the +func flock(f *os.File, exclusive bool) error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Pid = 0 + lock.Whence = 0 + lock.Pid = 0 + if exclusive { + lock.Type = syscall.F_WRLCK + } else { + lock.Type = syscall.F_RDLCK + } + err := rawFdCall(f, func(fd uintptr) error { + return syscall.FcntlFlock(fd, syscall.F_SETLK, &lock) + }) + if err == nil { + return nil + } else if err == syscall.EAGAIN { + return ErrLocked + } + return err +} + +//funlock releases a lock held on a file descriptor +func funlock(f *os.File) error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Type = syscall.F_UNLCK + lock.Whence = 0 + return rawFdCall(f, func(fd uintptr) error { + return syscall.FcntlFlock(fd, syscall.F_SETLK, &lock) + }) +} + +type controlFunc func(fd uintptr) error + +func rawFdCall(fio *os.File, cf controlFunc) error { + if fio == nil || cf == nil { + return errors.New("invalid parameters") + } + rc, err := fio.SyscallConn() + if err != nil { + return err + } + rc.Control(func(fd uintptr) { + err = cf(fd) + }) + return err +} diff --git a/flock_others.go b/flock_others.go new file mode 100644 index 0000000..48d75b8 --- /dev/null +++ b/flock_others.go @@ -0,0 +1,15 @@ +// +build windows,plan9,solaris + +package buffer + +import ( + "os" +) + +func flock(f *os.File, exclusive bool) error { + return nil //do nothing +} + +func funlock(f *os.File) error { + return nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..47fb608 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/cloudflare/buffer + +go 1.15 From d70330db3ea19d316f26ee8bbf7f860ca1d53819 Mon Sep 17 00:00:00 2001 From: Kris Watts Date: Thu, 8 Apr 2021 17:02:00 -0600 Subject: [PATCH 4/7] adding option to not specify a capacity on an existing buffer --- buffer.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/buffer.go b/buffer.go index 8ff2d46..76916d1 100644 --- a/buffer.go +++ b/buffer.go @@ -406,6 +406,9 @@ func Open(filename string, capacity int) (*Buffer, error) { ) if _, err := os.Stat(filename); os.IsNotExist(err) { + if capacity <= 0 { + return nil, errors.New("Bad capacity") + } f, err = os.Create(filename) if err != nil { return nil, err @@ -424,7 +427,9 @@ func Open(filename string, capacity int) (*Buffer, error) { f.Close() return nil, err } - if m.cap > uint64(capacity) { + if capacity == 0 { + capacity = int(m.cap) + } else if m.cap > uint64(capacity) { f.Close() return nil, errors.New("Cannot shrink existing file") } else if uint64(capacity) > m.cap { From e267467acf6e9627eba1cd2a23e8923ce0453ef6 Mon Sep 17 00:00:00 2001 From: Kris Watts Date: Thu, 8 Apr 2021 17:31:08 -0600 Subject: [PATCH 5/7] fixing header stuff --- buffer.go | 67 ++++++++++++++++++------------------- buffer_test.go | 89 ++++++++++++++++++++++++++++++-------------------- 2 files changed, 85 insertions(+), 71 deletions(-) diff --git a/buffer.go b/buffer.go index 76916d1..24b9606 100644 --- a/buffer.go +++ b/buffer.go @@ -4,6 +4,7 @@ package buffer import ( "bytes" "errors" + "fmt" "os" "sync" "syscall" @@ -53,9 +54,8 @@ const ( magicHeaderSize = 16 ) -var ( - magicHeader = []byte(`CLOUDFLAREBUFFER`) -) +//CLOUDFLAREBUFFER +var magicHeader = [16]byte{0x43, 0x4c, 0x4f, 0x55, 0x44, 0x46, 0x4c, 0x41, 0x52, 0x45, 0x42, 0x55, 0x46, 0x45, 0x52} const ( offMagic = 0 @@ -75,7 +75,7 @@ func readmeta(f *os.File) (m meta, err error) { var buff []byte if f == nil { err = ErrNotOpen - } else if fi, err = f.Stat(); err != nil { + } else if fi, err = f.Stat(); err == nil { if sz := fi.Size(); sz == 0 { err = ErrEmpty } else if sz < offData { @@ -89,17 +89,17 @@ func readmeta(f *os.File) (m meta, err error) { return } else if n != len(buff) { err = errors.New("Failed read") - } else if !bytes.Equal(buff[0:magicHeaderSize], magicHeader) { + } else if !bytes.Equal(buff[0:magicHeaderSize], magicHeader[:]) { err = ErrCorrupt } - + fmt.Println("READ", buff) } } - if err != nil || len(buff) < offData { + if err != nil { return } //do the actual extration - m = getMeta(buff[offSize:]) + m = getMeta(buff) //check the capacity against the file size if uint64(fi.Size()) != m.cap { err = ErrCorrupt @@ -108,17 +108,19 @@ func readmeta(f *os.File) (m meta, err error) { return } -func getMeta(b []byte) meta { - return meta{ - size: binary.GetLittleEndianUint64(b, offSize), - roff: binary.GetLittleEndianUint64(b, offNextRead), - woff: binary.GetLittleEndianUint64(b, offNextWrite), - cap: binary.GetLittleEndianUint64(b, offMaxCapacity), +func getMeta(b []byte) (m meta) { + m = meta{ + magic: magicHeader, + size: binary.GetLittleEndianUint64(b, offSize), + roff: binary.GetLittleEndianUint64(b, offNextRead), + woff: binary.GetLittleEndianUint64(b, offNextWrite), + cap: binary.GetLittleEndianUint64(b, offMaxCapacity), } + return } func putMeta(m meta, b []byte) { - copy(b, magicHeader) + copy(b, magicHeader[:]) binary.PutLittleEndianUint64(b, offSize, m.size) binary.PutLittleEndianUint64(b, offNextRead, m.roff) binary.PutLittleEndianUint64(b, offNextWrite, m.woff) @@ -381,10 +383,11 @@ func New(filename string, capacity int) (*Buffer, error) { if newFile { b.Lock() *b.m = meta{ - size: 0, - woff: offData, - roff: offData, - cap: uint64(capacity), + magic: magicHeader, + size: 0, + woff: offData, + roff: offData, + cap: uint64(capacity), } b.Unlock() } @@ -395,8 +398,7 @@ func New(filename string, capacity int) (*Buffer, error) { // Open will open an existing Buffer backed by the given file // If the file does not exist it will be created // If the buffer is new it will be set to the given capacity -// if the file already exists, AND the given capacity is larger, the file will be expanded -// if the file already exists and the capacity is smaller, an error is returned +// if the file already exists the capacity is ignored and we use the existing capacity func Open(filename string, capacity int) (*Buffer, error) { var ( newFile bool @@ -405,12 +407,16 @@ func Open(filename string, capacity int) (*Buffer, error) { m meta ) + fmt.Println("OPEN", capacity) if _, err := os.Stat(filename); os.IsNotExist(err) { if capacity <= 0 { return nil, errors.New("Bad capacity") } - f, err = os.Create(filename) - if err != nil { + if f, err = os.Create(filename); err != nil { + return nil, err + } + if err := syscall.Truncate(filename, int64(capacity)); err != nil { + f.Close() return nil, err } newFile = true @@ -427,24 +433,15 @@ func Open(filename string, capacity int) (*Buffer, error) { f.Close() return nil, err } - if capacity == 0 { - capacity = int(m.cap) - } else if m.cap > uint64(capacity) { - f.Close() - return nil, errors.New("Cannot shrink existing file") - } else if uint64(capacity) > m.cap { - m.cap = uint64(capacity) - } - } - if err := syscall.Truncate(filename, int64(capacity)); err != nil { - f.Close() - return nil, err + fmt.Printf("%+v\n", m) + capacity = int(m.cap) } b, err := open(f, capacity) if err != nil { f.Close() return nil, err } + fmt.Println("OPEN: ", capacity, m.cap) if newFile { b.Lock() *b.m = m diff --git a/buffer_test.go b/buffer_test.go index a2e63fc..3eb9631 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -47,10 +47,11 @@ func TestAll(t *testing.T) { {"Helloooo", write}, }, expect: meta{ - size: 32, - woff: 48, - roff: 48, - cap: 80, + magic: magicHeader, + size: 32, + woff: 48, + roff: 48, + cap: 80, }, }, tc{ // happy test @@ -60,10 +61,11 @@ func TestAll(t *testing.T) { {"小洞不补, 大洞吃苦.", read}, }, expect: meta{ - size: 35, - woff: 118, - roff: 83, - cap: 124, + magic: magicHeader, + size: 35, + woff: 118, + roff: 83, + cap: 124, }, }, tc{ // Tests read/write for recMeta wrap @@ -74,10 +76,11 @@ func TestAll(t *testing.T) { {"小洞不补, 大洞吃苦.", read}, }, expect: meta{ - size: 0, - woff: 80, - roff: 80, - cap: 86, + magic: magicHeader, + size: 0, + woff: 80, + roff: 80, + cap: 86, }, }, tc{ // Tests read/write for data wrapping @@ -88,10 +91,11 @@ func TestAll(t *testing.T) { {"小洞不补, 大洞吃苦.", read}, }, expect: meta{ - size: 0, - woff: 58, - roff: 58, - cap: 86, + magic: magicHeader, + size: 0, + woff: 58, + roff: 58, + cap: 86, }, }, } @@ -151,6 +155,14 @@ func TestAll(t *testing.T) { if err = b.Close(); err != nil { t.Errorf("Failed to close: %v", err) } + + //test open close with no capacity specified + if b, err = Open(filename, 0); err != nil { + t.Errorf("Failed to re-open: %v", err) + } else if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + } } @@ -163,10 +175,11 @@ func TestErrors(t *testing.T) { }, err: errOverflow, expect: meta{ - size: 21, - woff: 69, - roff: 48, - cap: 80, + magic: magicHeader, + size: 21, + woff: 69, + roff: 48, + cap: 80, }, }, tc{ @@ -175,10 +188,11 @@ func TestErrors(t *testing.T) { }, err: errToBig, expect: meta{ - size: 0, - woff: 48, - roff: 48, - cap: 64, + magic: magicHeader, + size: 0, + woff: 48, + roff: 48, + cap: 64, }, }, tc{ @@ -187,10 +201,11 @@ func TestErrors(t *testing.T) { }, err: nil, expect: meta{ - size: 0, - woff: 48, - roff: 48, - cap: 80, + magic: magicHeader, + size: 0, + woff: 48, + roff: 48, + cap: 80, }, }, } @@ -238,10 +253,11 @@ func TestHelpers(t *testing.T) { }, err: errOverflow, expect: meta{ - size: 21, - woff: 69, - roff: 48, - cap: 80, + magic: magicHeader, + size: 21, + woff: 69, + roff: 48, + cap: 80, }, } filename := tempFileName(t) //located in testing.T tempdir, no need to remove @@ -325,10 +341,11 @@ func TestOverlap(t *testing.T) { {"Helloooo", write}, }, expect: meta{ - size: 32, - woff: 64, //do to the loop - roff: 64, - cap: 80, + magic: magicHeader, + size: 32, + woff: 64, //do to the loop + roff: 64, + cap: 80, }, } filename := tempFileName(t) //located in testing.T tempdir, no need to remove From 68eddfc2def90fbb59b76be50d7bd6eb50d58d7a Mon Sep 17 00:00:00 2001 From: Kris Watts Date: Thu, 8 Apr 2021 17:41:52 -0600 Subject: [PATCH 6/7] removed debug statements --- buffer.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/buffer.go b/buffer.go index 24b9606..ac5db00 100644 --- a/buffer.go +++ b/buffer.go @@ -4,7 +4,6 @@ package buffer import ( "bytes" "errors" - "fmt" "os" "sync" "syscall" @@ -92,7 +91,6 @@ func readmeta(f *os.File) (m meta, err error) { } else if !bytes.Equal(buff[0:magicHeaderSize], magicHeader[:]) { err = ErrCorrupt } - fmt.Println("READ", buff) } } if err != nil { @@ -407,7 +405,6 @@ func Open(filename string, capacity int) (*Buffer, error) { m meta ) - fmt.Println("OPEN", capacity) if _, err := os.Stat(filename); os.IsNotExist(err) { if capacity <= 0 { return nil, errors.New("Bad capacity") @@ -433,7 +430,6 @@ func Open(filename string, capacity int) (*Buffer, error) { f.Close() return nil, err } - fmt.Printf("%+v\n", m) capacity = int(m.cap) } b, err := open(f, capacity) @@ -441,7 +437,6 @@ func Open(filename string, capacity int) (*Buffer, error) { f.Close() return nil, err } - fmt.Println("OPEN: ", capacity, m.cap) if newFile { b.Lock() *b.m = m From 97e100fbf3c57006a2da574f6f33e7bc92de4cda Mon Sep 17 00:00:00 2001 From: Kris Watts Date: Thu, 8 Apr 2021 18:07:08 -0600 Subject: [PATCH 7/7] OK, it totally works now --- buffer.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/buffer.go b/buffer.go index ac5db00..ffdf962 100644 --- a/buffer.go +++ b/buffer.go @@ -405,7 +405,7 @@ func Open(filename string, capacity int) (*Buffer, error) { m meta ) - if _, err := os.Stat(filename); os.IsNotExist(err) { + if fi, err := os.Stat(filename); os.IsNotExist(err) || (err == nil && fi.Size() == 0) { if capacity <= 0 { return nil, errors.New("Bad capacity") } @@ -418,13 +418,16 @@ func Open(filename string, capacity int) (*Buffer, error) { } newFile = true m = meta{ - size: 0, - woff: offData, - roff: offData, - cap: uint64(capacity), + magic: magicHeader, + size: 0, + woff: offData, + roff: offData, + cap: uint64(capacity), } + } else if err != nil { + return nil, err //some other error } else { - if f, err = os.OpenFile(filename, os.O_RDWR, 0644); err != nil { + if f, err = os.OpenFile(filename, os.O_RDWR, 0640); err != nil { return nil, err } else if m, err = readmeta(f); err != nil { f.Close()