Skip to content

Commit f8803fe

Browse files
author
mobus
committed
add lamport HAL
1 parent 5cc0a37 commit f8803fe

File tree

8 files changed

+283
-12
lines changed

8 files changed

+283
-12
lines changed

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ module github.com/sunvim/utils
33
go 1.16
44

55
require (
6-
github.com/golang/protobuf v1.5.0
6+
github.com/go-git/go-billy/v5 v5.3.1
77
github.com/hslam/buffer v0.0.0-20211027181515-93d623f7e213
88
github.com/hslam/scheduler v0.0.0-20211028175315-641598104976
99
github.com/hslam/sendfile v1.0.1
1010
github.com/hslam/splice v1.0.3
1111
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
1212
github.com/stretchr/testify v1.7.0
13-
google.golang.org/protobuf v1.27.1
1413
gopkg.in/eapache/queue.v1 v1.1.0
1514
)

go.sum

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
12
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
23
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
4-
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
5-
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
6-
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
4+
github.com/go-git/go-billy/v5 v5.3.1 h1:CPiOUAzKtMRvolEKw+bG1PLRpT7D3LIs3/3ey4Aiu34=
5+
github.com/go-git/go-billy/v5 v5.3.1/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0=
76
github.com/hslam/buffer v0.0.0-20211027181515-93d623f7e213 h1:0jr8x0IYeDlcIbFTlq9MigsrlOCtqaI1xM+4UotacG4=
87
github.com/hslam/buffer v0.0.0-20211027181515-93d623f7e213/go.mod h1:Gvbj40hnzR54zoUOuDZqDi7aziar8UlkHXk6NVYLg2U=
98
github.com/hslam/mmap v1.0.0 h1:GSp55lZrPDhctob3yE0SqESBjzgCn9cP4iu4Pmmm+gE=
@@ -14,20 +13,23 @@ github.com/hslam/sendfile v1.0.1 h1:0OLb5VRwjdN3q9hkslfk5nPQIrH5UmGTjSuJtV4Zq+8=
1413
github.com/hslam/sendfile v1.0.1/go.mod h1:IVInXNh7ccvv6fdFkcC3gRGCH7E+fRsTlyBnftCAk5A=
1514
github.com/hslam/splice v1.0.3 h1:CwSmzu6AAm8sb2wYgSGvTwixy3seqA6xJ9NXQ0ff3j4=
1615
github.com/hslam/splice v1.0.3/go.mod h1:7D1QlFptoG0ruXzcAwpzckKxUN4+ZpvrIhwfbcAQcx8=
16+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
17+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
18+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
19+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
20+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
21+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
1722
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1823
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1924
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
2025
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
2126
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2227
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
2328
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
24-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
25-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
26-
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
27-
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
28-
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
29-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
29+
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
3030
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
31+
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
32+
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3133
gopkg.in/eapache/queue.v1 v1.1.0 h1:EldqoJEGtXYiVCMRo2C9mePO2UUGnYn2+qLmlQSqPdc=
3234
gopkg.in/eapache/queue.v1 v1.1.0/go.mod h1:wNtmx1/O7kZSR9zNT1TTOJ7GLpm3Vn7srzlfylFbQwU=
3335
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=

lamport/clock.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package lamport
2+
3+
// Time is the value of a Clock.
4+
type Time uint64
5+
6+
// Clock is a Lamport logical clock
7+
type Clock interface {
8+
// Time is used to return the current value of the lamport clock
9+
Time() Time
10+
// Increment is used to return the value of the lamport clock and increment it afterwards
11+
Increment() (Time, error)
12+
// Witness is called to update our local clock if necessary after
13+
// witnessing a clock value received from another process
14+
Witness(time Time) error
15+
}

