Skip to content

Commit ad28a85

Browse files
lionelloedw-defang
andauthored
Add --until for BYOC (#1042)
* Use channel merge to sort query log events by time * wip * refactor wip * First check if LogGroup exists * fix * support --until end time * Fix: --until should not tail * Fix for EOF case * PR comments * feat: --until for GCP --------- Co-authored-by: Edward J <[email protected]>
1 parent f6400a9 commit ad28a85

File tree

24 files changed

+1192
-910
lines changed

24 files changed

+1192
-910
lines changed

src/cmd/cli/command/compose.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ func makeComposeLogsCmd() *cobra.Command {
423423
var utc, _ = cmd.Flags().GetBool("utc")
424424
var verbose, _ = cmd.Flags().GetBool("verbose")
425425
var filter, _ = cmd.Flags().GetString("filter")
426+
var until, _ = cmd.Flags().GetString("until")
426427

427428
if !cmd.Flags().Changed("verbose") {
428429
verbose = true // default verbose for explicit tail command
@@ -432,17 +433,27 @@ func makeComposeLogsCmd() *cobra.Command {
432433
os.Setenv("TZ", "") // used by Go's "time" package, see https://pkg.go.dev/time#Location
433434
}
434435

435-
ts, err := cli.ParseTimeOrDuration(since, time.Now())
436+
now := time.Now()
437+
sinceTs, err := cli.ParseTimeOrDuration(since, now)
436438
if err != nil {
437-
return fmt.Errorf("invalid duration or time: %w", err)
439+
return fmt.Errorf("invalid 'since' duration or time: %w", err)
438440
}
441+
sinceTs = sinceTs.UTC()
442+
untilTs, err := cli.ParseTimeOrDuration(until, now)
443+
if err != nil {
444+
return fmt.Errorf("invalid 'until' duration or time: %w", err)
445+
}
446+
untilTs = untilTs.UTC()
439447

440-
ts = ts.UTC()
441-
sinceStr := ""
442-
if pkg.IsValidTime(ts) {
443-
sinceStr = " since " + ts.Format(time.RFC3339Nano) + " "
448+
rangeStr := ""
449+
if pkg.IsValidTime(sinceTs) {
450+
rangeStr = " since " + sinceTs.Format(time.RFC3339Nano)
444451
}
445-
term.Infof("Showing logs%s; press Ctrl+C to stop:", sinceStr)
452+
if pkg.IsValidTime(untilTs) {
453+
rangeStr += " until " + untilTs.Format(time.RFC3339Nano)
454+
}
455+
term.Infof("Showing logs%s; press Ctrl+C to stop:", rangeStr)
456+
446457
services := args
447458
if len(name) > 0 {
448459
services = append(args, strings.Split(name, ",")...) // backwards compat
@@ -465,7 +476,8 @@ func makeComposeLogsCmd() *cobra.Command {
465476
LogType: logType,
466477
Raw: raw,
467478
Services: services,
468-
Since: ts,
479+
Since: sinceTs,
480+
Until: untilTs,
469481
Verbose: verbose,
470482
}
471483

@@ -478,7 +490,8 @@ func makeComposeLogsCmd() *cobra.Command {
478490
logsCmd.Flags().Bool("follow", false, "follow log output") // NOTE: -f is already used by --file
479491
logsCmd.Flags().MarkHidden("follow") // TODO: implement this
480492
logsCmd.Flags().BoolP("raw", "r", false, "show raw (unparsed) logs")
481-
logsCmd.Flags().StringP("since", "S", "", "show logs since duration/time")
493+
logsCmd.Flags().String("since", "", "show logs since duration/time")
494+
logsCmd.Flags().String("until", "", "show logs until duration/time")
482495
logsCmd.Flags().Bool("utc", false, "show logs in UTC timezone (ie. TZ=UTC)")
483496
logsCmd.Flags().Var(&logType, "type", fmt.Sprintf(`show logs of type; one of %v`, logs.AllLogTypes))
484497
logsCmd.Flags().String("filter", "", "only show logs containing given text; case-insensitive")

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -596,67 +596,89 @@ func (b *ByocAws) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRe
596596
}, nil
597597
}
598598

599-
func (b *ByocAws) Query(ctx context.Context, req *defangv1.DebugRequest) error {
599+
func (b *ByocAws) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
600+
// tailRequest := &defangv1.TailRequest{
601+
// Etag: req.Etag,
602+
// Project: req.Project,
603+
// Services: req.Services,
604+
// Since: req.Since,
605+
// Until: req.Until,
606+
// }
607+
600608
// The LogStreamNamePrefix filter can only be used with one service name
601609
var service string
602610
if len(req.Services) == 1 {
603611
service = req.Services[0]
604612
}
605613

606-
since := b.cdStart // TODO: get start time from req.Etag
607-
if since.IsZero() {
608-
since = time.Now().Add(-time.Hour) // TODO: get start time from req
614+
start := b.cdStart // TODO: get start time from req.Etag
615+
if req.Since.IsValid() {
616+
start = req.Since.AsTime()
617+
} else if start.IsZero() {
618+
start = time.Now().Add(-time.Hour)
609619
}
610620

611-
// get stack information
621+
end := time.Now()
622+
if req.Until.IsValid() {
623+
end = req.Until.AsTime()
624+
}
625+
626+
// get stack information (for log group ARN)
612627
err := b.driver.FillOutputs(ctx)
613628
if err != nil {
614629
return AnnotateAwsError(err)
615630
}
616631

617-
const maxQuerySizePerLogGroup = 128 * 1024 // 128KB (to stay well below the 1MB gRPC payload limit)
618-
619632
// Gather logs from the CD task, kaniko, ECS events, and all services
633+
evtsChan, errsChan := ecs.QueryLogGroups(ctx, start, end, b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll)...)
634+
if evtsChan == nil {
635+
return <-errsChan
636+
}
637+
638+
const maxQuerySizePerLogGroup = 128 * 1024 // 128KB per LogGroup (to stay well below the 1MB gRPC payload limit)
639+
620640
sb := strings.Builder{}
621-
for _, lgi := range b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll) {
622-
parseECSEventRecords := strings.HasSuffix(lgi.LogGroupARN, "/ecs")
623-
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
624-
for _, event := range logEvents {
625-
msg := term.StripAnsi(*event.Message)
626-
if parseECSEventRecords {
627-
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
628-
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
629-
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
630-
continue
631-
}
632-
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
633-
continue
634-
}
635-
// This matches the status messages in the Defang Playground Loki logs
636-
sb.WriteString("status=")
637-
sb.WriteString(event.Status())
638-
sb.WriteByte('\n')
641+
loop:
642+
for {
643+
select {
644+
case event, ok := <-evtsChan:
645+
if !ok {
646+
break loop
647+
}
648+
parseECSEventRecords := strings.HasSuffix(*event.LogGroupIdentifier, "/ecs")
649+
if parseECSEventRecords {
650+
if event, err := ecs.ParseECSEvent([]byte(*event.Message)); err == nil {
651+
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
652+
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
639653
continue
640654
}
641-
}
642-
sb.WriteString(msg)
643-
sb.WriteByte('\n')
644-
if sb.Len() > maxQuerySizePerLogGroup {
645-
return errors.New("query result was truncated")
655+
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
656+
continue
657+
}
658+
// This matches the status messages in the Defang Playground Loki logs
659+
sb.WriteString("status=")
660+
sb.WriteString(event.Status())
661+
sb.WriteByte('\n')
662+
continue
646663
}
647664
}
648-
return nil
649-
}); err != nil {
665+
msg := term.StripAnsi(*event.Message)
666+
sb.WriteString(msg)
667+
sb.WriteByte('\n')
668+
if sb.Len() > maxQuerySizePerLogGroup { // FIXME: this limit was supposed to be per LogGroup
669+
term.Warn("Query result was truncated")
670+
break loop
671+
}
672+
case err := <-errsChan:
650673
term.Warn("CloudWatch query error:", AnnotateAwsError(err))
651674
// continue reading other log groups
652675
}
653676
}
654-
655677
req.Logs = sb.String()
656678
return nil
657679
}
658680

659-
func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
681+
func (b *ByocAws) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
660682
// FillOutputs is needed to get the CD task ARN or the LogGroup ARNs
661683
if err := b.driver.FillOutputs(ctx); err != nil {
662684
return nil, AnnotateAwsError(err)
@@ -673,11 +695,11 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
673695
// * Etag, service: tail that task/service
674696
var err error
675697
var taskArn ecs.TaskArn
676-
var eventStream ecs.EventStream
698+
var tailStream ecs.LiveTailStream
677699
stopWhenCDTaskDone := false
678700
logType := logs.LogType(req.LogType)
679701
if etag != "" && !pkg.IsValidRandomID(etag) { // Assume invalid "etag" is a task ID
680-
eventStream, err = b.driver.TailTaskID(ctx, etag)
702+
tailStream, err = b.driver.TailTaskID(ctx, etag)
681703
taskArn, _ = b.driver.GetTaskArn(etag)
682704
term.Debugf("Tailing task %s", *taskArn)
683705
etag = "" // no need to filter by etag
@@ -687,7 +709,14 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
687709
if len(req.Services) == 1 {
688710
service = req.Services[0]
689711
}
690-
eventStream, err = ecs.TailLogGroups(ctx, req.Since.AsTime(), b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
712+
var start, end time.Time
713+
if req.Since.IsValid() {
714+
start = req.Since.AsTime()
715+
}
716+
if req.Until.IsValid() {
717+
end = req.Until.AsTime()
718+
}
719+
tailStream, err = ecs.QueryAndTailLogGroups(ctx, start, end, b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
691720
taskArn = b.cdTaskArn
692721
}
693722
if err != nil {
@@ -709,7 +738,7 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
709738
}()
710739
}
711740

712-
return newByocServerStream(ctx, eventStream, etag, req.GetServices(), b), nil
741+
return newByocServerStream(ctx, tailStream, etag, req.GetServices(), b), nil
713742
}
714743

715744
func (b *ByocAws) makeLogGroupARN(name string) string {

src/pkg/cli/client/byoc/aws/byoc_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestDeploy(t *testing.T) {
4040
func TestTail(t *testing.T) {
4141
b := NewByocProvider(ctx, "TestTail")
4242

43-
ss, err := b.Follow(context.Background(), &defangv1.TailRequest{Project: "byoc_integration_test"})
43+
ss, err := b.QueryLogs(context.Background(), &defangv1.TailRequest{Project: "byoc_integration_test"})
4444
if err != nil {
4545
// the only acceptable error is "unauthorized"
4646
if connect.CodeOf(err) == connect.CodeUnauthenticated {

src/pkg/cli/client/byoc/aws/stream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ type byocServerStream struct {
2424
etag string
2525
response *defangv1.TailResponse
2626
services []string
27-
stream ecs.EventStream
27+
stream ecs.LiveTailStream
2828

2929
ecsEventsHandler ECSEventHandler
3030
}
3131

32-
func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
32+
func newByocServerStream(ctx context.Context, stream ecs.LiveTailStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
3333
return &byocServerStream{
3434
ctx: ctx,
3535
etag: etag,

src/pkg/cli/client/byoc/do/byoc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (b *ByocDo) PutConfig(ctx context.Context, config *defangv1.PutConfigReques
369369
return err
370370
}
371371

372-
func (b *ByocDo) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
372+
func (b *ByocDo) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
373373
var appID, deploymentID string
374374

375375
if req.Etag != "" && req.Etag == b.cdEtag {
@@ -598,7 +598,7 @@ func (s *subscribeStream) Close() error {
598598
return nil
599599
}
600600

601-
func (b *ByocDo) Query(ctx context.Context, req *defangv1.DebugRequest) error {
601+
func (b *ByocDo) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
602602
return client.ErrNotImplemented("AI debugging is not yet supported for DO BYOC")
603603
}
604604

src/pkg/cli/client/byoc/gcp/byoc.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -480,15 +480,15 @@ func (b *ByocGcp) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest)
480480
return subscribeStream, nil
481481
}
482482

483-
func (b *ByocGcp) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
483+
func (b *ByocGcp) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
484484
if b.cdExecution != "" && req.Etag == b.cdExecution { // Only follow CD log, we need to subscribe to cd activities to detect when the job is done
485485
subscribeStream, err := NewSubscribeStream(ctx, b.driver)
486486
if err != nil {
487487
return nil, err
488488
}
489489
subscribeStream.AddJobExecutionUpdate(path.Base(b.cdExecution))
490490
var since time.Time
491-
if req.Since != nil {
491+
if req.Since.IsValid() {
492492
since = req.Since.AsTime()
493493
}
494494
subscribeStream.Start(since)
@@ -520,10 +520,14 @@ func (b *ByocGcp) Follow(ctx context.Context, req *defangv1.TailRequest) (client
520520
}
521521

522522
startTime := time.Now()
523-
if req.Since != nil {
523+
if req.Since.IsValid() {
524524
startTime = req.Since.AsTime()
525525
}
526-
if req.Since != nil || req.Etag != "" {
526+
var endTime time.Time
527+
if req.Until.IsValid() {
528+
endTime = req.Until.AsTime()
529+
}
530+
if req.Since.IsValid() || req.Etag != "" {
527531
execName := path.Base(b.cdExecution)
528532
if execName == "." {
529533
execName = ""
@@ -541,6 +545,7 @@ func (b *ByocGcp) Follow(ctx context.Context, req *defangv1.TailRequest) (client
541545
logStream.AddServiceLog(req.Project, etag, req.Services) // Service logs
542546
}
543547
logStream.AddSince(startTime)
548+
logStream.AddUntil(endTime)
544549
logStream.AddFilter(req.Pattern)
545550
}
546551
logStream.Start(startTime)
@@ -648,10 +653,13 @@ func (b *ByocGcp) PutConfig(ctx context.Context, req *defangv1.PutConfigRequest)
648653
}
649654

650655
func (b *ByocGcp) createDeploymentLogQuery(req *defangv1.DebugRequest) string {
651-
var since time.Time
652-
if req.Since != nil {
656+
var since, until time.Time
657+
if req.Since.IsValid() {
653658
since = req.Since.AsTime()
654659
}
660+
if req.Until.IsValid() {
661+
until = req.Until.AsTime()
662+
}
655663
query := NewLogQuery(b.driver.ProjectId)
656664
if b.cdExecution != "" {
657665
query.AddJobExecutionQuery(path.Base(b.cdExecution))
@@ -662,6 +670,7 @@ func (b *ByocGcp) createDeploymentLogQuery(req *defangv1.DebugRequest) string {
662670
query.AddServiceLogQuery(req.Project, req.Etag, req.Services) // Cloudrun service logs
663671
query.AddCloudBuildLogQuery(req.Project, req.Etag, req.Services) // CloudBuild logs
664672
query.AddSince(since)
673+
query.AddUntil(until)
665674

666675
// Service status updates
667676
query.AddJobStatusUpdateRequestQuery(req.Project, req.Etag, req.Services)
@@ -735,7 +744,7 @@ func (b *ByocGcp) query(ctx context.Context, query string) ([]*loggingpb.LogEntr
735744
return entries, nil
736745
}
737746

738-
func (b *ByocGcp) Query(ctx context.Context, req *defangv1.DebugRequest) error {
747+
func (b *ByocGcp) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
739748
logEntries, err := b.query(ctx, b.createDeploymentLogQuery(req))
740749
if err != nil {
741750
return annotateGcpError(err)

src/pkg/cli/client/byoc/gcp/query.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,13 @@ func (q *Query) AddSince(since time.Time) {
271271
q.baseQuery += fmt.Sprintf(` AND (timestamp >= %q)`, since.UTC().Format(time.RFC3339Nano))
272272
}
273273

274+
func (q *Query) AddUntil(until time.Time) {
275+
if until.IsZero() || until.Unix() <= 0 {
276+
return
277+
}
278+
q.baseQuery += fmt.Sprintf(` AND (timestamp <= %q)`, until.UTC().Format(time.RFC3339Nano))
279+
}
280+
274281
func (q *Query) AddFilter(filter string) {
275282
if filter == "" {
276283
return

src/pkg/cli/client/byoc/gcp/stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ func (s *LogStream) AddSince(start time.Time) {
206206
s.query.AddSince(start)
207207
}
208208

209+
func (s *LogStream) AddUntil(end time.Time) {
210+
s.query.AddUntil(end)
211+
}
212+
209213
func (s *LogStream) AddFilter(filter string) {
210214
s.query.AddFilter(filter)
211215
}

src/pkg/cli/client/client.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,6 @@ import (
99
composeTypes "github.com/compose-spec/compose-go/v2/types"
1010
)
1111

12-
type ServerStream[Res any] interface {
13-
Close() error
14-
Receive() bool
15-
Msg() *Res
16-
Err() error
17-
}
18-
1912
type ProjectLoader interface {
2013
LoadProjectName(context.Context) (string, error)
2114
LoadProject(context.Context) (*composeTypes.Project, error)

0 commit comments

Comments
 (0)