Skip to content

Commit d9064f6

Browse files
committed
firewalldb: add kvstores SQL implementation
1 parent 1f3592d commit d9064f6

File tree

7 files changed

+548
-5
lines changed

7 files changed

+548
-5
lines changed

firewalldb/kvstores_sql.go

Lines changed: 397 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,397 @@
1+
package firewalldb
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"database/sql"
7+
"errors"
8+
"fmt"
9+
10+
"github.com/lightninglabs/lightning-terminal/db"
11+
"github.com/lightninglabs/lightning-terminal/db/sqlc"
12+
"github.com/lightninglabs/lightning-terminal/session"
13+
"github.com/lightningnetwork/lnd/fn"
14+
)
15+
16+
// SQLSessionQueries is a subset of the sqlc.Queries interface that can be used
17+
// to interact with the sessions tables.
18+
type SQLSessionQueries interface {
19+
GetSessionIDByAlias(ctx context.Context, legacyID []byte) (int64, error)
20+
}
21+
22+
type SQLKVStoreQueries interface {
23+
SQLSessionQueries
24+
25+
DeleteFeatureKVStoreRecord(ctx context.Context, arg sqlc.DeleteFeatureKVStoreRecordParams) error
26+
DeleteGlobalKVStoreRecord(ctx context.Context, arg sqlc.DeleteGlobalKVStoreRecordParams) error
27+
DeleteSessionKVStoreRecord(ctx context.Context, arg sqlc.DeleteSessionKVStoreRecordParams) error
28+
GetFeatureKVStoreRecord(ctx context.Context, arg sqlc.GetFeatureKVStoreRecordParams) ([]byte, error)
29+
GetGlobalKVStoreRecord(ctx context.Context, arg sqlc.GetGlobalKVStoreRecordParams) ([]byte, error)
30+
GetSessionKVStoreRecord(ctx context.Context, arg sqlc.GetSessionKVStoreRecordParams) ([]byte, error)
31+
UpdateFeatureKVStoreRecord(ctx context.Context, arg sqlc.UpdateFeatureKVStoreRecordParams) error
32+
UpdateGlobalKVStoreRecord(ctx context.Context, arg sqlc.UpdateGlobalKVStoreRecordParams) error
33+
UpdateSessionKVStoreRecord(ctx context.Context, arg sqlc.UpdateSessionKVStoreRecordParams) error
34+
InsertKVStoreRecord(ctx context.Context, arg sqlc.InsertKVStoreRecordParams) error
35+
DeleteAllTempKVStores(ctx context.Context) error
36+
GetOrInsertFeatureID(ctx context.Context, name string) (int64, error)
37+
GetOrInsertRuleID(ctx context.Context, name string) (int64, error)
38+
GetFeatureID(ctx context.Context, name string) (int64, error)
39+
GetRuleID(ctx context.Context, name string) (int64, error)
40+
}
41+
42+
func (s *SQLDB) DeleteTempKVStores(ctx context.Context) error {
43+
var writeTxOpts db.QueriesTxOptions
44+
45+
return s.db.ExecTx(ctx, &writeTxOpts, func(tx SQLQueries) error {
46+
return tx.DeleteAllTempKVStores(ctx)
47+
})
48+
}
49+
50+
func (s *SQLDB) GetKVStores(rule string, groupAlias session.ID,
51+
feature string) KVStores {
52+
53+
return &sqlExecutor[KVStoreTx]{
54+
db: s.db,
55+
wrapTx: func(queries SQLQueries) KVStoreTx {
56+
return &sqlKVStoresSQLTx{
57+
queries: queries,
58+
db: &kvStoreSQLDB{
59+
SQLDB: s,
60+
groupAlias: groupAlias,
61+
rule: rule,
62+
feature: feature,
63+
},
64+
}
65+
},
66+
}
67+
}
68+
69+
type kvStoreSQLDB struct {
70+
*SQLDB
71+
groupAlias session.ID
72+
rule string
73+
feature string
74+
}
75+
76+
type sqlKVStoresSQLTx struct {
77+
db *kvStoreSQLDB
78+
queries SQLKVStoreQueries
79+
}
80+
81+
type sqlKVStore struct {
82+
*sqlKVStoresSQLTx
83+
84+
params *sqlKVStoreParams
85+
}
86+
87+
func (s *sqlKVStoresSQLTx) Global() KVStore {
88+
return &sqlKVStore{
89+
sqlKVStoresSQLTx: s,
90+
params: &sqlKVStoreParams{
91+
perm: true,
92+
ruleName: s.db.rule,
93+
},
94+
}
95+
}
96+
97+
func (s *sqlKVStoresSQLTx) Local() KVStore {
98+
var featureName fn.Option[string]
99+
if s.db.feature != "" {
100+
featureName = fn.Some(s.db.feature)
101+
}
102+
103+
return &sqlKVStore{
104+
sqlKVStoresSQLTx: s,
105+
params: &sqlKVStoreParams{
106+
perm: true,
107+
ruleName: s.db.rule,
108+
sessionID: fn.Some(s.db.groupAlias),
109+
featureName: featureName,
110+
},
111+
}
112+
}
113+
114+
func (s *sqlKVStoresSQLTx) GlobalTemp() KVStore {
115+
return &sqlKVStore{
116+
sqlKVStoresSQLTx: s,
117+
params: &sqlKVStoreParams{
118+
perm: false,
119+
ruleName: s.db.rule,
120+
},
121+
}
122+
}
123+
124+
func (s *sqlKVStoresSQLTx) LocalTemp() KVStore {
125+
var featureName fn.Option[string]
126+
if s.db.feature != "" {
127+
featureName = fn.Some(s.db.feature)
128+
}
129+
130+
return &sqlKVStore{
131+
sqlKVStoresSQLTx: s,
132+
params: &sqlKVStoreParams{
133+
perm: false,
134+
ruleName: s.db.rule,
135+
sessionID: fn.Some(s.db.groupAlias),
136+
featureName: featureName,
137+
},
138+
}
139+
}
140+
141+
var _ KVStoreTx = (*sqlKVStoresSQLTx)(nil)
142+
143+
type sqlKVStoreParams struct {
144+
perm bool
145+
ruleName string
146+
sessionID fn.Option[session.ID]
147+
featureName fn.Option[string]
148+
}
149+
150+
func (s *sqlKVStore) genNamespaceFields(ctx context.Context,
151+
readOnly bool) (int64, sql.NullInt64, sql.NullInt64, error) {
152+
153+
var (
154+
sessionID sql.NullInt64
155+
featureID sql.NullInt64
156+
ruleID int64
157+
err error
158+
)
159+
160+
s.params.sessionID.WhenSome(func(id session.ID) {
161+
var groupID int64
162+
groupID, err = s.queries.GetSessionIDByAlias(ctx, id[:])
163+
if errors.Is(err, sql.ErrNoRows) {
164+
err = session.ErrUnknownGroup
165+
166+
return
167+
} else if err != nil {
168+
return
169+
}
170+
171+
sessionID = sql.NullInt64{
172+
Int64: groupID,
173+
Valid: true,
174+
}
175+
})
176+
if err != nil {
177+
return ruleID, sessionID, featureID, err
178+
}
179+
180+
if readOnly {
181+
ruleID, err = s.queries.GetRuleID(ctx, s.params.ruleName)
182+
if err != nil {
183+
return 0, sessionID, featureID,
184+
fmt.Errorf("unable to get rule ID: %w", err)
185+
}
186+
} else {
187+
ruleID, err = s.queries.GetOrInsertRuleID(
188+
ctx, s.params.ruleName,
189+
)
190+
if err != nil {
191+
return 0, sessionID, featureID,
192+
fmt.Errorf("unable to get rule ID: %w", err)
193+
}
194+
}
195+
196+
s.params.featureName.WhenSome(func(feature string) {
197+
var id int64
198+
if readOnly {
199+
id, err = s.queries.GetFeatureID(ctx, feature)
200+
if err != nil {
201+
return
202+
}
203+
} else {
204+
id, err = s.queries.GetOrInsertFeatureID(ctx, feature)
205+
if err != nil {
206+
return
207+
}
208+
}
209+
210+
featureID = sql.NullInt64{
211+
Int64: id,
212+
Valid: true,
213+
}
214+
})
215+
216+
return ruleID, sessionID, featureID, err
217+
}
218+
219+
func (s *sqlKVStore) Get(ctx context.Context, key string) ([]byte, error) {
220+
value, err := s.get(ctx, key)
221+
if errors.Is(err, sql.ErrNoRows) ||
222+
errors.Is(err, session.ErrUnknownGroup) {
223+
224+
return nil, nil
225+
} else if err != nil {
226+
return nil, err
227+
}
228+
229+
return value, nil
230+
}
231+
232+
func (s *sqlKVStore) get(ctx context.Context, key string) ([]byte, error) {
233+
ruleID, sessionID, featureID, err := s.genNamespaceFields(ctx, true)
234+
if err != nil {
235+
return nil, err
236+
}
237+
238+
switch {
239+
case sessionID.Valid && featureID.Valid:
240+
return s.queries.GetFeatureKVStoreRecord(
241+
ctx, sqlc.GetFeatureKVStoreRecordParams{
242+
Key: key,
243+
Perm: s.params.perm,
244+
SessionID: sessionID,
245+
RuleID: ruleID,
246+
FeatureID: featureID,
247+
},
248+
)
249+
250+
case sessionID.Valid:
251+
return s.queries.GetSessionKVStoreRecord(
252+
ctx, sqlc.GetSessionKVStoreRecordParams{
253+
Key: key,
254+
Perm: s.params.perm,
255+
SessionID: sessionID,
256+
RuleID: ruleID,
257+
},
258+
)
259+
260+
case featureID.Valid:
261+
return nil, fmt.Errorf("a global feature kv store is " +
262+
"not currently supported")
263+
default:
264+
return s.queries.GetGlobalKVStoreRecord(
265+
ctx, sqlc.GetGlobalKVStoreRecordParams{
266+
Key: key,
267+
Perm: s.params.perm,
268+
RuleID: ruleID,
269+
},
270+
)
271+
}
272+
}
273+
274+
func (s *sqlKVStore) Set(ctx context.Context, key string, value []byte) error {
275+
ruleID, sessionID, featureID, err := s.genNamespaceFields(ctx, false)
276+
if err != nil {
277+
return err
278+
}
279+
280+
// We first need to figure out if we are inserting a new record or
281+
// updating an existing one. So first do a GET with the same set of
282+
// params.
283+
oldValue, err := s.get(ctx, key)
284+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
285+
return err
286+
}
287+
288+
// No such entry. Add new record.
289+
if errors.Is(err, sql.ErrNoRows) {
290+
return s.queries.InsertKVStoreRecord(
291+
ctx, sqlc.InsertKVStoreRecordParams{
292+
EntryKey: key,
293+
Value: value,
294+
Perm: s.params.perm,
295+
RuleID: ruleID,
296+
SessionID: sessionID,
297+
FeatureID: featureID,
298+
},
299+
)
300+
}
301+
302+
// If an entry exists but the value has not changed, there is nothing
303+
// left to do.
304+
if bytes.Equal(oldValue, value) {
305+
return nil
306+
}
307+
308+
// Otherwise, the key exists but the value needs to be updated.
309+
switch {
310+
case sessionID.Valid && featureID.Valid:
311+
return s.queries.UpdateFeatureKVStoreRecord(
312+
ctx, sqlc.UpdateFeatureKVStoreRecordParams{
313+
Key: key,
314+
Value: value,
315+
Perm: s.params.perm,
316+
SessionID: sessionID,
317+
RuleID: ruleID,
318+
FeatureID: featureID,
319+
},
320+
)
321+
322+
case sessionID.Valid:
323+
return s.queries.UpdateSessionKVStoreRecord(
324+
ctx, sqlc.UpdateSessionKVStoreRecordParams{
325+
Key: key,
326+
Value: value,
327+
Perm: s.params.perm,
328+
SessionID: sessionID,
329+
RuleID: ruleID,
330+
},
331+
)
332+
333+
case featureID.Valid:
334+
return fmt.Errorf("a global feature kv store is " +
335+
"not currently supported")
336+
default:
337+
return s.queries.UpdateGlobalKVStoreRecord(
338+
ctx, sqlc.UpdateGlobalKVStoreRecordParams{
339+
Key: key,
340+
Value: value,
341+
Perm: s.params.perm,
342+
RuleID: ruleID,
343+
},
344+
)
345+
}
346+
}
347+
348+
func (s *sqlKVStore) Del(ctx context.Context, key string) error {
349+
// Note: we pass in true here for "read-only" since because this is a
350+
// Delete, if the record does not exist, we don't need to create one.
351+
// But no need to error out if it doesn't exist.
352+
ruleID, sessionID, featureID, err := s.genNamespaceFields(ctx, true)
353+
if errors.Is(err, sql.ErrNoRows) ||
354+
errors.Is(err, session.ErrUnknownGroup) {
355+
356+
return nil
357+
} else if err != nil {
358+
return err
359+
}
360+
361+
switch {
362+
case sessionID.Valid && featureID.Valid:
363+
return s.queries.DeleteFeatureKVStoreRecord(
364+
ctx, sqlc.DeleteFeatureKVStoreRecordParams{
365+
Key: key,
366+
Perm: s.params.perm,
367+
SessionID: sessionID,
368+
RuleID: ruleID,
369+
FeatureID: featureID,
370+
},
371+
)
372+
373+
case sessionID.Valid:
374+
return s.queries.DeleteSessionKVStoreRecord(
375+
ctx, sqlc.DeleteSessionKVStoreRecordParams{
376+
Key: key,
377+
Perm: s.params.perm,
378+
SessionID: sessionID,
379+
RuleID: ruleID,
380+
},
381+
)
382+
383+
case featureID.Valid:
384+
return fmt.Errorf("a global feature kv store is " +
385+
"not currently supported")
386+
default:
387+
return s.queries.DeleteGlobalKVStoreRecord(
388+
ctx, sqlc.DeleteGlobalKVStoreRecordParams{
389+
Key: key,
390+
Perm: s.params.perm,
391+
RuleID: ruleID,
392+
},
393+
)
394+
}
395+
}
396+
397+
var _ KVStore = (*sqlKVStore)(nil)

0 commit comments

Comments
 (0)