Skip to content

Commit 23cf81e

Browse files
committed
Backfill: add multi-workspace backfill, buffer-first storage, and result aggregation
- Add --all-workspaces and --workspaces flags to backfill run command - Auto-discover and handle multiple workspace log paths; allow filtering by workspace IDs - Process each discovered path and aggregate BackfillResult across workspaces - Buffer events first during backfill for reliable SQLite storage; best-effort SendEvent afterward - Improve logging for discovery, per-workspace processing, and send/buffer failures - Add .gitignore entry for SQLite DB files - Update spec/README to document parsing, buffering, and multi-workspace changes
1 parent 1e79ebb commit 23cf81e

File tree

4 files changed

+249
-47
lines changed

4 files changed

+249
-47
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,7 @@ tmp/
170170
.turbo
171171

172172
# Playwright
173-
.playwright-mcp
173+
.playwright-mcp
174+
175+
# SQLite database files
176+
*.db

packages/collector-go/cmd/collector/main.go

Lines changed: 84 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"os/signal"
8+
"strings"
89
"syscall"
910
"time"
1011

@@ -304,6 +305,8 @@ var backfillRunCmd = &cobra.Command{
304305
toDate, _ := cmd.Flags().GetString("to")
305306
dryRun, _ := cmd.Flags().GetBool("dry-run")
306307
days, _ := cmd.Flags().GetInt("days")
308+
allWorkspaces, _ := cmd.Flags().GetBool("all-workspaces")
309+
specificWorkspaces, _ := cmd.Flags().GetStringSlice("workspaces")
307310

308311
// Parse dates
309312
var from, to time.Time
@@ -383,19 +386,47 @@ var backfillRunCmd = &cobra.Command{
383386
return fmt.Errorf("no log path specified")
384387
}
385388

386-
// If log path is "auto", discover it
389+
// If log path is "auto", discover paths
390+
var logPaths []string
387391
if logPath == "auto" {
388-
log.Infof("Auto-discovering log path for %s...", agentName)
392+
log.Infof("Auto-discovering log paths for %s...", agentName)
389393
discovered, err := watcher.DiscoverAgentLogs(agentName)
390394
if err != nil {
391395
return fmt.Errorf("failed to discover logs for %s: %w", agentName, err)
392396
}
393397
if len(discovered) == 0 {
394398
return fmt.Errorf("no logs found for agent %s", agentName)
395399
}
396-
// Use first discovered log path
397-
logPath = discovered[0].Path
398-
log.Infof("Using discovered log path: %s", logPath)
400+
401+
// Filter by workspace IDs if specified
402+
if len(specificWorkspaces) > 0 {
403+
log.Infof("Filtering for workspaces: %v", specificWorkspaces)
404+
for _, d := range discovered {
405+
for _, wsID := range specificWorkspaces {
406+
if strings.Contains(d.Path, wsID) {
407+
logPaths = append(logPaths, d.Path)
408+
break
409+
}
410+
}
411+
}
412+
if len(logPaths) == 0 {
413+
return fmt.Errorf("no logs found for specified workspaces")
414+
}
415+
} else if allWorkspaces {
416+
// Process all discovered workspaces
417+
log.Infof("Processing all %d discovered workspaces", len(discovered))
418+
for _, d := range discovered {
419+
logPaths = append(logPaths, d.Path)
420+
}
421+
} else {
422+
// Default: use first discovered path (backward compatibility)
423+
logPaths = []string{discovered[0].Path}
424+
log.Infof("Using discovered log path: %s", logPaths[0])
425+
log.Infof("Hint: Use --all-workspaces to process all %d workspaces", len(discovered))
426+
}
427+
} else {
428+
// Use specified log path
429+
logPaths = []string{logPath}
399430
}
400431

401432
// Progress callback
@@ -412,32 +443,59 @@ var backfillRunCmd = &cobra.Command{
412443
)
413444
}
414445

415-
// Run backfill
446+
// Run backfill for each log path
416447
ctx := context.Background()
417448
adapterName := mapAgentName(agentName)
418-
bfConfig := backfill.BackfillConfig{
419-
AgentName: adapterName,
420-
LogPath: logPath,
421-
FromDate: from,
422-
ToDate: to,
423-
DryRun: dryRun,
424-
BatchSize: 100,
425-
ProgressCB: progressFunc,
426-
}
427449

