Skip to content

Commit 67eba16

Browse files
authored
feat(genkit-tools/telemetry-server): Add a new otlp endpoint to support nested traces (#3796)
1 parent efc0eba commit 67eba16

File tree

3 files changed

+432
-2
lines changed

3 files changed

+432
-2
lines changed

genkit-tools/common/src/eval/evaluate.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,8 @@ async function gatherEvalInput(params: {
491491

492492
// Only the last collected trace to be used for evaluation.
493493
const traceId = traceIds.at(-1)!;
494+
// Sleep to let traces persist.
495+
await new Promise((resolve) => setTimeout(resolve, 2000));
494496
const trace = await manager.getTrace({
495497
traceId,
496498
});

genkit-tools/telemetry-server/src/index.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,41 @@ export async function startTelemetryServer(params: {
9191
}
9292
});
9393

94+
api.post(
95+
'/api/otlp/:parentTraceId/:parentSpanId',
96+
async (request, response) => {
97+
try {
98+
const { parentTraceId, parentSpanId } = request.params;
99+
100+
if (!request.body.resourceSpans?.length) {
101+
// Acknowledge and ignore empty payloads.
102+
response.status(200).json({});
103+
return;
104+
}
105+
const traces = traceDataFromOtlp(request.body);
106+
for (const traceData of traces) {
107+
traceData.traceId = parentTraceId;
108+
for (const span of Object.values(traceData.spans)) {
109+
span.attributes['genkit:otlp-traceId'] = span.traceId;
110+
span.traceId = parentTraceId;
111+
if (!span.parentSpanId) {
112+
span.parentSpanId = parentSpanId;
113+
}
114+
}
115+
await params.traceStore.save(parentTraceId, traceData);
116+
}
117+
response.status(200).json({});
118+
} catch (err) {
119+
logger.error(`Error processing OTLP payload: ${err}`);
120+
response.status(500).json({
121+
code: 13, // INTERNAL
122+
message:
123+
'An internal error occurred while processing the OTLP payload.',
124+
});
125+
}
126+
}
127+
);
128+
94129
api.post('/api/otlp', async (request, response) => {
95130
try {
96131
if (!request.body.resourceSpans?.length) {
@@ -99,8 +134,7 @@ export async function startTelemetryServer(params: {
99134
return;
100135
}
101136
const traces = traceDataFromOtlp(request.body);
102-
for (const trace of traces) {
103-
const traceData = TraceDataSchema.parse(trace);
137+
for (const traceData of traces) {
104138
await params.traceStore.save(traceData.traceId, traceData);
105139
}
106140
response.status(200).json({});

0 commit comments

Comments
 (0)