Skip to content

Commit 9ffea2e

Browse files
authored
feat: parameterize WAL (#113)
* feat: parameterize WAL * perf: optimize flush timer * fix: wakeup inflight append after close wal * fix: wakeup typo * fix: close log file * test: wal config
1 parent 64a5a5a commit 9ffea2e

File tree

14 files changed

+957
-278
lines changed

14 files changed

+957
-278
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2022 Linkall Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package block
16+
17+
import (
18+
// standard libraries.
19+
"sync"
20+
21+
// third-party libraries.
22+
"github.com/ncw/directio"
23+
)
24+
25+
type Allocator struct {
26+
size int
27+
next int64
28+
emptyBuf []byte
29+
pool sync.Pool
30+
}
31+
32+
func NewAllocator(size int, so int64) *Allocator {
33+
a := &Allocator{
34+
size: size,
35+
next: so,
36+
emptyBuf: make([]byte, size),
37+
}
38+
a.pool = sync.Pool{
39+
New: func() interface{} {
40+
return a.rawAlloc()
41+
},
42+
}
43+
return a
44+
}
45+
46+
func (a *Allocator) BlockSize() int {
47+
return a.size
48+
}
49+
50+
func (a *Allocator) rawAlloc() *Block {
51+
buf := directio.AlignedBlock(a.size)
52+
return &Block{
53+
block: block{
54+
buf: buf,
55+
},
56+
}
57+
}
58+
59+
func (a *Allocator) Next() *Block {
60+
b, _ := a.pool.Get().(*Block)
61+
// Reset block.
62+
copy(b.buf, a.emptyBuf)
63+
b.wp = 0
64+
b.fp = 0
65+
b.cp = 0
66+
b.SO = a.next
67+
a.next += int64(a.size)
68+
return b
69+
}
70+
71+
func (a *Allocator) Free(b *Block) {
72+
a.pool.Put(b)
73+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright 2022 Linkall Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package block
16+
17+
import (
18+
// standard libraries.
19+
"testing"
20+
21+
// third-party libraries.
22+
. "github.com/smartystreets/goconvey/convey"
23+
// this project.
24+
)
25+
26+
var emptyBuf = make([]byte, blockSize)
27+
28+
func TestAllocator(t *testing.T) {
29+
Convey("wal block allocator", t, func() {
30+
allocator := NewAllocator(blockSize, blockSize)
31+
So(allocator, ShouldNotBeNil)
32+
So(allocator.BlockSize(), ShouldEqual, blockSize)
33+
34+
b0 := allocator.Next()
35+
So(b0.Capacity(), ShouldEqual, blockSize)
36+
So(b0.Size(), ShouldEqual, 0)
37+
So(b0.Committed(), ShouldEqual, 0)
38+
So(b0.SO, ShouldEqual, blockSize)
39+
So(b0.fp, ShouldEqual, 0)
40+
So(b0.buf, ShouldResemble, emptyBuf)
41+
42+
b1 := allocator.Next()
43+
So(b1.Capacity(), ShouldEqual, blockSize)
44+
So(b1.Size(), ShouldEqual, 0)
45+
So(b1.Committed(), ShouldEqual, 0)
46+
So(b1.SO, ShouldEqual, 2*blockSize)
47+
So(b1.fp, ShouldEqual, 0)
48+
So(b1.buf, ShouldResemble, emptyBuf)
49+
50+
allocator.Free(b0)
51+
52+
b2 := allocator.Next()
53+
So(b2.Capacity(), ShouldEqual, blockSize)
54+
So(b2.Size(), ShouldEqual, 0)
55+
So(b2.Committed(), ShouldEqual, 0)
56+
So(b2.SO, ShouldEqual, 3*blockSize)
57+
So(b2.fp, ShouldEqual, 0)
58+
So(b2.buf, ShouldResemble, emptyBuf)
59+
})
60+
}

internal/store/wal/block.go renamed to internal/store/wal/block/block.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package wal
15+
package block
1616

1717
import (
18+
// standard libraries.
19+
"os"
20+
1821
// this project.
1922
"github.com/linkall-labs/vanus/internal/store/wal/io"
2023
"github.com/linkall-labs/vanus/internal/store/wal/record"
@@ -40,6 +43,10 @@ func (b *block) Size() int {
4043
return b.wp
4144
}
4245

46+
func (b *block) Committed() int {
47+
return b.cp
48+
}
49+
4350
func (b *block) Remaining() int {
4451
return b.remaining(b.Size())
4552
}
@@ -52,7 +59,7 @@ func (b *block) Full() bool {
5259
return b.Remaining() < record.HeaderSize
5360
}
5461

55-
func (b *block) full(off int) bool {
62+
func (b *block) FullWithOff(off int) bool {
5663
return b.remaining(off) < record.HeaderSize
5764
}
5865

@@ -84,3 +91,19 @@ func (b *block) Flush(writer io.WriterAt, offset int, base int64, cb FlushCallba
8491
}
8592
})
8693
}
94+
95+
func (b *block) RecoverFromFile(f *os.File, at int64, committed int) error {
96+
if _, err := f.ReadAt(b.buf, at); err != nil {
97+
return err
98+
}
99+
b.wp = committed
100+
b.fp = committed
101+
b.cp = committed
102+
return nil
103+
}
104+
105+
type Block struct {
106+
block
107+
// SO is start offset
108+
SO int64
109+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2022 Linkall Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package block
16+
17+
import (
18+
// standard libraries.
19+
"testing"
20+
21+
// third-party libraries.
22+
. "github.com/smartystreets/goconvey/convey"
23+
24+
// this project.
25+
"github.com/linkall-labs/vanus/internal/store/wal/io"
26+
"github.com/linkall-labs/vanus/internal/store/wal/record"
27+
)
28+
29+
const (
30+
blockSize = 4 * 1024
31+
)
32+
33+
func TestBlock(t *testing.T) {
34+
Convey("wal block", t, func() {
35+
b := Block{
36+
block: block{
37+
buf: make([]byte, blockSize),
38+
},
39+
}
40+
41+
Convey("properties", func() {
42+
So(b.Capacity(), ShouldEqual, blockSize)
43+
So(b.Size(), ShouldEqual, 0)
44+
So(b.Committed(), ShouldEqual, 0)
45+
So(b.Remaining(), ShouldEqual, blockSize)
46+
So(b.Full(), ShouldBeFalse)
47+
})
48+
49+
Convey("append", func() {
50+
dataSize := blockSize - record.HeaderSize*2 + 1
51+
records := record.Pack(make([]byte, dataSize), dataSize+record.HeaderSize, 0)
52+
r := records[0]
53+
54+
n, err := b.Append(r)
55+
56+
So(err, ShouldBeNil)
57+
So(n, ShouldEqual, r.Size())
58+
So(b.Full(), ShouldBeTrue)
59+
So(b.FullWithOff(n-1), ShouldBeFalse)
60+
So(b.Committed(), ShouldEqual, 0)
61+
62+
Convey("flush", func() {
63+
b.Flush(io.WriteAtFunc(func(b []byte, base int64,
64+
cb io.WriteCallback,
65+
) {
66+
So(b[:n], ShouldResemble, r.Marshal())
67+
So(base, ShouldEqual, 0)
68+
69+
cb(len(b), nil)
70+
}), n, 0, func(commit int64, err error) {
71+
So(err, ShouldBeNil)
72+
So(commit, ShouldEqual, n)
73+
})
74+
75+
So(b.Committed(), ShouldEqual, n)
76+
77+
b.Flush(nil, 0, 0, func(commit int64, err error) {
78+
So(err, ShouldBeNil)
79+
So(commit, ShouldEqual, n)
80+
})
81+
})
82+
})
83+
84+
Convey("append with too large data", func() {
85+
dataSize := blockSize - record.HeaderSize + 1
86+
records := record.Pack(make([]byte, dataSize), dataSize+record.HeaderSize, 0)
87+
r := records[0]
88+
89+
_, err := b.Append(r)
90+
91+
So(err, ShouldNotBeNil)
92+
})
93+
})
94+
}

internal/store/wal/block_test.go

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)