Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]


permissions:
Expand Down
17 changes: 15 additions & 2 deletions atlas/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ package atlas
import (
"go.uber.org/zap"
"slices"
"strings"
"sync"
"time"
"zombiezen.com/go/sqlite"
)

type Authorizer struct {
boundTime time.Duration
mu sync.RWMutex
boundTime time.Duration
mu sync.RWMutex
LastTables map[string]struct{}
ForceReadonly bool
}

var writeOps []sqlite.OpType = []sqlite.OpType{
Expand Down Expand Up @@ -74,6 +77,7 @@ func (a *Authorizer) Reset() {
defer a.mu.Unlock()

a.boundTime = 0
a.LastTables = make(map[string]struct{})
}

func (a *Authorizer) isJournalModeChange(action sqlite.Action) bool {
Expand All @@ -91,6 +95,15 @@ func (a *Authorizer) isAtlasChange(action sqlite.Action) bool {
func (a *Authorizer) Authorize(action sqlite.Action) sqlite.AuthResult {
Logger.Info("Auth", zap.Any("action", action), zap.String("table", action.Table()))

if a.ForceReadonly && a.isWrite(action) {
return sqlite.AuthResultDeny
}

if action.Table() != "" && action.Database() != "" && action.Table() != "sqlite_master" {
name := strings.ToUpper(action.Database()) + "." + strings.ToUpper(action.Table())
a.LastTables[name] = struct{}{}
}

if a.isJournalModeChange(action) || a.isAtlasChange(action) {
return sqlite.AuthResultDeny
}
Expand Down
3 changes: 3 additions & 0 deletions atlas/bootstrap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ SET address = :address, port = :port, region = :region, active = 1
return err
}

// add the node to the quorum manager
qm.AddNode(node)

atlas.CurrentOptions.ServerId = node.Id

return err
Expand Down
2 changes: 1 addition & 1 deletion atlas/bootstrap/migrations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ create table tables
);
/* Insert the nodes table as a tracked table */
insert into tables
values ('atlas.nodes', 'global', null, current_timestamp, 0, '', '');
values ('ATLAS.NODES', 'global', null, current_timestamp, 0, '', '');

/* Migrations are defined as rows on this table.
They are ultimately numbered by the version and batch_part columns.
Expand Down
14 changes: 12 additions & 2 deletions atlas/consensus/majority-quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,21 @@ import (
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"sync"
"sync/atomic"
)

type majorityQuorum struct {
q1 []*QuorumNode
q2 []*QuorumNode
q1 []*QuorumNode
q2 []*QuorumNode
nextMigrationVersion atomic.Int64
}

func (m *majorityQuorum) SetNextMigrationVersion(version int64) {
m.nextMigrationVersion.Store(version)
}

func (m *majorityQuorum) GetNextMigrationVersion() int64 {
return m.nextMigrationVersion.Load()
}

func (m *majorityQuorum) Gossip(ctx context.Context, in *GossipMigration, opts ...grpc.CallOption) (*emptypb.Empty, error) {
Expand Down
2 changes: 2 additions & 0 deletions atlas/consensus/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (q *defaultQuorumManager) AddNode(node *Node) {

type Quorum interface {
ConsensusClient
SetNextMigrationVersion(version int64)
GetNextMigrationVersion() int64
}

type QuorumNode struct {
Expand Down
13 changes: 8 additions & 5 deletions atlas/consensus/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"zombiezen.com/go/sqlite"
)

const NodeTable = "atlas.nodes"
const NodeTable = "ATLAS.NODES"

type Server struct {
UnimplementedConsensusServer
Expand All @@ -53,6 +53,9 @@ func (s *Server) StealTableOwnership(ctx context.Context, req *StealTableOwnersh
}()

_, err = atlas.ExecuteSQL(ctx, "BEGIN IMMEDIATE", conn, false)
if err != nil {
return nil, err
}

tr := GetDefaultTableRepository(ctx, conn)
existingTable, err := tr.GetTable(req.GetTable().GetName())
Expand Down Expand Up @@ -303,7 +306,7 @@ func (s *Server) AcceptMigration(ctx context.Context, req *WriteMigrationRequest
}()

commitConn := conn
if !strings.HasPrefix(req.GetMigration().GetVersion().GetTableName(), "atlas.") {
if !strings.HasPrefix(req.GetMigration().GetVersion().GetTableName(), "ATLAS.") {
commitConn, err = atlas.Pool.Take(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -388,7 +391,7 @@ func (s *Server) applyMigration(migrations []*Migration, commitConn *sqlite.Conn
return nil
}

func constructCurrentNode() *Node {
func ConstructCurrentNode() *Node {
return &Node{
Id: atlas.CurrentOptions.ServerId,
Address: atlas.CurrentOptions.AdvertiseAddress,
Expand Down Expand Up @@ -515,7 +518,7 @@ VALUES (:id, :address, :port, :region, 1, current_timestamp, 0)`, conn, false, a
}

mreq := &WriteMigrationRequest{
Sender: constructCurrentNode(),
Sender: ConstructCurrentNode(),
Migration: migration,
}

Expand Down Expand Up @@ -635,7 +638,7 @@ func SendGossip(ctx context.Context, req *GossipMigration, conn *sqlite.Conn) er
Table: req.GetTable(),
PreviousMigration: req.GetPreviousMigration(),
Ttl: req.GetTtl() - 1,
Sender: constructCurrentNode(),
Sender: ConstructCurrentNode(),
}

wg := sync.WaitGroup{}
Expand Down
39 changes: 39 additions & 0 deletions atlas/ownership.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,51 @@ type TableOwnerships struct {
mu sync.RWMutex
subscriptions map[string][]chan<- TableOwnershipChange
commitTimes map[string]time.Time
holds map[string]int
}

var Ownership = &TableOwnerships{
own: map[string]struct{}{},
subscriptions: map[string][]chan<- TableOwnershipChange{},
commitTimes: map[string]time.Time{},
holds: map[string]int{},
}

func (t *TableOwnerships) Hold(table string) {
t.mu.Lock()
defer t.mu.Unlock()

if c, ok := t.holds[table]; ok {
t.holds[table] = c + 1
return
}

t.holds[table] = 1
}

func (t *TableOwnerships) Release(table string) {
t.mu.Lock()
defer t.mu.Unlock()

if c, ok := t.holds[table]; ok {
if c == 1 {
delete(t.holds, table)
} else {
t.holds[table] = c - 1
}
}

if c, ok := t.holds[table]; ok && c <= 0 {
delete(t.holds, table)
}
}
Comment on lines +67 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove redundant delete operation and add count validation.

The Release method has two issues:

  1. The second delete operation (lines 79-81) is redundant as the table entry is already deleted if count equals 1.
  2. The count can potentially become negative without proper validation.

Apply this diff to fix both issues:

 func (t *TableOwnerships) Release(table string) {
     t.mu.Lock()
     defer t.mu.Unlock()
 
     if c, ok := t.holds[table]; ok {
+        if c <= 0 {
+            delete(t.holds, table)
+            return
+        }
         if c == 1 {
             delete(t.holds, table)
         } else {
             t.holds[table] = c - 1
         }
     }
-
-    if c, ok := t.holds[table]; ok && c <= 0 {
-        delete(t.holds, table)
-    }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (t *TableOwnerships) Release(table string) {
t.mu.Lock()
defer t.mu.Unlock()
if c, ok := t.holds[table]; ok {
if c == 1 {
delete(t.holds, table)
} else {
t.holds[table] = c - 1
}
}
if c, ok := t.holds[table]; ok && c <= 0 {
delete(t.holds, table)
}
}
func (t *TableOwnerships) Release(table string) {
t.mu.Lock()
defer t.mu.Unlock()
if c, ok := t.holds[table]; ok {
if c <= 0 {
delete(t.holds, table)
return
}
if c == 1 {
delete(t.holds, table)
} else {
t.holds[table] = c - 1
}
}
}


func (t *TableOwnerships) IsHeld(table string) bool {
t.mu.RLock()
defer t.mu.RUnlock()

_, ok := t.holds[table]
return ok
}

func (t *TableOwnerships) Add(table string, version int64) {
Expand Down
10 changes: 0 additions & 10 deletions atlas/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
_ "embed"
"runtime"
"strings"
"sync"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitemigration"
)
Expand All @@ -34,27 +33,18 @@ var migrations string
var Pool *sqlitemigration.Pool
var MigrationsPool *sqlitemigration.Pool

var authorizers = map[*sqlite.Conn]*Authorizer{}

// CreatePool creates a new connection pool for the database and the migrations database.
func CreatePool(options *Options) {
if Pool != nil {
return
}

authorizers = make(map[*sqlite.Conn]*Authorizer)
amu := sync.Mutex{}

Pool = sqlitemigration.NewPool(options.DbFilename, sqlitemigration.Schema{}, sqlitemigration.Options{
Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL,
PoolSize: runtime.NumCPU() * 2,
PrepareConn: func(conn *sqlite.Conn) (err error) {
auth := &Authorizer{}

amu.Lock()
authorizers[conn] = auth
amu.Unlock()

err = conn.SetAuthorizer(auth)
if err != nil {
return
Expand Down
Loading
Loading