lamport/clock_testing.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package lamport
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func testClock(t *testing.T, c Clock) {
10+
assert.Equal(t, Time(1), c.Time())
11+
12+
val, err := c.Increment()
13+
assert.NoError(t, err)
14+
assert.Equal(t, Time(2), val)
15+
assert.Equal(t, Time(2), c.Time())
16+
17+
err = c.Witness(42)
18+
assert.NoError(t, err)
19+
assert.Equal(t, Time(42), c.Time())
20+
21+
err = c.Witness(42)
22+
assert.NoError(t, err)
23+
assert.Equal(t, Time(42), c.Time())
24+
25+
err = c.Witness(30)
26+
assert.NoError(t, err)
27+
assert.Equal(t, Time(42), c.Time())
28+
}

lamport/mem_clock.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
3+
This Source Code Form is subject to the terms of the Mozilla Public
4+
License, v. 2.0. If a copy of the MPL was not distributed with this file,
5+
You can obtain one at http://mozilla.org/MPL/2.0/.
6+
7+
Copyright (c) 2013, Armon Dadgar [email protected]
8+
Copyright (c) 2013, Mitchell Hashimoto [email protected]
9+
10+
Alternatively, the contents of this file may be used under the terms
11+
of the GNU General Public License Version 3 or later, as described below:
12+
13+
This file is free software: you may copy, redistribute and/or modify
14+
it under the terms of the GNU General Public License as published by the
15+
Free Software Foundation, either version 3 of the License, or (at your
16+
option) any later version.
17+
18+
This file is distributed in the hope that it will be useful, but
19+
WITHOUT ANY WARRANTY; without even the implied warranty of
20+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
21+
Public License for more details.
22+
23+
You should have received a copy of the GNU General Public License
24+
along with this program. If not, see http://www.gnu.org/licenses/.
25+
26+
*/
27+
28+
// Note: this code originally originate from Hashicorp's Serf but has been changed since to fit git-bug's need.
29+
30+
// Note: this Lamport clock implementation is different than the algorithms you can find, notably Wikipedia or the
31+
// original Serf implementation. The reason is lie to what constitute an event in this distributed system.
32+
// Commonly, events happen when messages are sent or received, whereas in git-bug events happen when some data is
33+
// written, but *not* when read. This is why Witness set the time to the max seen value instead of max seen value +1.
34+
// See https://cs.stackexchange.com/a/133730/129795
35+
36+
package lamport
37+
38+
import (
39+
"sync/atomic"
40+
)
41+
42+
var _ Clock = &MemClock{}
43+
44+
// MemClock is a thread safe implementation of a lamport clock. It
45+
// uses efficient atomic operations for all of its functions, falling back
46+
// to a heavy lock only if there are enough CAS failures.
47+
type MemClock struct {
48+
counter uint64
49+
}
50+
51+
// NewMemClock create a new clock with the value 1.
52+
// Value 0 is considered as invalid.
53+
func NewMemClock() *MemClock {
54+
return &MemClock{
55+
counter: 1,
56+
}
57+
}
58+
59+
// NewMemClockWithTime create a new clock with a value.
60+
func NewMemClockWithTime(time uint64) *MemClock {
61+
return &MemClock{
62+
counter: time,
63+
}
64+
}
65+
66+
// Time is used to return the current value of the lamport clock
67+
func (mc *MemClock) Time() Time {
68+
return Time(atomic.LoadUint64(&mc.counter))
69+
}
70+
71+
// Increment is used to return the value of the lamport clock and increment it afterwards
72+
func (mc *MemClock) Increment() (Time, error) {
73+
return Time(atomic.AddUint64(&mc.counter, 1)), nil
74+
}
75+
76+
// Witness is called to update our local clock if necessary after
77+
// witnessing a clock value received from another process
78+
func (mc *MemClock) Witness(v Time) error {
79+
WITNESS:
80+
// If the other value is old, we do not need to do anything
81+
cur := atomic.LoadUint64(&mc.counter)
82+
other := uint64(v)
83+
if other <= cur {
84+
return nil
85+
}
86+
87+
// Ensure that our local clock is at least one ahead.
88+
if !atomic.CompareAndSwapUint64(&mc.counter, cur, other) {
89+
// CAS: CompareAndSwap
90+
// The CAS failed, so we just retry. Eventually our CAS should
91+
// succeed or a future witness will pass us by and our witness
92+
// will end.
93+
goto WITNESS
94+
}
95+
96+
return nil
97+
}

