Skip to content

Commit e84f009

Browse files
committed
support --until end time
1 parent 721b0c7 commit e84f009

File tree

5 files changed

+48
-29
lines changed

5 files changed

+48
-29
lines changed

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,14 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
709709
if len(req.Services) == 1 {
710710
service = req.Services[0]
711711
}
712-
eventStream, err = ecs.TailLogGroups(ctx, req.Since.AsTime(), b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
712+
var start, end time.Time
713+
if req.Since.IsValid() {
714+
start = req.Since.AsTime()
715+
}
716+
if req.Until.IsValid() {
717+
end = req.Until.AsTime()
718+
}
719+
eventStream, err = ecs.QueryAndTailLogGroups(ctx, start, end, b.getLogGroupInputs(etag, req.Project, service, req.Pattern, logType)...)
713720
taskArn = b.cdTaskArn
714721
}
715722
if err != nil {

src/pkg/clouds/aws/ecs/logs.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func getLogGroupIdentifier(arnOrId string) string {
2525
return strings.TrimSuffix(arnOrId, ":*")
2626
}
2727

28-
func TailLogGroups(ctx context.Context, start time.Time, logGroups ...LogGroupInput) (EventStream, error) {
28+
func QueryAndTailLogGroups(ctx context.Context, start, end time.Time, logGroups ...LogGroupInput) (EventStream, error) {
2929
ctx, cancel := context.WithCancel(ctx)
3030

3131
e := &eventStream{
@@ -34,13 +34,14 @@ func TailLogGroups(ctx context.Context, start time.Time, logGroups ...LogGroupIn
3434
}
3535

3636
for _, lgi := range logGroups {
37-
es, err := QueryAndTailLogGroup(ctx, lgi, start)
37+
es, err := QueryAndTailLogGroup(ctx, lgi, start, end)
3838
if err != nil {
3939
cancel()
4040
return nil, err
4141
}
4242
go func() {
4343
defer es.Close()
44+
// FIXME: this should *merge* the events from all log groups
4445
e.pipeEvents(ctx, es)
4546
}()
4647
}
@@ -56,7 +57,7 @@ type LogGroupInput struct {
5657
LogEventFilterPattern string
5758
}
5859

59-
func TailLogGroup(ctx context.Context, input LogGroupInput) (EventStream, error) {
60+
func TailLogGroup(ctx context.Context, input LogGroupInput) (cloudwatchlogs.StartLiveTailResponseStreamReader, error) {
6061
var pattern *string
6162
if input.LogEventFilterPattern != "" {
6263
pattern = &input.LogEventFilterPattern
@@ -172,13 +173,7 @@ func newCloudWatchLogsClient(ctx context.Context, region aws.Region) (*cloudwatc
172173
type LogEvent = types.LiveTailSessionLogEvent
173174

174175
// EventStream is an interface that represents a stream of events from a call to StartLiveTail.
175-
type EventStream interface {
176-
Close() error
177-
Events() <-chan types.StartLiveTailResponseStream
178-
Err() error
179-
}
180-
181-
var _ EventStream = (*cloudwatchlogs.StartLiveTailEventStream)(nil)
176+
type EventStream = cloudwatchlogs.StartLiveTailResponseStreamReader
182177

183178
func GetLogEvents(e types.StartLiveTailResponseStream) ([]LogEvent, error) {
184179
switch ev := e.(type) {

src/pkg/clouds/aws/ecs/merge.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package ecs
22

33
// Inspired by https://dev.to/vinaygo/concurrency-merge-sort-using-channels-and-goroutines-in-golang-35f7
4-
func mergech[T any](left chan T, right chan T, c chan T, less func(T, T) bool) {
4+
func Mergech[T any](left chan T, right chan T, c chan T, less func(T, T) bool) {
55
defer close(c)
66
val, ok := <-left
77
val2, ok2 := <-right
@@ -32,7 +32,7 @@ func mergeLogEventChan(left, right chan LogEvent) chan LogEvent {
3232
return left
3333
}
3434
out := make(chan LogEvent)
35-
go mergech(left, right, out, func(i1, i2 LogEvent) bool {
35+
go Mergech(left, right, out, func(i1, i2 LogEvent) bool {
3636
return *i1.Timestamp < *i2.Timestamp
3737
})
3838
return out

src/pkg/clouds/aws/ecs/stream.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,44 +5,58 @@ import (
55
"errors"
66
"time"
77

8+
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
89
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
910
)
1011

1112
// QueryAndTailLogGroup queries the log group from the give start time and initiates a Live Tail session.
1213
// This function also handles the case where the log group does not exist yet.
1314
// The caller should call `Close()` on the returned EventStream when done.
14-
func QueryAndTailLogGroup(ctx context.Context, lgi LogGroupInput, start time.Time) (EventStream, error) {
15+
func QueryAndTailLogGroup(ctx context.Context, lgi LogGroupInput, start, end time.Time) (EventStream, error) {
1516
ctx, cancel := context.WithCancel(ctx)
1617

1718
es := &eventStream{
1819
cancel: cancel,
1920
ch: make(chan types.StartLiveTailResponseStream),
2021
}
2122

22-
// First call TailLogGroup once to check if the log group exists or we have another error
23-
eventStream, err := TailLogGroup(ctx, lgi)
24-
if err != nil {
25-
var resourceNotFound *types.ResourceNotFoundException
26-
if !errors.As(err, &resourceNotFound) {
27-
return nil, err
23+
doTail := !end.IsZero()
24+
25+
var tailStream cloudwatchlogs.StartLiveTailResponseStreamReader
26+
if doTail {
27+
// First call TailLogGroup once to check if the log group exists or we have another error
28+
var err error
29+
tailStream, err = TailLogGroup(ctx, lgi)
30+
if err != nil {
31+
var resourceNotFound *types.ResourceNotFoundException
32+
if !errors.As(err, &resourceNotFound) {
33+
return nil, err
34+
}
35+
// Doesn't exist yet, continue to poll for it
2836
}
2937
}
3038

3139
// Start goroutine to wait for the log group to be created and then tail it
3240
go func() {
3341
defer close(es.ch)
3442

35-
if eventStream == nil {
43+
if doTail {
3644
// If the log group does not exist yet, poll until it does
37-
eventStream, err = pollTailLogGroup(ctx, lgi)
38-
if err != nil {
39-
es.err = err
40-
return
45+
if tailStream == nil {
46+
var err error
47+
tailStream, err = pollTailLogGroup(ctx, lgi)
48+
if err != nil {
49+
es.err = err
50+
return
51+
}
4152
}
53+
defer tailStream.Close()
4254
}
43-
defer eventStream.Close()
4455

4556
if !start.IsZero() {
57+
if end.IsZero() {
58+
end = time.Now()
59+
}
4660
// Query the logs between the start time and now
4761
if err := QueryLogGroup(ctx, lgi, start, time.Now(), func(events []LogEvent) error {
4862
es.ch <- &types.StartLiveTailResponseStreamMemberSessionUpdate{
@@ -55,14 +69,17 @@ func QueryAndTailLogGroup(ctx context.Context, lgi LogGroupInput, start time.Tim
5569
}
5670
}
5771

58-
es.pipeEvents(ctx, eventStream)
72+
if doTail {
73+
// Pipe the events from the tail stream to the internal channel
74+
es.pipeEvents(ctx, tailStream)
75+
}
5976
}()
6077

6178
return es, nil
6279
}
6380

6481
// pollTailLogGroup polls the log group and starts the Live Tail session once it's available
65-
func pollTailLogGroup(ctx context.Context, lgi LogGroupInput) (EventStream, error) {
82+
func pollTailLogGroup(ctx context.Context, lgi LogGroupInput) (cloudwatchlogs.StartLiveTailResponseStreamReader, error) {
6683
ticker := time.NewTicker(time.Second)
6784
defer ticker.Stop()
6885

src/pkg/clouds/aws/ecs/stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func TestPendingStream(t *testing.T) {
1515

1616
ps, _ := QueryAndTailLogGroup(context.Background(), LogGroupInput{
1717
LogGroupARN: "arn:aws:logs:us-west-2:532501343364:log-group:/ecs/lio/logss:*",
18-
}, time.Now().Add(-time.Minute))
18+
}, time.Now().Add(-time.Minute), time.Time{})
1919

2020
go func() {
2121
time.Sleep(5 * time.Second)

0 commit comments

Comments
 (0)