Skip to content

Commit e5126b7

Browse files
authored
feat(receiver): aws-like report log for invocations (#2011)
* feat(receiver): aws-like report log for invocations * add init duration * use types * fix tests * remove continue
1 parent 8682d99 commit e5126b7

File tree

4 files changed

+327
-2
lines changed

4 files changed

+327
-2
lines changed

collector/internal/telemetryapi/types.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
PlatformInitStart EventType = Platform + ".initStart"
2525
// PlatformInitRuntimeDone is used when function initialization ended.
2626
PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone"
27+
// PlatformReport is used when a report of function invocation is received.
28+
PlatformReport EventType = Platform + ".report"
2729
// Function invocation started.
2830
PlatformStart EventType = Platform + ".start"
2931
// The runtime finished processing an event with either success or failure.
@@ -95,3 +97,15 @@ type Event struct {
9597
Type string `json:"type"`
9698
Record map[string]any `json:"record"`
9799
}
100+
101+
// MetricType represents the type of metric in the platform.report event
102+
// see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#ReportMetrics
103+
type MetricType string
104+
105+
const (
106+
MetricBilledDurationMs MetricType = "billedDurationMs"
107+
MetricDurationMs MetricType = "durationMs"
108+
MetricMaxMemoryUsedMB MetricType = "maxMemoryUsedMB"
109+
MetricMemorySizeMB MetricType = "memorySizeMB"
110+
MetricInitDurationMs MetricType = "initDurationMs"
111+
)

collector/receiver/telemetryapireceiver/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Config struct {
2323
extensionID string
2424
Port int `mapstructure:"port"`
2525
Types []string `mapstructure:"types"`
26+
LogReport bool `mapstructure:"log_report"`
2627
}
2728

2829
// Validate validates the configuration by checking for missing or invalid fields

collector/receiver/telemetryapireceiver/receiver.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ import (
4141
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
4242
)
4343

44-
const initialQueueSize = 5
45-
const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
44+
const (
45+
initialQueueSize = 5
46+
scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
47+
logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB"
48+
)
4649

4750
type telemetryAPIReceiver struct {
4851
httpServer *http.Server
@@ -57,6 +60,7 @@ type telemetryAPIReceiver struct {
5760
types []telemetryapi.EventType
5861
resource pcommon.Resource
5962
currentFaasInvocationID string
63+
logReport bool
6064
}
6165

6266
func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
@@ -242,12 +246,79 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
242246
}
243247
} else if el.Type == string(telemetryapi.PlatformRuntimeDone) {
244248
r.currentFaasInvocationID = ""
249+
} else if el.Type == string(telemetryapi.PlatformReport) && r.logReport {
250+
if record, ok := el.Record.(map[string]interface{}); ok {
251+
if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil {
252+
logRecord.Attributes().PutStr("type", el.Type)
253+
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
254+
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
255+
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
256+
}
257+
}
258+
}
245259
}
246260
}
247261
}
248262
return log, nil
249263
}
250264

265+
// createReportLogRecord creates a log record for the platform.report event
266+
// returns the log record if successful, otherwise nil
267+
func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord {
268+
// gathering metrics
269+
metrics, ok := record["metrics"].(map[string]interface{})
270+
if !ok {
271+
return nil
272+
}
273+
var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64
274+
if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok {
275+
return nil
276+
}
277+
if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok {
278+
return nil
279+
}
280+
if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok {
281+
return nil
282+
}
283+
if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok {
284+
return nil
285+
}
286+
287+
// optionally gather information about cold start time
288+
var initDurationMs float64
289+
if initDurationMsVal, exists := metrics[string(telemetryapi.MetricInitDurationMs)]; exists {
290+
if val, ok := initDurationMsVal.(float64); ok {
291+
initDurationMs = val
292+
}
293+
}
294+
295+
// gathering requestId
296+
requestId := ""
297+
if requestId, ok = record["requestId"].(string); !ok {
298+
return nil
299+
}
300+
301+
// we have all information available, we can create the log record
302+
logRecord := scopeLog.LogRecords().AppendEmpty()
303+
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)
304+
305+
// building the body of the log record, optionally adding the init duration
306+
body := fmt.Sprintf(
307+
logReportFmt,
308+
requestId,
309+
durationMs,
310+
billedDurationMs,
311+
memorySizeMB,
312+
maxMemoryUsedMB,
313+
)
314+
if initDurationMs > 0 {
315+
body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs)
316+
}
317+
logRecord.Body().SetStr(body)
318+
319+
return &logRecord
320+
}
321+
251322
func severityTextToNumber(severityText string) plog.SeverityNumber {
252323
mapping := map[string]plog.SeverityNumber{
253324
"TRACE": plog.SeverityNumberTrace,
@@ -366,6 +437,7 @@ func newTelemetryAPIReceiver(
366437
port: cfg.Port,
367438
types: subscribedTypes,
368439
resource: r,
440+
logReport: cfg.LogReport,
369441
}, nil
370442
}
371443

collector/receiver/telemetryapireceiver/receiver_test.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,244 @@ func TestCreateLogs(t *testing.T) {
516516
}
517517
}
518518

