Skip to content

Commit a99296b

Browse files
committed
update logging
1 parent 0c0e36a commit a99296b

File tree

3 files changed

+505
-13
lines changed

3 files changed

+505
-13
lines changed

collector/internal/telemetryapi/types.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,29 @@ const (
2424
PlatformInitStart EventType = Platform + ".initStart"
2525
// PlatformInitRuntimeDone is used when function initialization ended.
2626
PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone"
27-
// PlatformReport is used when a report of function invocation is received.
28-
PlatformReport EventType = Platform + ".report"
29-
// Function invocation started.
27+
// PlatformInitReport is used when a report of function initialization is received.
28+
PlatformInitReport EventType = Platform + ".initReport"
29+
// PlatformStart is used when function invocation started.
3030
PlatformStart EventType = Platform + ".start"
31-
// The runtime finished processing an event with either success or failure.
31+
// PlatformRuntimeDone is used when the runtime finished processing an event with either success or failure.
3232
PlatformRuntimeDone EventType = Platform + ".runtimeDone"
33+
// PlatformReport is used when a report of function invocation is received.
34+
PlatformReport EventType = Platform + ".report"
35+
// PlatformRestoreStart is used when runtime restore started.
36+
PlatformRestoreStart EventType = Platform + ".restoreStart"
37+
// PlatformRestoreRuntimeDone is used when runtime restore completed.
38+
PlatformRestoreRuntimeDone EventType = Platform + ".restoreRuntimeDone"
39+
// PlatformRestoreReport is used when a report of runtime restore is received.
40+
PlatformRestoreReport EventType = Platform + ".restoreReport"
41+
// PlatformExtension is used for extension state events.
42+
PlatformExtension EventType = Platform + ".extension"
43+
// PlatformTelemetrySubscription is used when the extension subscribed to the Telemetry API.
44+
PlatformTelemetrySubscription EventType = Platform + ".telemetrySubscription"
45+
// PlatformLogsDropped is used when Lambda dropped log entries.
46+
PlatformLogsDropped EventType = Platform + ".logsDropped"
3347
// Function is used to receive log events emitted by the function
3448
Function EventType = "function"
35-
// Extension is used is to receive log events emitted by the extension
49+
// Extension is used to receive log events emitted by the extension
3650
Extension EventType = "extension"
3751
)
3852

collector/receiver/telemetryapireceiver/receiver.go

Lines changed: 105 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,20 @@ import (
4242
)
4343

4444
const (
45-
initialQueueSize = 5
46-
scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
47-
logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB"
45+
initialQueueSize = 5
46+
scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
47+
platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB"
48+
platformStartLogFmt = "START RequestId: %s Version: %s"
49+
platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s"
50+
platformInitStartLogFmt = "INIT_START Runtime Version: %s Runtime Version ARN: %s"
51+
platformInitRuntimeDoneLogFmt = "INIT_RUNTIME_DONE Status: %s"
52+
platformInitReportLogFmt = "INIT_REPORT Initialization Type: %s Phase: %s Status: %s Duration: %.2f ms"
53+
platformRestoreStartLogFmt = "RESTORE_START Runtime Version: %s Runtime Version ARN: %s"
54+
platformRestoreRuntimeDoneLogFmt = "RESTORE_RUNTIME_DONE Status: %s"
55+
platformRestoreReportLogFmt = "RESTORE_REPORT Status: %s Duration: %.2f ms"
56+
platformTelemetrySubscriptionLogFmt = "TELEMETRY: %s Subscribed Types: %v"
57+
platformExtensionLogFmt = "EXTENSION Name: %s State: %s Events: %v"
58+
platformLogsDroppedLogFmt = "LOGS_DROPPED DroppedRecords: %d DroppedBytes: %d Reason: %s"
4859
)
4960

5061
type telemetryAPIReceiver struct {
@@ -59,6 +70,7 @@ type telemetryAPIReceiver struct {
5970
port int
6071
types []telemetryapi.EventType
6172
resource pcommon.Resource
73+
faasFunctionVersion string
6274
currentFaasInvocationID string
6375
logReport bool
6476
}
@@ -243,10 +255,17 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
243255
logRecord.SetSeverityText(logRecord.SeverityNumber().String())
244256
}
245257

