Skip to content

Commit b8d5e38

Browse files
edwardrfedw-defanglionello
authored
Implement Subscribe API call for BYOC (#613)
* Implement Subscribe API call for BYOC also fix collection stream error handling * Add subscribe test and output whole message in one call in tail * Remove Errs() method from collection stream Keep all channel logic inside collection stream code * Abstract ECS event parsing * Fix event filtering, use defer to make sure term clean up * Restructure tail log message construction * Simplify append Co-authored-by: Lio李歐 <[email protected]> * Revert "Simplify append" This reverts commit ba38ee9. --------- Co-authored-by: Edward J <[email protected]> Co-authored-by: Lio李歐 <[email protected]>
1 parent 2a0552a commit b8d5e38

19 files changed

+1046
-154
lines changed

src/pkg/cli/client/byoc/aws/byoc.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"os"
1313
"sort"
1414
"strings"
15-
"time"
15+
"sync"
1616

1717
"github.com/DefangLabs/defang/src/pkg"
1818
"github.com/DefangLabs/defang/src/pkg/cli/client"
@@ -41,6 +41,9 @@ type ByocAws struct {
4141
cdTasks map[string]ecs.TaskArn
4242
driver *cfn.AwsEcs
4343
publicNatIps []string
44+
45+
ecsEventHandlers []ECSEventHandler
46+
handlersLock sync.RWMutex
4447
}
4548

4649
var _ client.Client = (*ByocAws)(nil)
@@ -452,8 +455,6 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
452455
return nil, err
453456
}
454457

455-
ctx, cancel := context.WithCancelCause(ctx)
456-
457458
etag := req.Etag
458459
// if etag == "" && req.Service == "cd" {
459460
// etag = awsecs.GetTaskID(b.cdTaskArn); TODO: find the last CD task
@@ -493,16 +494,7 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
493494
return nil, annotateAwsError(err)
494495
}
495496

496-
if taskArn != nil {
497-
go func() {
498-
if err := ecs.WaitForTask(ctx, taskArn, 3*time.Second); err != nil {
499-
time.Sleep(time.Second) // make sure we got all the logs from the task before cancelling
500-
cancel(err)
501-
}
502-
}()
503-
}
504-
505-
return newByocServerStream(ctx, eventStream, etag, req.GetServices()), nil
497+
return newByocServerStream(ctx, eventStream, etag, req.GetServices(), b), nil
506498
}
507499

508500
// This function was copied from Fabric controller and slightly modified to work with BYOC
@@ -781,6 +773,32 @@ func ensure(cond bool, msg string) {
781773
}
782774
}
783775

784-
func (b *ByocAws) Subscribe(context.Context, *defangv1.SubscribeRequest) (client.ServerStream[defangv1.SubscribeResponse], error) {
785-
return nil, client.ErrNotImplemented("not yet implemented for BYOC; please use the AWS ECS dashboard") // FIXME: implement this for BYOC
776+
type ECSEventHandler interface {
777+
HandleECSEvent(evt ecs.Event)
778+
}
779+
780+
func (b *ByocAws) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest) (client.ServerStream[defangv1.SubscribeResponse], error) {
781+
s := &byocSubscribeServerStream{
782+
services: req.Services,
783+
etag: req.Etag,
784+
ctx: ctx,
785+
786+
ch: make(chan *defangv1.SubscribeResponse),
787+
}
788+
b.AddEcsEventHandler(s)
789+
return s, nil
790+
}
791+
792+
func (b *ByocAws) HandleECSEvent(evt ecs.Event) {
793+
b.handlersLock.RLock()
794+
defer b.handlersLock.RUnlock()
795+
for _, handler := range b.ecsEventHandlers {
796+
handler.HandleECSEvent(evt)
797+
}
798+
}
799+
800+
func (b *ByocAws) AddEcsEventHandler(handler ECSEventHandler) {
801+
b.handlersLock.Lock()
802+
defer b.handlersLock.Unlock()
803+
b.ecsEventHandlers = append(b.ecsEventHandlers, handler)
786804
}

src/pkg/cli/client/byoc/aws/byoc_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
package aws
22

