Skip to content

Commit 7ca3fff

Browse files
author
Marvin Zhang
committed
feat: Enhance historical data synchronization with progress display and dynamic workspace detection
1 parent 4435018 commit 7ca3fff

File tree

3 files changed

+338
-14
lines changed

3 files changed

+338
-14
lines changed

cmd/devlog/main.go

Lines changed: 172 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"os/signal"
8+
"path/filepath"
89
"strings"
910
"syscall"
1011
"time"
@@ -198,12 +199,23 @@ watching. Use --no-history to skip historical sync (for power users).`,
198199

199200
totalSynced := 0
200201
totalSkipped := 0
202+
totalSources := 0
203+
currentSource := 0
204+
205+
// Count total sources first
206+
for _, logs := range discovered {
207+
totalSources += len(logs)
208+
}
209+
210+
syncStartTime := time.Now()
201211

202212
for agentName, logs := range discovered {
203213
adapterName := mapAgentName(agentName)
204214

205215
for _, logInfo := range logs {
206-
log.Infof("Syncing historical data for %s: %s", agentName, logInfo.Path)
216+
currentSource++
217+
// Show progress
218+
fmt.Printf("\r🔄 Syncing [%d/%d]: %s...", currentSource, totalSources, filepath.Base(filepath.Dir(logInfo.Path)))
207219

208220
bfConfig := backfill.BackfillConfig{
209221
AgentName: adapterName,
@@ -224,7 +236,10 @@ watching. Use --no-history to skip historical sync (for power users).`,
224236
}
225237
}
226238

