Skip to content

Commit 7d98ba0

Browse files
wesmclaude
andcommitted
fix: return structured staleness from cacheNeedsBuild
Replace early-return + reason-string-parsing with a cacheStaleness struct that collects all signals (new messages, deletions, missing tables) before returning. This fixes a mixed add+delete sync where the new-messages check short-circuited the deletion check, causing an incremental rebuild that left deleted rows in parquet shards. The FullRebuild flag is set whenever deletions are detected or the cache is missing/empty, so callers no longer need to parse the human-readable Reason string. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent dab3423 commit 7d98ba0

File tree

3 files changed

+87
-57
lines changed

3 files changed

+87
-57
lines changed

cmd/msgvault/cmd/build_cache_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1730,12 +1730,12 @@ func TestCacheNeedsBuild(t *testing.T) {
17301730

17311731
tt.setup(t, dbPath, analyticsDir)
17321732

1733-
gotBuild, gotReason := cacheNeedsBuild(dbPath, analyticsDir)
1734-
if gotBuild != tt.wantBuild {
1735-
t.Errorf("cacheNeedsBuild() build = %v, want %v (reason: %q)", gotBuild, tt.wantBuild, gotReason)
1733+
got := cacheNeedsBuild(dbPath, analyticsDir)
1734+
if got.NeedsBuild != tt.wantBuild {
1735+
t.Errorf("cacheNeedsBuild() build = %v, want %v (reason: %q)", got.NeedsBuild, tt.wantBuild, got.Reason)
17361736
}
1737-
if tt.wantReason != "" && gotReason != tt.wantReason {
1738-
t.Errorf("cacheNeedsBuild() reason = %q, want %q", gotReason, tt.wantReason)
1737+
if tt.wantReason != "" && got.Reason != tt.wantReason {
1738+
t.Errorf("cacheNeedsBuild() reason = %q, want %q", got.Reason, tt.wantReason)
17391739
}
17401740
})
17411741
}

cmd/msgvault/cmd/serve.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"os"
1010
"os/signal"
1111
"strconv"
12-
"strings"
1312
"syscall"
1413
"time"
1514

