Skip to content

Commit 1e79ebb

Browse files
committed
Backfill: use file-based parsing for Copilot JSON sessions; log parse errors; init client before hierarchy cache
- Detect parsing mode (shouldUseFileParsing) and choose file-based parsing for Copilot .json session files. - Add backfillFileWhole to parse entire JSON session files via adapter.ParseLogFile and process events in batches. - Keep existing line-by-line parsing for NDJSON/text formats (backfillFileLineByLine). - Add verbose parse error logging: emit first N parse errors with sample data and warn when remaining errors are suppressed. - Fix initialization order in backfill command: start API client before creating hierarchy cache and pass apiClient into NewHierarchyCache. Files changed: - packages/collector-go/internal/backfill/backfill.go - packages/collector-go/cmd/collector/main.go
1 parent 4c5a481 commit 1e79ebb

File tree

3 files changed

+315
-96
lines changed

3 files changed

+315
-96
lines changed

packages/collector-go/cmd/collector/main.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,7 @@ var backfillRunCmd = &cobra.Command{
325325
to = time.Now()
326326
}
327327

328-
// Initialize components
329-
hiererchyCache := hierarchy.NewHierarchyCache(nil, log)
330-
registry := adapters.DefaultRegistry(cfg.ProjectID, hiererchyCache, log)
331-
328+
// Initialize buffer
332329
bufferConfig := buffer.Config{
333330
DBPath: cfg.Buffer.DBPath,
334331
MaxSize: cfg.Buffer.MaxSize,
@@ -340,6 +337,7 @@ var backfillRunCmd = &cobra.Command{
340337
}
341338
defer buf.Close()
342339

340+
// Initialize API client
343341
batchInterval, _ := cfg.GetBatchInterval()
344342
clientConfig := client.Config{
345343
BaseURL: cfg.BackendURL,
@@ -353,6 +351,10 @@ var backfillRunCmd = &cobra.Command{
353351
apiClient.Start()
354352
defer apiClient.Stop()
355353

354+
// Initialize hierarchy cache and adapters (needs client)
355+
hiererchyCache := hierarchy.NewHierarchyCache(apiClient, log)
356+
registry := adapters.DefaultRegistry(cfg.ProjectID, hiererchyCache, log)
357+
356358
// Create backfill manager
357359
backfillConfig := backfill.Config{
358360
Registry: registry,

packages/collector-go/internal/backfill/backfill.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,155 @@ func (bm *BackfillManager) backfillFile(ctx context.Context, config BackfillConf
203203
return nil, fmt.Errorf("failed to save state: %w", err)
204204
}
205205

206+
// Determine if we should use file-based or line-based parsing
207+
// Try ParseLogFile first - if adapter doesn't support it, fall back to line-based
208+
useFileParsing := bm.shouldUseFileParsing(adapter, filePath)
209+
210+
if useFileParsing {
211+
return bm.backfillFileWhole(ctx, config, adapter, filePath, state)
212+
}
213+
return bm.backfillFileLineByLine(ctx, config, adapter, filePath, state)
214+
}
215+
216+
// shouldUseFileParsing determines if we should parse the entire file at once
217+
func (bm *BackfillManager) shouldUseFileParsing(adapter adapters.AgentAdapter, filePath string) bool {
218+
// For Copilot chat sessions (JSON files), use file parsing
219+
ext := filepath.Ext(filePath)
220+
adapterName := adapter.Name()
221+
222+
// Copilot uses JSON session files - must use file parsing
223+
if adapterName == "github-copilot" && ext == ".json" {
224+
return true
225+
}
226+
227+
// Other adapters with .jsonl or .ndjson use line parsing
228+
return false
229+
}
230+
231+
// backfillFileWhole parses an entire log file at once (for structured formats like JSON)
232+
func (bm *BackfillManager) backfillFileWhole(ctx context.Context, config BackfillConfig, adapter adapters.AgentAdapter, filePath string, state *BackfillState) (*BackfillResult, error) {
233+
bm.log.Infof("Using file-based parsing for %s", filepath.Base(filePath))
234+
235+
// Get file size for progress tracking
236+
fileInfo, err := os.Stat(filePath)
237+
if err != nil {
238+
state.Status = StatusFailed
239+
state.ErrorMessage = err.Error()
240+
bm.stateStore.Save(state)
241+
return nil, fmt.Errorf("failed to stat file: %w", err)
242+
}
243+
totalBytes := fileInfo.Size()
244+
245+
// Parse entire file
246+
events, err := adapter.ParseLogFile(filePath)
247+
if err != nil {
248+
state.Status = StatusFailed
249+
state.ErrorMessage = fmt.Sprintf("parse error: %v", err)
250+
bm.stateStore.Save(state)
251+
bm.log.Errorf("Failed to parse %s: %v", filepath.Base(filePath), err)
252+
return nil, fmt.Errorf("failed to parse file: %w", err)
253+
}
254+
255+
bm.log.Infof("Parsed %d events from %s", len(events), filepath.Base(filePath))
256+
257+
// Initialize result
258+
result := &BackfillResult{
259+
TotalEvents: len(events),
260+
BytesProcessed: totalBytes,
261+
}
262+
263+
// Filter by date range
264+
var filteredEvents []*types.AgentEvent
265+
for _, event := range events {
266+
// Check context cancellation
267+
select {
268+
case <-ctx.Done():
269+
state.Status = StatusPaused
270+
state.TotalEventsProcessed = result.ProcessedEvents
271+
bm.stateStore.Save(state)
272+
return result, ctx.Err()
273+
default:
274+
}
275+
276+
// Filter by date range
277+
if !config.FromDate.IsZero() && event.Timestamp.Before(config.FromDate) {
278+
result.SkippedEvents++
279+
continue
280+
}
281+
if !config.ToDate.IsZero() && event.Timestamp.After(config.ToDate) {
282+
result.SkippedEvents++
283+
continue
284+
}
285+
286+
// Check for duplicate
287+
if bm.isDuplicate(event) {
288+
result.SkippedEvents++
289+
continue
290+
}
291+
292+
filteredEvents = append(filteredEvents, event)
293+
}
294+
295+
bm.log.Infof("Filtered to %d events (skipped %d)", len(filteredEvents), result.SkippedEvents)
296+
297+
// Process events in batches
298+
if config.BatchSize == 0 {
299+
config.BatchSize = 100
300+
}
301+
302+
for i := 0; i < len(filteredEvents); i += config.BatchSize {
303+
end := i + config.BatchSize
304+
if end > len(filteredEvents) {
305+
end = len(filteredEvents)
306+
}
307+
batch := filteredEvents[i:end]
308+
309+
if !config.DryRun {
310+
if err := bm.processBatch(ctx, batch); err != nil {
311+
bm.log.Warnf("Failed to process batch: %v", err)
312+
result.ErrorEvents += len(batch)
313+
} else {
314+
result.ProcessedEvents += len(batch)
315+
}
316+
} else {
317+
result.ProcessedEvents += len(batch)
318+
}
319+
320+
// Report progress
321+
if config.ProgressCB != nil {
322+
progress := Progress{
323+
AgentName: config.AgentName,
324+
FilePath: filePath,
325+
BytesProcessed: totalBytes * int64(end) / int64(len(filteredEvents)),
326+
TotalBytes: totalBytes,
327+
EventsProcessed: result.ProcessedEvents,
328+
Percentage: float64(end) / float64(len(filteredEvents)) * 100,
329+
}
330+
config.ProgressCB(progress)
331+
}
332+
}
333+
334+
// Mark as completed
335+
now := time.Now()
336+
state.Status = StatusCompleted
337+
state.CompletedAt = &now
338+
state.LastByteOffset = totalBytes
339+
state.TotalEventsProcessed = result.ProcessedEvents
340+
if len(events) > 0 {
341+
state.LastTimestamp = &events[len(events)-1].Timestamp
342+
}
343+
344+
if err := bm.stateStore.Save(state); err != nil {
345+
bm.log.Warnf("Failed to save final state: %v", err)
346+
}
347+
348+
return result, nil
349+
}
350+
351+
// backfillFileLineByLine processes a log file line by line (for NDJSON/text formats)
352+
func (bm *BackfillManager) backfillFileLineByLine(ctx context.Context, config BackfillConfig, adapter adapters.AgentAdapter, filePath string, state *BackfillState) (*BackfillResult, error) {
353+
bm.log.Infof("Using line-based parsing for %s", filepath.Base(filePath))
354+
206355
// Open file
207356
file, err := os.Open(filePath)
208357
if err != nil {
@@ -243,6 +392,8 @@ func (bm *BackfillManager) backfillFile(ctx context.Context, config BackfillConf
243392
batch := make([]*types.AgentEvent, 0, config.BatchSize)
244393
currentOffset := state.LastByteOffset
245394
lastProgressUpdate := time.Now()
395+
errorCount := 0
396+
maxErrorsToLog := 10
246397

247398
// Process lines
248399
lineNum := 0
@@ -266,6 +417,15 @@ func (bm *BackfillManager) backfillFile(ctx context.Context, config BackfillConf
266417
event, err := adapter.ParseLogLine(line)
267418
if err != nil {
268419
result.ErrorEvents++
420+
// Log first N errors with sample data for debugging
421+
if errorCount < maxErrorsToLog {
422+
sampleLine := line
423+
if len(sampleLine) > 200 {
424+
sampleLine = sampleLine[:200] + "..."
425+
}
426+
bm.log.Errorf("Parse error on line %d: %v | Sample: %s", lineNum, err, sampleLine)
427+
}
428+
errorCount++
269429
currentOffset += lineBytes
270430
continue
271431
}
@@ -355,6 +515,11 @@ func (bm *BackfillManager) backfillFile(ctx context.Context, config BackfillConf
355515
}
356516
}
357517

518+
// Log error summary if we stopped logging
519+
if errorCount > maxErrorsToLog {
520+
bm.log.Warnf("Suppressed %d additional parse errors", errorCount-maxErrorsToLog)
521+
}
522+
358523
// Check for scanner errors
359524
if err := scanner.Err(); err != nil {
360525
state.Status = StatusFailed

0 commit comments

Comments
 (0)