33
import (
4+
"bufio"
5+
"bytes"
46
"context"
7+
"embed"
8+
"encoding/json"
9+
"io"
10+
"path"
11+
"strings"
12+
"sync"
513
"testing"
614

715
"github.com/DefangLabs/defang/src/pkg/cli/client"
16+
"github.com/DefangLabs/defang/src/pkg/clouds/aws/ecs"
817
"github.com/DefangLabs/defang/src/pkg/types"
918
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
1019
compose "github.com/compose-spec/compose-go/v2/types"
@@ -74,3 +83,83 @@ func (f FakeLoader) LoadProject(ctx context.Context) (*compose.Project, error) {
7483
func (f FakeLoader) LoadProjectName(ctx context.Context) (string, error) {
7584
return f.ProjectName, nil
7685
}
86+
87+
//go:embed test_ecs_events/*.json
88+
var testDir embed.FS
89+
90+
//go:embed test_ecs_events/*.events
91+
var expectedDir embed.FS
92+
93+
func TestSubscribe(t *testing.T) {
94+
t.Skip("Pending test")
95+
tests, err := testDir.ReadDir("test_ecs_events")
96+
if err != nil {
97+
t.Fatalf("failed to load ecs events test files: %v", err)
98+
}
99+
for _, tt := range tests {
100+
t.Run(tt.Name(), func(t *testing.T) {
101+
start := strings.LastIndex(tt.Name(), "-")
102+
end := strings.LastIndex(tt.Name(), ".")
103+
if start == -1 || end == -1 {
104+
t.Fatalf("cannot find etag from invalid test file name: %s", tt.Name())
105+
}
106+
name := tt.Name()[:start]
107+
etag := tt.Name()[start+1 : end]
108+
109+
byoc := &ByocAws{}
110+
111+
resp, err := byoc.Subscribe(context.Background(), &defangv1.SubscribeRequest{
112+
Etag: etag,
113+
Services: []string{"api", "web"},
114+
})
115+
if err != nil {
116+
t.Fatalf("Subscribe() failed: %v", err)
117+
}
118+
119+
var wg sync.WaitGroup
120+
wg.Add(1)
121+
go func() {
122+
defer wg.Done()
123+
124+
filename := path.Join("test_ecs_events", name+".events")
125+
ef, _ := expectedDir.ReadFile(filename)
126+
dec := json.NewDecoder(bytes.NewReader(ef))
127+
128+
for {
129+
if !resp.Receive() {
130+
if resp.Err() != nil {
131+
t.Errorf("Receive() failed: %v", resp.Err())
132+
}
133+
break
134+
}
135+
msg := resp.Msg()
136+
var expected defangv1.SubscribeResponse
137+
if err := dec.Decode(&expected); err == io.EOF {
138+
t.Errorf("unexpected message: %v", msg)
139+
} else if err != nil {
140+
t.Errorf("error unmarshaling expected ECS event: %v", err)
141+
} else if msg.Name != expected.Name || msg.Status != expected.Status || msg.State != expected.State {
142+
t.Errorf("expected message-, got+\n-%v\n+%v", &expected, msg)
143+
}
144+
}
145+
}()
146+
147+
data, err := testDir.ReadFile(path.Join("test_ecs_events", tt.Name()))
148+
if err != nil {
149+
t.Fatalf("failed to read test file: %v", err)
150+
}
151+
lines := bufio.NewScanner(bytes.NewReader(data))
152+
for lines.Scan() {
153+
ecsEvt, err := ecs.ParseECSEvent([]byte(lines.Text()))
154+
if err != nil {
155+
t.Fatalf("error parsing ECS event: %v", err)
156+
}
157+
158+
byoc.HandleECSEvent(ecsEvt)
159+
}
160+
resp.Close()
161+
162+
wg.Wait()
163+
})
164+
}
165+
}

src/pkg/cli/client/byoc/aws/stream.go

Lines changed: 34 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package aws
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76
"io"
8-
"path"
97
"strings"
108
"time"
119

@@ -23,25 +21,22 @@ import (
2321
type byocServerStream struct {
2422
ctx context.Context
2523
err error
26-
errCh <-chan error
2724
etag string
2825
response *defangv1.TailResponse
2926
services []string
3027
stream ecs.EventStream
31-
}
3228

33-
func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string) *byocServerStream {
34-
var errCh <-chan error
35-
if errch, ok := stream.(hasErrCh); ok {
36-
errCh = errch.Errs()
37-
}
29+
ecsEventsHandler ECSEventHandler
30+
}
3831

32+
func newByocServerStream(ctx context.Context, stream ecs.EventStream, etag string, services []string, ecsEventHandler ECSEventHandler) *byocServerStream {
3933
return &byocServerStream{
4034
ctx: ctx,
41-
errCh: errCh,
4235
etag: etag,
4336
stream: stream,
4437
services: services,
38+
39+
ecsEventsHandler: ecsEventHandler,
4540
}
4641
}
4742

@@ -62,24 +57,20 @@ func (bs *byocServerStream) Msg() *defangv1.TailResponse {
6257
return bs.response
6358
}
6459

65-
type hasErrCh interface {
66-
Errs() <-chan error
67-
}
68-
6960
func (bs *byocServerStream) Receive() bool {
7061
var evts []ecs.LogEvent
7162
select {
7263
case e := <-bs.stream.Events(): // blocking
64+
if bs.stream.Err() != nil {
65+
bs.err = bs.stream.Err()
66+
return false
67+
}
7368
var err error
7469
evts, err = ecs.GetLogEvents(e)
7570
if err != nil {
7671
bs.err = err
7772
return false
7873
}
79-
case err := <-bs.errCh: // blocking (if not nil)
80-
bs.err = err
81-
return false // abort on first error?
82-
8374
case <-bs.ctx.Done(): // blocking (if not nil)
8475
bs.err = context.Cause(bs.ctx)
8576
return false
@@ -132,19 +123,21 @@ func (bs *byocServerStream) parseEvents(events []ecs.LogEvent) (*defangv1.TailRe
132123
}
133124
} else if strings.HasSuffix(*event.LogGroupIdentifier, "/ecs") || strings.HasSuffix(*event.LogGroupIdentifier, "/ecs:*") {
134125
parseECSEventRecords = true
126+
response.Etag = bs.etag
127+
response.Service = "ecs"
135128
}
136129

137130
// Client-side filtering
138131
if bs.etag != "" && bs.etag != response.Etag {
139132
return nil, nil // TODO: filter these out using the AWS StartLiveTail API
140133
}
141134

142-
if len(bs.services) > 0 && !pkg.Contains(bs.services, bs.response.GetService()) {
135+
if len(bs.services) > 0 && !pkg.Contains(bs.services, response.GetService()) {
143136
return nil, nil // TODO: filter these out using the AWS StartLiveTail API
144137
}
145138

146-
entries := make([]*defangv1.LogEntry, len(events))
147-
for i, event := range events {
139+
entries := make([]*defangv1.LogEntry, 0, len(events))
140+
for _, event := range events {
148141
entry := &defangv1.LogEntry{
149142
Message: *event.Message,
150143
Stderr: false,
@@ -162,81 +155,36 @@ func (bs *byocServerStream) parseEvents(events []ecs.LogEvent) (*defangv1.TailRe
162155
}
163156
} else if parseECSEventRecords {
164157
var err error
165-
if err = parseECSEventRecord(event, entry); err != nil {
158+
if err = bs.parseECSEventRecord(event, entry); err != nil {
166159
term.Debugf("error parsing ECS event, output raw event log: %v", err)
167160
}
168161
} else if response.Service == "cd" && strings.HasPrefix(entry.Message, " ** ") {
169162
entry.Stderr = true
170163
}
171-
entries[i] = entry
164+
if entry.Etag != "" && bs.etag != "" && entry.Etag != bs.etag {
165+
continue
166+
}
167+
if entry.Service != "" && len(bs.services) > 0 && !pkg.Contains(bs.services, entry.Service) {
168+
continue
169+
}
170+
entries = append(entries, entry)
171+
}
172+
if len(entries) == 0 {
173+
return nil, nil
172174
}
173175
response.Entries = entries
174176
return &response, nil
175177
}
176178

177-
func parseECSEventRecord(event ecs.LogEvent, entry *defangv1.LogEntry) error {
178-
var ecsEvt ecs.Event
179-
if err := json.Unmarshal([]byte(*event.Message), &ecsEvt); err != nil {
180-
return fmt.Errorf("error unmarshaling ECS event: %w", err)
181-
}
182-
183-
var buf strings.Builder
184-
fmt.Fprintf(&buf, "%s ", ecsEvt.DetailType)
185-
if len(ecsEvt.Resources) > 0 {
186-
fmt.Fprintf(&buf, "%s ", path.Base(ecsEvt.Resources[0]))
187-
}
188-
switch ecsEvt.DetailType {
189-
case "ECS Task State Change":
190-
var detail ecs.ECSTaskStateChange
191-
if err := json.Unmarshal(ecsEvt.Detail, &detail); err != nil {
192-
return fmt.Errorf("error unmarshaling ECS task state change: %w", err)
193-
}
194-
195-
// Container name is in the format of "service_etag"
196-
if len(detail.Containers) < 1 {
197-
return fmt.Errorf("error parsing ECS task state change: missing containers section")
198-
}
199-
i := strings.LastIndex(detail.Containers[0].Name, "_")
200-
if i < 0 {
201-
return fmt.Errorf("error parsing ECS task state change: invalid container name %q", detail.Containers[0].Name)
202-
}
203-
entry.Service = detail.Containers[0].Name[:i]
204-
entry.Etag = detail.Containers[0].Name[i+1:]
205-
entry.Host = path.Base(ecsEvt.Resources[0])
206-
fmt.Fprintf(&buf, "%s %s", path.Base(detail.ClusterArn), detail.LastStatus)
207-
if detail.StoppedReason != "" {
208-
fmt.Fprintf(&buf, " : %s", detail.StoppedReason)
209-
}
210-
case "ECS Service Action", "ECS Deployment State Change": // pretty much the same JSON structure for both
211-
var detail ecs.ECSDeploymentStateChange
212-
if err := json.Unmarshal(ecsEvt.Detail, &detail); err != nil {
213-
return fmt.Errorf("error unmarshaling ECS service/deployment event: %v", err)
214-
}
215-
ecsSvcName := path.Base(ecsEvt.Resources[0])
216-
// TODO: etag is not available at service and deployment level, find a possible correlation, possibly task definition revision using the deploymentId
217-
snStart := strings.LastIndex(ecsSvcName, "_") // ecsSvcName is in the format "project_service-random", our validation does not allow '_' in service names
218-
snEnd := strings.LastIndex(ecsSvcName, "-")
219-
if snStart < 0 || snEnd < 0 || snStart >= snEnd {
220-
return fmt.Errorf("error parsing ECS service action: invalid service name %q", ecsEvt.Resources[0])
221-
}
222-
entry.Service = ecsSvcName[snStart+1 : snEnd]
223-
entry.Host = detail.DeploymentId
224-
fmt.Fprintf(&buf, "%s", detail.EventName)
225-
if detail.Reason != "" {
226-
fmt.Fprintf(&buf, " : %s", detail.Reason)
227-
}
228-
default:
229-
entry.Service = "ecs"
230-
if len(ecsEvt.Resources) > 0 {
231-
entry.Host = path.Base(ecsEvt.Resources[0])
232-
}
233-
// Print the unrecogonalized ECS event detail in prettry JSON format if possible
234-
raw, err := json.MarshalIndent(ecsEvt.Detail, "", " ")
235-
if err != nil {
236-
raw = []byte(ecsEvt.Detail)
237-
}
238-
fmt.Fprintf(&buf, "\n%s", raw)
179+
func (bs *byocServerStream) parseECSEventRecord(event ecs.LogEvent, entry *defangv1.LogEntry) error {
180+
evt, err := ecs.ParseECSEvent([]byte(*event.Message))
181+
if err != nil {
182+
return err
239183
}
240-
entry.Message = buf.String()
184+
bs.ecsEventsHandler.HandleECSEvent(evt)
185+
entry.Service = evt.Service()
186+
entry.Etag = evt.Etag()
187+
entry.Host = evt.Host()
188+
entry.Message = evt.Status()
241189
return nil
242190
}

0 commit comments

Comments
 (0)