227-
log.Infof("Historical sync complete: %d events synced, %d skipped (already synced)", totalSynced, totalSkipped)
239+
syncDuration := time.Since(syncStartTime)
240+
fmt.Println() // New line after progress
241+
log.Infof("✅ Historical sync complete in %s: %d events synced, %d skipped (already synced)",
242+
syncDuration.Round(time.Millisecond), totalSynced, totalSkipped)
228243
}
229244
} else {
230245
log.Info("Skipping historical sync (--no-history flag)")
@@ -244,8 +259,25 @@ watching. Use --no-history to skip historical sync (for power users).`,
244259
log.Warnf("Failed to watch %s: %v", logInfo.Path, err)
245260
}
246261
}
262+
263+
// Also watch parent directories for new workspace detection
264+
var parentPaths []string
265+
for _, logInfo := range logs {
266+
parentPaths = append(parentPaths, logInfo.Path)
267+
}
268+
if err := fileWatcher.WatchParentDirs(parentPaths, adapterInstance); err != nil {
269+
log.Warnf("Failed to watch parent dirs for %s: %v", agentName, err)
270+
}
247271
}
248272

273+
// Start dynamic workspace discovery (check every 60 seconds for new workspaces)
274+
fileWatcher.StartDynamicDiscovery(60*time.Second, func(path string, adapter adapters.AgentAdapter) {
275+
log.Infof("New workspace discovered: %s", path)
276+
if err := fileWatcher.Watch(path, adapter); err != nil {
277+
log.Warnf("Failed to watch new workspace %s: %v", path, err)
278+
}
279+
})
280+
249281
// Process events from watcher to client
250282
go func() {
251283
for {
@@ -340,11 +372,144 @@ var versionCmd = &cobra.Command{
340372

341373
var statusCmd = &cobra.Command{
342374
Use: "status",
343-
Short: "Check collector status",
344-
Run: func(cmd *cobra.Command, args []string) {
345-
fmt.Println("Checking collector status...")
346-
// TODO: Connect to health check endpoint
347-
fmt.Println("Status: Not implemented yet")
375+
Short: "Check collector status and sync state",
376+
Long: `Display the current status of the collector, including:
377+
- Configuration status
378+
- Backend connectivity
379+
- Sync state for all discovered agents
380+
- Buffer state (pending events)`,
381+
RunE: func(cmd *cobra.Command, args []string) error {
382+
fmt.Println("📊 Devlog Collector Status")
383+
fmt.Println("==========================")
384+
fmt.Println()
385+
386+
// Load configuration
387+
cfg, err := config.LoadConfig(configPath)
388+
if err != nil {
389+
fmt.Printf("⚠️ Configuration: Failed to load (%v)\n", err)
390+
} else {
391+
fmt.Printf("✅ Configuration: Loaded from %s\n", configPath)
392+
fmt.Printf(" Backend URL: %s\n", cfg.BackendURL)
393+
fmt.Printf(" Project ID: %s\n", cfg.ProjectID)
394+
}
395+
fmt.Println()
396+
397+
// Check backend connectivity
398+
if cfg != nil {
399+
batchInterval, _ := cfg.GetBatchInterval()
400+
clientConfig := client.Config{
401+
BaseURL: cfg.BackendURL,
402+
APIKey: cfg.APIKey,
403+
BatchSize: cfg.Collection.BatchSize,
404+
BatchDelay: batchInterval,
405+
MaxRetries: cfg.Collection.MaxRetries,
406+
Logger: log,
407+
}
408+
apiClient := client.NewClient(clientConfig)
409+
if err := apiClient.HealthCheck(); err != nil {
410+
fmt.Printf("❌ Backend: Unreachable (%v)\n", err)
411+
} else {
412+
fmt.Printf("✅ Backend: Connected\n")
413+
}
414+
}
415+
fmt.Println()
416+
417+
// Check buffer state
418+
if cfg != nil {
419+
bufferConfig := buffer.Config{
420+
DBPath: cfg.Buffer.DBPath,
421+
MaxSize: cfg.Buffer.MaxSize,
422+
Logger: log,
423+
}
424+
buf, err := buffer.NewBuffer(bufferConfig)
425+
if err == nil {
426+
defer buf.Close()
427+
count, _ := buf.Count()
428+
if count > 0 {
429+
fmt.Printf("📦 Buffer: %d events pending\n", count)
430+
} else {
431+
fmt.Printf("📦 Buffer: Empty (all events synced)\n")
432+
}
433+
}
434+
}
435+
fmt.Println()
436+
437+
// Discover all agent logs and show sync status
438+
fmt.Println("🤖 Agents:")
439+
discovered, err := watcher.DiscoverAllAgentLogs()
440+
if err != nil {
441+
fmt.Printf(" ⚠️ Failed to discover agents: %v\n", err)
442+
return nil
443+
}
444+
445+
if len(discovered) == 0 {
446+
fmt.Println(" No agent logs discovered")
447+
return nil
448+
}
449+
450+
// Get sync state from backfill manager
451+
var manager *backfill.BackfillManager
452+
if cfg != nil {
453+
backfillConfig := backfill.Config{
454+
StateDBPath: cfg.Buffer.DBPath,
455+
Logger: log,
456+
}
457+
manager, err = backfill.NewBackfillManager(backfillConfig)
458+
if err != nil {
459+
log.Debugf("Failed to create backfill manager: %v", err)
460+
} else {
461+
defer manager.Close()
462+
}
463+
}
464+
465+
for agentName, logs := range discovered {
466+
adapterName := mapAgentName(agentName)
467+
fmt.Printf("\n %s (%d log sources)\n", agentName, len(logs))
468+
469+
if manager == nil {
470+
for _, logInfo := range logs {
471+
fmt.Printf(" 📁 %s\n", logInfo.Path)
472+
}
473+
continue
474+
}
475+
476+
states, err := manager.Status(adapterName)
477+
if err != nil {
478+
continue
479+
}
480+
481+
stateMap := make(map[string]*backfill.BackfillState)
482+
for _, state := range states {
483+
stateMap[state.LogFilePath] = state
484+
}
485+
486+
for _, logInfo := range logs {
487+
state, exists := stateMap[logInfo.Path]
488+
if !exists {
489+
fmt.Printf(" 📁 %s - ⏳ pending\n", filepath.Base(filepath.Dir(logInfo.Path)))
490+
continue
491+
}
492+
493+
switch state.Status {
494+
case backfill.StatusCompleted:
495+
fmt.Printf(" 📁 %s - ✅ synced (%d events)\n",
496+
filepath.Base(filepath.Dir(logInfo.Path)), state.TotalEventsProcessed)
497+
case backfill.StatusInProgress:
498+
fmt.Printf(" 📁 %s - 🔄 syncing (%d events)\n",
499+
filepath.Base(filepath.Dir(logInfo.Path)), state.TotalEventsProcessed)
500+
case backfill.StatusFailed:
501+
fmt.Printf(" 📁 %s - ❌ failed: %s\n",
502+
filepath.Base(filepath.Dir(logInfo.Path)), state.ErrorMessage)
503+
default:
504+
fmt.Printf(" 📁 %s - ⏳ pending\n", filepath.Base(filepath.Dir(logInfo.Path)))
505+
}
506+
}
507+
}
508+
509+
fmt.Println()
510+
fmt.Println("💡 Tip: Run 'devlog-collector start' to begin collecting events")
511+
512+
return nil
348513
},
349514
}
350515

0 commit comments

Comments
 (0)