Skip to content

Commit 600d3d8

Browse files
committed
feat: add WhatsApp import from decrypted backup (#136)
Add `import --type whatsapp` command for importing messages from a decrypted WhatsApp msgstore.db into the msgvault unified schema. New package internal/whatsapp/: - Reads msgstore.db as read-only SQLite - Maps WhatsApp schema to msgvault tables (conversations, participants, messages, attachments, reactions, reply threading) - Batch processing (1000 msgs/batch) with checkpoint/resume - Optional --contacts for vCard name resolution (update-only, no creation) - Optional --media-dir for content-addressed media storage - Imports: text, images, video, audio, voice notes, documents, stickers, GIFs, reactions, replies, group participants - Skips: system messages, calls, location shares, contacts, polls Security: - Media path traversal defense (sanitize, reject absolute/.. paths, boundary check against mediaDir) - Streaming hash + copy for media (no io.ReadAll, 100MB max) - E.164 phone validation (reject non-numeric JIDs, 4-15 digit range) - File permissions 0600/0750 for attachments - Per-chat reply map scoping to bound memory Query engine updates: - Sender filters in DuckDB and SQLite check both message_recipients (email path) and direct sender_id (WhatsApp/chat path) - Phone number included in sender predicates for from:+447... queries - MatchesEmpty filters account for sender_id to avoid false positives - MCP handler routes non-email from values to display name matching - Parquet cache extended with sender_id, message_type, attachment_count, phone_number, title columns - Cache schema versioning to force rebuild on column layout changes Store additions: - EnsureConversationWithType (parameterized conversation_type) - EnsureParticipantByPhone (E.164 validated, with identifier row) - UpdateParticipantDisplayNameByPhone (update-only for contacts) - EnsureConversationParticipant, UpsertReaction, UpsertMessageRawWithFormat Tested with 1.19M messages (13.5k conversations, April 2016-present).
1 parent f38166f commit 600d3d8

File tree

18 files changed

+2620
-90
lines changed

18 files changed

+2620
-90
lines changed

cmd/msgvault/cmd/build_cache.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,17 @@ var fullRebuild bool
2727
// files (_last_sync.json, parquet directories) can corrupt the cache.
2828
var buildCacheMu sync.Mutex
2929

30+
// cacheSchemaVersion tracks the Parquet schema layout. Bump this whenever
31+
// columns are added/removed/renamed in the COPY queries below so that
32+
// incremental builds automatically trigger a full rebuild instead of
33+
// producing Parquet files with mismatched schemas.
34+
const cacheSchemaVersion = 2 // v2: added sender_id, message_type, attachment_count, phone_number, title
35+
3036
// syncState tracks the last exported message ID for incremental updates.
3137
type syncState struct {
3238
LastMessageID int64 `json:"last_message_id"`
3339
LastSyncAt time.Time `json:"last_sync_at"`
40+
SchemaVersion int `json:"schema_version,omitempty"`
3441
}
3542

3643
var buildCacheCmd = &cobra.Command{
@@ -101,8 +108,16 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
101108
if data, err := os.ReadFile(stateFile); err == nil {
102109
var state syncState
103110
if json.Unmarshal(data, &state) == nil {
104-
lastMessageID = state.LastMessageID
105-
fmt.Printf("Incremental export from message_id > %d\n", lastMessageID)
111+
if state.SchemaVersion != cacheSchemaVersion {
112+
// Schema has changed — force a full rebuild.
113+
fmt.Printf("Cache schema version mismatch (have v%d, need v%d). Forcing full rebuild.\n",
114+
state.SchemaVersion, cacheSchemaVersion)
115+
fullRebuild = true
116+
lastMessageID = 0
117+
} else {
118+
lastMessageID = state.LastMessageID
119+
fmt.Printf("Incremental export from message_id > %d\n", lastMessageID)
120+
}
106121
}
107122
}
108123
}
@@ -231,7 +246,10 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
231246
m.sent_at,
232247
m.size_estimate,
233248
m.has_attachments,
249+
COALESCE(TRY_CAST(m.attachment_count AS INTEGER), 0) as attachment_count,
234250
m.deleted_from_source_at,
251+
m.sender_id,
252+
COALESCE(TRY_CAST(m.message_type AS VARCHAR), '') as message_type,
235253
CAST(EXTRACT(YEAR FROM m.sent_at) AS INTEGER) as year,
236254
CAST(EXTRACT(MONTH FROM m.sent_at) AS INTEGER) as month
237255
FROM sqlite_db.messages m
@@ -321,7 +339,8 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
321339
id,
322340
COALESCE(TRY_CAST(email_address AS VARCHAR), '') as email_address,
323341
COALESCE(TRY_CAST(domain AS VARCHAR), '') as domain,
324-
COALESCE(TRY_CAST(display_name AS VARCHAR), '') as display_name
342+
COALESCE(TRY_CAST(display_name AS VARCHAR), '') as display_name,
343+
COALESCE(TRY_CAST(phone_number AS VARCHAR), '') as phone_number
325344
FROM sqlite_db.participants
326345
) TO '%s/participants.parquet' (
327346
FORMAT PARQUET,
@@ -372,7 +391,8 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
372391
COPY (
373392
SELECT
374393
id,
375-
COALESCE(TRY_CAST(source_conversation_id AS VARCHAR), '') as source_conversation_id
394+
COALESCE(TRY_CAST(source_conversation_id AS VARCHAR), '') as source_conversation_id,
395+
COALESCE(TRY_CAST(title AS VARCHAR), '') as title
376396
FROM sqlite_db.conversations
377397
) TO '%s/conversations.parquet' (
378398
FORMAT PARQUET,
@@ -391,10 +411,11 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
391411
exportedCount = 0
392412
}
393413

394-
// Save sync state
414+
// Save sync state with schema version for compatibility detection.
395415
state := syncState{
396416
LastMessageID: maxID,
397417
LastSyncAt: time.Now(),
418+
SchemaVersion: cacheSchemaVersion,
398419
}
399420
stateData, _ := json.Marshal(state)
400421
if err := os.WriteFile(stateFile, stateData, 0644); err != nil {
@@ -592,15 +613,15 @@ func setupSQLiteSource(duckDB *sql.DB, dbPath string) (cleanup func(), err error
592613
query string
593614
typeOverrides string // DuckDB types parameter for read_csv_auto (empty = infer all)
594615
}{
595-
{"messages", "SELECT id, source_id, source_message_id, conversation_id, subject, snippet, sent_at, size_estimate, has_attachments, deleted_from_source_at FROM messages WHERE sent_at IS NOT NULL",
616+
{"messages", "SELECT id, source_id, source_message_id, conversation_id, subject, snippet, sent_at, size_estimate, has_attachments, attachment_count, deleted_from_source_at, sender_id, message_type FROM messages WHERE sent_at IS NOT NULL",
596617
"types={'sent_at': 'TIMESTAMP', 'deleted_from_source_at': 'TIMESTAMP'}"},
597618
{"message_recipients", "SELECT message_id, participant_id, recipient_type, display_name FROM message_recipients", ""},
598619
{"message_labels", "SELECT message_id, label_id FROM message_labels", ""},
599620
{"attachments", "SELECT message_id, size, filename FROM attachments", ""},
600-
{"participants", "SELECT id, email_address, domain, display_name FROM participants", ""},
621+
{"participants", "SELECT id, email_address, domain, display_name, phone_number FROM participants", ""},
601622
{"labels", "SELECT id, name FROM labels", ""},
602623
{"sources", "SELECT id, identifier FROM sources", ""},
603-
{"conversations", "SELECT id, source_conversation_id FROM conversations", ""},
624+
{"conversations", "SELECT id, source_conversation_id, title FROM conversations", ""},
604625
}
605626

606627
for _, t := range tables {

cmd/msgvault/cmd/import.go

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"strings"
9+
"syscall"
10+
"time"
11+
12+
"github.com/spf13/cobra"
13+
"github.com/wesm/msgvault/internal/store"
14+
"github.com/wesm/msgvault/internal/whatsapp"
15+
)
16+
17+
var (
18+
importType string
19+
importPhone string
20+
importMediaDir string
21+
importContacts string
22+
importLimit int
23+
importDisplayName string
24+
)
25+
26+
var importCmd = &cobra.Command{
27+
Use: "import [path]",
28+
Short: "Import messages from external sources",
29+
Long: `Import messages from external message databases.
30+
31+
Currently supported types:
32+
whatsapp Import from a decrypted WhatsApp msgstore.db
33+
34+
Examples:
35+
msgvault import --type whatsapp --phone "+447700900000" /path/to/msgstore.db
36+
msgvault import --type whatsapp --phone "+447700900000" --contacts ~/contacts.vcf /path/to/msgstore.db
37+
msgvault import --type whatsapp --phone "+447700900000" --media-dir /path/to/Media /path/to/msgstore.db`,
38+
Args: cobra.ExactArgs(1),
39+
RunE: func(cmd *cobra.Command, args []string) error {
40+
if err := MustBeLocal("import"); err != nil {
41+
return err
42+
}
43+
44+
sourcePath := args[0]
45+
46+
// Validate source file exists.
47+
if _, err := os.Stat(sourcePath); err != nil {
48+
return fmt.Errorf("source file not found: %w", err)
49+
}
50+
51+
switch strings.ToLower(importType) {
52+
case "whatsapp":
53+
return runWhatsAppImport(cmd, sourcePath)
54+
default:
55+
return fmt.Errorf("unsupported import type %q (supported: whatsapp)", importType)
56+
}
57+
},
58+
}
59+
60+
func runWhatsAppImport(cmd *cobra.Command, sourcePath string) error {
61+
// Validate phone number.
62+
if importPhone == "" {
63+
return fmt.Errorf("--phone is required for WhatsApp import (E.164 format, e.g., +447700900000)")
64+
}
65+
if !strings.HasPrefix(importPhone, "+") {
66+
return fmt.Errorf("phone number must be in E.164 format (starting with +), got %q", importPhone)
67+
}
68+
69+
// Validate media dir if provided.
70+
if importMediaDir != "" {
71+
if info, err := os.Stat(importMediaDir); err != nil || !info.IsDir() {
72+
return fmt.Errorf("media directory not found or not a directory: %s", importMediaDir)
73+
}
74+
}
75+
76+
// Open database.
77+
dbPath := cfg.DatabaseDSN()
78+
s, err := store.Open(dbPath)
79+
if err != nil {
80+
return fmt.Errorf("open database: %w", err)
81+
}
82+
defer s.Close()
83+
84+
if err := s.InitSchema(); err != nil {
85+
return fmt.Errorf("init schema: %w", err)
86+
}
87+
88+
// Set up context with cancellation.
89+
ctx, cancel := context.WithCancel(cmd.Context())
90+
defer cancel()
91+
92+
// Handle Ctrl+C gracefully.
93+
sigChan := make(chan os.Signal, 1)
94+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
95+
go func() {
96+
<-sigChan
97+
fmt.Println("\nInterrupted. Saving checkpoint...")
98+
cancel()
99+
}()
100+
101+
// Build import options.
102+
opts := whatsapp.DefaultOptions()
103+
opts.Phone = importPhone
104+
opts.DisplayName = importDisplayName
105+
opts.MediaDir = importMediaDir
106+
opts.AttachmentsDir = cfg.AttachmentsDir()
107+
opts.Limit = importLimit
108+
109+
// Create importer with CLI progress.
110+
progress := &ImportCLIProgress{}
111+
importer := whatsapp.NewImporter(s, progress)
112+
113+
fmt.Printf("Importing WhatsApp messages from %s\n", sourcePath)
114+
fmt.Printf("Phone: %s\n", importPhone)
115+
if importMediaDir != "" {
116+
fmt.Printf("Media: %s\n", importMediaDir)
117+
}
118+
if importLimit > 0 {
119+
fmt.Printf("Limit: %d messages\n", importLimit)
120+
}
121+
fmt.Println()
122+
123+
summary, err := importer.Import(ctx, sourcePath, opts)
124+
if err != nil {
125+
if ctx.Err() != nil {
126+
fmt.Println("\nImport interrupted. Run again to continue.")
127+
return nil
128+
}
129+
return fmt.Errorf("import failed: %w", err)
130+
}
131+
132+
// Import contacts if provided.
133+
if importContacts != "" {
134+
fmt.Printf("\nImporting contacts from %s...\n", importContacts)
135+
matched, total, err := whatsapp.ImportContacts(s, importContacts)
136+
if err != nil {
137+
fmt.Printf("Warning: contact import failed: %v\n", err)
138+
} else {
139+
fmt.Printf(" Contacts: %d in file, %d phone numbers matched to participants\n", total, matched)
140+
}
141+
}
142+
143+
// Print summary.
144+
fmt.Println()
145+
fmt.Println("Import complete!")
146+
fmt.Printf(" Duration: %s\n", summary.Duration.Round(time.Second))
147+
fmt.Printf(" Chats: %d\n", summary.ChatsProcessed)
148+
fmt.Printf(" Messages: %d processed, %d added, %d skipped\n",
149+
summary.MessagesProcessed, summary.MessagesAdded, summary.MessagesSkipped)
150+
fmt.Printf(" Participants: %d\n", summary.Participants)
151+
fmt.Printf(" Reactions: %d\n", summary.ReactionsAdded)
152+
fmt.Printf(" Attachments: %d found", summary.AttachmentsFound)
153+
if summary.MediaCopied > 0 {
154+
fmt.Printf(", %d files copied", summary.MediaCopied)
155+
}
156+
fmt.Println()
157+
if summary.Errors > 0 {
158+
fmt.Printf(" Errors: %d\n", summary.Errors)
159+
}
160+
161+
if summary.MessagesAdded > 0 {
162+
rate := float64(summary.MessagesAdded) / summary.Duration.Seconds()
163+
fmt.Printf(" Rate: %.0f messages/sec\n", rate)
164+
}
165+
166+
return nil
167+
}
168+
169+
// ImportCLIProgress implements whatsapp.ImportProgress for terminal output.
170+
type ImportCLIProgress struct {
171+
startTime time.Time
172+
lastPrint time.Time
173+
currentChat string
174+
}
175+
176+
func (p *ImportCLIProgress) OnStart() {
177+
p.startTime = time.Now()
178+
p.lastPrint = time.Now()
179+
}
180+
181+
func (p *ImportCLIProgress) OnChatStart(chatJID, chatTitle string, messageCount int) {
182+
p.currentChat = chatTitle
183+
// Don't print every chat start — too noisy for 13k+ chats.
184+
}
185+
186+
func (p *ImportCLIProgress) OnProgress(processed, added, skipped int64) {
187+
// Throttle output to every 2 seconds.
188+
if time.Since(p.lastPrint) < 2*time.Second {
189+
return
190+
}
191+
p.lastPrint = time.Now()
192+
193+
elapsed := time.Since(p.startTime)
194+
rate := 0.0
195+
if elapsed.Seconds() >= 1 {
196+
rate = float64(added) / elapsed.Seconds()
197+
}
198+
199+
elapsedStr := formatDuration(elapsed)
200+
201+
chatStr := ""
202+
if p.currentChat != "" {
203+
// Truncate long chat names.
204+
name := p.currentChat
205+
if len(name) > 30 {
206+
name = name[:27] + "..."
207+
}
208+
chatStr = fmt.Sprintf(" | Chat: %s", name)
209+
}
210+
211+
fmt.Printf("\r Processed: %d | Added: %d | Skipped: %d | Rate: %.0f/s | Elapsed: %s%s ",
212+
processed, added, skipped, rate, elapsedStr, chatStr)
213+
}
214+
215+
func (p *ImportCLIProgress) OnChatComplete(chatJID string, messagesAdded int64) {
216+
// Quiet — progress line shows the aggregate.
217+
}
218+
219+
func (p *ImportCLIProgress) OnComplete(summary *whatsapp.ImportSummary) {
220+
fmt.Println() // Clear the progress line.
221+
}
222+
223+
func (p *ImportCLIProgress) OnError(err error) {
224+
fmt.Printf("\nWarning: %v\n", err)
225+
}
226+
227+
func init() {
228+
importCmd.Flags().StringVar(&importType, "type", "", "import source type (required: whatsapp)")
229+
importCmd.Flags().StringVar(&importPhone, "phone", "", "your phone number in E.164 format (required for whatsapp)")
230+
importCmd.Flags().StringVar(&importMediaDir, "media-dir", "", "path to decrypted Media folder (optional)")
231+
importCmd.Flags().StringVar(&importContacts, "contacts", "", "path to contacts .vcf file for name resolution (optional)")
232+
importCmd.Flags().IntVar(&importLimit, "limit", 0, "limit number of messages (for testing)")
233+
importCmd.Flags().StringVar(&importDisplayName, "display-name", "", "display name for the phone owner")
234+
_ = importCmd.MarkFlagRequired("type")
235+
rootCmd.AddCommand(importCmd)
236+
}

0 commit comments

Comments
 (0)