Skip to content

Commit c732979

Browse files
committed
accounts: add migration code from kvdb to SQL
This commit introduces the migration logic for transitioning the accounts store from kvdb to SQL. Note that as of this commit, the migration is not yet triggered by any production code, i.e. only tests execute the migration logic.
1 parent 6030f65 commit c732979

File tree

3 files changed

+615
-2
lines changed

3 files changed

+615
-2
lines changed

accounts/sql_migration.go

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package accounts
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"math"
9+
"reflect"
10+
"time"
11+
12+
"github.com/davecgh/go-spew/spew"
13+
"github.com/lightninglabs/lightning-terminal/db/sqlc"
14+
"github.com/lightningnetwork/lnd/lntypes"
15+
"github.com/pmezard/go-difflib/difflib"
16+
)
17+
18+
var (
19+
// ErrMigrationMismatch is returned when the migrated account does not
20+
// match the original account.
21+
ErrMigrationMismatch = fmt.Errorf("migrated account does not match " +
22+
"original account")
23+
)
24+
25+
// MigrateAccountStoreToSQL runs the migration of all accounts and indices from
26+
// the KV database to the SQL database. The migration is done in a single
27+
// transaction to ensure that all accounts are migrated or none at all.
28+
func MigrateAccountStoreToSQL(ctx context.Context, kvStore *BoltStore,
29+
tx SQLQueries) error {
30+
31+
log.Infof("Starting migration of the KV accounts store to SQL")
32+
33+
err := migrateAccountsToSQL(ctx, kvStore, tx)
34+
if err != nil {
35+
return fmt.Errorf("unsuccessful migration of accounts to "+
36+
"SQL: %w", err)
37+
}
38+
39+
err = migrateAccountsIndicesToSQL(ctx, kvStore, tx)
40+
if err != nil {
41+
return fmt.Errorf("unsuccessful migration of account indices "+
42+
"to SQL: %w", err)
43+
}
44+
45+
return nil
46+
}
47+
48+
// migrateAccountsToSQL runs the migration of all accounts from the KV database
49+
// to the SQL database. The migration is done in a single transaction to ensure
50+
// that all accounts are migrated or none at all.
51+
func migrateAccountsToSQL(ctx context.Context, kvStore *BoltStore,
52+
tx SQLQueries) error {
53+
54+
log.Infof("Starting migration of accounts from KV to SQL")
55+
56+
kvAccounts, err := kvStore.Accounts(ctx)
57+
if err != nil {
58+
return err
59+
}
60+
61+
for _, kvAccount := range kvAccounts {
62+
migratedAccountID, err := migrateSingleAccountToSQL(
63+
ctx, tx, kvAccount,
64+
)
65+
if err != nil {
66+
return fmt.Errorf("unable to migrate account(%v): %w",
67+
kvAccount.ID, err)
68+
}
69+
70+
migratedAccount, err := getAndMarshalAccount(
71+
ctx, tx, migratedAccountID,
72+
)
73+
if err != nil {
74+
return fmt.Errorf("unable to fetch migrated "+
75+
"account(%v): %w", kvAccount.ID, err)
76+
}
77+
78+
overrideAccountTimeZone(kvAccount)
79+
overrideAccountTimeZone(migratedAccount)
80+
81+
if !reflect.DeepEqual(kvAccount, migratedAccount) {
82+
diff := difflib.UnifiedDiff{
83+
A: difflib.SplitLines(
84+
spew.Sdump(kvAccount),
85+
),
86+
B: difflib.SplitLines(
87+
spew.Sdump(migratedAccount),
88+
),
89+
FromFile: "Expected",
90+
FromDate: "",
91+
ToFile: "Actual",
92+
ToDate: "",
93+
Context: 3,
94+
}
95+
diffText, _ := difflib.GetUnifiedDiffString(diff)
96+
97+
return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch,
98+
kvAccount.ID, diffText)
99+
}
100+
}
101+
102+
log.Infof("All accounts migrated from KV to SQL. Total number of "+
103+
"accounts migrated: %d", len(kvAccounts))
104+
105+
return nil
106+
}
107+
108+
// migrateSingleAccountToSQL runs the migration for a single account from the
109+
// KV database to the SQL database.
110+
func migrateSingleAccountToSQL(ctx context.Context,
111+
tx SQLQueries, account *OffChainBalanceAccount) (int64, error) {
112+
113+
insertAccountParams, err := makeInsertAccountParams(account)
114+
if err != nil {
115+
return 0, err
116+
}
117+
118+
sqlId, err := tx.InsertAccount(ctx, insertAccountParams)
119+
if err != nil {
120+
return 0, err
121+
}
122+
123+
for hash := range account.Invoices {
124+
addInvoiceParams := makeAddAccountInvoiceParams(sqlId, hash)
125+
126+
err = tx.AddAccountInvoice(ctx, addInvoiceParams)
127+
if err != nil {
128+
return sqlId, err
129+
}
130+
}
131+
132+
for hash, paymentEntry := range account.Payments {
133+
upsertPaymentParams := makeUpsertAccountPaymentParams(
134+
sqlId, hash, paymentEntry,
135+
)
136+
137+
err = tx.UpsertAccountPayment(ctx, upsertPaymentParams)
138+
if err != nil {
139+
return sqlId, err
140+
}
141+
}
142+
143+
return sqlId, nil
144+
}
145+
146+
// migrateAccountsIndicesToSQL runs the migration for the account indices from
147+
// the KV database to the SQL database.
148+
func migrateAccountsIndicesToSQL(ctx context.Context, kvStore *BoltStore,
149+
tx SQLQueries) error {
150+
151+
log.Infof("Starting migration of accounts indices from KV to SQL")
152+
153+
addIndex, settleIndex, err := kvStore.LastIndexes(ctx)
154+
if errors.Is(err, ErrNoInvoiceIndexKnown) {
155+
log.Infof("No indices found in KV store, skipping migration")
156+
return nil
157+
} else if err != nil {
158+
return err
159+
}
160+
161+
setAddIndexParams, err := makeSetAccountIndexParams(
162+
addIndex, addIndexName,
163+
)
164+
if err != nil {
165+
return err
166+
}
167+
168+
err = tx.SetAccountIndex(ctx, setAddIndexParams)
169+
if err != nil {
170+
return err
171+
}
172+
173+
setSettleIndexParams, err := makeSetAccountIndexParams(
174+
settleIndex, settleIndexName,
175+
)
176+
if err != nil {
177+
return err
178+
}
179+
180+
err = tx.SetAccountIndex(ctx, setSettleIndexParams)
181+
if err != nil {
182+
return err
183+
}
184+
185+
log.Infof("Successfully migratated accounts indices from KV to SQL")
186+
187+
return nil
188+
}
189+
190+
// overrideAccountTimeZone overrides the time zone of the account to the local
191+
// time zone and chops off the nanosecond part for comparison. This is needed
192+
// because KV database stores times as-is which as an unwanted side effect would
193+
// fail migration due to time comparison expecting both the original and
194+
// migrated accounts to be in the same local time zone and in microsecond
195+
// precision. Note that PostgresSQL stores times in microsecond precision while
196+
// SQLite can store times in nanosecond precision if using TEXT storage class.
197+
func overrideAccountTimeZone(account *OffChainBalanceAccount) {
198+
fixTime := func(t time.Time) time.Time {
199+
return t.In(time.Local).Truncate(time.Microsecond)
200+
}
201+
202+
if !account.ExpirationDate.IsZero() {
203+
account.ExpirationDate = fixTime(account.ExpirationDate)
204+
}
205+
206+
if !account.LastUpdate.IsZero() {
207+
account.LastUpdate = fixTime(account.LastUpdate)
208+
}
209+
}
210+
211+
func makeInsertAccountParams(account *OffChainBalanceAccount) (
212+
sqlc.InsertAccountParams, error) {
213+
214+
accountAlias, err := account.ID.ToInt64()
215+
if err != nil {
216+
return sqlc.InsertAccountParams{}, err
217+
}
218+
219+
return sqlc.InsertAccountParams{
220+
Type: int16(account.Type),
221+
InitialBalanceMsat: int64(account.InitialBalance),
222+
CurrentBalanceMsat: account.CurrentBalance,
223+
LastUpdated: account.LastUpdate.UTC(),
224+
Alias: accountAlias,
225+
Expiration: account.ExpirationDate.UTC(),
226+
Label: sql.NullString{
227+
String: account.Label,
228+
Valid: len(account.Label) > 0,
229+
},
230+
}, nil
231+
}
232+
233+
func makeAddAccountInvoiceParams(sqlID int64,
234+
hash lntypes.Hash) sqlc.AddAccountInvoiceParams {
235+
236+
return sqlc.AddAccountInvoiceParams{
237+
AccountID: sqlID,
238+
Hash: hash[:],
239+
}
240+
}
241+
242+
func makeUpsertAccountPaymentParams(sqlID int64, hash lntypes.Hash,
243+
entry *PaymentEntry) sqlc.UpsertAccountPaymentParams {
244+
245+
return sqlc.UpsertAccountPaymentParams{
246+
AccountID: sqlID,
247+
Hash: hash[:],
248+
Status: int16(entry.Status),
249+
FullAmountMsat: int64(entry.FullAmount),
250+
}
251+
}
252+
253+
func makeSetAccountIndexParams(indexValue uint64,
254+
indexName string) (sqlc.SetAccountIndexParams, error) {
255+
256+
if indexValue > math.MaxInt64 {
257+
return sqlc.SetAccountIndexParams{}, fmt.Errorf("%s:%v is "+
258+
"above max int64 value", indexName, indexValue)
259+
}
260+
261+
return sqlc.SetAccountIndexParams{
262+
Name: indexName,
263+
Value: int64(indexValue),
264+
}, nil
265+
}

0 commit comments

Comments
 (0)