Skip to content

Commit edee0f1

Browse files
authored
Merge pull request #16 from microyahoo/notes
add notes for concurrency election
2 parents 1758230 + fd6cb36 commit edee0f1

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

client/v3/concurrency/election.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,19 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
7070
s := e.session
7171
client := e.session.Client()
7272

73+
// 如果 session 没有指定 lease id,则会为每个 session 生成新的 lease id
74+
// key 的格式为 prefix+leaseID
7375
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
76+
// 当 key 不存在时,createRevision 是 0
7477
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
75-
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
78+
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease()))) // 为 key attach 租约
7679
txn = txn.Else(v3.OpGet(k))
7780
resp, err := txn.Commit()
7881
if err != nil {
7982
return err
8083
}
8184
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
82-
if !resp.Succeeded {
85+
if !resp.Succeeded { // 如果 key 已经存在了
8386
kv := resp.Responses[0].GetResponseRange().Kvs[0]
8487
e.leaderRev = kv.CreateRevision
8588
if string(kv.Value) != val {
@@ -107,6 +110,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
107110
}
108111

109112
// Proclaim lets the leader announce a new value without another election.
113+
// leader 宣布新的值,即修改 leaderKey 对应的 value。如果提交失败则清空 leaderKey
110114
func (e *Election) Proclaim(ctx context.Context, val string) error {
111115
if e.leaderSession == nil {
112116
return ErrElectionNotLeader
@@ -134,6 +138,7 @@ func (e *Election) Resign(ctx context.Context) (err error) {
134138
return nil
135139
}
136140
client := e.session.Client()
141+
// 将 leaderkey, leaderRev 从数据库中删除,从而发起新的选举
137142
cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
138143
resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
139144
if err == nil {

client/v3/concurrency/mutex.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
120120
// put self in lock waiters via myKey; oldest waiter holds lock
121121
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
122122
// reuse key in case this session already holds the lock
123-
get := v3.OpGet(m.myKey)
123+
get := v3.OpGet(m.myKey) // 这里是使用 mykey 来进行查询的
124124
// fetch current holder to complete uncontended path with only one RPC
125-
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
125+
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) // 这里是使用 prefix 来进行查询的
126126
// 如果 key 不存在则将 key 写入数据库,如果已经存在则读取 key 的记录信息,所有的操作在一个事务中
127127
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
128128
if err != nil {

0 commit comments

Comments
 (0)