Skip to content

Commit a7d5a9c

Browse files
committed
Sample
1 parent 391cd2c commit a7d5a9c

File tree

2 files changed

+218
-4
lines changed

2 files changed

+218
-4
lines changed

tools/pgimport/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# pgimport
2+
3+
Migrates ntfy data from SQLite to PostgreSQL.
4+
5+
## Build
6+
7+
```bash
8+
go build -o pgimport ./tools/pgimport/
9+
```
10+
11+
## Usage
12+
13+
```bash
14+
# Using CLI flags
15+
pgimport \
16+
--database-url "postgres://user:pass@host:5432/ntfy?sslmode=require" \
17+
--cache-file /var/cache/ntfy/cache.db \
18+
--auth-file /var/lib/ntfy/user.db \
19+
--web-push-file /var/lib/ntfy/webpush.db
20+
21+
# Using server.yml (flags override config values)
22+
pgimport --config /etc/ntfy/server.yml
23+
```
24+
25+
## Prerequisites
26+
27+
- PostgreSQL schema must already be set up (run ntfy with `database-url` once)
28+
- ntfy must not be running during the import
29+
- All three SQLite files are optional; only the ones specified will be imported
30+
31+
## Notes
32+
33+
- The tool is idempotent and safe to re-run
34+
- After importing, row counts and content are verified against the SQLite sources
35+
- Invalid UTF-8 in messages is replaced with the Unicode replacement character

tools/pgimport/main.go

Lines changed: 183 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -658,10 +658,35 @@ func verifyUsers(sqliteFile string, pgDB *sql.DB, failed *bool) error {
658658
defer sqlDB.Close()
659659

660660
verifyCount(sqlDB, pgDB, "tier", `SELECT COUNT(*) FROM tier`, `SELECT COUNT(*) FROM tier`, failed)
661+
verifyContent(sqlDB, pgDB, "tier",
662+
`SELECT id, code, name FROM tier ORDER BY id`,
663+
`SELECT id, code, name FROM tier ORDER BY id COLLATE "C"`,
664+
failed)
665+
661666
verifyCount(sqlDB, pgDB, "user", `SELECT COUNT(*) FROM user`, `SELECT COUNT(*) FROM "user"`, failed)
667+
verifyContent(sqlDB, pgDB, "user",
668+
`SELECT id, user, role, sync_topic FROM user ORDER BY id`,
669+
`SELECT id, user_name, role, sync_topic FROM "user" ORDER BY id COLLATE "C"`,
670+
failed)
671+
662672
verifyCount(sqlDB, pgDB, "user_access", `SELECT COUNT(*) FROM user_access a JOIN user u ON u.id = a.user_id`, `SELECT COUNT(*) FROM user_access`, failed)
673+
verifyContent(sqlDB, pgDB, "user_access",
674+
`SELECT a.user_id, a.topic FROM user_access a JOIN user u ON u.id = a.user_id ORDER BY a.user_id, a.topic`,
675+
`SELECT user_id, topic FROM user_access ORDER BY user_id COLLATE "C", topic COLLATE "C"`,
676+
failed)
677+
663678
verifyCount(sqlDB, pgDB, "user_token", `SELECT COUNT(*) FROM user_token t JOIN user u ON u.id = t.user_id`, `SELECT COUNT(*) FROM user_token`, failed)
679+
verifyContent(sqlDB, pgDB, "user_token",
680+
`SELECT t.user_id, t.token, t.label FROM user_token t JOIN user u ON u.id = t.user_id ORDER BY t.user_id, t.token`,
681+
`SELECT user_id, token, label FROM user_token ORDER BY user_id COLLATE "C", token COLLATE "C"`,
682+
failed)
683+
664684
verifyCount(sqlDB, pgDB, "user_phone", `SELECT COUNT(*) FROM user_phone p JOIN user u ON u.id = p.user_id`, `SELECT COUNT(*) FROM user_phone`, failed)
685+
verifyContent(sqlDB, pgDB, "user_phone",
686+
`SELECT p.user_id, p.phone_number FROM user_phone p JOIN user u ON u.id = p.user_id ORDER BY p.user_id, p.phone_number`,
687+
`SELECT user_id, phone_number FROM user_phone ORDER BY user_id COLLATE "C", phone_number COLLATE "C"`,
688+
failed)
689+
665690
return nil
666691
}
667692