519+
func TestCreateLogsWithLogReport(t *testing.T) {
520+
t.Parallel()
521+
522+
testCases := []struct {
523+
desc string
524+
slice []event
525+
logReport bool
526+
expectedLogRecords int
527+
expectedType string
528+
expectedTimestamp string
529+
expectedBody string
530+
expectedAttributes map[string]interface{}
531+
expectError bool
532+
}{
533+
{
534+
desc: "platform.report with logReport enabled - valid metrics",
535+
slice: []event{
536+
{
537+
Time: "2022-10-12T00:03:50.000Z",
538+
Type: "platform.report",
539+
Record: map[string]any{
540+
"requestId": "test-request-id-123",
541+
"metrics": map[string]any{
542+
"durationMs": 123.45,
543+
"billedDurationMs": float64(124),
544+
"memorySizeMB": float64(512),
545+
"maxMemoryUsedMB": float64(256),
546+
},
547+
},
548+
},
549+
},
550+
logReport: true,
551+
expectedLogRecords: 1,
552+
expectedType: "platform.report",
553+
expectedTimestamp: "2022-10-12T00:03:50.000Z",
554+
expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB",
555+
expectError: false,
556+
},
557+
{
558+
desc: "platform.report with logReport disabled",
559+
slice: []event{
560+
{
561+
Time: "2022-10-12T00:03:50.000Z",
562+
Type: "platform.report",
563+
Record: map[string]any{
564+
"requestId": "test-request-id-123",
565+
"metrics": map[string]any{
566+
"durationMs": 123.45,
567+
"billedDurationMs": 124,
568+
"memorySizeMB": 512,
569+
"maxMemoryUsedMB": 256,
570+
},
571+
},
572+
},
573+
},
574+
logReport: false,
575+
expectedLogRecords: 0,
576+
expectError: false,
577+
},
578+
{
579+
desc: "platform.report with logReport enabled - missing requestId",
580+
slice: []event{
581+
{
582+
Time: "2022-10-12T00:03:50.000Z",
583+
Type: "platform.report",
584+
Record: map[string]any{
585+
"metrics": map[string]any{
586+
"durationMs": 123.45,
587+
"billedDurationMs": 124,
588+
"memorySizeMB": 512,
589+
"maxMemoryUsedMB": 256,
590+
},
591+
},
592+
},
593+
},
594+
logReport: false,
595+
expectedLogRecords: 0,
596+
expectError: false,
597+
},
598+
{
599+
desc: "platform.report with logReport enabled - invalid timestamp",
600+
slice: []event{
601+
{
602+
Time: "invalid-timestamp",
603+
Type: "platform.report",
604+
Record: map[string]any{
605+
"requestId": "test-request-id-123",
606+
"metrics": map[string]any{
607+
"durationMs": 123.45,
608+
"billedDurationMs": 124,
609+
"memorySizeMB": 512,
610+
"maxMemoryUsedMB": 256,
611+
},
612+
},
613+
},
614+
},
615+
logReport: false,
616+
expectedLogRecords: 0,
617+
expectError: false,
618+
},
619+
{
620+
desc: "platform.report with logReport enabled - missing metrics",
621+
slice: []event{
622+
{
623+
Time: "2022-10-12T00:03:50.000Z",
624+
Type: "platform.report",
625+
Record: map[string]any{
626+
"requestId": "test-request-id-123",
627+
},
628+
},
629+
},
630+
logReport: false,
631+
expectedLogRecords: 0,
632+
expectError: false,
633+
},
634+
{
635+
desc: "platform.report with logReport enabled - invalid metrics format",
636+
slice: []event{
637+
{
638+
Time: "2022-10-12T00:03:50.000Z",
639+
Type: "platform.report",
640+
Record: map[string]any{
641+
"requestId": "test-request-id-123",
642+
"metrics": map[string]any{
643+
"durationMs": "invalid",
644+
"billedDurationMs": 124,
645+
"memorySizeMB": 512,
646+
"maxMemoryUsedMB": 256,
647+
},
648+
},
649+
},
650+
},
651+
logReport: false,
652+
expectedLogRecords: 0,
653+
expectError: false,
654+
},
655+
{
656+
desc: "platform.report with logReport enabled - record not a map",
657+
slice: []event{
658+
{
659+
Time: "2022-10-12T00:03:50.000Z",
660+
Type: "platform.report",
661+
Record: "invalid record format",
662+
},
663+
},
664+
logReport: true,
665+
expectedLogRecords: 0,
666+
expectError: false,
667+
},
668+
{
669+
desc: "platform.report with logReport enabled - with initDurationMs",
670+
slice: []event{
671+
{
672+
Time: "2022-10-12T00:03:50.000Z",
673+
Type: "platform.report",
674+
Record: map[string]any{
675+
"requestId": "test-request-id-123",
676+
"metrics": map[string]any{
677+
"durationMs": 123.45,
678+
"billedDurationMs": 124.0,
679+
"memorySizeMB": 512.0,
680+
"maxMemoryUsedMB": 256.0,
681+
"initDurationMs": 50.5,
682+
},
683+
},
684+
},
685+
},
686+
logReport: true,
687+
expectedLogRecords: 1,
688+
expectedType: "platform.report",
689+
expectedTimestamp: "2022-10-12T00:03:50.000Z",
690+
expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB Init Duration: 50.50 ms",
691+
expectError: false,
692+
},
693+
{
694+
desc: "platform.report with logReport enabled - with invalid initDurationMs type",
695+
slice: []event{
696+
{
697+
Time: "2022-10-12T00:03:50.000Z",
698+
Type: "platform.report",
699+
Record: map[string]any{
700+
"requestId": "test-request-id-123",
701+
"metrics": map[string]any{
702+
"durationMs": 123.45,
703+
"billedDurationMs": 124.0,
704+
"memorySizeMB": 512.0,
705+
"maxMemoryUsedMB": 256.0,
706+
"initDurationMs": "invalid-string",
707+
},
708+
},
709+
},
710+
},
711+
logReport: true,
712+
expectedLogRecords: 1,
713+
expectedType: "platform.report",
714+
expectedTimestamp: "2022-10-12T00:03:50.000Z",
715+
expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB",
716+
expectError: false,
717+
},
718+
}
719+
for _, tc := range testCases {
720+
t.Run(tc.desc, func(t *testing.T) {
721+
r, err := newTelemetryAPIReceiver(
722+
&Config{LogReport: tc.logReport},
723+
receivertest.NewNopSettings(Type),
724+
)
725+
require.NoError(t, err)
726+
log, err := r.createLogs(tc.slice)
727+
if tc.expectError {
728+
require.Error(t, err)
729+
} else {
730+
require.NoError(t, err)
731+
require.Equal(t, 1, log.ResourceLogs().Len())
732+
resourceLog := log.ResourceLogs().At(0)
733+
require.Equal(t, 1, resourceLog.ScopeLogs().Len())
734+
scopeLog := resourceLog.ScopeLogs().At(0)
735+
require.Equal(t, scopeName, scopeLog.Scope().Name())
736+
require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len())
737+
if scopeLog.LogRecords().Len() > 0 {
738+
logRecord := scopeLog.LogRecords().At(0)
739+
attr, ok := logRecord.Attributes().Get("type")
740+
require.True(t, ok)
741+
require.Equal(t, tc.expectedType, attr.Str())
742+
if tc.expectedTimestamp != "" {
743+
expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp)
744+
require.NoError(t, err)
745+
require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp())
746+
} else {
747+
// For invalid timestamps, no timestamp should be set (zero value)
748+
require.Equal(t, pcommon.Timestamp(0), logRecord.Timestamp())
749+
}
750+
require.Equal(t, tc.expectedBody, logRecord.Body().Str())
751+
}
752+
}
753+
})
754+
}
755+
}
756+
519757
func TestSeverityTextToNumber(t *testing.T) {
520758
t.Parallel()
521759

0 commit comments

Comments
 (0)