Skip to content

Commit 0b68ef8

Browse files
committed
add invocation ids for plain text logs
1 parent d9546e5 commit 0b68ef8

File tree

3 files changed

+67
-14
lines changed

3 files changed

+67
-14
lines changed

collector/internal/telemetryapi/types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ const (
2424
PlatformInitStart EventType = Platform + ".initStart"
2525
// PlatformInitRuntimeDone is used when function initialization ended.
2626
PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone"
27+
// Function invocation started.
28+
PlatformStart EventType = Platform + ".start"
29+
// The runtime finished processing an event with either success or failure.
30+
PlatformRuntimeDone EventType = Platform + ".runtimeDone"
2731
// Function is used to receive log events emitted by the function
2832
Function EventType = "function"
2933
// Extension is used is to receive log events emitted by the extension

collector/receiver/telemetryapireceiver/receiver.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,18 @@ const initialQueueSize = 5
4545
const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
4646

4747
type telemetryAPIReceiver struct {
48-
httpServer *http.Server
49-
logger *zap.Logger
50-
queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later
51-
nextTraces consumer.Traces
52-
nextLogs consumer.Logs
53-
lastPlatformStartTime string
54-
lastPlatformEndTime string
55-
extensionID string
56-
port int
57-
types []telemetryapi.EventType
58-
resource pcommon.Resource
48+
httpServer *http.Server
49+
logger *zap.Logger
50+
queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later
51+
nextTraces consumer.Traces
52+
nextLogs consumer.Logs
53+
lastPlatformStartTime string
54+
lastPlatformEndTime string
55+
extensionID string
56+
port int
57+
types []telemetryapi.EventType
58+
resource pcommon.Resource
59+
currentFaasInvocationID string
5960
}
6061

6162
func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
@@ -217,16 +218,31 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
217218
}
218219
if requestId, ok := record["requestId"].(string); ok {
219220
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)
221+
} else if r.currentFaasInvocationID != "" {
222+
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
220223
}
221224
if line, ok := record["message"].(string); ok {
222225
logRecord.Body().SetStr(line)
223226
}
224227
} else {
228+
if r.currentFaasInvocationID != "" {
229+
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
230+
}
225231
// in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
226232
if line, ok := el.Record.(string); ok {
227233
logRecord.Body().SetStr(line)
228234
}
229235
}
236+
} else { // platform events, if subscribed to
237+
if el.Type == string(telemetryapi.PlatformStart) {
238+
if record, ok := el.Record.(map[string]interface{}); ok {
239+
if requestId, ok := record["requestId"].(string); ok {
240+
r.currentFaasInvocationID = requestId
241+
}
242+
}
243+
} else if el.Type == string(telemetryapi.PlatformRuntimeDone) {
244+
r.currentFaasInvocationID = ""
245+
}
230246
}
231247
}
232248
return log, nil

collector/receiver/telemetryapireceiver/receiver_test.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,37 @@ func TestCreateLogs(t *testing.T) {
221221
expectedSeverityNumber: plog.SeverityNumberUnspecified,
222222
expectError: false,
223223
},
224+
{
225+
desc: "function text with requestId",
226+
slice: []event{
227+
{
228+
Time: "2022-10-12T00:03:50.000Z",
229+
Type: "platform.start",
230+
Record: map[string]any{
231+
"requestId": "34472c47-5ff0-4df5-a9ad-03776afa5473",
232+
},
233+
},
234+
{
235+
Time: "2022-10-12T00:03:50.000Z",
236+
Type: "function",
237+
Record: "[INFO] Hello world, I am an extension!",
238+
},
239+
{
240+
Time: "2022-10-12T00:03:50.000Z",
241+
Type: "platform.runtimeDone",
242+
Record: map[string]any{},
243+
},
244+
},
245+
expectedLogRecords: 1,
246+
expectedType: "function",
247+
expectedTimestamp: "2022-10-12T00:03:50.000Z",
248+
expectedBody: "[INFO] Hello world, I am an extension!",
249+
expectedContainsRequestId: true,
250+
expectedRequestId: "34472c47-5ff0-4df5-a9ad-03776afa5473",
251+
expectedSeverityText: "",
252+
expectedSeverityNumber: plog.SeverityNumberUnspecified,
253+
expectError: false,
254+
},
224255
{
225256
desc: "function json",
226257
slice: []event{
@@ -351,9 +382,11 @@ func TestCreateLogs(t *testing.T) {
351382
desc: "platform.start anything",
352383
slice: []event{
353384
{
354-
Time: "2022-10-12T00:03:50.000Z",
355-
Type: "platform.start",
356-
Record: map[string]any{},
385+
Time: "2022-10-12T00:03:50.000Z",
386+
Type: "platform.start",
387+
Record: map[string]any{
388+
"requestId": "test-id",
389+
},
357390
},
358391
},
359392
expectedLogRecords: 0,

0 commit comments

Comments
 (0)