Skip to content

Commit 5aaddd4

Browse files
committed
start working on making this usable
1 parent 4ea35df commit 5aaddd4

File tree

6 files changed

+162
-36
lines changed

6 files changed

+162
-36
lines changed

atlas/authorization.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ package atlas
2121
import (
2222
"go.uber.org/zap"
2323
"slices"
24+
"strings"
2425
"sync"
2526
"time"
2627
"zombiezen.com/go/sqlite"
2728
)
2829

2930
type Authorizer struct {
30-
boundTime time.Duration
31-
mu sync.RWMutex
31+
boundTime time.Duration
32+
mu sync.RWMutex
33+
LastTables map[string]struct{}
3234
}
3335

3436
var writeOps []sqlite.OpType = []sqlite.OpType{
@@ -74,6 +76,7 @@ func (a *Authorizer) Reset() {
7476
defer a.mu.Unlock()
7577

7678
a.boundTime = 0
79+
a.LastTables = make(map[string]struct{})
7780
}
7881

7982
func (a *Authorizer) isJournalModeChange(action sqlite.Action) bool {
@@ -91,6 +94,11 @@ func (a *Authorizer) isAtlasChange(action sqlite.Action) bool {
9194
func (a *Authorizer) Authorize(action sqlite.Action) sqlite.AuthResult {
9295
Logger.Info("Auth", zap.Any("action", action), zap.String("table", action.Table()))
9396

97+
if action.Table() != "" && action.Database() != "" && action.Table() != "sqlite_master" {
98+
name := strings.ToUpper(action.Database()) + "." + strings.ToUpper(action.Table())
99+
a.LastTables[name] = struct{}{}
100+
}
101+
94102
if a.isJournalModeChange(action) || a.isAtlasChange(action) {
95103
return sqlite.AuthResultDeny
96104
}

atlas/consensus/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func (s *Server) applyMigration(migrations []*Migration, commitConn *sqlite.Conn
388388
return nil
389389
}
390390

391-
func constructCurrentNode() *Node {
391+
func ConstructCurrentNode() *Node {
392392
return &Node{
393393
Id: atlas.CurrentOptions.ServerId,
394394
Address: atlas.CurrentOptions.AdvertiseAddress,
@@ -515,7 +515,7 @@ VALUES (:id, :address, :port, :region, 1, current_timestamp, 0)`, conn, false, a
515515
}
516516

517517
mreq := &WriteMigrationRequest{
518-
Sender: constructCurrentNode(),
518+
Sender: ConstructCurrentNode(),
519519
Migration: migration,
520520
}
521521

@@ -635,7 +635,7 @@ func SendGossip(ctx context.Context, req *GossipMigration, conn *sqlite.Conn) er
635635
Table: req.GetTable(),
636636
PreviousMigration: req.GetPreviousMigration(),
637637
Ttl: req.GetTtl() - 1,
638-
Sender: constructCurrentNode(),
638+
Sender: ConstructCurrentNode(),
639639
}
640640

641641
wg := sync.WaitGroup{}

atlas/pool.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
_ "embed"
2424
"runtime"
2525
"strings"
26-
"sync"
2726
"zombiezen.com/go/sqlite"
2827
"zombiezen.com/go/sqlite/sqlitemigration"
2928
)
@@ -34,27 +33,18 @@ var migrations string
3433
var Pool *sqlitemigration.Pool
3534
var MigrationsPool *sqlitemigration.Pool
3635

37-
var authorizers = map[*sqlite.Conn]*Authorizer{}
38-
3936
// CreatePool creates a new connection pool for the database and the migrations database.
4037
func CreatePool(options *Options) {
4138
if Pool != nil {
4239
return
4340
}
4441

45-
authorizers = make(map[*sqlite.Conn]*Authorizer)
46-
amu := sync.Mutex{}
47-
4842
Pool = sqlitemigration.NewPool(options.DbFilename, sqlitemigration.Schema{}, sqlitemigration.Options{
4943
Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL,
5044
PoolSize: runtime.NumCPU() * 2,
5145
PrepareConn: func(conn *sqlite.Conn) (err error) {
5246
auth := &Authorizer{}
5347

54-
amu.Lock()
55-
authorizers[conn] = auth
56-
amu.Unlock()
57-
5848
err = conn.SetAuthorizer(auth)
5949
if err != nil {
6050
return

atlas/socket/handler.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type Socket struct {
4848
streams []*sqlite.Stmt
4949
principals []*consensus.Principal
5050
timeout time.Duration
51+
authorizer *atlas.Authorizer
5152
}
5253

5354
func (s *Socket) Cleanup() {
@@ -160,6 +161,9 @@ func (s *Socket) rollbackAutoTransaction(ctx context.Context, err error) error {
160161
}
161162

162163
func (s *Socket) setTimeout(t time.Duration) error {
164+
if t == 0 {
165+
return s.conn.SetDeadline(time.Time{})
166+
}
163167
return s.conn.SetDeadline(time.Now().Add(t))
164168
}
165169

@@ -247,10 +251,21 @@ ready:
247251
}
248252
defer atlas.Pool.Put(s.sql)
249253

254+
s.authorizer = &atlas.Authorizer{}
255+
256+
err = s.sql.SetAuthorizer(s.authorizer)
257+
if err != nil {
258+
atlas.Logger.Error("Error setting authorizer", zap.Error(err))
259+
return
260+
}
261+
defer s.sql.SetAuthorizer(nil)
262+
250263
if err = s.setTimeout(s.timeout); err != nil {
251264
return
252265
}
253266

267+
var changes []*consensus.Migration
268+
254269
for {
255270
select {
256271
case <-ctx.Done():
@@ -283,7 +298,30 @@ ready:
283298
_, err = s.PerformExecute(ctx, cmd)
284299
goto handleError
285300
case "QUERY":
286-
_, err = s.PerformQuery(ctx, cmd)
301+
var q *Query
302+
q, err = s.PerformQuery(ctx, cmd)
303+
if err != nil {
304+
goto handleError
305+
}
306+
if len(q.tables) > 0 {
307+
// we need ownership over all tables
308+
for _, table := range q.tables {
309+
qm := consensus.GetDefaultQuorumManager(ctx)
310+
var qu consensus.Quorum
311+
qu, err = qm.GetQuorum(ctx, table.GetName())
312+
if err != nil {
313+
err = makeFatal(err)
314+
goto handleError
315+
}
316+
317+
result, err = qu.StealTableOwnership(ctx, &consensus.StealTableOwnershipRequest{
318+
Sender: consensus.ConstructCurrentNode(),
319+
Reason: consensus.StealReason_schemaReason,
320+
Table: nil,
321+
})
322+
}
323+
}
324+
287325
goto handleError
288326
case "FINALIZE":
289327
err = s.PerformFinalize(cmd)

atlas/socket/query.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,55 @@
1919
package socket
2020

2121
import (
22+
"github.com/bottledcode/atlas-db/atlas"
2223
"github.com/bottledcode/atlas-db/atlas/commands"
24+
"github.com/bottledcode/atlas-db/atlas/consensus"
25+
"github.com/bottledcode/atlas-db/atlas/operations"
26+
"go.uber.org/zap"
2327
"zombiezen.com/go/sqlite"
2428
)
2529

2630
type Query struct {
27-
stmt *sqlite.Stmt
28-
query *commands.SqlCommand
31+
stmt *sqlite.Stmt
32+
query *commands.SqlCommand
33+
tables []*consensus.Table
2934
}
3035

31-
func ParseQuery(cmd *commands.CommandString) (*Query, error) {
36+
func ParseQuery(cmd *commands.CommandString) (query *Query, err error) {
3237
if err := cmd.CheckMinLen(2); err != nil {
3338
return nil, err
3439
}
3540

3641
q := cmd.From(1)
3742

38-
return &Query{
43+
query = &Query{
3944
query: q,
4045
stmt: nil,
41-
}, nil
46+
}
47+
48+
return query, nil
4249
}
4350

4451
func (q *Query) Handle(s *Socket) (err error) {
52+
s.authorizer.Reset()
4553
q.stmt, _, err = s.sql.PrepareTransient(q.query.Raw())
4654
if err != nil {
4755
return makeFatal(err)
4856
}
57+
tables := s.authorizer.LastTables
58+
atlas.Logger.Info("tables", zap.Any("tables", tables))
59+
60+
if !q.query.IsQueryReadOnly() {
61+
first, _ := q.query.SelectNormalizedCommand(0)
62+
if first == "CREATE" {
63+
q.tables, err = operations.CreateTable(q.query)
64+
if err != nil {
65+
return nil
66+
}
67+
}
68+
69+
// todo: handle alters
70+
}
4971

5072
s.streams = append(s.streams, q.stmt)
5173
return nil

docs/reference/commands.md

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,59 +122,127 @@ rules.
122122
```
123123
Prepares a query for execution, associating it with an identifier.
124124

125+
Returns: OK or ERROR
126+
125127
2. **Execute a Prepared Query:**
126128
```
127129
EXECUTE [ID]\r\n
128130
```
129131
Executes a previously prepared query.
130132

133+
Returns: Stream + Metadata + OK
134+
131135
3. **Perform a One-Off Query:**
132136
```
133137
QUERY [RAW QUERY]\r\n
134138
```
135139
Executes a single query without preparation.
136140

141+
Returns: Stream + Metadata + OK
142+
137143
4. **Remove a Prepared Query:**
138144
```
139145
FINALIZE [ID]\r\n
140146
```
141147
Removes a previously prepared query from memory.
142148

149+
Returns: OK or ERROR
150+
143151
5. **Bind a Value to a Prepared Query:**
144152
```
145153
BIND [ID] [PARAM] [TYPE] [VALUE]\r\n
146154
```
147155
Binds a value to a parameter in a prepared query.
148156

149-
6. **Transaction Commands:**
157+
Returns: OK or ERROR
158+
159+
6. **Begin:**
160+
161+
```
162+
BEGIN\r\n
163+
```
164+
Starts a readonly transaction.
165+
166+
Returns: OK or ERROR
167+
168+
7. **BEGIN IMMEDIATE**
169+
```
170+
BEGIN IMMEDIATE WITH [TABLE (table_name, other table)] [VIEW (view_name, other_view)] [TRIGGER (trigger_name)]\r\n
171+
```
172+
Starts a writable transaction with write access to the declared objects.
173+
Note that if any tables require a principal, the principal must be declared before starting the transaction.
174+
175+
Returns: OK or ERROR
176+
177+
8. **COMMIT**
150178
```
151-
BEGIN IMMEDIATE\r\n
152179
COMMIT\r\n
180+
```
181+
Commits the current transaction.
182+
183+
Returns: OK or ERROR
184+
185+
9. **ROLLBACK**
186+
```
153187
ROLLBACK\r\n
188+
```
189+
Rollback the current transaction.
190+
191+
Returns: OK or ERROR
192+
193+
10. **SAVEPOINT**
194+
```
154195
SAVEPOINT [name]\r\n
155-
RELEASE [name]\r\n
156196
```
157-
Manages database transactions, including savepoints for partial rollbacks.
197+
Create a savepoint in the current transaction.
198+
199+
Returns: OK or ERROR
158200

159-
7. **Pragmas:**
201+
11. **RELEASE**
160202
```
161-
PRAGMA [name]=[VALUE]\r\n
162-
PRAGMA [name]\r\n
203+
RELEASE [name]\r\n
163204
```
164-
Configures or queries node-specific settings.
205+
Release a savepoint in the current transaction.
206+
207+
12. **Pragmas:**
208+
```
209+
PRAGMA [name]=[VALUE]\r\n
210+
PRAGMA [name]\r\n
211+
```
212+
Configures or queries node-specific settings.
213+
214+
13. **Principle Command:**
215+
```
216+
PRINCIPLE [principle_name] [id]\r\n
217+
```
218+
Sets the principle context for row-level security.
219+
A valid principle must be established before interacting with tables that use row-level security.
220+
It cannot be used in a transaction.
221+
222+
Returns: OK or ERROR
165223
166-
8. **Principle Command:**
224+
14. **Scroll Command:**
225+
```
226+
SCROLL [StreamID] [Count]\r\n
227+
```
228+
Fetches a specific number of rows from a stream identified by `StreamID`.
229+
230+
RETURNS: ROWS + METADATA + OK
231+
15. **RESET:**
167232
```
168-
PRINCIPLE [principle_name] [id]\r\n
233+
RESET [ID]\r\n
169234
```
170-
Sets the principle context for row-level security. A valid principle must be established before interacting with
171-
tables that use row-level security.
235+
Resets a prepared query, so it can be run again.
236+
237+
Returns: OK or ERROR
172238
173-
9. **Scroll Command:**
239+
16. **CLEAR BINDINGS:**
174240
```
175-
SCROLL [StreamID] [Count]\r\n
241+
CLEARBINDINGS [ID]\r\n
176242
```
177-
Fetches a specific number of rows from a stream identified by `StreamID`.
243+
Clears the bindings for a prepared query.
244+
245+
Returns: OK or ERROR
178246
179247
---
180248

0 commit comments

Comments
 (0)