246-
if el.Type == string(telemetryapi.PlatformReport) {
247-
platformReportMessage := createPlatformReportMessage(requestId, record)
248-
if platformReportMessage != "" {
249-
logRecord.Body().SetStr(platformReportMessage)
258+
if strings.HasPrefix(el.Type, platform) {
259+
if el.Type == string(telemetryapi.PlatformInitStart) {
260+
functionVersion, _ := record["functionVersion"].(string)
261+
if functionVersion != "" {
262+
r.faasFunctionVersion = functionVersion
263+
}
264+
}
265+
266+
message := createPlatformMessage(requestId, r.faasFunctionVersion, el.Type, record)
267+
if message != "" {
268+
logRecord.Body().SetStr(message)
250269
}
251270
} else if line, ok := record["message"].(string); ok {
252271
logRecord.Body().SetStr(line)
@@ -267,6 +286,84 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
267286
return log, nil
268287
}
269288

289+
func createPlatformMessage(requestId string, functionVersion string, eventType string, record map[string]interface{}) string {
290+
switch eventType {
291+
case string(telemetryapi.PlatformStart):
292+
if requestId != "" && functionVersion != "" {
293+
return fmt.Sprintf(platformStartLogFmt, requestId, functionVersion)
294+
}
295+
case string(telemetryapi.PlatformRuntimeDone):
296+
if requestId != "" && functionVersion != "" {
297+
return fmt.Sprintf(platformRuntimeDoneLogFmt, requestId, functionVersion)
298+
}
299+
case string(telemetryapi.PlatformReport):
300+
return createPlatformReportMessage(requestId, record)
301+
case string(telemetryapi.PlatformInitStart):
302+
runtimeVersion, _ := record["runtimeVersion"].(string)
303+
runtimeVersionArn, _ := record["runtimeVersionArn"].(string)
304+
if runtimeVersion != "" || runtimeVersionArn != "" {
305+
return fmt.Sprintf(platformInitStartLogFmt, runtimeVersion, runtimeVersionArn)
306+
}
307+
case string(telemetryapi.PlatformInitRuntimeDone):
308+
status, _ := record["status"].(string)
309+
if status != "" {
310+
return fmt.Sprintf(platformInitRuntimeDoneLogFmt, status)
311+
}
312+
case string(telemetryapi.PlatformInitReport):
313+
initType, _ := record["initializationType"].(string)
314+
phase, _ := record["phase"].(string)
315+
status, _ := record["status"].(string)
316+
var durationMs float64
317+
if metrics, ok := record["metrics"].(map[string]interface{}); ok {
318+
durationMs, _ = metrics["durationMs"].(float64)
319+
}
320+
if initType != "" || phase != "" || status != "" || durationMs != 0 {
321+
return fmt.Sprintf(platformInitReportLogFmt, initType, phase, status, durationMs)
322+
}
323+
case string(telemetryapi.PlatformRestoreStart):
324+
runtimeVersion, _ := record["runtimeVersion"].(string)
325+
runtimeVersionArn, _ := record["runtimeVersionArn"].(string)
326+
if runtimeVersion != "" || runtimeVersionArn != "" {
327+
return fmt.Sprintf(platformRestoreStartLogFmt, runtimeVersion, runtimeVersionArn)
328+
}
329+
case string(telemetryapi.PlatformRestoreRuntimeDone):
330+
status, _ := record["status"].(string)
331+
if status != "" {
332+
return fmt.Sprintf(platformRestoreRuntimeDoneLogFmt, status)
333+
}
334+
case string(telemetryapi.PlatformRestoreReport):
335+
status, _ := record["status"].(string)
336+
var durationMs float64
337+
if metrics, ok := record["metrics"].(map[string]interface{}); ok {
338+
durationMs, _ = metrics["durationMs"].(float64)
339+
}
340+
if status != "" && durationMs != 0 {
341+
return fmt.Sprintf(platformRestoreReportLogFmt, status, durationMs)
342+
}
343+
case string(telemetryapi.PlatformTelemetrySubscription):
344+
name, _ := record["name"].(string)
345+
types, _ := record["types"].([]interface{})
346+
if name != "" {
347+
return fmt.Sprintf(platformTelemetrySubscriptionLogFmt, name, types)
348+
}
349+
case string(telemetryapi.PlatformExtension):
350+
name, _ := record["name"].(string)
351+
state, _ := record["state"].(string)
352+
events, _ := record["events"].([]interface{})
353+
if name != "" {
354+
return fmt.Sprintf(platformExtensionLogFmt, name, state, events)
355+
}
356+
case string(telemetryapi.PlatformLogsDropped):
357+
droppedRecords, _ := record["droppedRecords"].(int64)
358+
droppedBytes, _ := record["droppedBytes"].(int64)
359+
reason, _ := record["reason"].(string)
360+
if reason != "" {
361+
return fmt.Sprintf(platformLogsDroppedLogFmt, int(droppedRecords), int(droppedBytes), reason)
362+
}
363+
}
364+
return ""
365+
}
366+
270367
func createPlatformReportMessage(requestId string, record map[string]interface{}) string {
271368
// gathering metrics
272369
metrics, ok := record["metrics"].(map[string]interface{})
@@ -296,7 +393,7 @@ func createPlatformReportMessage(requestId string, record map[string]interface{}
296393
}
297394

298395
message := fmt.Sprintf(
299-
logReportFmt,
396+
platformReportLogFmt,
300397
requestId,
301398
durationMs,
302399
billedDurationMs,

0 commit comments

Comments
 (0)