Skip to content

Commit c99f6f4

Browse files
committed
support --until end time
1 parent 721b0c7 commit c99f6f4

File tree

13 files changed

+68
-43
lines changed

13 files changed

+68
-43
lines changed

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ func (b *ByocAws) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRe
596596
}, nil
597597
}
598598

599-
func (b *ByocAws) Query(ctx context.Context, req *defangv1.DebugRequest) error {
599+
func (b *ByocAws) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
600600
// tailRequest := &defangv1.TailRequest{
601601
// Etag: req.Etag,
602602
// Project: req.Project,
@@ -695,11 +695,11 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
695695
// * Etag, service: tail that task/service
696696
var err error
697697
var taskArn ecs.TaskArn
698-
var eventStream ecs.EventStream
698+
var tailStream ecs.LiveTailStream
699699
stopWhenCDTaskDone := false
700700
logType := logs.LogType(req.LogType)
701701
if etag != "" && !pkg.IsValidRandomID(etag) { // Assume invalid "etag" is a task ID
702-
eventStream, err = b.driver.TailTaskID(ctx, etag)
702+
tailStream, err = b.driver.TailTaskID(ctx, etag)
703703
taskArn, _ = b.driver.GetTaskArn(etag)
704704
term.Debugf("Tailing task %s", *taskArn)
705705
etag = "" // no need to filter by etag
@@ -709,7 +709,14 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
709709
if len(req.Services) == 1 {
710710
service = req.Services[0]
711711
}
712-
eventStream, err = ecs.TailLogGroups(ctx, req.Since.AsTime(), b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
712+
var start, end time.Time
713+
if req.Since.IsValid() {
714+
start = req.Since.AsTime()
715+
}
716+
if req.Until.IsValid() {
717+
end = req.Until.AsTime()
718+
}
719+
tailStream, err = ecs.QueryAndTailLogGroups(ctx, start, end, b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
713720
taskArn = b.cdTaskArn
714721
}
715722
if err != nil {
@@ -731,7 +738,7 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
731738
}()
732739
}
733740

734-
return newByocServerStream(ctx, eventStream, etag, req.GetServices(), b), nil
741+
return newByocServerStream(ctx, tailStream, etag, req.GetServices(), b), nil
735742
}
736743

737744
func (b *ByocAws) makeLogGroupARN(name string) string {

src/pkg/cli/client/byoc/aws/stream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ type byocServerStream struct {
2424
etag string
2525
response *defangv1.TailResponse
2626
services []string
27-
stream ecs.EventStream
27+
stream ecs.LiveTailStream
2828

2929
ecsEventsHandler ECSEventHandler
3030
}
3131

32-
func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
32+
func newByocServerStream(ctx context.Context, stream ecs.LiveTailStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
3333
return &byocServerStream{
3434
ctx: ctx,
3535
etag: etag,

src/pkg/cli/client/byoc/do/byoc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ func (s *subscribeStream) Close() error {
598598
return nil
599599
}
600600

601-
func (b *ByocDo) Query(ctx context.Context, req *defangv1.DebugRequest) error {
601+
func (b *ByocDo) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
602602
return client.ErrNotImplemented("AI debugging is not yet supported for DO BYOC")
603603
}
604604

src/pkg/cli/client/byoc/gcp/byoc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,7 @@ func (b *ByocGcp) query(ctx context.Context, query string) ([]*loggingpb.LogEntr
735735
return entries, nil
736736
}
737737

738-
func (b *ByocGcp) Query(ctx context.Context, req *defangv1.DebugRequest) error {
738+
func (b *ByocGcp) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
739739
logEntries, err := b.query(ctx, b.createDeploymentLogQuery(req))
740740
if err != nil {
741741
return annotateGcpError(err)

src/pkg/cli/client/playground.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (g *PlaygroundProvider) AccountInfo(ctx context.Context) (AccountInfo, erro
121121
return PlaygroundAccountInfo{}, nil
122122
}
123123

124-
func (g *PlaygroundProvider) Query(ctx context.Context, req *defangv1.DebugRequest) error {
124+
func (g *PlaygroundProvider) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
125125
return nil
126126
}
127127

src/pkg/cli/client/provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ type Provider interface {
142142
GetService(context.Context, *defangv1.GetRequest) (*defangv1.ServiceInfo, error)
143143
GetServices(context.Context, *defangv1.GetServicesRequest) (*defangv1.GetServicesResponse, error)
144144
ListConfig(context.Context, *defangv1.ListConfigsRequest) (*defangv1.Secrets, error)
145-
Query(context.Context, *defangv1.DebugRequest) error
145+
QueryForDebug(context.Context, *defangv1.DebugRequest) error
146146
Preview(context.Context, *defangv1.DeployRequest) (*defangv1.DeployResponse, error)
147147
PutConfig(context.Context, *defangv1.PutConfigRequest) error
148148
RemoteProjectName(context.Context) (string, error)

src/pkg/cli/debug.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func DebugDeployment(ctx context.Context, client client.FabricClient, debugConfi
112112
Since: sinceTime,
113113
Until: untilTime,
114114
}
115-
err := debugConfig.Provider.Query(ctx, &req)
115+
err := debugConfig.Provider.QueryForDebug(ctx, &req)
116116
if err != nil {
117117
return err
118118
}

src/pkg/cli/debug_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type MustHaveProjectNameQueryProvider struct {
4141
client.Provider
4242
}
4343

44-
func (m MustHaveProjectNameQueryProvider) Query(ctx context.Context, req *defangv1.DebugRequest) error {
44+
func (m MustHaveProjectNameQueryProvider) QueryForDebug(ctx context.Context, req *defangv1.DebugRequest) error {
4545
if req.Project == "" {
4646
return errors.New("project name is missing")
4747
}

src/pkg/clouds/aws/ecs/logs.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func getLogGroupIdentifier(arnOrId string) string {
2525
return strings.TrimSuffix(arnOrId, ":*")
2626
}
2727

28-
func TailLogGroups(ctx context.Context, start time.Time, logGroups ...LogGroupInput) (EventStream, error) {
28+
func QueryAndTailLogGroups(ctx context.Context, start, end time.Time, logGroups ...LogGroupInput) (LiveTailStream, error) {
2929
ctx, cancel := context.WithCancel(ctx)
3030

3131
e := &eventStream{
@@ -34,13 +34,14 @@ func TailLogGroups(ctx context.Context, start time.Time, logGroups ...LogGroupIn
3434
}
3535

3636
for _, lgi := range logGroups {
37-
es, err := QueryAndTailLogGroup(ctx, lgi, start)
37+
es, err := QueryAndTailLogGroup(ctx, lgi, start, end)
3838
if err != nil {
3939
cancel()
4040
return nil, err
4141
}
4242
go func() {
4343
defer es.Close()
44+
// FIXME: this should *merge* the events from all log groups
4445
e.pipeEvents(ctx, es)
4546
}()
4647
}
@@ -56,7 +57,7 @@ type LogGroupInput struct {
5657
LogEventFilterPattern string
5758
}
5859

59-
func TailLogGroup(ctx context.Context, input LogGroupInput) (EventStream, error) {
60+
func TailLogGroup(ctx context.Context, input LogGroupInput) (LiveTailStream, error) {
6061
var pattern *string
6162
if input.LogEventFilterPattern != "" {
6263
pattern = &input.LogEventFilterPattern
@@ -171,14 +172,15 @@ func newCloudWatchLogsClient(ctx context.Context, region aws.Region) (*cloudwatc
171172

172173
type LogEvent = types.LiveTailSessionLogEvent
173174

174-
// EventStream is an interface that represents a stream of events from a call to StartLiveTail.
175-
type EventStream interface {
175+
// EventStream is a generic interface that represents a stream of events
176+
type EventStream[T any] interface {
177+
Events() <-chan T
176178
Close() error
177-
Events() <-chan types.StartLiveTailResponseStream
178179
Err() error
179180
}
180181

181-
var _ EventStream = (*cloudwatchlogs.StartLiveTailEventStream)(nil)
182+
// Deprecated: LiveTailStream is a stream of events from a call to StartLiveTail
183+
type LiveTailStream = EventStream[types.StartLiveTailResponseStream]
182184

183185
func GetLogEvents(e types.StartLiveTailResponseStream) ([]LogEvent, error) {
184186
switch ev := e.(type) {

src/pkg/clouds/aws/ecs/merge.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package ecs
22

33
// Inspired by https://dev.to/vinaygo/concurrency-merge-sort-using-channels-and-goroutines-in-golang-35f7
4-
func mergech[T any](left chan T, right chan T, c chan T, less func(T, T) bool) {
4+
func Mergech[T any](left chan T, right chan T, c chan T, less func(T, T) bool) {
55
defer close(c)
66
val, ok := <-left
77
val2, ok2 := <-right
@@ -32,7 +32,7 @@ func mergeLogEventChan(left, right chan LogEvent) chan LogEvent {
3232
return left
3333
}
3434
out := make(chan LogEvent)
35-
go mergech(left, right, out, func(i1, i2 LogEvent) bool {
35+
go Mergech(left, right, out, func(i1, i2 LogEvent) bool {
3636
return *i1.Timestamp < *i2.Timestamp
3737
})
3838
return out

0 commit comments

Comments
 (0)