Skip to content

Commit 2f02664

Browse files
wesmclaude
andcommitted
Persist RFC822 Message-ID for cross-sync IMAP dedup; harden EML export
IMAP composite IDs (mailbox|uid) change when messages move between mailboxes across syncs. Without persistent dedup, a message moved from All Mail to Trash would be re-imported as a duplicate. Add rfc822_message_id column to messages table, populated from the parsed MIME Message-ID header during ingest. For IMAP sources, check this column before persisting to skip messages already imported under a different composite ID. Dedup skips are counted as skipped (not errors) via errDuplicateRFC822 sentinel. Harden sanitizeEMLFilename with filepath.Base to ensure the output is a plain filename with no directory components, guarding against IMAP mailbox names containing path separators. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 518feaf commit 2f02664

File tree

6 files changed

+119
-11
lines changed

6 files changed

+119
-11
lines changed

cmd/msgvault/cmd/export_eml.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cmd
22

33
import (
44
"fmt"
5+
"path/filepath"
56
"strconv"
67
"strings"
78

@@ -72,7 +73,11 @@ func sanitizeEMLFilename(sourceMessageID string) string {
7273
}
7374
return r
7475
}, sourceMessageID)
75-
if safe == "" {
76+
// Ensure the result is a plain filename with no directory
77+
// components, guarding against IMAP mailbox names with
78+
// path separators or traversal sequences.
79+
safe = filepath.Base(safe)
80+
if safe == "" || safe == "." {
7681
safe = "message"
7782
}
7883
return safe + ".eml"

internal/store/messages.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ type Message struct {
4545
ConversationID int64
4646
SourceID int64
4747
SourceMessageID string
48-
MessageType string // "email"
48+
RFC822MessageID sql.NullString // RFC822 Message-ID header for cross-mailbox dedup
49+
MessageType string // "email"
4950
SentAt sql.NullTime
5051
ReceivedAt sql.NullTime
5152
InternalDate sql.NullTime
@@ -85,6 +86,20 @@ func (s *Store) MessageExistsBatch(sourceID int64, sourceMessageIDs []string) (m
8586
return result, nil
8687
}
8788

89+
// MessageExistsByRFC822ID checks if a message with the given
90+
// RFC822 Message-ID already exists for this source.
91+
func (s *Store) MessageExistsByRFC822ID(
92+
sourceID int64, rfc822ID string,
93+
) (bool, error) {
94+
var count int
95+
err := s.db.QueryRow(
96+
`SELECT COUNT(*) FROM messages
97+
WHERE source_id = ? AND rfc822_message_id = ?`,
98+
sourceID, rfc822ID,
99+
).Scan(&count)
100+
return count > 0, err
101+
}
102+
88103
// MessageExistsWithRawBatch checks which message IDs already exist in the database
89104
// and have raw MIME data stored.
90105
// Returns a map of source_message_id -> internal message_id.
@@ -144,13 +159,15 @@ func (s *Store) EnsureConversation(sourceID int64, sourceConversationID, title s
144159

145160
const upsertMessageSQL = `
146161
INSERT INTO messages (
147-
conversation_id, source_id, source_message_id, message_type,
162+
conversation_id, source_id, source_message_id,
163+
rfc822_message_id, message_type,
148164
sent_at, received_at, internal_date, sender_id, is_from_me,
149165
subject, snippet, size_estimate,
150166
has_attachments, attachment_count, archived_at
151-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
167+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
152168
ON CONFLICT(source_id, source_message_id) DO UPDATE SET
153169
conversation_id = excluded.conversation_id,
170+
rfc822_message_id = excluded.rfc822_message_id,
154171
sent_at = excluded.sent_at,
155172
received_at = excluded.received_at,
156173
internal_date = excluded.internal_date,
@@ -169,7 +186,8 @@ func (s *Store) UpsertMessage(msg *Message) (int64, error) {
169186

170187
func upsertMessage(q querier, msg *Message) (int64, error) {
171188
args := []any{
172-
msg.ConversationID, msg.SourceID, msg.SourceMessageID, msg.MessageType,
189+
msg.ConversationID, msg.SourceID, msg.SourceMessageID,
190+
msg.RFC822MessageID, msg.MessageType,
173191
msg.SentAt, msg.ReceivedAt, msg.InternalDate, msg.SenderID, msg.IsFromMe,
174192
msg.Subject, msg.Snippet, msg.SizeEstimate,
175193
msg.HasAttachments, msg.AttachmentCount,

internal/store/schema.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ CREATE TABLE IF NOT EXISTS messages (
108108
-- Platform-specific ID for dedup
109109
source_message_id TEXT,
110110

111+
-- RFC822 Message-ID for cross-mailbox dedup (IMAP)
112+
rfc822_message_id TEXT,
113+
111114
-- Message classification
112115
message_type TEXT NOT NULL, -- 'email', 'imessage', 'sms', 'mms', 'rcs', 'whatsapp'
113116

internal/store/store.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,20 @@ func (s *Store) InitSchema() error {
226226
return fmt.Errorf("execute schema.sql: %w", err)
227227
}
228228

229-
// Migration: add sync_config column to sources for databases created before
230-
// IMAP support was added. SQLite returns "duplicate column name" if the
231-
// column already exists, which we treat as success.
232-
if _, err := s.db.Exec(`ALTER TABLE sources ADD COLUMN sync_config JSON`); err != nil {
233-
if !isSQLiteError(err, "duplicate column name") {
234-
return fmt.Errorf("migrate schema (sync_config): %w", err)
229+
// Migrations: add columns for databases created before these features.
230+
// SQLite returns "duplicate column name" if the column already exists,
231+
// which we treat as success.
232+
for _, m := range []struct {
233+
sql string
234+
desc string
235+
}{
236+
{`ALTER TABLE sources ADD COLUMN sync_config JSON`, "sync_config"},
237+
{`ALTER TABLE messages ADD COLUMN rfc822_message_id TEXT`, "rfc822_message_id"},
238+
} {
239+
if _, err := s.db.Exec(m.sql); err != nil {
240+
if !isSQLiteError(err, "duplicate column name") {
241+
return fmt.Errorf("migrate schema (%s): %w", m.desc, err)
242+
}
235243
}
236244
}
237245

internal/sync/sync.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,10 @@ func (s *Syncer) processBatch(ctx context.Context, sourceID int64, listResp *gma
212212

213213
threadID := threadIDs[newIDs[i]]
214214
if err := s.ingestMessage(ctx, sourceID, raw, threadID, labelMap); err != nil {
215+
if errors.Is(err, errDuplicateRFC822) {
216+
result.skipped++
217+
continue
218+
}
215219
s.logger.Warn("failed to ingest message", "id", raw.ID, "error", err)
216220
checkpoint.ErrorsCount++
217221
continue
@@ -511,10 +515,17 @@ func (s *Syncer) parseToModel(sourceID int64, raw *gmail.RawMessage, threadID st
511515
}
512516

513517
// Build message record
518+
rfc822ID := sql.NullString{}
519+
if parsed.MessageID != "" {
520+
rfc822ID = sql.NullString{
521+
String: parsed.MessageID, Valid: true,
522+
}
523+
}
514524
msg := &store.Message{
515525
ConversationID: conversationID,
516526
SourceID: sourceID,
517527
SourceMessageID: raw.ID,
528+
RFC822MessageID: rfc822ID,
518529
MessageType: "email",
519530
SenderID: senderID,
520531
Subject: sql.NullString{String: subject, Valid: subject != ""},
@@ -634,13 +645,35 @@ func (s *Syncer) persistMessage(data *messageData, labelMap map[string]int64) er
634645
return nil
635646
}
636647

648+
// errDuplicateRFC822 signals that a message was skipped because
649+
// another message with the same RFC822 Message-ID already exists
650+
// for this source. Used for cross-sync dedup on IMAP where
651+
// composite IDs change when messages move between mailboxes.
652+
var errDuplicateRFC822 = errors.New("duplicate RFC822 Message-ID")
653+
637654
// ingestMessage parses and stores a single message.
638655
func (s *Syncer) ingestMessage(ctx context.Context, sourceID int64, raw *gmail.RawMessage, threadID string, labelMap map[string]int64) error {
639656
data, err := s.parseToModel(sourceID, raw, threadID)
640657
if err != nil {
641658
return err
642659
}
643660

661+
// For IMAP sources, check if a message with the same RFC822
662+
// Message-ID already exists under a different composite ID.
663+
// This handles messages that moved between mailboxes across
664+
// syncs (e.g. All Mail → Trash changes the mailbox|uid key).
665+
if s.opts.SourceType == "imap" &&
666+
data.message.RFC822MessageID.Valid {
667+
exists, err := s.store.MessageExistsByRFC822ID(
668+
sourceID, data.message.RFC822MessageID.String)
669+
if err != nil {
670+
return fmt.Errorf("check rfc822 dedup: %w", err)
671+
}
672+
if exists {
673+
return errDuplicateRFC822
674+
}
675+
}
676+
644677
return s.persistMessage(data, labelMap)
645678
}
646679

internal/sync/sync_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,47 @@ func TestIMAPThreading(t *testing.T) {
16861686
}
16871687
}
16881688

1689+
// TestIMAPCrossSyncDedup verifies that a message imported from one mailbox
1690+
// is not re-imported when it appears under a different mailbox|uid on a
1691+
// subsequent sync (e.g. moved from All Mail to Trash).
1692+
func TestIMAPCrossSyncDedup(t *testing.T) {
1693+
env := newTestEnv(t)
1694+
env.SetOptions(t, func(o *Options) {
1695+
o.SourceType = "imap"
1696+
})
1697+
1698+
msg := testemail.NewMessage().
1699+
Subject("Dedup test").
1700+
Header("Message-ID", "<dedup@example.com>").
1701+
Body("Same message, different mailbox.").
1702+
Bytes()
1703+
1704+
// First sync: message is in All Mail
1705+
env.Mock.Profile.MessagesTotal = 1
1706+
env.Mock.Profile.HistoryID = 100
1707+
env.Mock.AddMessage("AllMail|42", msg, []string{"AllMail"})
1708+
summary := runFullSync(t, env)
1709+
assertSummary(t, summary, WantSummary{Added: intPtr(1)})
1710+
1711+
// Second sync: message moved to Trash (different composite ID)
1712+
delete(env.Mock.Messages, "AllMail|42")
1713+
env.Mock.AddMessage("Trash|99", msg, []string{"Trash"})
1714+
summary = runFullSync(t, env)
1715+
// Should be skipped via RFC822 Message-ID dedup, not re-imported
1716+
assertSummary(t, summary, WantSummary{Added: intPtr(0)})
1717+
1718+
// Only one message should exist in the database
1719+
var count int
1720+
err := env.Store.DB().QueryRow(
1721+
`SELECT COUNT(*) FROM messages`).Scan(&count)
1722+
if err != nil {
1723+
t.Fatalf("count messages: %v", err)
1724+
}
1725+
if count != 1 {
1726+
t.Errorf("expected 1 message, got %d (duplicate imported)", count)
1727+
}
1728+
}
1729+
16891730
// TestIncrementalSyncLabelRemovedWithMissingRaw verifies that removing a label
16901731
// from a message whose raw MIME data is missing still succeeds. The label-removal
16911732
// path operates on the message_labels table directly and never touches raw data.

0 commit comments

Comments
 (0)