Skip to content

Commit bcaabd3

Browse files
authored
refactor(aws): clean-up logs parsing (#1458)
1 parent 40cf4c9 commit bcaabd3

File tree

1 file changed

+35
-32
lines changed

1 file changed

+35
-32
lines changed

src/pkg/cli/client/byoc/aws/stream.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -71,54 +71,57 @@ func (bs *byocServerStream) Receive() bool {
7171
}
7272

7373
func (bs *byocServerStream) parseEvents(events []ecs.LogEvent) *defangv1.TailResponse {
74-
var response defangv1.TailResponse
7574
if len(events) == 0 {
7675
// The original gRPC/connect server stream would never send an empty response.
7776
// We could loop around the select, but returning an empty response updates the spinner.
7877
return nil
7978
}
79+
var response defangv1.TailResponse
8080
parseFirelensRecords := false
8181
parseECSEventRecords := false
82-
// Get the Etag/Host/Service from the first event (should be the same for all events in this batch)
83-
event := events[0]
84-
if parts := strings.Split(*event.LogStreamName, "/"); len(parts) == 3 {
85-
if strings.Contains(*event.LogGroupIdentifier, ":"+byoc.CdTaskPrefix) {
86-
// These events are from the CD task: "crun/main/taskID" stream; we should detect stdout/stderr
87-
response.Etag = bs.etag // pass the etag filter below, but we already filtered the tail by taskID
88-
response.Host = "pulumi"
89-
response.Service = "cd"
90-
} else {
91-
// These events are from an awslogs service task: "tenant/service_etag/taskID" stream
92-
response.Host = parts[2] // TODO: figure out actual hostname/IP
93-
parts = strings.Split(parts[1], "_")
94-
if len(parts) != 2 || !pkg.IsValidRandomID(parts[1]) {
95-
// skip, ignore sidecar logs (like route53-sidecar or fluentbit)
96-
return nil
97-
}
98-
service, etag := parts[0], parts[1]
99-
response.Etag = etag
100-
response.Service = service
101-
}
102-
} else if strings.Contains(*event.LogStreamName, "-firelens-") {
103-
// These events are from the Firelens sidecar; try to parse the JSON
82+
// Get the Etag/Host/Service from the first entry (should be the same for all events in this batch)
83+
first := events[0]
84+
switch {
85+
case strings.HasSuffix(*first.LogGroupIdentifier, "/ecs"):
86+
// ECS lifecycle events. LogStreams: "f0b805a8-fa74-3212-b6ce-a981c011d337"
87+
parseECSEventRecords = true
88+
case strings.Contains(*first.LogGroupIdentifier, ":"+byoc.CdTaskPrefix):
89+
// These events are from the CD task: "crun/main/<taskID>" stream; we should detect stdout/stderr
90+
// LogStreams: "crun/main/0f2a8ccde0374239bdd04f5e07d8c523"
91+
response.Host = "pulumi"
92+
response.Service = "cd"
93+
case strings.HasSuffix(*first.LogGroupIdentifier, "/builds") && strings.Contains(*first.LogStreamName, "-firelens-"):
94+
// These events are from the Firelens sidecar "<service>/<kaniko>-firelens-<taskID>"; try to parse the JSON
95+
// LogStreams: "app-image/kaniko-firelens-babe6cdb246b4c10b5b7093bb294e6c7"
10496
var record logs.FirelensMessage
105-
if err := json.Unmarshal([]byte(*event.Message), &record); err == nil {
97+
if err := json.Unmarshal([]byte(*first.Message), &record); err == nil {
10698
response.Etag = record.Etag
10799
response.Host = record.Host
108-
response.Service = record.ContainerName // TODO: could be service_etag
100+
response.Service = record.ContainerName // TODO: ContainerName could be service_etag
109101
parseFirelensRecords = true
102+
break
110103
}
111-
} else if strings.HasSuffix(*event.LogGroupIdentifier, "/ecs") || strings.HasSuffix(*event.LogGroupIdentifier, "/ecs:*") {
112-
parseECSEventRecords = true
113-
response.Etag = bs.etag
114-
response.Service = "ecs"
104+
fallthrough
105+
default:
106+
if parts := strings.Split(*first.LogStreamName, "/"); len(parts) == 3 {
107+
// These events are from an awslogs ECS task: "<tenant>/<service>_<etag>/<taskID>" stream
108+
// LogStreams: "app/app_hg2xsgvsldqk/198f58c08c734bda924edc516f93b2d5"
109+
response.Host = parts[2] // TODO: figure out actual hostname/IP for Task ID
110+
underscore := strings.LastIndexByte(parts[1], '_')
111+
if etag := parts[1][underscore+1:]; pkg.IsValidRandomID(etag) {
112+
response.Service = parts[1][:underscore]
113+
response.Etag = etag
114+
break
115+
}
116+
}
117+
term.Debugf("unrecognized log stream format: %s", *first.LogStreamName)
118+
return nil // skip, ignore sidecar logs (like route53-sidecar or fluentbit)
115119
}
116120

117-
// Client-side filtering
118-
if bs.etag != "" && bs.etag != response.Etag {
121+
// Client-side filtering on etag and service (if provided)
122+
if response.Etag != "" && bs.etag != "" && bs.etag != response.Etag {
119123
return nil // TODO: filter these out using the AWS StartLiveTail API
120124
}
121-
122125
if len(bs.services) > 0 && !slices.Contains(bs.services, response.GetService()) {
123126
return nil // TODO: filter these out using the AWS StartLiveTail API
124127
}

0 commit comments

Comments
 (0)