Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions src/cmd/cli/command/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func makeComposeLogsCmd() *cobra.Command {
var utc, _ = cmd.Flags().GetBool("utc")
var verbose, _ = cmd.Flags().GetBool("verbose")
var filter, _ = cmd.Flags().GetString("filter")
var until, _ = cmd.Flags().GetString("until")

if !cmd.Flags().Changed("verbose") {
verbose = true // default verbose for explicit tail command
Expand All @@ -432,17 +433,27 @@ func makeComposeLogsCmd() *cobra.Command {
os.Setenv("TZ", "") // used by Go's "time" package, see https://pkg.go.dev/time#Location
}

ts, err := cli.ParseTimeOrDuration(since, time.Now())
now := time.Now()
sinceTs, err := cli.ParseTimeOrDuration(since, now)
if err != nil {
return fmt.Errorf("invalid duration or time: %w", err)
return fmt.Errorf("invalid 'since' duration or time: %w", err)
}
sinceTs = sinceTs.UTC()
untilTs, err := cli.ParseTimeOrDuration(until, now)
if err != nil {
return fmt.Errorf("invalid 'until' duration or time: %w", err)
}
untilTs = untilTs.UTC()

ts = ts.UTC()
sinceStr := ""
if pkg.IsValidTime(ts) {
sinceStr = " since " + ts.Format(time.RFC3339Nano) + " "
rangeStr := ""
if pkg.IsValidTime(sinceTs) {
rangeStr = " since " + sinceTs.Format(time.RFC3339Nano)
}
term.Infof("Showing logs%s; press Ctrl+C to stop:", sinceStr)
if pkg.IsValidTime(untilTs) {
rangeStr += " until " + untilTs.Format(time.RFC3339Nano)
}
term.Infof("Showing logs%s; press Ctrl+C to stop:", rangeStr)

services := args
if len(name) > 0 {
services = append(args, strings.Split(name, ",")...) // backwards compat
Expand All @@ -465,7 +476,8 @@ func makeComposeLogsCmd() *cobra.Command {
LogType: logType,
Raw: raw,
Services: services,
Since: ts,
Since: sinceTs,
Until: untilTs,
Verbose: verbose,
}

Expand All @@ -478,7 +490,8 @@ func makeComposeLogsCmd() *cobra.Command {
logsCmd.Flags().Bool("follow", false, "follow log output") // NOTE: -f is already used by --file
logsCmd.Flags().MarkHidden("follow") // TODO: implement this
logsCmd.Flags().BoolP("raw", "r", false, "show raw (unparsed) logs")
logsCmd.Flags().StringP("since", "S", "", "show logs since duration/time")
logsCmd.Flags().String("since", "", "show logs since duration/time")
logsCmd.Flags().String("until", "", "show logs until duration/time")
logsCmd.Flags().Bool("utc", false, "show logs in UTC timezone (ie. TZ=UTC)")
logsCmd.Flags().Var(&logType, "type", fmt.Sprintf(`show logs of type; one of %v`, logs.AllLogTypes))
logsCmd.Flags().String("filter", "", "only show logs containing given text; case-insensitive")
Expand Down
105 changes: 67 additions & 38 deletions src/pkg/cli/client/byoc/aws/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,67 +596,89 @@ func (b *ByocAws) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRe
}, nil
}

func (b *ByocAws) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (b *ByocAws) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
// tailRequest := &defangv1.TailRequest{
// Etag: req.Etag,
// Project: req.Project,
// Services: req.Services,
// Since: req.Since,
// Until: req.Until,
// }

// The LogStreamNamePrefix filter can only be used with one service name
var service string
if len(req.Services) == 1 {
service = req.Services[0]
}

since := b.cdStart // TODO: get start time from req.Etag
if since.IsZero() {
since = time.Now().Add(-time.Hour) // TODO: get start time from req
start := b.cdStart // TODO: get start time from req.Etag
if req.Since.IsValid() {
start = req.Since.AsTime()
} else if start.IsZero() {
start = time.Now().Add(-time.Hour)
}

// get stack information
end := time.Now()
if req.Until.IsValid() {
end = req.Until.AsTime()
}

// get stack information (for log group ARN)
err := b.driver.FillOutputs(ctx)
if err != nil {
return AnnotateAwsError(err)
}

const maxQuerySizePerLogGroup = 128 * 1024 // 128KB (to stay well below the 1MB gRPC payload limit)

// Gather logs from the CD task, kaniko, ECS events, and all services
evtsChan, errsChan := ecs.QueryLogGroups(ctx, start, end, b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll)...)
if evtsChan == nil {
return <-errsChan
}

const maxQuerySizePerLogGroup = 128 * 1024 // 128KB per LogGroup (to stay well below the 1MB gRPC payload limit)

sb := strings.Builder{}
for _, lgi := range b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll) {
parseECSEventRecords := strings.HasSuffix(lgi.LogGroupARN, "/ecs")
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
for _, event := range logEvents {
msg := term.StripAnsi(*event.Message)
if parseECSEventRecords {
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
continue
}
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
continue
}
// This matches the status messages in the Defang Playground Loki logs
sb.WriteString("status=")
sb.WriteString(event.Status())
sb.WriteByte('\n')
loop:
for {
select {
case event, ok := <-evtsChan:
if !ok {
break loop
}
parseECSEventRecords := strings.HasSuffix(*event.LogGroupIdentifier, "/ecs")
if parseECSEventRecords {
if event, err := ecs.ParseECSEvent([]byte(*event.Message)); err == nil {
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
continue
}
}
sb.WriteString(msg)
sb.WriteByte('\n')
if sb.Len() > maxQuerySizePerLogGroup {
return errors.New("query result was truncated")
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
continue
}
// This matches the status messages in the Defang Playground Loki logs
sb.WriteString("status=")
sb.WriteString(event.Status())
sb.WriteByte('\n')
continue
}
}
return nil
}); err != nil {
msg := term.StripAnsi(*event.Message)
sb.WriteString(msg)
sb.WriteByte('\n')
if sb.Len() > maxQuerySizePerLogGroup { // FIXME: this limit was supposed to be per LogGroup
term.Warn("Query result was truncated")
break loop
}
case err := <-errsChan:
term.Warn("CloudWatch query error:", AnnotateAwsError(err))
// continue reading other log groups
}
}

