Skip to content

Commit 6a58940

Browse files
committed
Merge remote-tracking branch 'origin/edw/sort-log-query-by-time' into lio/log-until
2 parents 8d1be6e + 980a4dc commit 6a58940

File tree

1 file changed

+80
-25
lines changed

1 file changed

+80
-25
lines changed

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -618,44 +618,99 @@ func (b *ByocAws) Query(ctx context.Context, req *defangv1.DebugRequest) error {
618618

619619
// Gather logs from the CD task, kaniko, ECS events, and all services
620620
sb := strings.Builder{}
621+
var evtsChan chan ecs.LogEvent
622+
errsChan := make(chan error)
621623
for _, lgi := range b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll) {
622-
parseECSEventRecords := strings.HasSuffix(lgi.LogGroupARN, "/ecs")
623-
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
624-
for _, event := range logEvents {
625-
msg := term.StripAnsi(*event.Message)
626-
if parseECSEventRecords {
627-
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
628-
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
629-
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
630-
continue
631-
}
632-
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
633-
continue
634-
}
635-
// This matches the status messages in the Defang Playground Loki logs
636-
sb.WriteString("status=")
637-
sb.WriteString(event.Status())
638-
sb.WriteByte('\n')
624+
lgEvtChan := make(chan ecs.LogEvent)
625+
// Start a go routine for each log group
626+
go func(lgi ecs.LogGroupInput) {
627+
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
628+
for _, event := range logEvents {
629+
lgEvtChan <- event
630+
}
631+
return nil
632+
}); err != nil {
633+
errsChan <- err
634+
}
635+
close(lgEvtChan)
636+
}(lgi)
637+
evtsChan = mergeLogEventChan(evtsChan, lgEvtChan) // Merge sort the log events base one timestamp
638+
}
639+
640+
loop:
641+
for {
642+
select {
643+
case evt, ok := <-evtsChan:
644+
if !ok {
645+
break loop
646+
}
647+
parseECSEventRecords := strings.HasSuffix(*evt.LogGroupIdentifier, "/ecs")
648+
msg := term.StripAnsi(*evt.Message)
649+
if parseECSEventRecords {
650+
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
651+
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
652+
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
639653
continue
640654
}
641-
}
642-
sb.WriteString(msg)
643-
sb.WriteByte('\n')
644-
if sb.Len() > maxQuerySizePerLogGroup {
645-
return errors.New("query result was truncated")
655+
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
656+
continue
657+
}
658+
// This matches the status messages in the Defang Playground Loki logs
659+
sb.WriteString("status=")
660+
sb.WriteString(event.Status())
661+
sb.WriteByte('\n')
662+
continue
646663
}
647664
}
648-
return nil
649-
}); err != nil {
665+
sb.WriteString(msg)
666+
sb.WriteByte('\n')
667+
if sb.Len() > maxQuerySizePerLogGroup {
668+
term.Warn("Query result was truncated")
669+
break loop
670+
}
671+
case err := <-errsChan:
650672
term.Warn("CloudWatch query error:", AnnotateAwsError(err))
651673
// continue reading other log groups
652674
}
653675
}
654-
655676
req.Logs = sb.String()
656677
return nil
657678
}
658679

680+
// Inspired by https://dev.to/vinaygo/concurrency-merge-sort-using-channels-and-goroutines-in-golang-35f7
681+
func mergeLogEventChan(a, b chan ecs.LogEvent) chan ecs.LogEvent {
682+
if a == nil {
683+
return b
684+
}
685+
if b == nil {
686+
return a
687+
}
688+
out := make(chan ecs.LogEvent)
689+
go func() {
690+
defer close(out)
691+
valA, okA := <-a
692+
valB, okB := <-b
693+
for okA && okB {
694+
if *valA.Timestamp < *valB.Timestamp {
695+
out <- valA
696+
valA, okA = <-a
697+
} else {
698+
out <- valB
699+
valB, okB = <-b
700+
}
701+
}
702+
for okA {
703+
out <- valA
704+
valA, okA = <-a
705+
}
706+
for okB {
707+
out <- valB
708+
valB, okB = <-b
709+
}
710+
}()
711+
return out
712+
}
713+
659714
func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
660715
// FillOutputs is needed to get the CD task ARN or the LogGroup ARNs
661716
if err := b.driver.FillOutputs(ctx); err != nil {

0 commit comments

Comments
 (0)