Skip to content

Commit d699c45

Browse files
committed
feat: add distributed lock
1 parent 1623595 commit d699c45

File tree

21 files changed

+1633
-200
lines changed

21 files changed

+1633
-200
lines changed

pkg/distlock/README.md

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
# 分布式锁
2+
3+
支持以下类型的分布式锁:
4+
- Noop(假的,勿使用)
5+
- MySQL
6+
- PostgreSQL
7+
- Redis
8+
- Etcd
9+
- Zookeeper
10+
- Consul
11+
- Memcached
12+
- MongoDB
13+
14+
## 测试情况
15+
16+
- 已测试:MySQL、PostgreSQL、Redis
17+
- 未测试(使用前建议你自己充分测试下):Etcd、Zookeeper、Consul、Memcached、MongoDB
18+
19+
## GPT Prompt
20+
21+
```
22+
package distlock
23+
24+
import (
25+
"context"
26+
"fmt"
27+
"sync"
28+
"time"
29+
30+
"github.com/go-sql-driver/mysql"
31+
"github.com/jackc/pgx/v5/pgconn"
32+
"gorm.io/gorm"
33+
34+
"github.com/superproj/onex/pkg/logger"
35+
)
36+
37+
// GORMLocker provides a distributed locking mechanism using GORM.
38+
type GORMLocker struct {
39+
db *gorm.DB
40+
lockName string
41+
lockTimeout time.Duration
42+
renewTicker *time.Ticker
43+
stopChan chan struct{}
44+
mu sync.Mutex
45+
ownerID string
46+
logger logger.Logger
47+
}
48+
49+
// Lock represents a database record for a distributed lock.
50+
type Lock struct {
51+
ID uint `gorm:"primarykey"`
52+
Name string `gorm:"unique"`
53+
OwnerID string
54+
ExpiredAt time.Time
55+
CreatedAt time.Time
56+
UpdatedAt time.Time
57+
}
58+
59+
// Ensure GORMLocker implements the Locker interface.
60+
var _ Locker = (*GORMLocker)(nil)
61+
62+
// NewGORMLocker initializes a new GORMLocker instance.
63+
func NewGORMLocker(db *gorm.DB, opts ...Option) (*GORMLocker, error) {
64+
o := ApplyOptions(opts...)
65+
66+
if err := db.AutoMigrate(&Lock{}); err != nil {
67+
return nil, err
68+
}
69+
70+
locker := &GORMLocker{
71+
db: db,
72+
ownerID: o.ownerID,
73+
lockName: o.lockName,
74+
lockTimeout: o.lockTimeout,
75+
stopChan: make(chan struct{}),
76+
logger: o.logger,
77+
}
78+
79+
locker.logger.Info("GORMLocker initialized", "lockName", locker.lockName, "ownerID", locker.ownerID)
80+
81+
return locker, nil
82+
}
83+
84+
// Lock acquires the distributed lock.
85+
func (l *GORMLocker) Lock(ctx context.Context) error {
86+
l.mu.Lock()
87+
defer l.mu.Unlock()
88+
89+
now := time.Now()
90+
expiredAt := now.Add(l.lockTimeout)
91+
92+
err := l.db.Transaction(func(tx *gorm.DB) error {
93+
if err := tx.Create(&Lock{Name: l.lockName, OwnerID: l.ownerID, ExpiredAt: expiredAt}).Error; err != nil {
94+
if !isDuplicateEntry(err) {
95+
l.logger.Error("failed to create lock", "error", err)
96+
return err
97+
}
98+
99+
var lock Lock
100+
if err := tx.First(&lock, "name = ?", l.lockName).Error; err != nil {
101+
l.logger.Error("failed to fetch existing lock", "error", err)
102+
return err
103+
}
104+
105+
if !lock.ExpiredAt.Before(now) {
106+
l.logger.Warn("lock is already held by another owner", "ownerID", lock.OwnerID)
107+
return fmt.Errorf("lock is already held by %s", lock.OwnerID)
108+
}
109+
110+
lock.OwnerID = l.ownerID
111+
lock.ExpiredAt = expiredAt
112+
if err := tx.Save(&lock).Error; err != nil {
113+
l.logger.Error("failed to update expired lock", "error", err)
114+
return err
115+
}
116+
l.logger.Info("Lock expired, updated owner", "lockName", l.lockName, "newOwnerID", l.ownerID)
117+
}
118+
119+
l.renewTicker = time.NewTicker(l.lockTimeout / 2)
120+
go l.renewLock(ctx)
121+
122+
l.logger.Info("Lock acquired", "lockName", l.lockName, "ownerID", l.ownerID)
123+
return nil
124+
})
125+
126+
return err
127+
}
128+
129+
// Unlock releases the distributed lock.
130+
func (l *GORMLocker) Unlock(ctx context.Context) error {
131+
l.mu.Lock()
132+
defer l.mu.Unlock()
133+
134+
if l.renewTicker != nil {
135+
l.renewTicker.Stop()
136+
l.renewTicker = nil
137+
l.logger.Info("Stopped renewing lock", "lockName", l.lockName)
138+
}
139+
140+
err := l.db.Delete(&Lock{}, "name = ?", l.lockName).Error
141+
if err != nil {
142+
l.logger.Error("failed to delete lock", "error", err)
143+
return err
144+
}
145+
146+
l.logger.Info("Lock released", "lockName", l.lockName)
147+
return nil
148+
}
149+
150+
// Renew refreshes the lease for the distributed lock.
151+
func (l *GORMLocker) Renew(ctx context.Context) error {
152+
l.mu.Lock()
153+
defer l.mu.Unlock()
154+
155+
now := time.Now()
156+
expiredAt := now.Add(l.lockTimeout)
157+
158+
err := l.db.Model(&Lock{}).Where("name = ?", l.lockName).Update("expired_at", expiredAt).Error
159+
if err != nil {
160+
l.logger.Error("failed to renew lock", "error", err)
161+
return err
162+
}
163+
164+
l.logger.Info("Lock renewed", "lockName", l.lockName, "newExpiration", expiredAt)
165+
return nil
166+
}
167+
168+
// renewLock periodically renews the lock lease.
169+
func (l *GORMLocker) renewLock(ctx context.Context) {
170+
for {
171+
select {
172+
case <-l.stopChan:
173+
return
174+
case <-l.renewTicker.C:
175+
if err := l.Renew(ctx); err != nil {
176+
l.logger.Error("failed to renew lock", "error", err)
177+
}
178+
}
179+
}
180+
}
181+
182+
// isDuplicateEntry checks if the error is a duplicate entry error for MySQL and PostgreSQL.
183+
func isDuplicateEntry(err error) bool {
184+
if err == nil {
185+
return false
186+
}
187+
188+
if mysqlErr, ok := err.(*mysql.MySQLError); ok {
189+
return mysqlErr.Number == 1062 // MySQL error code for duplicate entry
190+
}
191+
192+
if pgErr, ok := err.(*pgconn.PgError); ok {
193+
return pgErr.Code == "23505" // PostgreSQL error code for unique violation
194+
}
195+
196+
return false
197+
}
198+
199+
参考上述基于MySQL的分布式锁实现,使用Consul实现一个分布式锁,该分布式锁同样实现了以下接口:
200+
201+
type Locker interface {
202+
Lock(ctx context.Context) error
203+
Unlock(ctx context.Context) error
204+
Renew(ctx context.Context) error
205+
}
206+
207+
并且需要满足以下要求:
208+
1. 分布式锁启动后,会有一个异步的协程序,根据创建时的参数,定期续锁;
209+
2. 如果代码需要创建数据库表,需要使用gorm的AutoMigrate方法自动创建表
210+
3. 使用logger记录必要的日志
211+
```

pkg/distlock/client.go

Lines changed: 0 additions & 107 deletions
This file was deleted.

0 commit comments

Comments
 (0)