Skip to content

Commit 78b91e8

Browse files
committed
feat(receiver): aws-like report log for invocations
1 parent 1a9cbcb commit 78b91e8

File tree

4 files changed

+259
-2
lines changed

4 files changed

+259
-2
lines changed

collector/internal/telemetryapi/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const (
2424
PlatformInitStart EventType = Platform + ".initStart"
2525
// PlatformInitRuntimeDone is used when function initialization ended.
2626
PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone"
27+
// PlatformReport is used when a report of function invocation is received.
28+
PlatformReport EventType = Platform + ".report"
2729
// Function invocation started.
2830
PlatformStart EventType = Platform + ".start"
2931
// The runtime finished processing an event with either success or failure.

collector/receiver/telemetryapireceiver/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Config struct {
2323
extensionID string
2424
Port int `mapstructure:"port"`
2525
Types []string `mapstructure:"types"`
26+
LogReport bool `mapstructure:"log_report"`
2627
}
2728

2829
// Validate validates the configuration by checking for missing or invalid fields

collector/receiver/telemetryapireceiver/receiver.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,16 @@ import (
4141
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
4242
)
4343

44-
const initialQueueSize = 5
45-
const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
44+
const (
45+
initialQueueSize = 5
46+
scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
47+
48+
logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %d ms Memory Size: %d MB Max Memory Used: %d MB"
49+
metricBilledDurationMs = "billedDurationMs"
50+
metricDurationMs = "durationMs"
51+
metricMaxMemoryUsedMB = "maxMemoryUsedMB"
52+
metricMemorySizeMB = "memorySizeMB"
53+
)
4654

4755
type telemetryAPIReceiver struct {
4856
httpServer *http.Server
@@ -57,6 +65,7 @@ type telemetryAPIReceiver struct {
5765
types []telemetryapi.EventType
5866
resource pcommon.Resource
5967
currentFaasInvocationID string
68+
logReport bool
6069
}
6170

6271
func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
@@ -242,12 +251,69 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
242251
}
243252
} else if el.Type == string(telemetryapi.PlatformRuntimeDone) {
244253
r.currentFaasInvocationID = ""
254+
} else if el.Type == string(telemetryapi.PlatformReport) && r.logReport {
255+
if record, ok := el.Record.(map[string]interface{}); ok {
256+
if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil {
257+
logRecord.Attributes().PutStr("type", el.Type)
258+
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
259+
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
260+
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
261+
} else {
262+
continue
263+
}
264+
}
265+
}
245266
}
246267
}
247268
}
248269
return log, nil
249270
}
250271

272+
// createReportLogRecord creates a log record for the platform.report event
273+
// returns the log record if successful, otherwise nil
274+
func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord {
275+
// gathering metrics
276+
metrics, ok := record["metrics"].(map[string]interface{})
277+
if !ok {
278+
return nil
279+
}
280+
var durationMs float64
281+
var billedDurationMs, memorySizeMB, maxMemoryUsedMB int
282+
if durationMs, ok = metrics[metricDurationMs].(float64); !ok {
283+
return nil
284+
}
285+
if billedDurationMs, ok = metrics[metricBilledDurationMs].(int); !ok {
286+
return nil
287+
}
288+
if memorySizeMB, ok = metrics[metricMemorySizeMB].(int); !ok {
289+
return nil
290+
}
291+
if maxMemoryUsedMB, ok = metrics[metricMaxMemoryUsedMB].(int); !ok {
292+
return nil
293+
}
294+
295+
// gathering requestId
296+
requestId := ""
297+
if requestId, ok = record["requestId"].(string); !ok {
298+
return nil
299+
}
300+
301+
// we have all information available, we can create the log record
302+
logRecord := scopeLog.LogRecords().AppendEmpty()
303+
logRecord.Body().SetStr(
304+
fmt.Sprintf(
305+
logReportFmt,
306+
requestId,
307+
durationMs,
308+
billedDurationMs,
309+
memorySizeMB,
310+
maxMemoryUsedMB,
311+
),
312+
)
313+
314+
return &logRecord
315+
}
316+
251317
func severityTextToNumber(severityText string) plog.SeverityNumber {
252318
mapping := map[string]plog.SeverityNumber{
253319
"TRACE": plog.SeverityNumberTrace,
@@ -366,6 +432,7 @@ func newTelemetryAPIReceiver(
366432
port: cfg.Port,
367433
types: subscribedTypes,
368434
resource: r,
435+
logReport: cfg.LogReport,
369436
}, nil
370437
}
371438

collector/receiver/telemetryapireceiver/receiver_test.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,193 @@ func TestCreateLogs(t *testing.T) {
516516
}
517517
}
518518

