diff --git a/src/pkg/clouds/aws/ecs/logs.go b/src/pkg/clouds/aws/ecs/logs.go index 12f6e731d..1ae30952e 100644 --- a/src/pkg/clouds/aws/ecs/logs.go +++ b/src/pkg/clouds/aws/ecs/logs.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "strings" - "sync" "time" "github.com/DefangLabs/defang/src/pkg/clouds/aws" @@ -29,39 +28,40 @@ func getLogGroupIdentifier(arnOrId string) string { func QueryAndTailLogGroups(ctx context.Context, start, end time.Time, logGroups ...LogGroupInput) (LiveTailStream, error) { ctx, cancel := context.WithCancel(ctx) - e := &eventStream{ - cancel: cancel, - ch: make(chan types.StartLiveTailResponseStream), - } + errCh := make(chan error) - // We must close the channel when all log groups are done - var wg sync.WaitGroup - var err error + var eventCh chan LogEvent for _, lgi := range logGroups { - var es LiveTailStream - es, err = QueryAndTailLogGroup(ctx, lgi, start, end) + es, err := QueryAndTailLogGroup(ctx, lgi, start, end) if err != nil { - break // abort if there is any fatal error + cancel() + return nil, err } - wg.Add(1) - go func() { - defer es.Close() - defer wg.Done() - // FIXME: this should *merge* the events from all log groups - e.err = e.pipeEvents(ctx, es) - }() + newCh := LiveTailStreamToChannel(ctx, es, errCh) + eventCh = mergeLogEventChan(eventCh, newCh) } - go func() { - wg.Wait() - close(e.ch) - }() - - if err != nil { - cancel() // abort any goroutines (caller won't call Close) - return nil, err + e := &eventStream{ + cancel: cancel, + ch: make(chan types.StartLiveTailResponseStream), } + go func() { + defer close(e.ch) + for { + select { + case event := <-eventCh: + e.ch <- &types.StartLiveTailResponseStreamMemberSessionUpdate{ + Value: types.LiveTailSessionUpdate{SessionResults: []types.LiveTailSessionLogEvent{event}}, + } + case err := <-errCh: + e.err = err + return // defered close of e.ch will unblock the caller to pick up the error + case <-ctx.Done(): + return + } + } + }() return e, nil } diff --git a/src/pkg/clouds/aws/ecs/stream.go b/src/pkg/clouds/aws/ecs/stream.go index 1ef92730b..9beaaf079 100644 --- a/src/pkg/clouds/aws/ecs/stream.go +++ b/src/pkg/clouds/aws/ecs/stream.go @@ -143,3 +143,42 @@ func (es *eventStream) pipeEvents(ctx context.Context, tailStream LiveTailStream } } } + +func LiveTailStreamToChannel(ctx context.Context, tailStream LiveTailStream, errCh chan<- error) chan LogEvent { + eventCh := make(chan LogEvent) + go func() { + defer close(eventCh) + defer tailStream.Close() + for { + // Double select to make sure context cancellation is not blocked by either the receive or send + // See: https://stackoverflow.com/questions/60030756/what-does-it-mean-when-one-channel-uses-two-arrows-to-write-to-another-channel + select { + case stream := <-tailStream.Events(): // blocking + if err := tailStream.Err(); err != nil { + errCh <- err + return + } + if stream == nil { + continue + } + events, err := GetLogEvents(stream) + if err != nil { + errCh <- err + return + } + for _, event := range events { + select { + case eventCh <- event: + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + } + case <-ctx.Done(): // blocking + errCh <- ctx.Err() + return + } + } + }() + return eventCh +} diff --git a/src/protos/io/defang/v1/fabric.proto b/src/protos/io/defang/v1/fabric.proto index c0183d3a7..13ca97997 100644 --- a/src/protos/io/defang/v1/fabric.proto +++ b/src/protos/io/defang/v1/fabric.proto @@ -245,7 +245,6 @@ message GenerateFilesRequest { bool agree_tos = 3; bool training_opt_out = 4; // only valid for Pro users string model_id = 5; // only valid for Pro users - } message File {