req.Logs = sb.String()
return nil
}

func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
func (b *ByocAws) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
// FillOutputs is needed to get the CD task ARN or the LogGroup ARNs
if err := b.driver.FillOutputs(ctx); err != nil {
return nil, AnnotateAwsError(err)
Expand All @@ -673,11 +695,11 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
// * Etag, service: tail that task/service
var err error
var taskArn ecs.TaskArn
var eventStream ecs.EventStream
var tailStream ecs.LiveTailStream
stopWhenCDTaskDone := false
logType := logs.LogType(req.LogType)
if etag != "" && !pkg.IsValidRandomID(etag) { // Assume invalid "etag" is a task ID
eventStream, err = b.driver.TailTaskID(ctx, etag)
tailStream, err = b.driver.TailTaskID(ctx, etag)
taskArn, _ = b.driver.GetTaskArn(etag)
term.Debugf("Tailing task %s", *taskArn)
etag = "" // no need to filter by etag
Expand All @@ -687,7 +709,14 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
if len(req.Services) == 1 {
service = req.Services[0]
}
eventStream, err = ecs.TailLogGroups(ctx, req.Since.AsTime(), b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
var start, end time.Time
if req.Since.IsValid() {
start = req.Since.AsTime()
}
if req.Until.IsValid() {
end = req.Until.AsTime()
}
tailStream, err = ecs.QueryAndTailLogGroups(ctx, start, end, b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
taskArn = b.cdTaskArn
}
if err != nil {
Expand All @@ -709,7 +738,7 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
}()
}

return newByocServerStream(ctx, eventStream, etag, req.GetServices(), b), nil
return newByocServerStream(ctx, tailStream, etag, req.GetServices(), b), nil
}

func (b *ByocAws) makeLogGroupARN(name string) string {
Expand Down
2 changes: 1 addition & 1 deletion src/pkg/cli/client/byoc/aws/byoc_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestDeploy(t *testing.T) {
func TestTail(t *testing.T) {
b := NewByocProvider(ctx, "TestTail")

ss, err := b.Follow(context.Background(), &defangv1.TailRequest{Project: "byoc_integration_test"})
ss, err := b.QueryLogs(context.Background(), &defangv1.TailRequest{Project: "byoc_integration_test"})
if err != nil {
// the only acceptable error is "unauthorized"
if connect.CodeOf(err) == connect.CodeUnauthenticated {
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/cli/client/byoc/aws/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ type byocServerStream struct {
etag string
response *defangv1.TailResponse
services []string
stream ecs.EventStream
stream ecs.LiveTailStream

ecsEventsHandler ECSEventHandler
}

func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
func newByocServerStream(ctx context.Context, stream ecs.LiveTailStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
return &byocServerStream{
ctx: ctx,
etag: etag,
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/cli/client/byoc/do/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (b *ByocDo) PutConfig(ctx context.Context, config *defangv1.PutConfigReques
return err
}

func (b *ByocDo) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
func (b *ByocDo) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
var appID, deploymentID string

if req.Etag != "" && req.Etag == b.cdEtag {
Expand Down Expand Up @@ -598,7 +598,7 @@ func (s *subscribeStream) Close() error {
return nil
}

func (b *ByocDo) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (b *ByocDo) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
return client.ErrNotImplemented("AI debugging is not yet supported for DO BYOC")
}

Expand Down
23 changes: 16 additions & 7 deletions src/pkg/cli/client/byoc/gcp/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,15 +480,15 @@ func (b *ByocGcp) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest)
return subscribeStream, nil
}

func (b *ByocGcp) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
func (b *ByocGcp) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
if b.cdExecution != "" && req.Etag == b.cdExecution { // Only follow CD log, we need to subscribe to cd activities to detect when the job is done
subscribeStream, err := NewSubscribeStream(ctx, b.driver)
if err != nil {
return nil, err
}
subscribeStream.AddJobExecutionUpdate(path.Base(b.cdExecution))
var since time.Time
if req.Since != nil {
if req.Since.IsValid() {
since = req.Since.AsTime()
}
subscribeStream.Start(since)
Expand Down Expand Up @@ -520,10 +520,14 @@ func (b *ByocGcp) Follow(ctx context.Context, req *defangv1.TailRequest) (client
}

startTime := time.Now()
if req.Since != nil {
if req.Since.IsValid() {
startTime = req.Since.AsTime()
}
if req.Since != nil || req.Etag != "" {
var endTime time.Time
if req.Until.IsValid() {
endTime = req.Until.AsTime()
}
if req.Since.IsValid() || req.Etag != "" {
execName := path.Base(b.cdExecution)
if execName == "." {
execName = ""
Expand All @@ -541,6 +545,7 @@ func (b *ByocGcp) Follow(ctx context.Context, req *defangv1.TailRequest) (client
logStream.AddServiceLog(req.Project, etag, req.Services) // Service logs
}
logStream.AddSince(startTime)
logStream.AddUntil(endTime)
logStream.AddFilter(req.Pattern)
}
logStream.Start(startTime)
Expand Down Expand Up @@ -648,10 +653,13 @@ func (b *ByocGcp) PutConfig(ctx context.Context, req *defangv1.PutConfigRequest)
}

func (b *ByocGcp) createDeploymentLogQuery(req *defangv1.DebugRequest) string {
var since time.Time
if req.Since != nil {
var since, until time.Time
if req.Since.IsValid() {
since = req.Since.AsTime()
}
if req.Until.IsValid() {
until = req.Until.AsTime()
}
query := NewLogQuery(b.driver.ProjectId)
if b.cdExecution != "" {
query.AddJobExecutionQuery(path.Base(b.cdExecution))
Expand All @@ -662,6 +670,7 @@ func (b *ByocGcp) createDeploymentLogQuery(req *defangv1.DebugRequest) string {
query.AddServiceLogQuery(req.Project, req.Etag, req.Services) // Cloudrun service logs
query.AddCloudBuildLogQuery(req.Project, req.Etag, req.Services) // CloudBuild logs
query.AddSince(since)
query.AddUntil(until)

// Service status updates
query.AddJobStatusUpdateRequestQuery(req.Project, req.Etag, req.Services)
Expand Down Expand Up @@ -735,7 +744,7 @@ func (b *ByocGcp) query(ctx context.Context, query string) ([]*loggingpb.LogEntr
return entries, nil
}

func (b *ByocGcp) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (b *ByocGcp) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
logEntries, err := b.query(ctx, b.createDeploymentLogQuery(req))
if err != nil {
return annotateGcpError(err)
Expand Down
7 changes: 7 additions & 0 deletions src/pkg/cli/client/byoc/gcp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ func (q *Query) AddSince(since time.Time) {
q.baseQuery += fmt.Sprintf(` AND (timestamp >= %q)`, since.UTC().Format(time.RFC3339Nano))
}

func (q *Query) AddUntil(until time.Time) {
if until.IsZero() || until.Unix() <= 0 {
return
}
q.baseQuery += fmt.Sprintf(` AND (timestamp <= %q)`, until.UTC().Format(time.RFC3339Nano))
}

func (q *Query) AddFilter(filter string) {
if filter == "" {
return
Expand Down
4 changes: 4 additions & 0 deletions src/pkg/cli/client/byoc/gcp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func (s *LogStream) AddSince(start time.Time) {
s.query.AddSince(start)
}

func (s *LogStream) AddUntil(end time.Time) {
s.query.AddUntil(end)
}

func (s *LogStream) AddFilter(filter string) {
s.query.AddFilter(filter)
}
Expand Down
7 changes: 0 additions & 7 deletions src/pkg/cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ import (
composeTypes "github.com/compose-spec/compose-go/v2/types"
)

type ServerStream[Res any] interface {
Close() error
Receive() bool
Msg() *Res
Err() error
}

type ProjectLoader interface {
LoadProjectName(context.Context) (string, error)
LoadProject(context.Context) (*composeTypes.Project, error)
Expand Down
Loading
Loading