Skip to content

Commit 639008c

Browse files
committed
initial changes to export telemetry api logs
1 parent 67bd18f commit 639008c

File tree

1 file changed

+67
-79
lines changed

1 file changed

+67
-79
lines changed

collector/receiver/telemetryapireceiver/receiver.go

Lines changed: 67 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,15 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
188188
slice = nil
189189
}
190190

191+
func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string {
192+
if requestId, ok := record["requestId"].(string); ok {
193+
return requestId
194+
} else if r.currentFaasInvocationID != "" {
195+
return r.currentFaasInvocationID
196+
}
197+
return ""
198+
}
199+
191200
func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
192201
log := plog.NewLogs()
193202
resourceLog := log.ResourceLogs().AppendEmpty()
@@ -196,92 +205,83 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
196205
scopeLog.Scope().SetName(scopeName)
197206
for _, el := range slice {
198207
r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el))
199-
if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) {
200-
logRecord := scopeLog.LogRecords().AppendEmpty()
201-
logRecord.Attributes().PutStr("type", el.Type)
202-
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
203-
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
204-
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
205-
} else {
206-
r.logger.Error("error parsing time", zap.Error(err))
207-
return plog.Logs{}, err
208-
}
209-
if record, ok := el.Record.(map[string]interface{}); ok {
210-
// in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
211-
if timestamp, ok := record["timestamp"].(string); ok {
212-
if t, err := time.Parse(time.RFC3339, timestamp); err == nil {
213-
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
214-
} else {
215-
r.logger.Error("error parsing time", zap.Error(err))
216-
return plog.Logs{}, err
217-
}
218-
}
219-
if level, ok := record["level"].(string); ok {
220-
logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level)))
221-
logRecord.SetSeverityText(logRecord.SeverityNumber().String())
222-
}
223-
if requestId, ok := record["requestId"].(string); ok {
224-
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)
225-
} else if r.currentFaasInvocationID != "" {
226-
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
227-
}
228-
if line, ok := record["message"].(string); ok {
229-
logRecord.Body().SetStr(line)
230-
}
231-
} else {
232-
if r.currentFaasInvocationID != "" {
233-
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
234-
}
235-
// in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
236-
if line, ok := el.Record.(string); ok {
237-
logRecord.Body().SetStr(line)
208+
logRecord := scopeLog.LogRecords().AppendEmpty()
209+
logRecord.Attributes().PutStr("type", el.Type)
210+
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
211+
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
212+
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
213+
} else {
214+
r.logger.Error("error parsing time", zap.Error(err))
215+
return plog.Logs{}, err
216+
}
217+
if record, ok := el.Record.(map[string]interface{}); ok {
218+
// in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
219+
if timestamp, ok := record["timestamp"].(string); ok {
220+
if t, err := time.Parse(time.RFC3339, timestamp); err == nil {
221+
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
222+
} else {
223+
r.logger.Error("error parsing time", zap.Error(err))
224+
return plog.Logs{}, err
238225
}
239226
}
240-
} else { // platform events, if subscribed to
241-
if el.Type == string(telemetryapi.PlatformStart) {
242-
if record, ok := el.Record.(map[string]interface{}); ok {
243-
if requestId, ok := record["requestId"].(string); ok {
244-
r.currentFaasInvocationID = requestId
245-
}
227+
if level, ok := record["level"].(string); ok {
228+
logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level)))
229+
logRecord.SetSeverityText(logRecord.SeverityNumber().String())
230+
}
231+
232+
requestId := r.getRecordRequestId(record)
233+
if requestId != "" {
234+
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)
235+
236+
// If this is the first event in the invocation with a request id (typically "platform.start"),
237+
// set the current invocation id to this request id.
238+
if r.currentFaasInvocationID == "" {
239+
r.currentFaasInvocationID = requestId
246240
}
247-
} else if el.Type == string(telemetryapi.PlatformRuntimeDone) {
248-
r.currentFaasInvocationID = ""
249-
} else if el.Type == string(telemetryapi.PlatformReport) && r.logReport {
250-
if record, ok := el.Record.(map[string]interface{}); ok {
251-
if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil {
252-
logRecord.Attributes().PutStr("type", el.Type)
253-
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
254-
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
255-
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
256-
}
257-
}
241+
}
242+
243+
if line, ok := record["message"].(string); ok {
244+
logRecord.Body().SetStr(line)
245+
} else if el.Type == string(telemetryapi.PlatformReport) {
246+
platformReportMessage := createPlatformReportMessage(requestId, record)
247+
if platformReportMessage != "" {
248+
logRecord.Body().SetStr(platformReportMessage)
258249
}
259250
}
251+
} else {
252+
if r.currentFaasInvocationID != "" {
253+
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
254+
}
255+
// in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
256+
if line, ok := el.Record.(string); ok {
257+
logRecord.Body().SetStr(line)
258+
}
259+
}
260+
if el.Type == string(telemetryapi.PlatformRuntimeDone) {
261+
r.currentFaasInvocationID = ""
260262
}
261263
}
262264
return log, nil
263265
}
264266

265-
// createReportLogRecord creates a log record for the platform.report event
266-
// returns the log record if successful, otherwise nil
267-
func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord {
267+
func createPlatformReportMessage(requestId string, record map[string]interface{}) string {
268268
// gathering metrics
269269
metrics, ok := record["metrics"].(map[string]interface{})
270270
if !ok {
271-
return nil
271+
return ""
272272
}
273273
var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64
274274
if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok {
275-
return nil
275+
return ""
276276
}
277277
if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok {
278-
return nil
278+
return ""
279279
}
280280
if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok {
281-
return nil
281+
return ""
282282
}
283283
if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok {
284-
return nil
284+
return ""
285285
}
286286

287287
// optionally gather information about cold start time
@@ -292,18 +292,7 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface
292292
}
293293
}
294294

295-
// gathering requestId
296-
requestId := ""
297-
if requestId, ok = record["requestId"].(string); !ok {
298-
return nil
299-
}
300-
301-
// we have all information available, we can create the log record
302-
logRecord := scopeLog.LogRecords().AppendEmpty()
303-
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)
304-
305-
// building the body of the log record, optionally adding the init duration
306-
body := fmt.Sprintf(
295+
message := fmt.Sprintf(
307296
logReportFmt,
308297
requestId,
309298
durationMs,
@@ -312,11 +301,10 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface
312301
maxMemoryUsedMB,
313302
)
314303
if initDurationMs > 0 {
315-
body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs)
304+
message += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs)
316305
}
317-
logRecord.Body().SetStr(body)
318306

319-
return &logRecord
307+
return message
320308
}
321309

322310
func severityTextToNumber(severityText string) plog.SeverityNumber {

0 commit comments

Comments
 (0)