@@ -673,6 +698,7 @@ func verifyMessages(sqliteFile string, pgDB *sql.DB, failed *bool) error {
673698
defer sqlDB.Close()
674699

675700
verifyCount(sqlDB, pgDB, "messages", `SELECT COUNT(*) FROM messages`, `SELECT COUNT(*) FROM message`, failed)
701+
verifySampledMessages(sqlDB, pgDB, failed)
676702
return nil
677703
}
678704

@@ -684,26 +710,179 @@ func verifyWebPush(sqliteFile string, pgDB *sql.DB, failed *bool) error {
684710
defer sqlDB.Close()
685711

686712
verifyCount(sqlDB, pgDB, "subscription", `SELECT COUNT(*) FROM subscription`, `SELECT COUNT(*) FROM webpush_subscription`, failed)
713+
verifyContent(sqlDB, pgDB, "subscription",
714+
`SELECT id, endpoint, key_auth, key_p256dh, user_id FROM subscription ORDER BY id`,
715+
`SELECT id, endpoint, key_auth, key_p256dh, user_id FROM webpush_subscription ORDER BY id COLLATE "C"`,
716+
failed)
717+
687718
verifyCount(sqlDB, pgDB, "subscription_topic", `SELECT COUNT(*) FROM subscription_topic`, `SELECT COUNT(*) FROM webpush_subscription_topic`, failed)
719+
verifyContent(sqlDB, pgDB, "subscription_topic",
720+
`SELECT subscription_id, topic FROM subscription_topic ORDER BY subscription_id, topic`,
721+
`SELECT subscription_id, topic FROM webpush_subscription_topic ORDER BY subscription_id COLLATE "C", topic COLLATE "C"`,
722+
failed)
723+
688724
return nil
689725
}
690726

