Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions src/cmd/cli/command/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func makeComposeLogsCmd() *cobra.Command {
var utc, _ = cmd.Flags().GetBool("utc")
var verbose, _ = cmd.Flags().GetBool("verbose")
var filter, _ = cmd.Flags().GetString("filter")
var until, _ = cmd.Flags().GetString("until")

if !cmd.Flags().Changed("verbose") {
verbose = true // default verbose for explicit tail command
Expand All @@ -432,17 +433,27 @@ func makeComposeLogsCmd() *cobra.Command {
os.Setenv("TZ", "") // used by Go's "time" package, see https://pkg.go.dev/time#Location
}

ts, err := cli.ParseTimeOrDuration(since, time.Now())
now := time.Now()
sinceTs, err := cli.ParseTimeOrDuration(since, now)
if err != nil {
return fmt.Errorf("invalid duration or time: %w", err)
return fmt.Errorf("invalid 'since' duration or time: %w", err)
}
sinceTs = sinceTs.UTC()
untilTs, err := cli.ParseTimeOrDuration(until, now)
if err != nil {
return fmt.Errorf("invalid 'until' duration or time: %w", err)
}
untilTs = untilTs.UTC()

ts = ts.UTC()
sinceStr := ""
if pkg.IsValidTime(ts) {
sinceStr = " since " + ts.Format(time.RFC3339Nano) + " "
rangeStr := ""
if pkg.IsValidTime(sinceTs) {
rangeStr = " since " + sinceTs.Format(time.RFC3339Nano)
}
term.Infof("Showing logs%s; press Ctrl+C to stop:", sinceStr)
if pkg.IsValidTime(untilTs) {
rangeStr += " until " + untilTs.Format(time.RFC3339Nano)
}
term.Infof("Showing logs%s; press Ctrl+C to stop:", rangeStr)

services := args
if len(name) > 0 {
services = append(args, strings.Split(name, ",")...) // backwards compat
Expand All @@ -465,7 +476,8 @@ func makeComposeLogsCmd() *cobra.Command {
LogType: logType,
Raw: raw,
Services: services,
Since: ts,
Since: sinceTs,
Until: untilTs,
Verbose: verbose,
}

Expand All @@ -478,7 +490,8 @@ func makeComposeLogsCmd() *cobra.Command {
logsCmd.Flags().Bool("follow", false, "follow log output") // NOTE: -f is already used by --file
logsCmd.Flags().MarkHidden("follow") // TODO: implement this
logsCmd.Flags().BoolP("raw", "r", false, "show raw (unparsed) logs")
logsCmd.Flags().StringP("since", "S", "", "show logs since duration/time")
logsCmd.Flags().String("since", "", "show logs since duration/time")
logsCmd.Flags().String("until", "", "show logs until duration/time")
logsCmd.Flags().Bool("utc", false, "show logs in UTC timezone (ie. TZ=UTC)")
logsCmd.Flags().Var(&logType, "type", fmt.Sprintf(`show logs of type; one of %v`, logs.AllLogTypes))
logsCmd.Flags().String("filter", "", "only show logs containing given text; case-insensitive")
Expand Down
105 changes: 67 additions & 38 deletions src/pkg/cli/client/byoc/aws/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,67 +596,89 @@ func (b *ByocAws) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRe
}, nil
}

func (b *ByocAws) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (b *ByocAws) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
// tailRequest := &defangv1.TailRequest{
// Etag: req.Etag,
// Project: req.Project,
// Services: req.Services,
// Since: req.Since,
// Until: req.Until,
// }

// The LogStreamNamePrefix filter can only be used with one service name
var service string
if len(req.Services) == 1 {
service = req.Services[0]
}

since := b.cdStart // TODO: get start time from req.Etag
if since.IsZero() {
since = time.Now().Add(-time.Hour) // TODO: get start time from req
start := b.cdStart // TODO: get start time from req.Etag
if req.Since.IsValid() {
start = req.Since.AsTime()
} else if start.IsZero() {
start = time.Now().Add(-time.Hour)
}

// get stack information
end := time.Now()
if req.Until.IsValid() {
end = req.Until.AsTime()
}

// get stack information (for log group ARN)
err := b.driver.FillOutputs(ctx)
if err != nil {
return AnnotateAwsError(err)
}

const maxQuerySizePerLogGroup = 128 * 1024 // 128KB (to stay well below the 1MB gRPC payload limit)

// Gather logs from the CD task, kaniko, ECS events, and all services
evtsChan, errsChan := ecs.QueryLogGroups(ctx, start, end, b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll)...)
if evtsChan == nil {
return <-errsChan
}

const maxQuerySizePerLogGroup = 128 * 1024 // 128KB per LogGroup (to stay well below the 1MB gRPC payload limit)

sb := strings.Builder{}
for _, lgi := range b.getLogGroupInputs(req.Etag, req.Project, service, "", logs.LogTypeAll) {
parseECSEventRecords := strings.HasSuffix(lgi.LogGroupARN, "/ecs")
if err := ecs.Query(ctx, lgi, since, time.Now(), func(logEvents []ecs.LogEvent) error {
for _, event := range logEvents {
msg := term.StripAnsi(*event.Message)
if parseECSEventRecords {
if event, err := ecs.ParseECSEvent([]byte(msg)); err == nil {
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
continue
}
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
continue
}
// This matches the status messages in the Defang Playground Loki logs
sb.WriteString("status=")
sb.WriteString(event.Status())
sb.WriteByte('\n')
loop:
for {
select {
case event, ok := <-evtsChan:
if !ok {
break loop
}
parseECSEventRecords := strings.HasSuffix(*event.LogGroupIdentifier, "/ecs")
if parseECSEventRecords {
if event, err := ecs.ParseECSEvent([]byte(*event.Message)); err == nil {
// TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
if event.Etag() != "" && req.Etag != "" && event.Etag() != req.Etag {
continue
}
}
sb.WriteString(msg)
sb.WriteByte('\n')
if sb.Len() > maxQuerySizePerLogGroup {
return errors.New("query result was truncated")
if event.Service() != "" && len(req.Services) > 0 && !slices.Contains(req.Services, event.Service()) {
continue
}
// This matches the status messages in the Defang Playground Loki logs
sb.WriteString("status=")
sb.WriteString(event.Status())
sb.WriteByte('\n')
continue
}
}
return nil
}); err != nil {
msg := term.StripAnsi(*event.Message)
sb.WriteString(msg)
sb.WriteByte('\n')
if sb.Len() > maxQuerySizePerLogGroup { // FIXME: this limit was supposed to be per LogGroup
term.Warn("Query result was truncated")
break loop
}
case err := <-errsChan:
term.Warn("CloudWatch query error:", AnnotateAwsError(err))
// continue reading other log groups
}
}

req.Logs = sb.String()
return nil
}

func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
func (b *ByocAws) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
// FillOutputs is needed to get the CD task ARN or the LogGroup ARNs
if err := b.driver.FillOutputs(ctx); err != nil {
return nil, AnnotateAwsError(err)
Expand All @@ -673,11 +695,11 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
// * Etag, service: tail that task/service
var err error
var taskArn ecs.TaskArn
var eventStream ecs.EventStream
var tailStream ecs.LiveTailStream
stopWhenCDTaskDone := false
logType := logs.LogType(req.LogType)
if etag != "" && !pkg.IsValidRandomID(etag) { // Assume invalid "etag" is a task ID
eventStream, err = b.driver.TailTaskID(ctx, etag)
tailStream, err = b.driver.TailTaskID(ctx, etag)
taskArn, _ = b.driver.GetTaskArn(etag)
term.Debugf("Tailing task %s", *taskArn)
etag = "" // no need to filter by etag
Expand All @@ -687,7 +709,14 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
if len(req.Services) == 1 {
service = req.Services[0]
}
eventStream, err = ecs.TailLogGroups(ctx, req.Since.AsTime(), b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
var start, end time.Time
if req.Since.IsValid() {
start = req.Since.AsTime()
}
if req.Until.IsValid() {
end = req.Until.AsTime()
}
tailStream, err = ecs.QueryAndTailLogGroups(ctx, start, end, b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
taskArn = b.cdTaskArn
}
if err != nil {
Expand All @@ -709,7 +738,7 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
}()
}

return newByocServerStream(ctx, eventStream, etag, req.GetServices(), b), nil
return newByocServerStream(ctx, tailStream, etag, req.GetServices(), b), nil
}

func (b *ByocAws) makeLogGroupARN(name string) string {
Expand Down
2 changes: 1 addition & 1 deletion src/pkg/cli/client/byoc/aws/byoc_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestDeploy(t *testing.T) {
func TestTail(t *testing.T) {
b := NewByocProvider(ctx, "TestTail")

ss, err := b.Follow(context.Background(), &defangv1.TailRequest{Project: "byoc_integration_test"})
ss, err := b.QueryLogs(context.Background(), &defangv1.TailRequest{Project: "byoc_integration_test"})
if err != nil {
// the only acceptable error is "unauthorized"
if connect.CodeOf(err) == connect.CodeUnauthenticated {
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/cli/client/byoc/aws/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ type byocServerStream struct {
etag string
response *defangv1.TailResponse
services []string
stream ecs.EventStream
stream ecs.LiveTailStream

ecsEventsHandler ECSEventHandler
}

func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
func newByocServerStream(ctx context.Context, stream ecs.LiveTailStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
return &byocServerStream{
ctx: ctx,
etag: etag,
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/cli/client/byoc/do/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (b *ByocDo) PutConfig(ctx context.Context, config *defangv1.PutConfigReques
return err
}

func (b *ByocDo) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
func (b *ByocDo) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
var appID, deploymentID string

if req.Etag != "" && req.Etag == b.cdEtag {
Expand Down Expand Up @@ -598,7 +598,7 @@ func (s *subscribeStream) Close() error {
return nil
}

func (b *ByocDo) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (b *ByocDo) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
return client.ErrNotImplemented("AI debugging is not yet supported for DO BYOC")
}

Expand Down
4 changes: 2 additions & 2 deletions src/pkg/cli/client/byoc/gcp/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (b *ByocGcp) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest)
return subscribeStream, nil
}

func (b *ByocGcp) Follow(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
func (b *ByocGcp) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
if b.cdExecution != "" && req.Etag == b.cdExecution { // Only follow CD log, we need to subscribe to cd activities to detect when the job is done
subscribeStream, err := NewSubscribeStream(ctx, b.driver)
if err != nil {
Expand Down Expand Up @@ -735,7 +735,7 @@ func (b *ByocGcp) query(ctx context.Context, query string) ([]*loggingpb.LogEntr
return entries, nil
}

func (b *ByocGcp) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (b *ByocGcp) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
logEntries, err := b.query(ctx, b.createDeploymentLogQuery(req))
if err != nil {
return annotateGcpError(err)
Expand Down
7 changes: 0 additions & 7 deletions src/pkg/cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ import (
composeTypes "github.com/compose-spec/compose-go/v2/types"
)

type ServerStream[Res any] interface {
Close() error
Receive() bool
Msg() *Res
Err() error
}

type ProjectLoader interface {
LoadProjectName(context.Context) (string, error)
LoadProject(context.Context) (*composeTypes.Project, error)
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/cli/client/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (g *PlaygroundProvider) Subscribe(ctx context.Context, req *defangv1.Subscr
return g.GetController().Subscribe(ctx, connect.NewRequest(req))
}

func (g *PlaygroundProvider) Follow(ctx context.Context, req *defangv1.TailRequest) (ServerStream[defangv1.TailResponse], error) {
func (g *PlaygroundProvider) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (ServerStream[defangv1.TailResponse], error) {
return g.GetController().Tail(ctx, connect.NewRequest(req))
}

Expand Down Expand Up @@ -121,7 +121,7 @@ func (g *PlaygroundProvider) AccountInfo(ctx context.Context) (AccountInfo, erro
return PlaygroundAccountInfo{}, nil
}

func (g *PlaygroundProvider) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (g *PlaygroundProvider) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions src/pkg/cli/client/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ type PrepareDomainDelegationResponse struct {
DelegationSetId string
}

type ServerStream[Res any] interface {
Close() error
Receive() bool
Msg() *Res
Err() error
}

type Provider interface {
AccountInfo(context.Context) (AccountInfo, error)
BootstrapCommand(context.Context, BootstrapCommandRequest) (types.ETag, error)
Expand All @@ -130,12 +137,12 @@ type Provider interface {
Deploy(context.Context, *defangv1.DeployRequest) (*defangv1.DeployResponse, error)
DelayBeforeRetry(context.Context) error
Destroy(context.Context, *defangv1.DestroyRequest) (types.ETag, error)
Follow(context.Context, *defangv1.TailRequest) (ServerStream[defangv1.TailResponse], error)
QueryLogs(context.Context, *defangv1.TailRequest) (ServerStream[defangv1.TailResponse], error)
GetProjectUpdate(context.Context, string) (*defangv1.ProjectUpdate, error)
GetService(context.Context, *defangv1.GetRequest) (*defangv1.ServiceInfo, error)
GetServices(context.Context, *defangv1.GetServicesRequest) (*defangv1.GetServicesResponse, error)
ListConfig(context.Context, *defangv1.ListConfigsRequest) (*defangv1.Secrets, error)
Query(context.Context, *defangv1.DebugRequest) error
QueryForDebug(context.Context, *defangv1.DebugRequest) error
Preview(context.Context, *defangv1.DeployRequest) (*defangv1.DeployResponse, error)
PutConfig(context.Context, *defangv1.PutConfigRequest) error
RemoteProjectName(context.Context) (string, error)
Expand Down
12 changes: 9 additions & 3 deletions src/pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/AlecAivazis/survey/v2"
"github.com/DefangLabs/defang/src/pkg"
"github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/cli/compose"
"github.com/DefangLabs/defang/src/pkg/term"
Expand All @@ -35,6 +36,7 @@ type DebugConfig struct {
Project *compose.Project
Provider client.Provider
Since time.Time
Until time.Time
}

func InteractiveDebugDeployment(ctx context.Context, client client.FabricClient, debugConfig DebugConfig) error {
Expand Down Expand Up @@ -95,18 +97,22 @@ func DebugDeployment(ctx context.Context, client client.FabricClient, debugConfi
return ErrDryRun
}

var sinceTime *timestamppb.Timestamp = nil
if !debugConfig.Since.IsZero() {
var sinceTime, untilTime *timestamppb.Timestamp
if pkg.IsValidTime(debugConfig.Since) {
sinceTime = timestamppb.New(debugConfig.Since)
}
if pkg.IsValidTime(debugConfig.Until) {
untilTime = timestamppb.New(debugConfig.Until)
}
req := defangv1.DebugRequest{
Etag: debugConfig.Etag,
Files: files,
Project: debugConfig.Project.Name,
Services: debugConfig.FailedServices,
Since: sinceTime,
Until: untilTime,
}
err := debugConfig.Provider.Query(ctx, &req)
err := debugConfig.Provider.QueryForDebug(ctx, &req)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion src/pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type MustHaveProjectNameQueryProvider struct {
client.Provider
}

func (m MustHaveProjectNameQueryProvider) Query(ctx context.Context, req *defangv1.DebugRequest) error {
func (m MustHaveProjectNameQueryProvider) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
if req.Project == "" {
return errors.New("project name is missing")
}
Expand Down
Loading
Loading