Skip to content

Commit a52dee4

Browse files
committed
refactor wip
1 parent 523bbde commit a52dee4

File tree

8 files changed

+384
-382
lines changed

8 files changed

+384
-382
lines changed

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 30 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -597,57 +597,57 @@ func (b *ByocAws) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRe
597597
}
598598

599599
func (b *ByocAws) Query(ctx context.Context, req *defangv1.DebugRequest) error {
600+
// tailRequest := &defangv1.TailRequest{
601+
// Etag: req.Etag,
602+
// Project: req.Project,
603+
// Services: req.Services,
604+
// Since: req.Since,
605+
// Until: req.Until,
606+
// }
607+
600608
// The LogStreamNamePrefix filter can only be used with one service name
601609
var service string
602610
if len(req.Services) == 1 {
603611
service = req.Services[0]
604612
}
605613

606-
since := b.cdStart // TODO: get start time from req.Etag
607-
if since.IsZero() {
608-
since = time.Now().Add(-time.Hour) // TODO: get start time from req
614+
start := b.cdStart // TODO: get start time from req.Etag
615+
if req.Since.IsValid() {
616+
start = req.Since.AsTime()
617+
} else if start.IsZero() {
618+
start = time.Now().Add(-time.Hour)
619+
}
620+
621+
end := time.Now()
622+
if req.Until.IsValid() {
623+
end = req.Until.AsTime()
609624
}
610625

611-
// get stack information
626+
// get stack information (for log group ARN)
612627
err := b.driver.FillOutputs(ctx)
613628
if err != nil {
614629
return AnnotateAwsError(err)
615630
}
616631

617-
const maxQuerySizePerLogGroup = 128 * 1024 // 128KB (to stay well below the 1MB gRPC payload limit)
618-
619632
// Gather logs from the CD task, kaniko, ECS events, and all services
620-
sb := strings.Builder{}
621-
var evtsChan chan ecs.LogEvent
622-
errsChan := make(chan error)
623-
for _, lgi := range b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll) {
624-
lgEvtChan := make(chan ecs.LogEvent)
625-
// Start a go routine for each log group
626-
go func(lgi ecs.LogGroupInput) {
627-
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
628-
for _, event := range logEvents {
629-
lgEvtChan <- event
630-
}
631-
return nil
632-
}); err != nil {
633-
errsChan <- err
634-
}
635-
close(lgEvtChan)
636-
}(lgi)
637-
evtsChan = mergeLogEventChan(evtsChan, lgEvtChan) // Merge sort the log events based on timestamp
633+
evtsChan, errsChan := ecs.QueryLogGroups(ctx, start, end, b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll)...)
634+
if evtsChan == nil {
635+
return <-errsChan
638636
}
639637

638+
const maxQuerySizePerLogGroup = 128 * 1024 // 128KB per LogGroup (to stay well below the 1MB gRPC payload limit)
639+
640+
sb := strings.Builder{}
640641
loop:
641642
for {
642643
select {
643-
case evt, ok := <-evtsChan:
644+
case event, ok := <-evtsChan:
644645
if !ok {
645646
break loop
646647
}
647-
parseECSEventRecords := strings.HasSuffix(*evt.LogGroupIdentifier, "/ecs")
648-
msg := term.StripAnsi(*evt.Message)
648+
parseECSEventRecords := strings.HasSuffix(*event.LogGroupIdentifier, "/ecs")
649649
if parseECSEventRecords {
650-
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
650+
if event, err := ecs.ParseECSEvent([]byte(*event.Message)); err == nil {
651651
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
652652
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
653653
continue
@@ -662,9 +662,10 @@ loop:
662662
continue
663663
}
664664
}
665+
msg := term.StripAnsi(*event.Message)
665666
sb.WriteString(msg)
666667
sb.WriteByte('\n')
667-
if sb.Len() > maxQuerySizePerLogGroup {
668+
if sb.Len() > maxQuerySizePerLogGroup { // FIXME: this limit was supposed to be per LogGroup
668669
term.Warn("Query result was truncated")
669670
break loop
670671
}
@@ -677,44 +678,6 @@ loop:
677678
return nil
678679
}
679680

680-
// Inspired by https://dev.to/vinaygo/concurrency-merge-sort-using-channels-and-goroutines-in-golang-35f7
681-
func mergech[T any](left chan T, right chan T, c chan T, less func(T, T) bool) {
682-
defer close(c)
683-
val, ok := <-left
684-
val2, ok2 := <-right
685-
for ok && ok2 {
686-
if less(val, val2) {
687-
c <- val
688-
val, ok = <-left
689-
} else {
690-
c <- val2
691-
val2, ok2 = <-right
692-
}
693-
}
694-
for ok {
695-
c <- val
696-
val, ok = <-left
697-
}
698-
for ok2 {
699-
c <- val2
700-
val2, ok2 = <-right
701-
}
702-
}
703-
704-
func mergeLogEventChan(left, right chan ecs.LogEvent) chan ecs.LogEvent {
705-
if left == nil {
706-
return right
707-
}
708-
if right == nil {
709-
return left
710-
}
711-
out := make(chan ecs.LogEvent)
712-
go mergech(left, right, out, func(i1, i2 ecs.LogEvent) bool {
713-
return *i1.Timestamp < *i2.Timestamp
714-
})
715-
return out
716-
}
717-
718681
func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
719682
// FillOutputs is needed to get the CD task ARN or the LogGroup ARNs
720683
if err := b.driver.FillOutputs(ctx); err != nil {

src/pkg/cli/client/client.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,6 @@ import (
99
composeTypes "github.com/compose-spec/compose-go/v2/types"
1010
)
1111

12-
type ServerStream[Res any] interface {
13-
Close() error
14-
Receive() bool
15-
Msg() *Res
16-
Err() error
17-
}
18-
1912
type ProjectLoader interface {
2013
LoadProjectName(context.Context) (string, error)
2114
LoadProject(context.Context) (*composeTypes.Project, error)

src/pkg/cli/client/provider.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@ type PrepareDomainDelegationResponse struct {
119119
DelegationSetId string
120120
}
121121

122+
type ServerStream[Res any] interface {
123+
Close() error
124+
Receive() bool
125+
Msg() *Res
126+
Err() error
127+
}
128+
122129
type Provider interface {
123130
AccountInfo(context.Context) (AccountInfo, error)
124131
BootstrapCommand(context.Context, BootstrapCommandRequest) (types.ETag, error)

0 commit comments

Comments
 (0)