Skip to content

Commit 122700b

Browse files
committed
start handling begin
1 parent 8e7eacb commit 122700b

File tree

3 files changed

+136
-121
lines changed

3 files changed

+136
-121
lines changed

atlas/authorization.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ import (
2828
)
2929

3030
type Authorizer struct {
31-
boundTime time.Duration
32-
mu sync.RWMutex
33-
LastTables map[string]struct{}
31+
boundTime time.Duration
32+
mu sync.RWMutex
33+
LastTables map[string]struct{}
34+
ForceReadonly bool
3435
}
3536

3637
var writeOps []sqlite.OpType = []sqlite.OpType{
@@ -94,6 +95,10 @@ func (a *Authorizer) isAtlasChange(action sqlite.Action) bool {
9495
func (a *Authorizer) Authorize(action sqlite.Action) sqlite.AuthResult {
9596
Logger.Info("Auth", zap.Any("action", action), zap.String("table", action.Table()))
9697

98+
if a.ForceReadonly && a.isWrite(action) {
99+
return sqlite.AuthResultDeny
100+
}
101+
97102
if action.Table() != "" && action.Database() != "" && action.Table() != "sqlite_master" {
98103
name := strings.ToUpper(action.Database()) + "." + strings.ToUpper(action.Table())
99104
a.LastTables[name] = struct{}{}

atlas/ownership.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,51 @@ type TableOwnerships struct {
4242
mu sync.RWMutex
4343
subscriptions map[string][]chan<- TableOwnershipChange
4444
commitTimes map[string]time.Time
45+
holds map[string]int
4546
}
4647

4748
var Ownership = &TableOwnerships{
4849
own: map[string]struct{}{},
4950
subscriptions: map[string][]chan<- TableOwnershipChange{},
5051
commitTimes: map[string]time.Time{},
52+
holds: map[string]int{},
53+
}
54+
55+
func (t *TableOwnerships) Hold(table string) {
56+
t.mu.Lock()
57+
defer t.mu.Unlock()
58+
59+
if c, ok := t.holds[table]; ok {
60+
t.holds[table] = c + 1
61+
return
62+
}
63+
64+
t.holds[table] = 1
65+
}
66+
67+
func (t *TableOwnerships) Release(table string) {
68+
t.mu.Lock()
69+
defer t.mu.Unlock()
70+
71+
if c, ok := t.holds[table]; ok {
72+
if c == 1 {
73+
delete(t.holds, table)
74+
} else {
75+
t.holds[table] = c - 1
76+
}
77+
}
78+
79+
if c, ok := t.holds[table]; ok && c <= 0 {
80+
delete(t.holds, table)
81+
}
82+
}
83+
84+
func (t *TableOwnerships) IsHeld(table string) bool {
85+
t.mu.RLock()
86+
defer t.mu.RUnlock()
87+
88+
_, ok := t.holds[table]
89+
return ok
5190
}
5291

5392
func (t *TableOwnerships) Add(table string, version int64) {

atlas/socket/handler.go

Lines changed: 89 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -305,28 +305,8 @@ ready:
305305
err = s.PerformBind(cmd)
306306
goto handleError
307307
case "BEGIN":
308-
if s.inTransaction {
309-
err = makeFatal(errors.New("the transaction is already in progress"))
310-
goto handleError
311-
}
312-
if t, ok := cmd.SelectNormalizedCommand(1); ok && t == "IMMEDIATE" {
313-
_, err = atlas.ExecuteSQL(ctx, "BEGIN IMMEDIATE", s.sql, false)
314-
if err != nil {
315-
err = makeFatal(err)
316-
goto handleError
317-
}
318-
} else {
319-
_, err = atlas.ExecuteSQL(ctx, "BEGIN", s.sql, false)
320-
if err != nil {
321-
err = makeFatal(err)
322-
goto handleError
323-
}
324-
}
325-
s.inTransaction = true
326-
327-
s.session, err = atlas.InitializeSession(ctx, s.sql)
308+
_, err = s.PerformBegin(ctx, cmd)
328309
if err != nil {
329-
err = makeFatal(err)
330310
goto handleError
331311
}
332312

@@ -480,103 +460,6 @@ ready:
480460
}
481461
}
482462

483-
func (s *Socket) SanitizeBegin(cmd commands.Command) (tables, views, triggers []string, err error) {
484-
if s.inTransaction {
485-
err = errors.New("the transaction is already in progress")
486-
return
487-
}
488-
489-
extractList := func() (list []string, err error) {
490-
var rip []string
491-
n := 3
492-
expectingFirst := true
493-
expectingLast := true
494-
for {
495-
first, _ := cmd.SelectNormalizedCommand(n)
496-
rip = append(rip, first)
497-
n++
498-
isLast := false
499-
first = strings.TrimSuffix(first, ",")
500-
if first == "" {
501-
break
502-
}
503-
if first == "(" {
504-
expectingFirst = false
505-
continue
506-
}
507-
if first == ")" {
508-
expectingLast = false
509-
break
510-
}
511-
if strings.HasPrefix(first, "(") {
512-
first = strings.TrimPrefix(first, "(")
513-
expectingFirst = false
514-
}
515-
if strings.HasSuffix(first, ")") {
516-
expectingLast = false
517-
first = strings.TrimSuffix(first, ")")
518-
isLast = true
519-
}
520-
if expectingFirst {
521-
err = errors.New("expected table name in parentheses")
522-
return
523-
}
524-
525-
if first == "," {
526-
continue
527-
}
528-
529-
list = append(list, cmd.NormalizeName(first))
530-
if isLast {
531-
break
532-
}
533-
}
534-
if expectingFirst || expectingLast {
535-
err = errors.New("expected table name in parentheses")
536-
return
537-
}
538-
cmd = cmd.ReplaceCommand(strings.Join(rip, " "), "BEGIN IMMEDIATE")
539-
return
540-
}
541-
542-
if t, ok := cmd.SelectNormalizedCommand(1); ok && t == "IMMEDIATE" {
543-
if err = cmd.CheckMinLen(3); err != nil {
544-
return
545-
}
546-
for {
547-
switch t, _ = cmd.SelectNormalizedCommand(2); t {
548-
case "TABLE":
549-
if err = cmd.CheckMinLen(4); err != nil {
550-
return
551-
}
552-
tables, err = extractList()
553-
if err != nil {
554-
return
555-
}
556-
case "VIEW":
557-
if err = cmd.CheckMinLen(4); err != nil {
558-
return
559-
}
560-
views, err = extractList()
561-
if err != nil {
562-
return
563-
}
564-
case "TRIGGER":
565-
if err = cmd.CheckMinLen(4); err != nil {
566-
return
567-
}
568-
triggers, err = extractList()
569-
if err != nil {
570-
return
571-
}
572-
default:
573-
return
574-
}
575-
}
576-
}
577-
return
578-
}
579-
580463
func (s *Socket) PerformFinalize(cmd *commands.CommandString) (err error) {
581464
if err = cmd.CheckExactLen(2); err != nil {
582465
return
@@ -599,6 +482,94 @@ func (s *Socket) PerformFinalize(cmd *commands.CommandString) (err error) {
599482
return
600483
}
601484

485+
func (s *Socket) PerformBegin(ctx context.Context, cmd *commands.CommandString) (begin *Begin, err error) {
486+
if begin, err = ParseBegin(cmd); err != nil {
487+
return
488+
}
489+
if err = begin.Handle(s); err != nil {
490+
return
491+
}
492+
493+
if begin.isReadonly {
494+
_, err = atlas.ExecuteSQL(ctx, "BEGIN", s.sql, false)
495+
return
496+
}
497+
498+
q := make(map[string]consensus.Quorum)
499+
500+
conn, err := atlas.MigrationsPool.Take(ctx)
501+
if err != nil {
502+
return
503+
}
504+
defer atlas.MigrationsPool.Put(conn)
505+
506+
tr := consensus.GetDefaultTableRepository(ctx, conn)
507+
508+
// before doing anything, we need to flag the tables as being held so we do not give them up if we become an owner
509+
for _, p := range begin.tables {
510+
// see if this table already exists
511+
var t *consensus.Table
512+
t, err = tr.GetTable(p)
513+
if err != nil {
514+
return
515+
}
516+
if t == nil {
517+
// we either expect the table to be created in the new transaction, or we do not have any information about it yet
518+
continue
519+
}
520+
521+
atlas.Ownership.Hold(p)
522+
defer func(p string) {
523+
if err != nil {
524+
atlas.Ownership.Release(p)
525+
}
526+
}(p)
527+
528+
q[p], err = consensus.GetDefaultQuorumManager(ctx).GetQuorum(ctx, p)
529+
if err != nil {
530+
return
531+
}
532+
533+
retry:
534+
535+
var resp *consensus.StealTableOwnershipResponse
536+
resp, err = q[p].StealTableOwnership(ctx, &consensus.StealTableOwnershipRequest{
537+
Sender: consensus.ConstructCurrentNode(),
538+
Reason: consensus.StealReason_queryReason,
539+
Table: t,
540+
})
541+
if err != nil {
542+
return
543+
}
544+
545+
// we do not have the table and this could be because the table was updated or some other reason
546+
switch resp.GetResponse().(type) {
547+
case *consensus.StealTableOwnershipResponse_Failure:
548+
// update the table if newer
549+
if resp.GetFailure().GetTable().GetVersion() > t.GetVersion() {
550+
t = resp.GetFailure().GetTable()
551+
err = tr.UpdateTable(t)
552+
if err != nil {
553+
return
554+
}
555+
goto retry
556+
}
557+
// We were denied ownership for some reason unrelated to the table being updated.
558+
// This is most likely because the table owner has refused the movement to the current region.
559+
err = errors.New("forwarding required")
560+
return
561+
case *consensus.StealTableOwnershipResponse_Success:
562+
// check that we were promised the ownership
563+
if resp.GetPromised() {
564+
// we have the table and now we can proceed once we apply any missing migrations
565+
566+
}
567+
}
568+
}
569+
570+
return
571+
}
572+
602573
// PerformPrepare parses a SQL query and creates an appropriate Prepare object for execution.
603574
func (s *Socket) PerformPrepare(cmd *commands.CommandString) (prepare *Prepare, err error) {
604575
if prepare, err = ParsePrepare(cmd); err != nil {

0 commit comments

Comments
 (0)