428-
result, err := manager.Backfill(ctx, bfConfig)
429-
if err != nil {
430-
return fmt.Errorf("backfill failed: %w", err)
450+
// Aggregate results
451+
totalResult := &backfill.BackfillResult{}
452+
overallStart := time.Now()
453+
454+
for i, logPath := range logPaths {
455+
if len(logPaths) > 1 {
456+
fmt.Printf("\n[%d/%d] Processing: %s\n", i+1, len(logPaths), logPath)
457+
}
458+
459+
bfConfig := backfill.BackfillConfig{
460+
AgentName: adapterName,
461+
LogPath: logPath,
462+
FromDate: from,
463+
ToDate: to,
464+
DryRun: dryRun,
465+
BatchSize: 100,
466+
ProgressCB: progressFunc,
467+
}
468+
469+
result, err := manager.Backfill(ctx, bfConfig)
470+
if err != nil {
471+
log.Warnf("Failed to process %s: %v", logPath, err)
472+
totalResult.ErrorEvents++
473+
continue
474+
}
475+
476+
// Aggregate results
477+
totalResult.TotalEvents += result.TotalEvents
478+
totalResult.ProcessedEvents += result.ProcessedEvents
479+
totalResult.SkippedEvents += result.SkippedEvents
480+
totalResult.ErrorEvents += result.ErrorEvents
481+
totalResult.BytesProcessed += result.BytesProcessed
431482
}
432483

484+
totalResult.Duration = time.Since(overallStart)
485+
433486
// Print summary
434487
fmt.Println("\n\n✓ Backfill completed")
435-
fmt.Printf("Duration: %s\n", result.Duration)
436-
fmt.Printf("Events processed: %d\n", result.ProcessedEvents)
437-
fmt.Printf("Events skipped: %d (duplicates)\n", result.SkippedEvents)
438-
fmt.Printf("Errors: %d\n", result.ErrorEvents)
439-
fmt.Printf("Throughput: %.1f events/sec\n", float64(result.ProcessedEvents)/result.Duration.Seconds())
440-
fmt.Printf("Data processed: %.2f MB\n", float64(result.BytesProcessed)/(1024*1024))
488+
if len(logPaths) > 1 {
489+
fmt.Printf("Workspaces processed: %d\n", len(logPaths))
490+
}
491+
fmt.Printf("Duration: %s\n", totalResult.Duration)
492+
fmt.Printf("Events processed: %d\n", totalResult.ProcessedEvents)
493+
fmt.Printf("Events skipped: %d (duplicates)\n", totalResult.SkippedEvents)
494+
fmt.Printf("Errors: %d\n", totalResult.ErrorEvents)
495+
if totalResult.Duration.Seconds() > 0 {
496+
fmt.Printf("Throughput: %.1f events/sec\n", float64(totalResult.ProcessedEvents)/totalResult.Duration.Seconds())
497+
}
498+
fmt.Printf("Data processed: %.2f MB\n", float64(totalResult.BytesProcessed)/(1024*1024))
441499

442500
return nil
443501
},
@@ -542,6 +600,8 @@ func init() {
542600
backfillRunCmd.Flags().StringP("to", "t", "", "End date (YYYY-MM-DD)")
543601
backfillRunCmd.Flags().IntP("days", "d", 0, "Backfill last N days (alternative to from/to)")
544602
backfillRunCmd.Flags().Bool("dry-run", false, "Preview without processing")
603+
backfillRunCmd.Flags().Bool("all-workspaces", false, "Process all discovered workspaces")
604+
backfillRunCmd.Flags().StringSlice("workspaces", []string{}, "Specific workspace IDs to process (comma-separated)")
545605

546606
// Backfill status flags
547607
backfillStatusCmd.Flags().StringP("agent", "a", "", "Agent name to check")

packages/collector-go/internal/backfill/backfill.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -546,12 +546,19 @@ func (bm *BackfillManager) backfillFileLineByLine(ctx context.Context, config Ba
546546
// processBatch sends a batch of events to the client and buffer
547547
func (bm *BackfillManager) processBatch(ctx context.Context, batch []*types.AgentEvent) error {
548548
for _, event := range batch {
549-
// Try to send immediately
549+
// For backfill operations, buffer events first for reliable storage
550+
// The buffer will be processed by the normal collector sync mechanism
551+
if err := bm.buffer.Store(event); err != nil {
552+
bm.log.Warnf("Failed to buffer event: %v", err)
553+
// Continue to try sending directly as fallback
554+
}
555+
556+
// Also try to send immediately if backend is available
557+
// This is best-effort and failures are acceptable since we've buffered
550558
if err := bm.client.SendEvent(event); err != nil {
551-
// Buffer if send fails
552-
if err := bm.buffer.Store(event); err != nil {
553-
return fmt.Errorf("failed to buffer event: %w", err)
554-
}
559+
// SendEvent currently always returns nil, so this won't catch async send failures
560+
// But we keep it for future compatibility
561+
bm.log.Debugf("Failed to queue event for sending: %v", err)
555562
}
556563
}
557564
return nil

0 commit comments

Comments
 (0)