Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package freecache

import (
"bufio"
"encoding/binary"
"io"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -366,3 +368,61 @@ func (cache *Cache) ResetStatistics() {
cache.locks[i].Unlock()
}
}

// Backup the cache to writer.
func (cache *Cache) Backup(w io.Writer) error {
it := cache.NewIterator()
for {
e := it.Next()
if e == nil {
return nil
}

chunk := encodeEntry(e)
_, err := w.Write(chunk)
if err != nil {
return err
}
}
}

// Restore from backup.
func (cache *Cache) Restore(rd io.Reader) error {
timer := cache.segments[0].timer
buf := make([]byte, 8*1024) // 8KB

r := bufio.NewReader(rd)
for {
bsize, err := r.Peek(8)
if err != nil {
if err == io.EOF {
return nil
}
return err
}

size := getEntrySize(bsize)
if len(buf) < size {
buf = make([]byte, size*2)
}

_, err = io.ReadFull(r, buf[:size])
if err != nil {
return err
}

e := decodeEntry(buf)
if int(e.ExpireAt) == 0 {
err = cache.Set(e.Key, e.Value, 0)
} else {
now := timer.Now()
if !isExpired(e.ExpireAt, now) {
ttl := int(e.ExpireAt - now)
err = cache.Set(e.Key, e.Value, ttl)
}
}
if err != nil {
return err
}
}
}
77 changes: 77 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package freecache

import (
"bytes"
"compress/gzip"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"log"
mrand "math/rand"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1086,3 +1088,78 @@ func TestUpdate(t *testing.T) {
found, replaced, err = cache.Update(key, updater)
assertExpectations(4, true, false, val2, val2)
}

func TestBackupRestore(t *testing.T) {
iters := 100
minTimeExpire := 5
totalExpire := 10

// TestBackup
func() {
cache := NewCache(1024)

mrand.Seed(7)
for i := 0; i < iters; i++ {
key := mrand.Int()
val := strconv.Itoa(i)
err := cache.SetInt(int64(key), []byte(val), minTimeExpire+i)
if err != nil {
t.Errorf("err: %s", err)
}
}

f, err := os.Create("/tmp/cache_backup.bin")
if err != nil {
t.Fatalf("err: %s", err)
}
defer f.Close()

gz := gzip.NewWriter(f)
defer gz.Close()

err = cache.Backup(gz)
if err != nil {
t.Fatalf("err: %s", err)
}
}()

// TestRestore
func() {
cache := NewCache(1024)

f, err := os.Open("/tmp/cache_backup.bin")
if err != nil {
t.Fatalf("err: %s", err)
}
defer f.Close()

gz, err := gzip.NewReader(f)
if err != nil {
t.Fatalf("err: %s", err)
}
defer gz.Close()
err = cache.Restore(gz)
if err != nil {
t.Fatalf("err: %s", err)
}

time.Sleep(time.Duration(minTimeExpire+totalExpire) * time.Second)

mrand.Seed(7)
for i := 0; i < iters; i++ {
key := mrand.Int()
val := strconv.Itoa(i)
val2, err := cache.GetInt(int64(key))
if err != nil {
if i < (totalExpire+1) && err == ErrNotFound {
continue
}
t.Errorf("err: %s", err)
}

if string(val2) != val {
t.Errorf("err: %v %q==%q", key, val, val2)
}
}
}()
}
39 changes: 37 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package freecache

import (
"encoding/binary"
"unsafe"
)

Expand All @@ -14,8 +15,41 @@ type Iterator struct {

// Entry represents a key/value pair.
type Entry struct {
Key []byte
Value []byte
Key []byte
Value []byte
ExpireAt uint32
}

const headerEntry = 4 + 4 + 4

func getEntrySize(b []byte) int {
return headerEntry + int(binary.LittleEndian.Uint32(b[0:4])) + int(binary.LittleEndian.Uint32(b[4:8]))
}

func encodeEntry(e *Entry) []byte {
size := headerEntry + len(e.Key) + len(e.Value)
buf := make([]byte, size)

// header
binary.LittleEndian.PutUint32(buf[0:4], uint32(len(e.Key)))
binary.LittleEndian.PutUint32(buf[4:8], uint32(len(e.Value)))
binary.LittleEndian.PutUint32(buf[8:12], e.ExpireAt)

// data
copy(buf[12:], e.Key)
copy(buf[12+len(e.Key):], e.Value)
return buf
}

func decodeEntry(b []byte) *Entry {
ks := binary.LittleEndian.Uint32(b[0:4])
vs := binary.LittleEndian.Uint32(b[4:8])
expire := binary.LittleEndian.Uint32(b[8:12])
return &Entry{
Key: b[12 : 12+ks],
Value: b[12+ks : 12+ks+vs],
ExpireAt: expire,
}
}

// Next returns the next entry for the iterator.
Expand Down Expand Up @@ -63,6 +97,7 @@ func (it *Iterator) nextForSlot(seg *segment, slotId int) *Entry {
entry := new(Entry)
entry.Key = make([]byte, hdr.keyLen)
entry.Value = make([]byte, hdr.valLen)
entry.ExpireAt = hdr.expireAt
seg.rb.ReadAt(entry.Key, ptr.offset+ENTRY_HDR_SIZE)
seg.rb.ReadAt(entry.Value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
return entry
Expand Down
12 changes: 11 additions & 1 deletion ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func NewRingBuf(size int, begin int64) (rb RingBuf) {
// Reset the ring buffer
//
// Parameters:
// begin: beginning offset of the data stream
//
// begin: beginning offset of the data stream
func (rb *RingBuf) Reset(begin int64) {
rb.begin = begin
rb.end = begin
Expand Down Expand Up @@ -60,6 +61,9 @@ func (rb *RingBuf) End() int64 {

// read up to len(p), at off of the data stream.
func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) {
if len(p) == 0 {
return
}
if off > rb.end || off < rb.begin {
err = ErrOutOfRange
return
Expand Down Expand Up @@ -96,6 +100,9 @@ func (rb *RingBuf) getDataOff(off int64) int {
// Slice returns a slice of the supplied range of the ring buffer. It will
// not alloc unless the requested range wraps the ring buffer.
func (rb *RingBuf) Slice(off, length int64) ([]byte, error) {
if length == 0 {
return nil, nil
}
if off > rb.end || off < rb.begin {
return nil, ErrOutOfRange
}
Expand Down Expand Up @@ -136,6 +143,9 @@ func (rb *RingBuf) Write(p []byte) (n int, err error) {
}

func (rb *RingBuf) WriteAt(p []byte, off int64) (n int, err error) {
if len(p) == 0 {
return
}
if off+int64(len(p)) > rb.end || off < rb.begin {
err = ErrOutOfRange
return
Expand Down
16 changes: 10 additions & 6 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
const HASH_ENTRY_SIZE = 16
const ENTRY_HDR_SIZE = 24

var ErrLargeKey = errors.New("The key is larger than 65535")
var ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size")
var ErrNotFound = errors.New("Entry not found")
var ErrLargeKey = errors.New("the key is larger than 65535")
var ErrLargeEntry = errors.New("the entry size is larger than 1/1024 of cache size")
var ErrNotFound = errors.New("entry not found")

// entry pointer struct points to an entry in ring buffer
type entryPtr struct {
Expand Down Expand Up @@ -241,16 +241,20 @@ func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byt
return
}
expireAt = hdr.expireAt
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
if int(hdr.valLen) == 0 {
return
}

if cap(buf) >= int(hdr.valLen) {
value = buf[:hdr.valLen]
} else {
value = make([]byte, hdr.valLen)
}

seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
}

Expand Down