Skip to content

Commit 105d0cd

Browse files
authored
Merge pull request #33 from absolutelightning/lock-free
Lock free and single event loop
2 parents b0eb8fb + 79b899c commit 105d0cd

File tree

11 files changed

+3
-62
lines changed

11 files changed

+3
-62
lines changed

datastructures/hnsw/graph.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"math"
66
"math/rand"
77
"sort"
8-
"sync"
98
"time"
109

1110
"github.com/absolutelightning/gods/queues/priorityqueue"
@@ -24,7 +23,6 @@ type HNSW struct {
2423
LayerFactor float64 // Probability factor, but we won't use it with the current randomLevel
2524
EfSearch int // number of candidates during search
2625
DistFunc DistanceFunc // distance function
27-
lock sync.Mutex // lock for thread-safe operations
2826
EntryPoint *Node // top entry point into the graph
2927
Rand *rand.Rand // random number generator
3028
}
@@ -96,9 +94,6 @@ func (h *HNSW) randomLevel() int {
9694

9795
// Insert adds a new element `vector` into the HNSW graph.
9896
func (h *HNSW) Insert(vector Vector) string {
99-
h.lock.Lock()
100-
defer h.lock.Unlock()
101-
10297
level := h.randomLevel()
10398

10499
// Create the new node
@@ -440,9 +435,6 @@ func min(a, b int) int {
440435
}
441436

442437
func (h *HNSW) Delete(nodeID string) bool {
443-
h.lock.Lock()
444-
defer h.lock.Unlock()
445-
446438
if len(h.Layers) == 0 {
447439
return false
448440
}

main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ func main() {
9292
log.Fatal(gnet.Run(
9393
tredsServer,
9494
"tcp://0.0.0.0:"+strconv.Itoa(tredsServer.Port),
95-
gnet.WithMulticore(true),
95+
// Single Event loop
96+
gnet.WithMulticore(false),
9697
gnet.WithReusePort(false),
9798
gnet.WithTCPKeepAlive(300*time.Second),
9899
))

server/discard.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ func executeDiscard() ExecutionHook {
3535
return gnet.None
3636
}
3737

38-
ts.LockClientTransaction()
39-
defer ts.UnlockClientTransaction()
4038
delete(ts.GetClientTransaction(), c.RemoteAddr().String())
4139

4240
res := "OK"

server/exec.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ func executeExec() ExecutionHook {
3636
return gnet.None
3737
}
3838

39-
ts.LockClientTransaction()
40-
defer ts.UnlockClientTransaction()
41-
4239
clientTransaction, ok := ts.GetClientTransaction()[c.RemoteAddr().String()]
4340
if !ok {
4441
ts.RespondErr(c, fmt.Errorf("no transaction started"))

server/multi.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,6 @@ func executeMulti() ExecutionHook {
4444
return gnet.None
4545
}
4646

47-
ts.LockClientTransaction()
48-
defer ts.UnlockClientTransaction()
49-
5047
ts.GetClientTransaction()[c.RemoteAddr().String()] = make([]string, 0)
5148

5249
res := "OK"

server/psubscribe.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ func executePSubscribeCommand() ExecutionHook {
6666
allChannels[channelPrefix] = struct{}{}
6767
}
6868

69-
ts.LockChannelSubs()
70-
defer ts.UnlockChannelSubs()
71-
7269
for channel := range allChannels {
7370
prevData, ok := subscriptionData.Get([]byte(channel))
7471
if !ok {

server/punsubscribe.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ func executePUnsubscribeCommand() ExecutionHook {
6666
allChannels[channelPrefix] = struct{}{}
6767
}
6868

69-
ts.LockChannelSubs()
70-
defer ts.UnlockChannelSubs()
7169
for channel := range allChannels {
7270
prevData, ok := subscriptionData.Get([]byte(channel))
7371
if !ok {

server/server.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"path/filepath"
1212
"strconv"
1313
"strings"
14-
"sync"
1514
"time"
1615

1716
wal "github.com/hashicorp/raft-wal"
@@ -39,10 +38,8 @@ type Server struct {
3938
tredsCommandRegistry commands.CommandRegistry
4039
tredsServerCommandRegistry ServerCommandRegistry
4140
clientTransaction map[string][]string
42-
clientTransactionLock *sync.Mutex
4341

4442
channelSubscriptionData *radix.Tree
45-
channelSubscriptionLock *sync.Mutex
4643
connectionSubscription map[string]map[string]struct{}
4744

4845
connectionMap map[string]gnet.Conn
@@ -206,10 +203,8 @@ func New(port, segmentSize int, bindAddr, advertiseAddr, serverId string, applyT
206203
id: config.LocalID,
207204
raftApplyTimeout: applyTimeout,
208205
clientTransaction: make(map[string][]string),
209-
clientTransactionLock: &sync.Mutex{},
210206
connP: connPool.NewConnPool(time.Second * 5),
211207
channelSubscriptionData: radix.New(),
212-
channelSubscriptionLock: &sync.Mutex{},
213208
connectionSubscription: make(map[string]map[string]struct{}),
214209
connectionMap: make(map[string]gnet.Conn),
215210
}, nil
@@ -276,22 +271,6 @@ func (ts *Server) GetRaftApplyTimeout() time.Duration {
276271
return ts.raftApplyTimeout
277272
}
278273

279-
func (ts *Server) LockClientTransaction() {
280-
ts.clientTransactionLock.Lock()
281-
}
282-
283-
func (ts *Server) UnlockClientTransaction() {
284-
ts.clientTransactionLock.Unlock()
285-
}
286-
287-
func (ts *Server) LockChannelSubs() {
288-
ts.channelSubscriptionLock.Lock()
289-
}
290-
291-
func (ts *Server) UnlockChannelSubs() {
292-
ts.channelSubscriptionLock.Unlock()
293-
}
294-
295274
func (ts *Server) SetChannelSubscriptionData(data *radix.Tree) {
296275
ts.channelSubscriptionData = data
297276
}
@@ -323,8 +302,6 @@ func (ts *Server) OnTraffic(c gnet.Conn) gnet.Action {
323302

324303
// Check for transaction first, if transaction just enqueue the command
325304
if _, ok := ts.clientTransaction[c.RemoteAddr().String()]; ok {
326-
ts.clientTransactionLock.Lock()
327-
defer ts.clientTransactionLock.Unlock()
328305
ts.clientTransaction[c.RemoteAddr().String()] = append(ts.clientTransaction[c.RemoteAddr().String()], inp)
329306
res := "QUEUED"
330307
_, errConn := c.Write([]byte(resp.EncodeSimpleString(res)))
@@ -428,14 +405,10 @@ func (ts *Server) OnClose(c gnet.Conn, _ error) gnet.Action {
428405
}
429406

430407
func (ts *Server) CleanUpClientTransaction(c gnet.Conn) {
431-
ts.clientTransactionLock.Lock()
432-
defer ts.clientTransactionLock.Unlock()
433408
delete(ts.clientTransaction, c.RemoteAddr().String())
434409
}
435410

436411
func (ts *Server) CleanUpChannelSubscriptions(c gnet.Conn) {
437-
ts.channelSubscriptionLock.Lock()
438-
defer ts.channelSubscriptionLock.Unlock()
439412
// use connectionSubscription map to delete all subscriptions for this connection
440413
if _, ok := ts.connectionSubscription[c.RemoteAddr().String()]; ok {
441414
for channel := range ts.connectionSubscription[c.RemoteAddr().String()] {

server/subscribe.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ func executeSubscribeCommandName() ExecutionHook {
5757
allChannels[channel] = struct{}{}
5858
}
5959

60-
ts.LockChannelSubs()
61-
defer ts.UnlockChannelSubs()
6260
for channel := range allChannels {
6361
prevData, ok := subscriptionData.Get([]byte(channel))
6462
if !ok {

server/treds_fsm.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"io"
66
"log"
77
"strings"
8-
"sync"
98
"time"
109

1110
"github.com/hashicorp/raft"
@@ -22,7 +21,6 @@ type TredsFsm struct {
2221
cmdRegistry commands.CommandRegistry
2322
tredsStore store.Store
2423
conn gnet.Conn
25-
storeLock *sync.Mutex
2624
}
2725

2826
func (t *TredsFsm) Apply(log *raft.Log) interface{} {
@@ -35,10 +33,6 @@ func (t *TredsFsm) Apply(log *raft.Log) interface{} {
3533
if err != nil {
3634
return err
3735
}
38-
if commandReg.IsWrite {
39-
t.storeLock.Lock()
40-
defer t.storeLock.Unlock()
41-
}
4236
currentStore := t.tredsStore
4337
if currentStore != nil {
4438
return commandReg.Execute(args, currentStore)
@@ -85,8 +79,6 @@ func (t *TredsFsm) Restore(old io.ReadCloser) error {
8579
return err
8680
}
8781
ts := store.NewTredsStore()
88-
t.storeLock.Lock()
89-
defer t.storeLock.Unlock()
9082
err = ts.Restore(data)
9183
t.tredsStore = ts
9284
if err != nil {
@@ -96,5 +88,5 @@ func (t *TredsFsm) Restore(old io.ReadCloser) error {
9688
}
9789

9890
func NewTredsFsm(registry commands.CommandRegistry, store store.Store) *TredsFsm {
99-
return &TredsFsm{cmdRegistry: registry, tredsStore: store, storeLock: &sync.Mutex{}}
91+
return &TredsFsm{cmdRegistry: registry, tredsStore: store}
10092
}

0 commit comments

Comments
 (0)