Skip to content

Commit 980a4dc

Browse files
committed
Use channel merge to sort query log events by time
1 parent 781d168 commit 980a4dc

File tree

1 file changed

+80
-25
lines changed

1 file changed

+80
-25
lines changed

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -623,44 +623,99 @@ func (b *ByocAws) Query(ctx context.Context, req *defangv1.DebugRequest) error {
623623

624624
// Gather logs from the CD task, kaniko, ECS events, and all services
625625
sb := strings.Builder{}
626+
var evtsChan chan ecs.LogEvent
627+
errsChan := make(chan error)
626628
for _, lgi := range b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll) {
627-
parseECSEventRecords := strings.HasSuffix(lgi.LogGroupARN, "/ecs")
628-
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
629-
for _, event := range logEvents {
630-
msg := term.StripAnsi(*event.Message)
631-
if parseECSEventRecords {
632-
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
633-
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
634-
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
635-
continue
636-
}
637-
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
638-
continue
639-
}
640-
// This matches the status messages in the Defang Playground Loki logs
641-
sb.WriteString("status=")
642-
sb.WriteString(event.Status())
643-
sb.WriteByte('\n')
629+
lgEvtChan := make(chan ecs.LogEvent)
630+
// Start a go routine for each log group
631+
go func(lgi ecs.LogGroupInput) {
632+
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
633+
for _, event := range logEvents {
634+
lgEvtChan <- event
635+
}
636+
return nil
637+
}); err != nil {
638+
errsChan <- err
639+
}
640+
close(lgEvtChan)
641+
}(lgi)
642+
evtsChan = mergeLogEventChan(evtsChan, lgEvtChan) // Merge sort the log events base one timestamp
643+
}
644+
645+
loop:
646+
for {
647+
select {
648+
case evt, ok := <-evtsChan:
649+
if !ok {
650+
break loop
651+
}
652+
parseECSEventRecords := strings.HasSuffix(*evt.LogGroupIdentifier, "/ecs")
653+
msg := term.StripAnsi(*evt.Message)
654+
if parseECSEventRecords {
655+
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
656+
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
657+
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
644658
continue
645659
}
646-
}
647-
sb.WriteString(msg)
648-
sb.WriteByte('\n')
649-
if sb.Len() > maxQuerySizePerLogGroup {
650-
return errors.New("query result was truncated")
660+
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
661+
continue
662+
}
663+
// This matches the status messages in the Defang Playground Loki logs
664+
sb.WriteString("status=")
665+
sb.WriteString(event.Status())
666+
sb.WriteByte('\n')
667+
continue
651668
}
652669
}
653-
return nil
654-
}); err != nil {
670+
sb.WriteString(msg)
671+
sb.WriteByte('\n')
672+
if sb.Len() > maxQuerySizePerLogGroup {
673+
term.Warn("Query result was truncated")
674+
break loop
675+
}
676+
case err := <-errsChan:
655677
term.Warn("CloudWatch query error:", AnnotateAwsError(err))
656678
// continue reading other log groups
657679
}
658680
}
659-
660681
req.Logs = sb.String()
661682
return nil
662683
}
663684

685+
// Inspired by https://dev.to/vinaygo/concurrency-merge-sort-using-channels-and-goroutines-in-golang-35f7
686+
func mergeLogEventChan(a, b chan ecs.LogEvent) chan ecs.LogEvent {
687+
if a == nil {
688+
return b
689+
}
690+
if b == nil {
691+
return a
692+
}
693+
out := make(chan ecs.LogEvent)
694+
go func() {
695+
defer close(out)
696+
valA, okA := <-a
697+
valB, okB := <-b
698+
for okA && okB {
699+
if *valA.Timestamp < *valB.Timestamp {
700+
out <- valA
701+
valA, okA = <-a
702+
} else {
703+
out <- valB
704+
valB, okB = <-b
705+
}
706+
}
707+
for okA {
708+
out <- valA
709+
valA, okA = <-a
710+
}
711+
for okB {
712+
out <- valB
713+
valB, okB = <-b
714+
}
715+
}()
716+
return out
717+
}
718+
664719
func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
665720
// FillOutputs is needed to get the CD task ARN or the LogGroup ARNs
666721
if err := b.driver.FillOutputs(ctx); err != nil {

0 commit comments

Comments
 (0)