@@ -89,8 +88,8 @@ func runServe(cmd *cobra.Command, args []string) error {
8988
// otherwise fall back to SQLite so remote endpoints still work.
9089
analyticsDir := cfg.AnalyticsDir()
9190
var engine query.Engine
92-
needsBuild, reason := cacheNeedsBuild(dbPath, analyticsDir)
93-
if !needsBuild && query.HasCompleteParquetData(analyticsDir) {
91+
staleness := cacheNeedsBuild(dbPath, analyticsDir)
92+
if !staleness.NeedsBuild && query.HasCompleteParquetData(analyticsDir) {
9493
duckEngine, engineErr := query.NewDuckDBEngine(
9594
analyticsDir, dbPath, s.DB(),
9695
)
@@ -102,9 +101,9 @@ func runServe(cmd *cobra.Command, args []string) error {
102101
engine = duckEngine
103102
}
104103
} else {
105-
if reason != "" {
104+
if staleness.Reason != "" {
106105
logger.Info("parquet cache not usable, using SQLite engine",
107-
"reason", reason)
106+
"reason", staleness.Reason)
108107
} else {
109108
logger.Info("parquet cache not built - using SQLite engine (run 'msgvault build-cache' for faster aggregates)")
110109
}
@@ -318,19 +317,15 @@ func runScheduledSync(ctx context.Context, email string, s *store.Store, oauthMg
318317
"duration", time.Since(startTime),
319318
)
320319

321-
// Rebuild cache if stale (covers new messages, deletions, and
322-
// label updates — not just additions).
320+
// Rebuild cache if stale (covers new messages and deletions).
323321
dbPath := cfg.DatabaseDSN()
324322
analyticsDir := cfg.AnalyticsDir()
325-
if needsBuild, reason := cacheNeedsBuild(dbPath, analyticsDir); needsBuild {
326-
// Deletions require a full rebuild because incremental builds
327-
// only export rows with id > lastMessageID and cannot update
328-
// or remove existing rows in parquet shards.
329-
fullRebuild := strings.Contains(reason, "deletions")
323+
if staleness := cacheNeedsBuild(dbPath, analyticsDir); staleness.NeedsBuild {
330324
logger.Info("rebuilding cache after sync",
331-
"email", email, "reason", reason,
332-
"full_rebuild", fullRebuild)
333-
result, err := buildCache(dbPath, analyticsDir, fullRebuild)
325+
"email", email, "reason", staleness.Reason,
326+
"full_rebuild", staleness.FullRebuild)
327+
result, err := buildCache(
328+
dbPath, analyticsDir, staleness.FullRebuild)
334329
if err != nil {
335330
logger.Error("cache build failed", "error", err)
336331
// Don't fail the sync for cache build errors

cmd/msgvault/cmd/tui.go

Lines changed: 72 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"path/filepath"
8+
"strings"
89

910
tea "github.com/charmbracelet/bubbletea"
1011
"github.com/spf13/cobra"
@@ -102,10 +103,10 @@ Remote Mode:
102103

103104
// Check if cache needs to be built/updated (unless forcing SQL or skipping)
104105
if !forceSQL && !skipCacheBuild {
105-
needsBuild, reason := cacheNeedsBuild(dbPath, analyticsDir)
106-
if needsBuild {
107-
fmt.Printf("Building analytics cache (%s)...\n", reason)
108-
result, err := buildCache(dbPath, analyticsDir, true)
106+
staleness := cacheNeedsBuild(dbPath, analyticsDir)
107+
if staleness.NeedsBuild {
108+
fmt.Printf("Building analytics cache (%s)...\n", staleness.Reason)
109+
result, err := buildCache(dbPath, analyticsDir, staleness.FullRebuild)
109110
if err != nil {
110111
fmt.Fprintf(os.Stderr, "Warning: Failed to build cache: %v\n", err)
111112
fmt.Fprintf(os.Stderr, "Falling back to SQLite (may be slow for large archives)\n")
@@ -157,9 +158,19 @@ Remote Mode:
157158
},
158159
}
159160

160-
// cacheNeedsBuild checks if the analytics cache needs to be built or updated.
161-
// Returns (needsBuild, reason) where reason describes why.
162-
func cacheNeedsBuild(dbPath, analyticsDir string) (bool, string) {
161+
// cacheStaleness describes why the analytics cache needs a rebuild.
162+
type cacheStaleness struct {
163+
NeedsBuild bool
164+
HasNew bool // new messages since last build
165+
HasDeleted bool // deletions since last build
166+
FullRebuild bool // must rewrite all shards (not incremental)
167+
Reason string
168+
}
169+
170+
// cacheNeedsBuild checks if the analytics cache needs to be built or
171+
// updated. Collects all staleness signals before returning so that
172+
// e.g. a mixed add+delete sync correctly reports both.
173+
func cacheNeedsBuild(dbPath, analyticsDir string) cacheStaleness {
163174
messagesDir := filepath.Join(analyticsDir, "messages")
164175
stateFile := filepath.Join(analyticsDir, "_last_sync.json")
165176

@@ -169,22 +180,31 @@ func cacheNeedsBuild(dbPath, analyticsDir string) (bool, string) {
169180
data, err := os.ReadFile(stateFile)
170181
if err != nil {
171182
if !hasParquetData {
172-
return true, "no cache exists"
183+
return cacheStaleness{
184+
NeedsBuild: true, FullRebuild: true,
185+
Reason: "no cache exists",
186+
}
187+
}
188+
return cacheStaleness{
189+
NeedsBuild: true, FullRebuild: true,
190+
Reason: "no sync state found",
173191
}
174-
return true, "no sync state found"
175192
}
176193

177194
var state syncState
178195
if err := json.Unmarshal(data, &state); err != nil {
179-
return true, "invalid sync state"
196+
return cacheStaleness{
197+
NeedsBuild: true, FullRebuild: true,
198+
Reason: "invalid sync state",
199+
}
180200
}
181201

182-
// Check if SQLite has newer messages
183-
// We need to query SQLite directly to check max message ID
184202
db, err := store.Open(dbPath)
185203
if err != nil {
186-
// Can't open DB to check - force rebuild to be safe
187-
return true, "cannot verify cache status"
204+
return cacheStaleness{
205+
NeedsBuild: true, FullRebuild: true,
206+
Reason: "cannot verify cache status",
207+
}
188208
}
189209
defer db.Close()
190210

@@ -194,59 +214,74 @@ func cacheNeedsBuild(dbPath, analyticsDir string) (bool, string) {
194214
WHERE deleted_from_source_at IS NULL AND sent_at IS NOT NULL
195215
`).Scan(&maxID)
196216
if err != nil {
197-
// Can't query - force rebuild to be safe
198-
return true, "cannot verify cache status"
217+
return cacheStaleness{
218+
NeedsBuild: true, FullRebuild: true,
219+
Reason: "cannot verify cache status",
220+
}
199221
}
200222

201-
// Zero-message accounts never produce message parquet files, so
202-
// HasParquetData returns false even after a successful cache build.
203-
// If sync state and DB both agree there are 0 messages, no build needed.
204223
if maxID == 0 && state.LastMessageID == 0 {
205-
return false, ""
224+
return cacheStaleness{}
206225
}
207226

208227
if !hasParquetData {
209-
return true, "no cache exists"
228+
return cacheStaleness{
229+
NeedsBuild: true, FullRebuild: true,
230+
Reason: "no cache exists",
231+
}
210232
}
211233

234+
// Collect staleness signals without short-circuiting so a mixed
235+
// add+delete sync correctly triggers a full rebuild.
236+
var reasons []string
237+
result := cacheStaleness{}
238+
212239
if maxID > state.LastMessageID {
213240
newCount := maxID - state.LastMessageID
214-
return true, fmt.Sprintf("%d new messages", newCount)
241+
result.HasNew = true
242+
reasons = append(reasons,
243+
fmt.Sprintf("%d new messages", newCount))
215244
}
216245

217-
// Check if any messages were soft-deleted after the last cache build.
218-
// Incremental builds don't rewrite existing rows, so deletions that
219-
// occur after the build leave stale (non-deleted) rows in Parquet.
220-
// Format as "YYYY-MM-DD HH:MM:SS" to match datetime('now') values.
221-
var deletedSinceBuild int64
222246
syncAtStr := state.LastSyncAt.UTC().Format("2006-01-02 15:04:05")
247+
var deletedSinceBuild int64
223248
err = db.DB().QueryRow(`
224249
SELECT COUNT(*) FROM messages
225250
WHERE deleted_from_source_at IS NOT NULL
226251
AND deleted_from_source_at >= ?
227252
`, syncAtStr).Scan(&deletedSinceBuild)
228253
if err != nil {
229-
return true, "cannot verify deletion state"
254+
return cacheStaleness{
255+
NeedsBuild: true, FullRebuild: true,
256+
Reason: "cannot verify deletion state",
257+
}
230258
}
231259
if deletedSinceBuild > 0 {
232-
return true, fmt.Sprintf(
233-
"%d deletions since last cache build",
234-
deletedSinceBuild,
235-
)
260+
result.HasDeleted = true
261+
result.FullRebuild = true
262+
reasons = append(reasons,
263+
fmt.Sprintf("%d deletions", deletedSinceBuild))
236264
}
237265

238266
// Check if parquet files actually exist (directory might be empty)
239-
files, _ := filepath.Glob(filepath.Join(messagesDir, "*", "*.parquet"))
267+
files, _ := filepath.Glob(
268+
filepath.Join(messagesDir, "*", "*.parquet"))
240269
if len(files) == 0 {
241-
return true, "cache directory empty"
270+
result.FullRebuild = true
271+
reasons = append(reasons, "cache directory empty")
242272
}
243273

244-
// Check for required parquet tables (e.g. conversations added in a newer version)
245274
if missingRequiredParquet(analyticsDir) {
246-
return true, "cache missing required tables"
275+
result.FullRebuild = true
276+
reasons = append(reasons, "cache missing required tables")
277+
}
278+
279+
if len(reasons) > 0 {
280+
result.NeedsBuild = true
281+
result.Reason = strings.Join(reasons, "; ")
247282
}
248283

249-
return false, ""
284+
return result
250285
}
251286

252287
func init() {

0 commit comments

Comments
 (0)