diff --git a/advise_linux.go b/advise_linux.go index ac8e16d..dc34e09 100644 --- a/advise_linux.go +++ b/advise_linux.go @@ -15,13 +15,11 @@ func (b *Buffer) Advise() error { b.Lock() defer b.Unlock() - m := b.readmeta() - // Start of next page above write offset - wo := int(m.woff) + (pageSize - int(m.woff)%pageSize) + wo := int(b.m.woff) + (pageSize - int(b.m.woff)%pageSize) // Start of page below read offset - ro := int(m.roff) - int(m.roff)%pageSize + ro := int(b.m.roff) - int(b.m.roff)%pageSize if ro > wo { return syscall.Madvise(b.data[wo:ro], syscall.MADV_DONTNEED) @@ -34,8 +32,8 @@ func (b *Buffer) Advise() error { return err } } - if wo < int(m.cap) { - return syscall.Madvise(b.data[wo:m.cap], syscall.MADV_DONTNEED) + if wo < int(b.m.cap) { + return syscall.Madvise(b.data[wo:b.m.cap], syscall.MADV_DONTNEED) } return nil } diff --git a/buffer.go b/buffer.go index fe2fd6e..ffdf962 100644 --- a/buffer.go +++ b/buffer.go @@ -2,14 +2,22 @@ package buffer import ( + "bytes" "errors" "os" "sync" "syscall" + "unsafe" "github.com/cloudflare/buffer/binary" ) +var ( + ErrNotOpen = errors.New("Not open") + ErrEmpty = errors.New("Empty file") + ErrCorrupt = errors.New("Corrupt") +) + // Buffer data format // Total size doesn't include header // (32 bytes) @@ -28,43 +36,106 @@ import ( type Buffer struct { sync.Mutex + f *os.File data []byte + m *meta } type meta struct { - size uint64 - woff uint64 - roff uint64 - cap uint64 + magic [16]byte + size uint64 + woff uint64 + roff uint64 + cap uint64 } -// TODO: add magic + signatures const ( - offSize = iota * 8 - offNextRead = iota * 8 - offNextWrite = iota * 8 - offMaxCapacity = iota * 8 - offData = iota * 8 + magicHeaderSize = 16 +) + +//CLOUDFLAREBUFFER +var magicHeader = [16]byte{0x43, 0x4c, 0x4f, 0x55, 0x44, 0x46, 0x4c, 0x41, 0x52, 0x45, 0x42, 0x55, 0x46, 0x45, 0x52} + +const ( + offMagic = 0 + offSize = offMagic + magicHeaderSize + offNextRead = offSize + 8 + offNextWrite = offNextRead + 8 + offMaxCapacity = offNextWrite + 8 + offData = offMaxCapacity + 8 ) // recordLength contains metadata about what's stored. // Currently only the recordLength const recMeta = 8 -func (b *Buffer) readmeta() meta { - return meta{ - size: binary.GetLittleEndianUint64(b.data, offSize), - roff: binary.GetLittleEndianUint64(b.data, offNextRead), - woff: binary.GetLittleEndianUint64(b.data, offNextWrite), - cap: binary.GetLittleEndianUint64(b.data, offMaxCapacity), +func readmeta(f *os.File) (m meta, err error) { + var fi os.FileInfo + var buff []byte + if f == nil { + err = ErrNotOpen + } else if fi, err = f.Stat(); err == nil { + if sz := fi.Size(); sz == 0 { + err = ErrEmpty + } else if sz < offData { + //not a new file but metdata is bad + err = errors.New("corrupted file header") + } else { + var n int + //check the magic bytes + buff = make([]byte, offData) + if n, err = f.ReadAt(buff, 0); err != nil { + return + } else if n != len(buff) { + err = errors.New("Failed read") + } else if !bytes.Equal(buff[0:magicHeaderSize], magicHeader[:]) { + err = ErrCorrupt + } + } + } + if err != nil { + return + } + //do the actual extration + m = getMeta(buff) + //check the capacity against the file size + if uint64(fi.Size()) != m.cap { + err = ErrCorrupt + } + + return +} + +func getMeta(b []byte) (m meta) { + m = meta{ + magic: magicHeader, + size: binary.GetLittleEndianUint64(b, offSize), + roff: binary.GetLittleEndianUint64(b, offNextRead), + woff: binary.GetLittleEndianUint64(b, offNextWrite), + cap: binary.GetLittleEndianUint64(b, offMaxCapacity), } + return +} + +func putMeta(m meta, b []byte) { + copy(b, magicHeader[:]) + binary.PutLittleEndianUint64(b, offSize, m.size) + binary.PutLittleEndianUint64(b, offNextRead, m.roff) + binary.PutLittleEndianUint64(b, offNextWrite, m.woff) + binary.PutLittleEndianUint64(b, offMaxCapacity, m.cap) +} + +func (b *Buffer) readmeta() (m meta) { + b.Lock() + m = *b.m + b.Unlock() + return } func (b *Buffer) writemeta(m meta) { - binary.PutLittleEndianUint64(b.data, offSize, m.size) - binary.PutLittleEndianUint64(b.data, offNextRead, m.roff) - binary.PutLittleEndianUint64(b.data, offNextWrite, m.woff) - binary.PutLittleEndianUint64(b.data, offMaxCapacity, m.cap) + b.Lock() + *b.m = m + b.Unlock() } var ( @@ -77,29 +148,54 @@ var ( // Inserting data which will overflow the buffer fails // Inserting does not overwrite any existing data inside the buffer // if it would overwrite data, errOverwrite is returned -func (b *Buffer) Insert(data []byte) error { +func (b *Buffer) Insert(data []byte) (err error) { b.Lock() - defer b.Unlock() + err = b.insert(data) + b.Unlock() + return +} + +// InsertWithOverwrite inserts data into the buffer, potentially removing existing data +// Inserting using this function CAN overwrite data if there isn't enough free space +func (b *Buffer) InsertWithOverwrite(data []byte) (err error) { + writeLen := uint64(len(data)) + // data is larger filesize - metadata + if writeLen > (b.m.cap - offData - recMeta) { + return errToBig + } - m := b.readmeta() + b.Lock() + for writeLen > b.free() { + if _, err = b.read(true); err != nil { + break + } + } + if err == nil { + err = b.insert(data) + } + b.Unlock() + return +} + +func (b *Buffer) insert(data []byte) error { writeLen := uint64(len(data)) // data is larger filesize - metadata - if writeLen > (m.cap - offData - recMeta) { + if writeLen > (b.m.cap - offData - recMeta) { return errToBig } // data exceeds available space - if m.size+writeLen+offData+recMeta > m.cap { + if b.m.size+writeLen+offData+recMeta > b.m.cap { return errOverflow } wrap := false copyTo := 0 - endOff := m.woff + writeLen + recMeta - if endOff >= m.cap { + endOff := b.m.woff + writeLen + recMeta + if endOff >= b.m.cap { // Get to the start of the file - endOff = endOff % m.cap + endOff = endOff % b.m.cap // save for later copyTo = int(writeLen - endOff) // skip the metadata @@ -111,38 +207,37 @@ func (b *Buffer) Insert(data []byte) error { switch { // r e w // [_______++++++++____] - case m.roff < m.woff && m.roff < endOff && endOff < m.woff: + case b.m.roff < b.m.woff && b.m.roff < endOff && endOff < b.m.woff: return errOverwrite // w r e // [+++++++________++++] - case m.woff < m.roff && endOff > m.roff: + case b.m.woff < b.m.roff && endOff > b.m.roff: return errOverwrite // e w r // [+++++++________++++] - case m.woff < m.roff && endOff < m.woff: + case b.m.woff < b.m.roff && endOff < b.m.woff: return errOverwrite } switch { - case wrap && m.woff+recMeta > m.cap: + case wrap && b.m.woff+recMeta > b.m.cap: tmp := serializeMeta(writeLen) - left := m.cap - m.woff - // startData := (m.woff+recMeta)%m.cap + offData - copy(b.data[m.woff:m.cap], tmp[:left]) + left := b.m.cap - b.m.woff + // startData := (b.m.woff+recMeta)%b.m.cap + offData + copy(b.data[b.m.woff:b.m.cap], tmp[:left]) copy(b.data[offData:offData+recMeta-left], tmp[left:]) copy(b.data[offData+recMeta-left:endOff], data) case wrap: - binary.PutLittleEndianUint64(b.data, int(m.woff), uint64(writeLen)) - copy(b.data[m.woff+recMeta:m.cap], data[:copyTo]) + binary.PutLittleEndianUint64(b.data, int(b.m.woff), uint64(writeLen)) + copy(b.data[b.m.woff+recMeta:b.m.cap], data[:copyTo]) copy(b.data[offData:endOff], data[copyTo:]) default: // write doesn't wrap - binary.PutLittleEndianUint64(b.data, int(m.woff), uint64(writeLen)) - copy(b.data[m.woff+recMeta:endOff], data) + binary.PutLittleEndianUint64(b.data, int(b.m.woff), uint64(writeLen)) + copy(b.data[b.m.woff+recMeta:endOff], data) } - m.woff = endOff - m.size += (writeLen + recMeta) - b.writemeta(m) + b.m.woff = endOff + b.m.size += (writeLen + recMeta) return nil } @@ -158,12 +253,8 @@ func deserializeMeta(meta []byte) uint64 { } func (b *Buffer) read(mutate bool) ([]byte, error) { - b.Lock() - defer b.Unlock() - - m := b.readmeta() - if m.size == 0 { + if b.m.size == 0 { return nil, nil } @@ -171,22 +262,22 @@ func (b *Buffer) read(mutate bool) ([]byte, error) { readLen uint64 left uint64 ) - if m.roff+recMeta > m.cap { - left = m.cap - m.roff + if b.m.roff+recMeta > b.m.cap { + left = b.m.cap - b.m.roff tmp := make([]byte, recMeta) - copy(tmp[:left], b.data[m.roff:m.cap]) + copy(tmp[:left], b.data[b.m.roff:b.m.cap]) copy(tmp[left:], b.data[offData:offData+recMeta-left]) readLen = deserializeMeta(tmp) } else { - readLen = binary.GetLittleEndianUint64(b.data, int(m.roff)) + readLen = binary.GetLittleEndianUint64(b.data, int(b.m.roff)) } var copyTo int - endOff := m.roff + readLen + recMeta + endOff := b.m.roff + readLen + recMeta wrap := false - if endOff > m.cap { + if endOff > b.m.cap { // Get to the start of the file - endOff = endOff % m.cap + endOff = endOff % b.m.cap // save for later copyTo = int(readLen - endOff) // skip the metadata @@ -197,37 +288,61 @@ func (b *Buffer) read(mutate bool) ([]byte, error) { ret := make([]byte, readLen) switch { - case wrap && m.roff+recMeta > m.cap: + case wrap && b.m.roff+recMeta > b.m.cap: copy(ret, b.data[offData+recMeta-left:endOff]) case wrap: - copy(ret[:copyTo], b.data[m.roff+recMeta:m.cap]) + copy(ret[:copyTo], b.data[b.m.roff+recMeta:b.m.cap]) copy(ret[copyTo:], b.data[offData:endOff]) default: - copy(ret, b.data[m.roff+recMeta:endOff]) + copy(ret, b.data[b.m.roff+recMeta:endOff]) } if mutate { - m.roff = endOff - m.size -= (readLen + recMeta) - b.writemeta(m) + b.m.roff = endOff + b.m.size -= (readLen + recMeta) } return ret, nil } // Peek reads the first record and returns it without removing it from the buffer. -func (b *Buffer) Peek() ([]byte, error) { - return b.read(false) +func (b *Buffer) Peek() (buff []byte, err error) { + b.Lock() + buff, err = b.read(false) + b.Unlock() + return } // Pop removes and returns the first record in the buffer -func (b *Buffer) Pop() ([]byte, error) { - return b.read(true) +func (b *Buffer) Pop() (buff []byte, err error) { + b.Lock() + buff, err = b.read(true) + b.Unlock() + return } -func (b *Buffer) Size() int { +// Size returns the overall size of the held data +func (b *Buffer) Size() (v int) { b.Lock() - m := b.readmeta() + v = int(b.m.size) b.Unlock() - return int(m.size) + return +} + +// Free returns how much free data there is in the buffer +// we account for the record metadata, so if the actual free is less +func (b *Buffer) Free() (v int) { + b.Lock() + v = int(b.free()) + b.Unlock() + return +} + +func (b *Buffer) free() (v uint64) { + if v = b.m.cap - (b.m.size + offData); v < recMeta { + v = 0 + } else { + v -= recMeta + } + return } // New creates a new Buffer backed by the file given by filename. @@ -239,7 +354,6 @@ func New(filename string, capacity int) (*Buffer, error) { err error ) - // TODO: don't assume filename given is a good file. // TODO: get passed in open file? if _, err := os.Stat(filename); os.IsNotExist(err) { f, err = os.Create(filename) @@ -248,38 +362,142 @@ func New(filename string, capacity int) (*Buffer, error) { } newFile = true } else { - f, err = os.OpenFile(filename, os.O_RDWR, 0644) - if err != nil { + if f, err = os.OpenFile(filename, os.O_RDWR, 0644); err != nil { + return nil, err + } else if _, err = readmeta(f); err != nil { + f.Close() return nil, err } } if err := syscall.Truncate(filename, int64(capacity)); err != nil { + f.Close() return nil, err } - - data, err := syscall.Mmap( - int(f.Fd()), 0, capacity, - syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, - ) + b, err := open(f, capacity) if err != nil { + f.Close() return nil, err } - // don't need this anymore - f.Close() + if newFile { + b.Lock() + *b.m = meta{ + magic: magicHeader, + size: 0, + woff: offData, + roff: offData, + cap: uint64(capacity), + } + b.Unlock() + } - b := &Buffer{data: data} + return b, nil +} - if newFile { - m := meta{ - size: 0, - woff: offData, - roff: offData, - cap: uint64(capacity), +// Open will open an existing Buffer backed by the given file +// If the file does not exist it will be created +// If the buffer is new it will be set to the given capacity +// if the file already exists the capacity is ignored and we use the existing capacity +func Open(filename string, capacity int) (*Buffer, error) { + var ( + newFile bool + f *os.File + err error + m meta + ) + + if fi, err := os.Stat(filename); os.IsNotExist(err) || (err == nil && fi.Size() == 0) { + if capacity <= 0 { + return nil, errors.New("Bad capacity") + } + if f, err = os.Create(filename); err != nil { + return nil, err + } + if err := syscall.Truncate(filename, int64(capacity)); err != nil { + f.Close() + return nil, err + } + newFile = true + m = meta{ + magic: magicHeader, + size: 0, + woff: offData, + roff: offData, + cap: uint64(capacity), } + } else if err != nil { + return nil, err //some other error + } else { + if f, err = os.OpenFile(filename, os.O_RDWR, 0640); err != nil { + return nil, err + } else if m, err = readmeta(f); err != nil { + f.Close() + return nil, err + } + capacity = int(m.cap) + } + b, err := open(f, capacity) + if err != nil { + f.Close() + return nil, err + } + if newFile { b.Lock() - b.writemeta(m) + *b.m = m b.Unlock() } return b, nil } + +func open(f *os.File, capacity int) (*Buffer, error) { + data, err := syscall.Mmap( + int(f.Fd()), 0, capacity, + syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED, + ) + if err != nil { + return nil, err + } + if err = flock(f, true); err != nil { + return nil, err + } + b := &Buffer{ + f: f, + data: data, + m: (*meta)(unsafe.Pointer(&data[0])), + } + return b, nil +} + +func (b *Buffer) Sync() (err error) { + b.Lock() + defer b.Unlock() + //check that our data isn't nil + if len(b.data) == 0 { + return ErrNotOpen + } + //call a blocking msync + base := uintptr(unsafe.Pointer(&b.data[0])) + sz := uintptr(len(b.data)) + if _, _, errno := syscall.Syscall(syscall.SYS_MSYNC, base, sz, 0x4); errno != 0 { + err = errno + } + return +} + +func (b *Buffer) Close() (err error) { + b.Lock() + defer b.Unlock() + //check that our data isn't nil + if b.data == nil { + return ErrNotOpen + } + funlock(b.f) + //unmap and set the buffer to nil + if err = syscall.Munmap(b.data); err == nil { + err = b.f.Close() + } else { + b.f.Close() + } + b.data = nil + return +} diff --git a/buffer_test.go b/buffer_test.go index 3fa2360..3eb9631 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -1,6 +1,7 @@ package buffer import ( + "fmt" "io/ioutil" "os" "strings" @@ -23,6 +24,21 @@ type tc struct { err error } +var ( + td string +) + +func TestMain(m *testing.M) { + var err error + if td, err = ioutil.TempDir(os.TempDir(), "buffer"); err != nil { + fmt.Println("Failed to make temporary directory", err) + os.Exit(-1) + } + r := m.Run() + os.RemoveAll(td) + os.Exit(r) +} + func TestAll(t *testing.T) { cases := []tc{ tc{ // happy test @@ -31,10 +47,11 @@ func TestAll(t *testing.T) { {"Helloooo", write}, }, expect: meta{ - size: 32, - woff: 32, - roff: 32, - cap: 64, + magic: magicHeader, + size: 32, + woff: 48, + roff: 48, + cap: 80, }, }, tc{ // happy test @@ -44,10 +61,11 @@ func TestAll(t *testing.T) { {"小洞不补, 大洞吃苦.", read}, }, expect: meta{ - size: 35, - woff: 102, - roff: 67, - cap: 108, + magic: magicHeader, + size: 35, + woff: 118, + roff: 83, + cap: 124, }, }, tc{ // Tests read/write for recMeta wrap @@ -58,10 +76,11 @@ func TestAll(t *testing.T) { {"小洞不补, 大洞吃苦.", read}, }, expect: meta{ - size: 0, - woff: 64, - roff: 64, - cap: 70, + magic: magicHeader, + size: 0, + woff: 80, + roff: 80, + cap: 86, }, }, tc{ // Tests read/write for data wrapping @@ -72,25 +91,16 @@ func TestAll(t *testing.T) { {"小洞不补, 大洞吃苦.", read}, }, expect: meta{ - size: 0, - woff: 42, - roff: 42, - cap: 70, + magic: magicHeader, + size: 0, + woff: 58, + roff: 58, + cap: 86, }, }, } - for i, c := range cases { - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove b, err := New(filename, int(c.expect.cap)) if err != nil { t.Errorf("Unexpected failure creating buffer for case: %d", i) @@ -112,11 +122,47 @@ func TestAll(t *testing.T) { t.Logf("%v", string(val)) } } + if err = b.Sync(); err != nil { + t.Errorf("Failed to sync: %v", err) + } m := b.readmeta() if c.expect != m { t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) } - os.Remove(filename) + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + + //re-open the file with Open + if b, err = Open(filename, int(c.expect.cap)); err != nil { + t.Errorf("Failed to re-open: %v", err) + } + if m = b.readmeta(); c.expect != m { + t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) + } + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + + //re-open the file with Open and an expansion + if b, err = Open(filename, int(c.expect.cap+1024)); err != nil { + t.Errorf("Failed to re-open: %v", err) + } + m.cap += 1024 + if m = b.readmeta(); c.expect != m { + t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) + } + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + + //test open close with no capacity specified + if b, err = Open(filename, 0); err != nil { + t.Errorf("Failed to re-open: %v", err) + } else if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + } } @@ -129,10 +175,11 @@ func TestErrors(t *testing.T) { }, err: errOverflow, expect: meta{ - size: 21, - woff: 53, - roff: 32, - cap: 64, + magic: magicHeader, + size: 21, + woff: 69, + roff: 48, + cap: 80, }, }, tc{ @@ -141,10 +188,11 @@ func TestErrors(t *testing.T) { }, err: errToBig, expect: meta{ - size: 0, - woff: 32, - roff: 32, - cap: 48, + magic: magicHeader, + size: 0, + woff: 48, + roff: 48, + cap: 64, }, }, tc{ @@ -153,26 +201,16 @@ func TestErrors(t *testing.T) { }, err: nil, expect: meta{ - size: 0, - woff: 32, - roff: 32, - cap: 64, + magic: magicHeader, + size: 0, + woff: 48, + roff: 48, + cap: 80, }, }, } - for i, c := range cases { - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove b, err := New(filename, int(c.expect.cap)) if err != nil { t.Errorf("Unexpected failure creating buffer for case: %d", i) @@ -203,7 +241,6 @@ func TestErrors(t *testing.T) { if c.expect != m { t.Errorf("metadata structs do not match for %d. expect: %v, actual: %v", i, c.expect, m) } - os.Remove(filename) } } @@ -216,24 +253,14 @@ func TestHelpers(t *testing.T) { }, err: errOverflow, expect: meta{ - size: 21, - woff: 53, - roff: 32, - cap: 64, + magic: magicHeader, + size: 21, + woff: 69, + roff: 48, + cap: 80, }, } - - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove b, err := New(filename, int(c.expect.cap)) if err != nil { t.Errorf("Unexpected failure creating buffer: %v", err) @@ -250,7 +277,7 @@ func TestHelpers(t *testing.T) { t.Errorf("Unexpected Insert failure. err: %v", err) } } else { - _, err := b.Pop() + _, err = b.Pop() switch err { case c.err: break @@ -267,22 +294,10 @@ func TestHelpers(t *testing.T) { if b.Size() != int(m.size) { t.Errorf("Reported size does not match metadata") } - os.Remove(filename) } func TestAdvise(t *testing.T) { - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - t.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - t.Fatalf("Unexpected failure removing temp file: %v", err) - } - + filename := tempFileName(t) //located in testing.T tempdir, no need to remove recordSize := 1000 // 20 records precisely cap := (20*(recordSize+recMeta) + offData) @@ -316,25 +331,80 @@ func TestAdvise(t *testing.T) { if err != nil { t.Errorf("unexpected error in advise: %v", err) } +} - os.Remove(filename) +func TestOverlap(t *testing.T) { + c := tc{ // happy test + ops: []operation{ + {"Wooorldd", write}, + {"Helloooo", write}, + {"Helloooo", write}, + }, + expect: meta{ + magic: magicHeader, + size: 32, + woff: 64, //do to the loop + roff: 64, + cap: 80, + }, + } + filename := tempFileName(t) //located in testing.T tempdir, no need to remove + b, err := New(filename, int(c.expect.cap)) + if err != nil { + t.Errorf("Unexpected failure creating buffer") + } + for _, op := range c.ops { + if op.write { + err := b.InsertWithOverwrite([]byte(op.value)) + if err != nil { + t.Errorf("Unexpected Insert failure: %v", err) + } + } else { + val, err := b.Pop() + if err != nil { + t.Errorf("Unexpected Pop failure: %v", err) + } + if string(val) != op.value { + t.Errorf("Values do not match expected: %v actual: %v", op.value, val) + } + t.Logf("%v", string(val)) + } + } + if err = b.Sync(); err != nil { + t.Errorf("Failed to sync: %v", err) + } + m := b.readmeta() + if c.expect != m { + t.Errorf("metadata structs do not match expect: %v, actual: %v", c.expect, m) + } + + //make sure we can only pop 2 + var buffs []string + for { + if buff, err := b.Pop(); err != nil { + t.Errorf("Failed to pop: %v", err) + } else if buff == nil { + break + } else { + buffs = append(buffs, string(buff)) + } + } + if err = b.Close(); err != nil { + t.Errorf("Failed to close: %v", err) + } + if len(buffs) != 2 { + t.Errorf("Too many entries came back out: %d != 2", len(buffs)) + } + if buffs[0] != string(c.ops[1].value) || buffs[1] != string(c.ops[2].value) { + t.Errorf("invalid output: %v", buffs) + } } // benchBuffer tests reading/writing count batches of size batch in sequential chunks // with file capacity cap func benchBuffer(cap, recordSize int, b *testing.B) { + filename := tempFileName(b) //located in testing.T tempdir, no need to remove - f, err := ioutil.TempFile("tests", "tmpBufTest") - if err != nil { - b.Fatalf("Unexpected failure creating temp file: %v", err) - } - - // only want it for the name? TODO: figure a better way to do this - filename := f.Name() - err = os.Remove(filename) - if err != nil { - b.Fatalf("Unexpected failure removing temp file: %v", err) - } buf, err := New(filename, int(cap)) if err != nil { b.Errorf("Unexpected failure creating buffer.") @@ -352,8 +422,6 @@ func benchBuffer(cap, recordSize int, b *testing.B) { b.Fatalf("Unexpected error while popping: %v", err) } } - - os.Remove(filename) } func BenchmarkBuffer50KBCap1KBRec(b *testing.B) { @@ -375,3 +443,21 @@ func BenchmarkBuffer500MBCap1MBRec(b *testing.B) { // 500MB file, 1MB record benchBuffer(500*1000*1000, 1000*1000, b) } + +func tempFileName(t ft) (s string) { + f, err := ioutil.TempFile(td, "tmpBufTest") + if err != nil { + t.Fatalf("Unexpected failure creating temp file: %v", err) + } + s = f.Name() + if err = f.Close(); err != nil { + t.Fatalf("Failed to close file: %v", err) + } else if err = os.Remove(s); err != nil { + t.Fatalf("Unexpected failure removing temp file: %v", err) + } + return +} + +type ft interface { + Fatalf(f string, args ...interface{}) +} diff --git a/flock.go b/flock.go new file mode 100644 index 0000000..9aa3d2d --- /dev/null +++ b/flock.go @@ -0,0 +1,71 @@ +// +build !windows,!plan9,!solaris + +//this package is based on the flock implementation used in boltdb +//which is MIT licensed and available at: +// https://github.com/boltdb/bolt/blob/master/bolt_unix.go + +package buffer + +import ( + "errors" + "os" + "syscall" +) + +var ( + ErrTimeout = errors.New("Timeout") + ErrLocked = errors.New("File is already locked") +) + +//flock locks a file for this process, this DOES NOT prevent the same process +//from opening the +func flock(f *os.File, exclusive bool) error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Pid = 0 + lock.Whence = 0 + lock.Pid = 0 + if exclusive { + lock.Type = syscall.F_WRLCK + } else { + lock.Type = syscall.F_RDLCK + } + err := rawFdCall(f, func(fd uintptr) error { + return syscall.FcntlFlock(fd, syscall.F_SETLK, &lock) + }) + if err == nil { + return nil + } else if err == syscall.EAGAIN { + return ErrLocked + } + return err +} + +//funlock releases a lock held on a file descriptor +func funlock(f *os.File) error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Type = syscall.F_UNLCK + lock.Whence = 0 + return rawFdCall(f, func(fd uintptr) error { + return syscall.FcntlFlock(fd, syscall.F_SETLK, &lock) + }) +} + +type controlFunc func(fd uintptr) error + +func rawFdCall(fio *os.File, cf controlFunc) error { + if fio == nil || cf == nil { + return errors.New("invalid parameters") + } + rc, err := fio.SyscallConn() + if err != nil { + return err + } + rc.Control(func(fd uintptr) { + err = cf(fd) + }) + return err +} diff --git a/flock_others.go b/flock_others.go new file mode 100644 index 0000000..48d75b8 --- /dev/null +++ b/flock_others.go @@ -0,0 +1,15 @@ +// +build windows,plan9,solaris + +package buffer + +import ( + "os" +) + +func flock(f *os.File, exclusive bool) error { + return nil //do nothing +} + +func funlock(f *os.File) error { + return nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..47fb608 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/cloudflare/buffer + +go 1.15