691727
func verifyCount(sqlDB, pgDB *sql.DB, table, sqliteQuery, pgQuery string, failed *bool) {
692728
var sqliteCount, pgCount int64
693729
if err := sqlDB.QueryRow(sqliteQuery).Scan(&sqliteCount); err != nil {
694-
fmt.Printf(" %-20s ERROR reading SQLite: %s\n", table, err)
730+
fmt.Printf(" %-25s count ERROR reading SQLite: %s\n", table, err)
695731
*failed = true
696732
return
697733
}
698734
if err := pgDB.QueryRow(pgQuery).Scan(&pgCount); err != nil {
699-
fmt.Printf(" %-20s ERROR reading PostgreSQL: %s\n", table, err)
735+
fmt.Printf(" %-25s count ERROR reading PostgreSQL: %s\n", table, err)
700736
*failed = true
701737
return
702738
}
703739
if sqliteCount == pgCount {
704-
fmt.Printf(" %-20s OK (%d rows)\n", table, pgCount)
740+
fmt.Printf(" %-25s count OK (%d rows)\n", table, pgCount)
705741
} else {
706-
fmt.Printf(" %-20s MISMATCH: SQLite=%d, PostgreSQL=%d\n", table, sqliteCount, pgCount)
742+
fmt.Printf(" %-25s count MISMATCH: SQLite=%d, PostgreSQL=%d\n", table, sqliteCount, pgCount)
743+
*failed = true
744+
}
745+
}
746+
747+
func verifyContent(sqlDB, pgDB *sql.DB, table, sqliteQuery, pgQuery string, failed *bool) {
748+
sqliteRows, err := sqlDB.Query(sqliteQuery)
749+
if err != nil {
750+
fmt.Printf(" %-25s content ERROR reading SQLite: %s\n", table, err)
751+
*failed = true
752+
return
753+
}
754+
defer sqliteRows.Close()
755+
756+
pgRows, err := pgDB.Query(pgQuery)
757+
if err != nil {
758+
fmt.Printf(" %-25s content ERROR reading PostgreSQL: %s\n", table, err)
759+
*failed = true
760+
return
761+
}
762+
defer pgRows.Close()
763+
764+
cols, err := sqliteRows.Columns()
765+
if err != nil {
766+
fmt.Printf(" %-25s content ERROR reading columns: %s\n", table, err)
707767
*failed = true
768+
return
769+
}
770+
numCols := len(cols)
771+
772+
rowNum := 0
773+
mismatches := 0
774+
for sqliteRows.Next() {
775+
rowNum++
776+
if !pgRows.Next() {
777+
fmt.Printf(" %-25s content MISMATCH: PostgreSQL has fewer rows (at row %d)\n", table, rowNum)
778+
*failed = true
779+
return
780+
}
781+
sqliteVals := makeStringSlice(numCols)
782+
pgVals := makeStringSlice(numCols)
783+
if err := sqliteRows.Scan(sqliteVals...); err != nil {
784+
fmt.Printf(" %-25s content ERROR scanning SQLite row %d: %s\n", table, rowNum, err)
785+
*failed = true
786+
return
787+
}
788+
if err := pgRows.Scan(pgVals...); err != nil {
789+
fmt.Printf(" %-25s content ERROR scanning PostgreSQL row %d: %s\n", table, rowNum, err)
790+
*failed = true
791+
return
792+
}
793+
for i := 0; i < numCols; i++ {
794+
sv := *(sqliteVals[i].(*sql.NullString))
795+
pv := *(pgVals[i].(*sql.NullString))
796+
if sv != pv {
797+
mismatches++
798+
if mismatches <= 3 {
799+
fmt.Printf(" %-25s content MISMATCH at row %d, col %s: SQLite=%q, PostgreSQL=%q\n", table, rowNum, cols[i], sv.String, pv.String)
800+
}
801+
}
802+
}
803+
}
804+
if pgRows.Next() {
805+
fmt.Printf(" %-25s content MISMATCH: PostgreSQL has more rows than SQLite\n", table)
806+
*failed = true
807+
return
808+
}
809+
if mismatches > 0 {
810+
if mismatches > 3 {
811+
fmt.Printf(" %-25s content ... and %d more mismatches\n", table, mismatches-3)
812+
}
813+
*failed = true
814+
} else {
815+
fmt.Printf(" %-25s content OK\n", table)
816+
}
817+
}
818+
819+
func verifySampledMessages(sqlDB, pgDB *sql.DB, failed *bool) {
820+
rows, err := sqlDB.Query(`SELECT mid, topic, time, message, title, tags, priority FROM messages ORDER BY mid`)
821+
if err != nil {
822+
fmt.Printf(" %-25s content ERROR reading SQLite: %s\n", "messages (sampled)", err)
823+
*failed = true
824+
return
825+
}
826+
defer rows.Close()
827+
828+
rowNum := 0
829+
checked := 0
830+
mismatches := 0
831+
for rows.Next() {
832+
rowNum++
833+
var mid, topic, message, title, tags string
834+
var msgTime int64
835+
var priority int
836+
if err := rows.Scan(&mid, &topic, &msgTime, &message, &title, &tags, &priority); err != nil {
837+
fmt.Printf(" %-25s content ERROR scanning SQLite row %d: %s\n", "messages (sampled)", rowNum, err)
838+
*failed = true
839+
return
840+
}
841+
if rowNum%100 != 1 {
842+
continue
843+
}
844+
checked++
845+
var pgTopic, pgMessage, pgTitle, pgTags string
846+
var pgTime int64
847+
var pgPriority int
848+
err := pgDB.QueryRow(`SELECT topic, time, message, title, tags, priority FROM message WHERE mid = $1`, mid).
849+
Scan(&pgTopic, &pgTime, &pgMessage, &pgTitle, &pgTags, &pgPriority)
850+
if err == sql.ErrNoRows {
851+
mismatches++
852+
if mismatches <= 3 {
853+
fmt.Printf(" %-25s content MISMATCH: mid=%s not found in PostgreSQL\n", "messages (sampled)", mid)
854+
}
855+
continue
856+
} else if err != nil {
857+
fmt.Printf(" %-25s content ERROR querying PostgreSQL for mid=%s: %s\n", "messages (sampled)", mid, err)
858+
*failed = true
859+
return
860+
}
861+
topic = toUTF8(topic)
862+
message = toUTF8(message)
863+
title = toUTF8(title)
864+
tags = toUTF8(tags)
865+
if topic != pgTopic || msgTime != pgTime || message != pgMessage || title != pgTitle || tags != pgTags || priority != pgPriority {
866+
mismatches++
867+
if mismatches <= 3 {
868+
fmt.Printf(" %-25s content MISMATCH at mid=%s\n", "messages (sampled)", mid)
869+
}
870+
}
871+
}
872+
if mismatches > 0 {
873+
if mismatches > 3 {
874+
fmt.Printf(" %-25s content ... and %d more mismatches\n", "messages (sampled)", mismatches-3)
875+
}
876+
*failed = true
877+
} else {
878+
fmt.Printf(" %-25s content OK (%d samples checked)\n", "messages (sampled)", checked)
879+
}
880+
}
881+
882+
func makeStringSlice(n int) []any {
883+
vals := make([]any, n)
884+
for i := range vals {
885+
vals[i] = &sql.NullString{}
708886
}
887+
return vals
709888
}

0 commit comments

Comments
 (0)