519+
func TestCreateLogsWithLogReport(t *testing.T) {
520+
t.Parallel()
521+
522+
testCases := []struct {
523+
desc string
524+
slice []event
525+
logReport bool
526+
expectedLogRecords int
527+
expectedType string
528+
expectedTimestamp string
529+
expectedBody string
530+
expectError bool
531+
}{
532+
{
533+
desc: "platform.report with logReport enabled - valid metrics",
534+
slice: []event{
535+
{
536+
Time: "2022-10-12T00:03:50.000Z",
537+
Type: "platform.report",
538+
Record: map[string]any{
539+
"requestId": "test-request-id-123",
540+
"metrics": map[string]any{
541+
"durationMs": 123.45,
542+
"billedDurationMs": 124,
543+
"memorySizeMB": 512,
544+
"maxMemoryUsedMB": 256,
545+
},
546+
},
547+
},
548+
},
549+
logReport: true,
550+
expectedLogRecords: 1,
551+
expectedType: "platform.report",
552+
expectedTimestamp: "2022-10-12T00:03:50.000Z",
553+
expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB",
554+
expectError: false,
555+
},
556+
{
557+
desc: "platform.report with logReport disabled",
558+
slice: []event{
559+
{
560+
Time: "2022-10-12T00:03:50.000Z",
561+
Type: "platform.report",
562+
Record: map[string]any{
563+
"requestId": "test-request-id-123",
564+
"metrics": map[string]any{
565+
"durationMs": 123.45,
566+
"billedDurationMs": 124,
567+
"memorySizeMB": 512,
568+
"maxMemoryUsedMB": 256,
569+
},
570+
},
571+
},
572+
},
573+
logReport: false,
574+
expectedLogRecords: 0,
575+
expectError: false,
576+
},
577+
{
578+
desc: "platform.report with logReport enabled - missing requestId",
579+
slice: []event{
580+
{
581+
Time: "2022-10-12T00:03:50.000Z",
582+
Type: "platform.report",
583+
Record: map[string]any{
584+
"metrics": map[string]any{
585+
"durationMs": 123.45,
586+
"billedDurationMs": 124,
587+
"memorySizeMB": 512,
588+
"maxMemoryUsedMB": 256,
589+
},
590+
},
591+
},
592+
},
593+
logReport: false,
594+
expectedLogRecords: 0,
595+
expectError: false,
596+
},
597+
{
598+
desc: "platform.report with logReport enabled - invalid timestamp",
599+
slice: []event{
600+
{
601+
Time: "invalid-timestamp",
602+
Type: "platform.report",
603+
Record: map[string]any{
604+
"requestId": "test-request-id-123",
605+
"metrics": map[string]any{
606+
"durationMs": 123.45,
607+
"billedDurationMs": 124,
608+
"memorySizeMB": 512,
609+
"maxMemoryUsedMB": 256,
610+
},
611+
},
612+
},
613+
},
614+
logReport: false,
615+
expectedLogRecords: 0,
616+
expectError: false,
617+
},
618+
{
619+
desc: "platform.report with logReport enabled - missing metrics",
620+
slice: []event{
621+
{
622+
Time: "2022-10-12T00:03:50.000Z",
623+
Type: "platform.report",
624+
Record: map[string]any{
625+
"requestId": "test-request-id-123",
626+
},
627+
},
628+
},
629+
logReport: false,
630+
expectedLogRecords: 0,
631+
expectError: false,
632+
},
633+
{
634+
desc: "platform.report with logReport enabled - invalid metrics format",
635+
slice: []event{
636+
{
637+
Time: "2022-10-12T00:03:50.000Z",
638+
Type: "platform.report",
639+
Record: map[string]any{
640+
"requestId": "test-request-id-123",
641+
"metrics": map[string]any{
642+
"durationMs": "invalid",
643+
"billedDurationMs": 124,
644+
"memorySizeMB": 512,
645+
"maxMemoryUsedMB": 256,
646+
},
647+
},
648+
},
649+
},
650+
logReport: false,
651+
expectedLogRecords: 0,
652+
expectError: false,
653+
},
654+
{
655+
desc: "platform.report with logReport enabled - record not a map",
656+
slice: []event{
657+
{
658+
Time: "2022-10-12T00:03:50.000Z",
659+
Type: "platform.report",
660+
Record: "invalid record format",
661+
},
662+
},
663+
logReport: true,
664+
expectedLogRecords: 0,
665+
expectError: false,
666+
},
667+
}
668+
for _, tc := range testCases {
669+
t.Run(tc.desc, func(t *testing.T) {
670+
r, err := newTelemetryAPIReceiver(
671+
&Config{LogReport: tc.logReport},
672+
receivertest.NewNopSettings(Type),
673+
)
674+
require.NoError(t, err)
675+
log, err := r.createLogs(tc.slice)
676+
if tc.expectError {
677+
require.Error(t, err)
678+
} else {
679+
require.NoError(t, err)
680+
require.Equal(t, 1, log.ResourceLogs().Len())
681+
resourceLog := log.ResourceLogs().At(0)
682+
require.Equal(t, 1, resourceLog.ScopeLogs().Len())
683+
scopeLog := resourceLog.ScopeLogs().At(0)
684+
require.Equal(t, scopeName, scopeLog.Scope().Name())
685+
require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len())
686+
if scopeLog.LogRecords().Len() > 0 {
687+
logRecord := scopeLog.LogRecords().At(0)
688+
attr, ok := logRecord.Attributes().Get("type")
689+
require.True(t, ok)
690+
require.Equal(t, tc.expectedType, attr.Str())
691+
if tc.expectedTimestamp != "" {
692+
expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp)
693+
require.NoError(t, err)
694+
require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp())
695+
} else {
696+
// For invalid timestamps, no timestamp should be set (zero value)
697+
require.Equal(t, pcommon.Timestamp(0), logRecord.Timestamp())
698+
}
699+
require.Equal(t, tc.expectedBody, logRecord.Body().Str())
700+
}
701+
}
702+
})
703+
}
704+
}
705+
519706
func TestSeverityTextToNumber(t *testing.T) {
520707
t.Parallel()
521708

0 commit comments

Comments
 (0)