Skip to content

Commit 0166952

Browse files
committed
make it easier to steal tables
1 parent e446d80 commit 0166952

File tree

3 files changed

+211
-2
lines changed

3 files changed

+211
-2
lines changed

atlas/consensus/majority-quorum.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,21 @@ import (
2525
"google.golang.org/grpc"
2626
"google.golang.org/protobuf/types/known/emptypb"
2727
"sync"
28+
"sync/atomic"
2829
)
2930

3031
type majorityQuorum struct {
31-
q1 []*QuorumNode
32-
q2 []*QuorumNode
32+
q1 []*QuorumNode
33+
q2 []*QuorumNode
34+
nextMigrationVersion atomic.Int64
35+
}
36+
37+
func (m *majorityQuorum) SetNextMigrationVersion(version int64) {
38+
m.nextMigrationVersion.Store(version)
39+
}
40+
41+
func (m *majorityQuorum) GetNextMigrationVersion() int64 {
42+
return m.nextMigrationVersion.Load()
3343
}
3444

3545
func (m *majorityQuorum) Gossip(ctx context.Context, in *GossipMigration, opts ...grpc.CallOption) (*emptypb.Empty, error) {

atlas/consensus/quorum.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ func (q *defaultQuorumManager) AddNode(node *Node) {
7373

7474
type Quorum interface {
7575
ConsensusClient
76+
SetNextMigrationVersion(version int64)
77+
GetNextMigrationVersion() int64
7678
}
7779

7880
type QuorumNode struct {

atlas/socket/server/steal.go

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* This file is part of Atlas-DB.
3+
*
4+
* Atlas-DB is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of
7+
* the License, or (at your option) any later version.
8+
*
9+
* Atlas-DB is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with Atlas-DB. If not, see <https://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
package server
20+
21+
import (
22+
"bytes"
23+
"context"
24+
"fmt"
25+
"github.com/bottledcode/atlas-db/atlas"
26+
"github.com/bottledcode/atlas-db/atlas/consensus"
27+
"strings"
28+
"zombiezen.com/go/sqlite"
29+
)
30+
31+
type principalKey struct {
32+
Name string
33+
}
34+
35+
type principalValue struct {
36+
Value string
37+
}
38+
39+
func AttachPrincipal(ctx context.Context, name, value string) context.Context {
40+
return context.WithValue(ctx, principalKey{Name: name}, principalValue{Value: value})
41+
}
42+
43+
func getTable(ctx context.Context, table string) (*consensus.Table, error) {
44+
conn, err := atlas.MigrationsPool.Take(ctx)
45+
if err != nil {
46+
return nil, err
47+
}
48+
defer atlas.MigrationsPool.Put(conn)
49+
tr := consensus.GetDefaultTableRepository(ctx, conn)
50+
t, err := tr.GetTable(table)
51+
if err != nil {
52+
return nil, err
53+
}
54+
55+
if t != nil && len(t.GetShardPrincipals()) > 0 {
56+
shards := make([]*consensus.Principal, 0, len(t.GetShardPrincipals()))
57+
for _, p := range t.GetShardPrincipals() {
58+
if v, ok := ctx.Value(principalKey{Name: p}).(principalValue); !ok {
59+
return nil, fmt.Errorf("missing principal: %s", p)
60+
} else {
61+
shards = append(shards, &consensus.Principal{Name: p, Value: v.Value})
62+
}
63+
}
64+
shard, err := tr.GetShard(t, shards)
65+
if err != nil {
66+
return nil, err
67+
}
68+
if shard == nil {
69+
return nil, fmt.Errorf("shard isn't found: %s", table)
70+
}
71+
72+
return shard.GetShard(), nil
73+
}
74+
75+
return t, nil
76+
}
77+
78+
func StealTableByName(ctx context.Context, tableName string) (consensus.Quorum, error) {
79+
table, err := getTable(ctx, tableName)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
if table == nil {
85+
return nil, fmt.Errorf("table isn't found: %s", tableName)
86+
}
87+
88+
// resolve groups
89+
goUp:
90+
if table.GetGroup() != "" {
91+
table, err = getTable(ctx, table.GetGroup())
92+
if err != nil {
93+
return nil, err
94+
}
95+
goto goUp
96+
}
97+
98+
return stealTable(ctx, table)
99+
}
100+
101+
func stealTable(ctx context.Context, table *consensus.Table) (consensus.Quorum, error) {
102+
req := &consensus.StealTableOwnershipRequest{
103+
Sender: consensus.ConstructCurrentNode(),
104+
Reason: consensus.StealReason_queryReason,
105+
Table: table,
106+
}
107+
108+
qm := consensus.GetDefaultQuorumManager(ctx)
109+
q, err := qm.GetQuorum(ctx, table.GetName())
110+
if err != nil {
111+
return nil, err
112+
}
113+
114+
response, err := q.StealTableOwnership(ctx, req)
115+
if err != nil {
116+
return nil, err
117+
}
118+
119+
if !response.GetPromised() {
120+
return nil, fmt.Errorf("unable to steal ownership for table: %s", table.GetName())
121+
}
122+
123+
var conn *sqlite.Conn
124+
if strings.HasPrefix(table.GetName(), "ATLAS.") {
125+
conn, err = atlas.MigrationsPool.Take(ctx)
126+
if err != nil {
127+
return nil, err
128+
}
129+
defer atlas.MigrationsPool.Put(conn)
130+
} else {
131+
conn, err = atlas.Pool.Take(ctx)
132+
if err != nil {
133+
return nil, err
134+
}
135+
defer atlas.Pool.Put(conn)
136+
}
137+
138+
// apply any missing migrations
139+
_, err = atlas.ExecuteSQL(ctx, "BEGIN IMMEDIATE", conn, false)
140+
if err != nil {
141+
return nil, err
142+
}
143+
defer func() {
144+
if err != nil {
145+
_, _ = atlas.ExecuteSQL(ctx, "ROLLBACK", conn, false)
146+
}
147+
}()
148+
149+
nextVersion := int64(1)
150+
151+
for _, missing := range response.GetSuccess().GetMissingMigrations() {
152+
nextVersion = missing.GetVersion().GetMigrationVersion() + 1
153+
switch missing.GetMigration().(type) {
154+
case *consensus.Migration_Data:
155+
for _, data := range missing.GetData().GetSession() {
156+
reader := bytes.NewReader(data)
157+
err = conn.ApplyChangeset(reader, nil, func(conflictType sqlite.ConflictType, iterator *sqlite.ChangesetIterator) sqlite.ConflictAction {
158+
return sqlite.ChangesetReplace
159+
})
160+
if err != nil {
161+
return nil, err
162+
}
163+
}
164+
case *consensus.Migration_Schema:
165+
var stmt *sqlite.Stmt
166+
hasRow := true
167+
for _, command := range missing.GetSchema().GetCommands() {
168+
stmt, _, err = conn.PrepareTransient(command)
169+
if err != nil {
170+
return nil, err
171+
}
172+
for hasRow {
173+
hasRow, err = stmt.Step()
174+
if err != nil {
175+
return nil, err
176+
}
177+
if !hasRow {
178+
break
179+
}
180+
}
181+
err = stmt.Finalize()
182+
if err != nil {
183+
return nil, err
184+
}
185+
}
186+
}
187+
}
188+
189+
q.SetNextMigrationVersion(nextVersion)
190+
191+
_, err = atlas.ExecuteSQL(ctx, "COMMIT", conn, false)
192+
if err != nil {
193+
return nil, err
194+
}
195+
196+
return q, nil
197+
}

0 commit comments

Comments
 (0)