lamport/mem_clock_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package lamport
2+
3+
import "testing"
4+
5+
func TestMemClock(t *testing.T) {
6+
c := NewMemClock()
7+
testClock(t, c)
8+
}

lamport/persisted_clock.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package lamport
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io/ioutil"
7+
"os"
8+
9+
"github.com/go-git/go-billy/v5"
10+
"github.com/go-git/go-billy/v5/util"
11+
)
12+
13+
var ErrClockNotExist = errors.New("clock doesn't exist")
14+
15+
type PersistedClock struct {
16+
*MemClock
17+
root billy.Filesystem
18+
filePath string
19+
}
20+
21+
// NewPersistedClock create a new persisted Lamport clock
22+
func NewPersistedClock(root billy.Filesystem, filePath string) (*PersistedClock, error) {
23+
clock := &PersistedClock{
24+
MemClock: NewMemClock(),
25+
root: root,
26+
filePath: filePath,
27+
}
28+
29+
err := clock.Write()
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
return clock, nil
35+
}
36+
37+
// LoadPersistedClock load a persisted Lamport clock from a file
38+
func LoadPersistedClock(root billy.Filesystem, filePath string) (*PersistedClock, error) {
39+
clock := &PersistedClock{
40+
root: root,
41+
filePath: filePath,
42+
}
43+
44+
err := clock.read()
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
return clock, nil
50+
}
51+
52+
// Increment is used to return the value of the lamport clock and increment it afterwards
53+
func (pc *PersistedClock) Increment() (Time, error) {
54+
time, err := pc.MemClock.Increment()
55+
if err != nil {
56+
return 0, err
57+
}
58+
return time, pc.Write()
59+
}
60+
61+
// Witness is called to update our local clock if necessary after
62+
// witnessing a clock value received from another process
63+
func (pc *PersistedClock) Witness(time Time) error {
64+
// TODO: rework so that we write only when the clock was actually updated
65+
err := pc.MemClock.Witness(time)
66+
if err != nil {
67+
return err
68+
}
69+
return pc.Write()
70+
}
71+
72+
func (pc *PersistedClock) read() error {
73+
f, err := pc.root.Open(pc.filePath)
74+
if os.IsNotExist(err) {
75+
return ErrClockNotExist
76+
}
77+
if err != nil {
78+
return err
79+
}
80+
defer f.Close()
81+
82+
content, err := ioutil.ReadAll(f)
83+
if err != nil {
84+
return err
85+
}
86+
87+
var value uint64
88+
n, err := fmt.Sscanf(string(content), "%d", &value)
89+
if err != nil {
90+
return err
91+
}
92+
93+
if n != 1 {
94+
return fmt.Errorf("could not read the clock")
95+
}
96+
97+
pc.MemClock = NewMemClockWithTime(value)
98+
99+
return nil
100+
}
101+
102+
func (pc *PersistedClock) Write() error {
103+
data := []byte(fmt.Sprintf("%d", pc.counter))
104+
return util.WriteFile(pc.root, pc.filePath, data, 0644)
105+
}

lamport/persisted_clock_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package lamport
2+
3+
import (
4+
"testing"
5+
6+
"github.com/go-git/go-billy/v5/memfs"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestPersistedClock(t *testing.T) {
11+
root := memfs.New()
12+
13+
c, err := NewPersistedClock(root, "test-clock")
14+
require.NoError(t, err)
15+
16+
testClock(t, c)
17+
}

0 commit comments

Comments
 (0)