Skip to content

Commit 46e143c

Browse files
committed
Merge branch 'main' into edw-byoc-account-in-domain
2 parents e6eb4c6 + b8d5e38 commit 46e143c

19 files changed

+1046
-154
lines changed

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"os"
1515
"sort"
1616
"strings"
17-
"time"
17+
"sync"
1818

1919
"github.com/DefangLabs/defang/src/pkg"
2020
"github.com/DefangLabs/defang/src/pkg/cli/client"
@@ -43,6 +43,9 @@ type ByocAws struct {
4343
cdTasks map[string]ecs.TaskArn
4444
driver *cfn.AwsEcs
4545
publicNatIps []string
46+
47+
ecsEventHandlers []ECSEventHandler
48+
handlersLock sync.RWMutex
4649
}
4750

4851
var _ client.Client = (*ByocAws)(nil)
@@ -464,8 +467,6 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
464467
return nil, err
465468
}
466469

467-
ctx, cancel := context.WithCancelCause(ctx)
468-
469470
etag := req.Etag
470471
// if etag == "" && req.Service == "cd" {
471472
// etag = awsecs.GetTaskID(b.cdTaskArn); TODO: find the last CD task
@@ -505,16 +506,7 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
505506
return nil, annotateAwsError(err)
506507
}
507508

508-
if taskArn != nil {
509-
go func() {
510-
if err := ecs.WaitForTask(ctx, taskArn, 3*time.Second); err != nil {
511-
time.Sleep(time.Second) // make sure we got all the logs from the task before cancelling
512-
cancel(err)
513-
}
514-
}()
515-
}
516-
517-
return newByocServerStream(ctx, eventStream, etag, req.GetServices()), nil
509+
return newByocServerStream(ctx, eventStream, etag, req.GetServices(), b), nil
518510
}
519511

520512
// This function was copied from Fabric controller and slightly modified to work with BYOC
@@ -792,6 +784,32 @@ func ensure(cond bool, msg string) {
792784
}
793785
}
794786

795-
func (b *ByocAws) Subscribe(context.Context, *defangv1.SubscribeRequest) (client.ServerStream[defangv1.SubscribeResponse], error) {
796-
return nil, client.ErrNotImplemented("not yet implemented for BYOC; please use the AWS ECS dashboard") // FIXME: implement this for BYOC
787+
type ECSEventHandler interface {
788+
HandleECSEvent(evt ecs.Event)
789+
}
790+
791+
func (b *ByocAws) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest) (client.ServerStream[defangv1.SubscribeResponse], error) {
792+
s := &byocSubscribeServerStream{
793+
services: req.Services,
794+
etag: req.Etag,
795+
ctx: ctx,
796+
797+
ch: make(chan *defangv1.SubscribeResponse),
798+
}
799+
b.AddEcsEventHandler(s)
800+
return s, nil
801+
}
802+
803+
func (b *ByocAws) HandleECSEvent(evt ecs.Event) {
804+
b.handlersLock.RLock()
805+
defer b.handlersLock.RUnlock()
806+
for _, handler := range b.ecsEventHandlers {
807+
handler.HandleECSEvent(evt)
808+
}
809+
}
810+
811+
func (b *ByocAws) AddEcsEventHandler(handler ECSEventHandler) {
812+
b.handlersLock.Lock()
813+
defer b.handlersLock.Unlock()
814+
b.ecsEventHandlers = append(b.ecsEventHandlers, handler)
797815
}

src/pkg/cli/client/byoc/aws/byoc_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
package aws
22

33
import (
4+
"bufio"
5+
"bytes"
46
"context"
7+
"embed"
8+
"encoding/json"
9+
"io"
10+
"path"
511
"regexp"
12+
"strings"
13+
"sync"
614
"testing"
715

816
"github.com/DefangLabs/defang/src/pkg/cli/client"
17+
"github.com/DefangLabs/defang/src/pkg/clouds/aws/ecs"
918
"github.com/DefangLabs/defang/src/pkg/types"
1019
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
1120
compose "github.com/compose-spec/compose-go/v2/types"
@@ -75,3 +84,83 @@ func (f FakeLoader) LoadProject(ctx context.Context) (*compose.Project, error) {
7584
func (f FakeLoader) LoadProjectName(ctx context.Context) (string, error) {
7685
return f.ProjectName, nil
7786
}
87+
88+
//go:embed test_ecs_events/*.json
89+
var testDir embed.FS
90+
91+
//go:embed test_ecs_events/*.events
92+
var expectedDir embed.FS
93+
94+
func TestSubscribe(t *testing.T) {
95+
t.Skip("Pending test")
96+
tests, err := testDir.ReadDir("test_ecs_events")
97+
if err != nil {
98+
t.Fatalf("failed to load ecs events test files: %v", err)
99+
}
100+
for _, tt := range tests {
101+
t.Run(tt.Name(), func(t *testing.T) {
102+
start := strings.LastIndex(tt.Name(), "-")
103+
end := strings.LastIndex(tt.Name(), ".")
104+
if start == -1 || end == -1 {
105+
t.Fatalf("cannot find etag from invalid test file name: %s", tt.Name())
106+
}
107+
name := tt.Name()[:start]
108+
etag := tt.Name()[start+1 : end]
109+
110+
byoc := &ByocAws{}
111+
112+
resp, err := byoc.Subscribe(context.Background(), &defangv1.SubscribeRequest{
113+
Etag: etag,
114+
Services: []string{"api", "web"},
115+
})
116+
if err != nil {
117+
t.Fatalf("Subscribe() failed: %v", err)
118+
}
119+
120+
var wg sync.WaitGroup
121+
wg.Add(1)
122+
go func() {
123+
defer wg.Done()
124+
125+
filename := path.Join("test_ecs_events", name+".events")
126+
ef, _ := expectedDir.ReadFile(filename)
127+
dec := json.NewDecoder(bytes.NewReader(ef))
128+
129+
for {
130+
if !resp.Receive() {
131+
if resp.Err() != nil {
132+
t.Errorf("Receive() failed: %v", resp.Err())
133+
}
134+
break
135+
}
136+
msg := resp.Msg()
137+
var expected defangv1.SubscribeResponse
138+
if err := dec.Decode(&expected); err == io.EOF {
139+
t.Errorf("unexpected message: %v", msg)
140+
} else if err != nil {
141+
t.Errorf("error unmarshaling expected ECS event: %v", err)
142+
} else if msg.Name != expected.Name || msg.Status != expected.Status || msg.State != expected.State {
143+
t.Errorf("expected message-, got+\n-%v\n+%v", &expected, msg)
144+
}
145+
}
146+
}()
147+
148+
data, err := testDir.ReadFile(path.Join("test_ecs_events", tt.Name()))
149+
if err != nil {
150+
t.Fatalf("failed to read test file: %v", err)
151+
}
152+
lines := bufio.NewScanner(bytes.NewReader(data))
153+
for lines.Scan() {
154+
ecsEvt, err := ecs.ParseECSEvent([]byte(lines.Text()))
155+
if err != nil {
156+
t.Fatalf("error parsing ECS event: %v", err)
157+
}
158+
159+
byoc.HandleECSEvent(ecsEvt)
160+
}
161+
resp.Close()
162+
163+
wg.Wait()
164+
})
165+
}
166+
}

src/pkg/cli/client/byoc/aws/stream.go

Lines changed: 34 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package aws
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76
"io"
8-
"path"
97
"strings"
108
"time"
119

@@ -23,25 +21,22 @@ import (
2321
type byocServerStream struct {
2422
ctx context.Context
2523
err error
26-
errCh <-chan error
2724
etag string
2825
response *defangv1.TailResponse
2926
services []string
3027
stream ecs.EventStream
31-
}
3228

33-
func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string) *byocServerStream {
34-
var errCh <-chan error
35-
if errch, ok := stream.(hasErrCh); ok {
36-
errCh = errch.Errs()
37-
}
29+
ecsEventsHandler ECSEventHandler
30+
}
3831

32+
func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
3933
return &byocServerStream{
4034
ctx: ctx,
41-
errCh: errCh,
4235
etag: etag,
4336
stream: stream,
4437
services: services,
38+
39+
ecsEventsHandler: ecsEventHandler,
4540
}
4641
}
4742

@@ -62,24 +57,20 @@ func (bs *byocServerStream) Msg() *defangv1.TailResponse {
6257
return bs.response
6358
}
6459

65-
type hasErrCh interface {
66-
Errs() <-chan error
67-
}
68-
6960
func (bs *byocServerStream) Receive() bool {
7061
var evts []ecs.LogEvent
7162
select {
7263
case e := <-bs.stream.Events(): // blocking
64+
if bs.stream.Err() != nil {
65+
bs.err = bs.stream.Err()
66+
return false
67+
}
7368
var err error
7469
evts, err = ecs.GetLogEvents(e)
7570
if err != nil {
7671
bs.err = err
7772
return false
7873
}
79-
case err := <-bs.errCh: // blocking (if not nil)
80-
bs.err = err
81-
return false // abort on first error?
82-
8374
case <-bs.ctx.Done(): // blocking (if not nil)
8475
bs.err = context.Cause(bs.ctx)
8576
return false
@@ -132,19 +123,21 @@ func (bs *byocServerStream) parseEvents(events []ecs.LogEvent) (*defangv1.TailRe
132123
}
133124
} else if strings.HasSuffix(*event.LogGroupIdentifier, "/ecs") || strings.HasSuffix(*event.LogGroupIdentifier, "/ecs:*") {
134125
parseECSEventRecords = true
126+
response.Etag = bs.etag
127+
response.Service = "ecs"
135128
}
136129

137130
// Client-side filtering
138131
if bs.etag != "" && bs.etag != response.Etag {
139132
return nil, nil // TODO: filter these out using the AWS StartLiveTail API
140133
}
141134

142-
if len(bs.services) > 0 && !pkg.Contains(bs.services, bs.response.GetService()) {
135+
if len(bs.services) > 0 && !pkg.Contains(bs.services, response.GetService()) {
143136
return nil, nil // TODO: filter these out using the AWS StartLiveTail API
144137
}
145138

146-
entries := make([]*defangv1.LogEntry, len(events))
147-
for i, event := range events {
139+
entries := make([]*defangv1.LogEntry, 0, len(events))
140+
for _, event := range events {
148141
entry := &defangv1.LogEntry{
149142
Message: *event.Message,
150143
Stderr: false,
@@ -162,81 +155,36 @@ func (bs *byocServerStream) parseEvents(events []ecs.LogEvent) (*defangv1.TailRe
162155
}
163156
} else if parseECSEventRecords {
164157
var err error
165-
if err = parseECSEventRecord(event, entry); err != nil {
158+
if err = bs.parseECSEventRecord(event, entry); err != nil {
166159
term.Debugf("error parsing ECS event, output raw event log: %v", err)
167160
}
168161
} else if response.Service == "cd" && strings.HasPrefix(entry.Message, " ** ") {
169162
entry.Stderr = true
170163
}
171-
entries[i] = entry
164+
if entry.Etag != "" && bs.etag != "" && entry.Etag != bs.etag {
165+
continue
166+
}
167+
if entry.Service != "" && len(bs.services) > 0 && !pkg.Contains(bs.services, entry.Service) {
168+
continue
169+
}
170+
entries = append(entries, entry)
171+
}
172+
if len(entries) == 0 {
173+
return nil, nil
172174
}
173175
response.Entries = entries
174176
return &response, nil
175177
}
176178

177-
func parseECSEventRecord(event ecs.LogEvent, entry *defangv1.LogEntry) error {
178-
var ecsEvt ecs.Event
179-
if err := json.Unmarshal([]byte(*event.Message), &ecsEvt); err != nil {
180-
return fmt.Errorf("error unmarshaling ECS event: %w", err)
181-
}
182-
183-
var buf strings.Builder
184-
fmt.Fprintf(&buf, "%s ", ecsEvt.DetailType)
185-
if len(ecsEvt.Resources) > 0 {
186-
fmt.Fprintf(&buf, "%s ", path.Base(ecsEvt.Resources[0]))
187-
}
188-
switch ecsEvt.DetailType {
189-
case "ECS Task State Change":
190-
var detail ecs.ECSTaskStateChange
191-
if err := json.Unmarshal(ecsEvt.Detail, &detail); err != nil {
192-
return fmt.Errorf("error unmarshaling ECS task state change: %w", err)
193-
}
194-
195-
// Container name is in the format of "service_etag"
196-
if len(detail.Containers) < 1 {
197-
return fmt.Errorf("error parsing ECS task state change: missing containers section")
198-
}
199-
i := strings.LastIndex(detail.Containers[0].Name, "_")
200-
if i < 0 {
201-
return fmt.Errorf("error parsing ECS task state change: invalid container name %q", detail.Containers[0].Name)
202-
}
203-
entry.Service = detail.Containers[0].Name[:i]
204-
entry.Etag = detail.Containers[0].Name[i+1:]
205-
entry.Host = path.Base(ecsEvt.Resources[0])
206-
fmt.Fprintf(&buf, "%s %s", path.Base(detail.ClusterArn), detail.LastStatus)
207-
if detail.StoppedReason != "" {
208-
fmt.Fprintf(&buf, " : %s", detail.StoppedReason)
209-
}
210-
case "ECS Service Action", "ECS Deployment State Change": // pretty much the same JSON structure for both
211-
var detail ecs.ECSDeploymentStateChange
212-
if err := json.Unmarshal(ecsEvt.Detail, &detail); err != nil {
213-
return fmt.Errorf("error unmarshaling ECS service/deployment event: %v", err)
214-
}
215-
ecsSvcName := path.Base(ecsEvt.Resources[0])
216-
// TODO: etag is not available at service and deployment level, find a possible correlation, possibly task definition revision using the deploymentId
217-
snStart := strings.LastIndex(ecsSvcName, "_") // ecsSvcName is in the format "project_service-random", our validation does not allow '_' in service names
218-
snEnd := strings.LastIndex(ecsSvcName, "-")
219-
if snStart < 0 || snEnd < 0 || snStart >= snEnd {
220-
return fmt.Errorf("error parsing ECS service action: invalid service name %q", ecsEvt.Resources[0])
221-
}
222-
entry.Service = ecsSvcName[snStart+1 : snEnd]
223-
entry.Host = detail.DeploymentId
224-
fmt.Fprintf(&buf, "%s", detail.EventName)
225-
if detail.Reason != "" {
226-
fmt.Fprintf(&buf, " : %s", detail.Reason)
227-
}
228-
default:
229-
entry.Service = "ecs"
230-
if len(ecsEvt.Resources) > 0 {
231-
entry.Host = path.Base(ecsEvt.Resources[0])
232-
}
233-
// Print the unrecogonalized ECS event detail in prettry JSON format if possible
234-
raw, err := json.MarshalIndent(ecsEvt.Detail, "", " ")
235-
if err != nil {
236-
raw = []byte(ecsEvt.Detail)
237-
}
238-
fmt.Fprintf(&buf, "\n%s", raw)
179+
func (bs *byocServerStream) parseECSEventRecord(event ecs.LogEvent, entry *defangv1.LogEntry) error {
180+
evt, err := ecs.ParseECSEvent([]byte(*event.Message))
181+
if err != nil {
182+
return err
239183
}
240-
entry.Message = buf.String()
184+
bs.ecsEventsHandler.HandleECSEvent(evt)
185+
entry.Service = evt.Service()
186+
entry.Etag = evt.Etag()
187+
entry.Host = evt.Host()
188+
entry.Message = evt.Status()
241189
return nil
242190
}

0 commit comments

Comments
 (0)