Skip to content

Commit 6ca835b

Browse files
committed
Extract LLM attributes from any OTEL convention, not just GenAI
## Summary The span ingestion pipeline only recognized `gen_ai.*` attribute keys when extracting LLM-specific fields into promoted ClickHouse columns. Spans arriving via OpenInference, OpenLLMetry, Vercel AI SDK, or OpenAI Agents SDK had their LLM columns left empty despite carrying equivalent data under different keys and vocabularies. This introduces a multi-convention extraction layer with two components: **scalar attribute resolvers** and **content payload parsers**. ### Scalar attribute resolvers (`resolvers.ts`) Each promoted column is resolved from a priority-ordered list of convention-specific candidates. The first candidate that returns a value wins. Value translation is applied where conventions use different vocabularies. | Column | GenAI current | GenAI deprecated / OpenLLMetry | OpenInference | Vercel AI SDK | |---|---|---|---|---| | `operation` | `gen_ai.operation.name` | `llm.request.type` (maps `completion`→`text_completion`, `embedding`→`embeddings`, etc.) | `openinference.span.kind` (maps `LLM`→`chat`, `EMBEDDING`→`embeddings`, `TOOL`→`execute_tool`, etc.) | `ai.operationId` (maps `ai.generateText`→`chat`, `ai.toolCall`→`execute_tool`, etc.) | | `provider` | `gen_ai.provider.name` | `gen_ai.system` (aliases `bedrock`→`aws.bedrock`, `gemini`→`gcp.gemini`, `mistral`→`mistral_ai`, etc.) | `llm.system` (aliases `mistralai`→`mistral_ai`, `xai`→`x_ai`, `vertexai`→`gcp.vertex_ai`) | `ai.model.provider` (strips `.chat`/`.messages`/`.responses` suffixes, aliases `google.generative-ai`→`gcp.gemini`, `amazon-bedrock`→`aws.bedrock`) | | `model` | `gen_ai.request.model` | same | `llm.model_name`, `embedding.model_name`, `reranker.model_name` | `ai.model.id` | | `response_model` | `gen_ai.response.model` | same | `llm.model_name` (no request/response distinction) | `ai.response.model` | | `tokens_input` | `gen_ai.usage.input_tokens` | `gen_ai.usage.prompt_tokens` | `llm.token_count.prompt` | `ai.usage.promptTokens` | | `tokens_output` | `gen_ai.usage.output_tokens` | `gen_ai.usage.completion_tokens` | `llm.token_count.completion` | `ai.usage.completionTokens` | | `tokens_cache_read` | `gen_ai.usage.cache_read.input_tokens` | same | `llm.token_count.prompt_details.cache_read` | — | | `tokens_cache_create` | `gen_ai.usage.cache_creation.input_tokens` | same | `llm.token_count.prompt_details.cache_write` | — | | `tokens_reasoning` | `gen_ai.usage.reasoning_tokens` | same | `llm.token_count.completion_details.reasoning` | — | | `response_id` | `gen_ai.response.id` | same | — | `ai.response.id` | | `finish_reasons` | `gen_ai.response.finish_reasons` (string[]) | same | — | `ai.response.finishReason` (singular string, wrapped to array; `tool-calls`→`tool_calls`, `content-filter`→`content_filter`) | | `session_id` | `gen_ai.conversation.id` | same | `session.id` | — | | `cost_*_microcents` | — | `gen_ai.usage.cost` (total only, USD float→microcents) | `llm.cost.prompt`, `llm.cost.completion`, `llm.cost.total` (USD float→microcents) | — | OpenAI Agents SDK spans are handled implicitly — when bridged to OTEL via the official instrumentor, they emit GenAI convention attributes. ### Content payload parsers (`content/`) LLM message payloads use fundamentally different storage structures across conventions, so each gets a dedicated parser with sentinel-based detection: - **GenAI current** (sentinel: `gen_ai.input.messages` or `gen_ai.output.messages`): Parses structured/JSON messages already in GenAI parts-based format. Extracts `gen_ai.system_instructions` and `gen_ai.tool.definitions` as dedicated attributes. - **GenAI deprecated / OpenLLMetry** (sentinel: `gen_ai.prompt` or `gen_ai.completion`): Parses flat JSON strings containing `{role, content}` message arrays. Translates to GenAI format via `rosetta-ai` auto-detection. Extracts `llm.request.functions` for tool definitions. - **OpenInference** (sentinel: `llm.input_messages.*` prefix or `openinference.span.kind`): Reassembles flattened indexed span attributes (`llm.input_messages.{i}.message.role`, `.content`, `.tool_calls.{j}.tool_call.function.name`, etc.) by scanning, grouping by index, and sorting. Reconstructs `llm.tools.{i}.tool.json_schema` for tool definitions. Translates reassembled messages via `rosetta-ai`. - **Vercel AI SDK** (sentinel: `ai.prompt` or `ai.prompt.messages`): Handles both top-level spans (`ai.prompt` JSON with `system` + `messages` fields) and call-level spans (`ai.prompt.messages` JSON array). Reconstructs output from split `ai.response.text` + `ai.response.toolCalls`. Parses `ai.prompt.tools` string array for tool definitions. Translates via `rosetta-ai` with explicit `Provider.VercelAI`. All raw span attributes remain in the dynamic `attr_*` maps regardless of whether they were also extracted to promoted columns.
1 parent a3b33ce commit 6ca835b

File tree

10 files changed

+896
-79
lines changed

10 files changed

+896
-79
lines changed

apps/ingest/src/routes/traces.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { OrganizationId, ProjectId, putInDisk } from "@domain/shared"
22
import type { OtlpExportTraceServiceRequest } from "@domain/spans"
3+
import { validateOtlpCompliance } from "@domain/spans"
34
import { Effect } from "effect"
45
import type { Hono } from "hono"
56
import { getSpanIngestionQueue, getStorageDisk } from "../clients.ts"
@@ -32,6 +33,11 @@ export const registerTracesRoute = ({ app }: TracesRouteContext) => {
3233
return c.json({})
3334
}
3435

36+
const validationError = validateOtlpCompliance(request)
37+
if (validationError) {
38+
return c.json({ error: `Non-compliant OTLP payload: ${validationError}` }, 400)
39+
}
40+
3541
const organizationId = c.get("organizationId")
3642
const projectId = c.get("projectId")
3743
const apiKeyId = c.get("apiKeyId")

packages/domain/spans/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export type { TraceListOptions, TraceRepository } from "./ports/trace-repository
55

66
export type { TransformContext } from "./otlp/transform.ts"
77
export { transformOtlpToSpans } from "./otlp/transform.ts"
8-
export type { OtlpExportTraceServiceRequest } from "./otlp/types.ts"
8+
export type { OtlpExportTraceServiceRequest, OtlpSpan } from "./otlp/types.ts"
9+
export { validateOtlpCompliance } from "./otlp/validate.ts"
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Content parser for OTEL GenAI semantic convention v1.37+.
3+
*
4+
* Attributes:
5+
* gen_ai.input.messages — structured object or JSON string (parts-based GenAI format)
6+
* gen_ai.output.messages — same
7+
* gen_ai.system_instructions — structured array of parts or JSON string
8+
* gen_ai.tool.definitions — structured array or JSON string
9+
*/
10+
import type { GenAIMessage } from "rosetta-ai"
11+
import type { OtlpKeyValue } from "../types.ts"
12+
import type { ParsedContent } from "./index.ts"
13+
14+
function extractJsonAttr(attrs: readonly OtlpKeyValue[], key: string): unknown {
15+
const kv = attrs.find((a) => a.key === key)
16+
if (!kv?.value) return undefined
17+
if (kv.value.stringValue) {
18+
try {
19+
return JSON.parse(kv.value.stringValue)
20+
} catch {
21+
return undefined
22+
}
23+
}
24+
return undefined
25+
}
26+
27+
function parseMessages(attrs: readonly OtlpKeyValue[], key: string): GenAIMessage[] {
28+
const raw = extractJsonAttr(attrs, key)
29+
if (!Array.isArray(raw)) return []
30+
return raw as GenAIMessage[]
31+
}
32+
33+
export function parseGenAICurrent(attrs: readonly OtlpKeyValue[]): ParsedContent {
34+
const inputMessages = parseMessages(attrs, "gen_ai.input.messages")
35+
const outputMessages = parseMessages(attrs, "gen_ai.output.messages")
36+
37+
const systemRaw = extractJsonAttr(attrs, "gen_ai.system_instructions")
38+
const systemInstructions = systemRaw ? JSON.stringify(systemRaw) : ""
39+
40+
const toolsRaw = extractJsonAttr(attrs, "gen_ai.tool.definitions")
41+
const toolDefinitions = toolsRaw ? JSON.stringify(toolsRaw) : ""
42+
43+
return { inputMessages, outputMessages, systemInstructions, toolDefinitions }
44+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Content parser for OTEL GenAI deprecated convention (pre-v1.37) and OpenLLMetry.
3+
*
4+
* Both use the same attribute keys:
5+
* gen_ai.prompt — single JSON string with array of {role, content} messages
6+
* gen_ai.completion — single JSON string with array of {role, content} messages
7+
*
8+
* The JSON content is in provider-native format (typically OpenAI-style {role, content}).
9+
* We use rosetta-ai's auto-detection to translate to GenAI format.
10+
*
11+
* OpenLLMetry additionally defines:
12+
* llm.request.functions — JSON string array of function/tool definitions
13+
*/
14+
import { safeTranslate } from "rosetta-ai"
15+
import type { GenAIMessage } from "rosetta-ai"
16+
import type { OtlpKeyValue } from "../types.ts"
17+
import type { ParsedContent } from "./index.ts"
18+
19+
function parseJsonString(attrs: readonly OtlpKeyValue[], key: string): unknown {
20+
const kv = attrs.find((a) => a.key === key)
21+
if (!kv?.value?.stringValue) return undefined
22+
try {
23+
return JSON.parse(kv.value.stringValue)
24+
} catch {
25+
return undefined
26+
}
27+
}
28+
29+
function translateMessages(raw: unknown, direction: "input" | "output"): GenAIMessage[] {
30+
if (!Array.isArray(raw) || raw.length === 0) return []
31+
const result = safeTranslate(raw, { direction })
32+
if (result.error) return []
33+
return result.messages as GenAIMessage[]
34+
}
35+
36+
export function parseGenAIDeprecated(attrs: readonly OtlpKeyValue[]): ParsedContent {
37+
const promptRaw = parseJsonString(attrs, "gen_ai.prompt")
38+
const completionRaw = parseJsonString(attrs, "gen_ai.completion")
39+
40+
let inputMessages: GenAIMessage[] = []
41+
let systemInstructions = ""
42+
43+
if (Array.isArray(promptRaw) && promptRaw.length > 0) {
44+
const result = safeTranslate(promptRaw, { direction: "input" })
45+
if (!result.error) {
46+
inputMessages = result.messages as GenAIMessage[]
47+
if (result.system) {
48+
systemInstructions = JSON.stringify(result.system)
49+
}
50+
}
51+
}
52+
53+
const outputMessages = translateMessages(completionRaw, "output")
54+
55+
const functionsRaw = parseJsonString(attrs, "llm.request.functions")
56+
const toolDefinitions = Array.isArray(functionsRaw) ? JSON.stringify(functionsRaw) : ""
57+
58+
return { inputMessages, outputMessages, systemInstructions, toolDefinitions }
59+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import type { GenAIMessage } from "rosetta-ai"
2+
import { stringAttr } from "../resolvers.ts"
3+
import type { OtlpKeyValue } from "../types.ts"
4+
import { parseGenAICurrent } from "./genai.ts"
5+
import { parseGenAIDeprecated } from "./genai_deprecated.ts"
6+
import { parseOpenInference } from "./openinference.ts"
7+
import { parseVercel } from "./vercel.ts"
8+
9+
export interface ParsedContent {
10+
readonly inputMessages: readonly GenAIMessage[]
11+
readonly outputMessages: readonly GenAIMessage[]
12+
readonly systemInstructions: string
13+
readonly toolDefinitions: string
14+
}
15+
16+
const EMPTY_CONTENT: ParsedContent = {
17+
inputMessages: [],
18+
outputMessages: [],
19+
systemInstructions: "",
20+
toolDefinitions: "",
21+
}
22+
23+
interface ContentParser {
24+
canHandle(attrs: readonly OtlpKeyValue[]): boolean
25+
parse(attrs: readonly OtlpKeyValue[]): ParsedContent
26+
}
27+
28+
function hasKey(attrs: readonly OtlpKeyValue[], key: string): boolean {
29+
return attrs.some((a) => a.key === key)
30+
}
31+
32+
function hasKeyPrefix(attrs: readonly OtlpKeyValue[], prefix: string): boolean {
33+
return attrs.some((a) => a.key.startsWith(prefix))
34+
}
35+
36+
const PARSERS: readonly ContentParser[] = [
37+
{
38+
canHandle: (attrs) => hasKey(attrs, "gen_ai.input.messages") || hasKey(attrs, "gen_ai.output.messages"),
39+
parse: parseGenAICurrent,
40+
},
41+
{
42+
canHandle: (attrs) =>
43+
hasKeyPrefix(attrs, "llm.input_messages.") ||
44+
hasKeyPrefix(attrs, "llm.output_messages.") ||
45+
(stringAttr(attrs, "openinference.span.kind") !== undefined && hasKeyPrefix(attrs, "llm.")),
46+
parse: parseOpenInference,
47+
},
48+
{
49+
canHandle: (attrs) => hasKey(attrs, "ai.prompt") || hasKey(attrs, "ai.prompt.messages"),
50+
parse: parseVercel,
51+
},
52+
// GenAI deprecated / OpenLLMetry is the broadest fallback (gen_ai.prompt is common)
53+
{
54+
canHandle: (attrs) => hasKey(attrs, "gen_ai.prompt") || hasKey(attrs, "gen_ai.completion"),
55+
parse: parseGenAIDeprecated,
56+
},
57+
]
58+
59+
export function parseContent(attrs: readonly OtlpKeyValue[]): ParsedContent {
60+
for (const parser of PARSERS) {
61+
if (parser.canHandle(attrs)) {
62+
return parser.parse(attrs)
63+
}
64+
}
65+
return EMPTY_CONTENT
66+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/**
2+
* Content parser for OpenInference (Arize Phoenix).
3+
*
4+
* OpenInference explodes messages into flattened indexed span attributes:
5+
* llm.input_messages.{i}.message.role
6+
* llm.input_messages.{i}.message.content
7+
* llm.input_messages.{i}.message.tool_calls.{j}.tool_call.function.name
8+
* llm.input_messages.{i}.message.tool_calls.{j}.tool_call.function.arguments
9+
* llm.input_messages.{i}.message.contents.{j}.message_content.type
10+
* llm.input_messages.{i}.message.contents.{j}.message_content.text
11+
* llm.input_messages.{i}.message.contents.{j}.message_content.image.image.url
12+
*
13+
* Output messages follow the same pattern with llm.output_messages.{i}.
14+
*
15+
* Tool definitions use:
16+
* llm.tools.{i}.tool.json_schema — JSON string of tool schema
17+
*
18+
* We reassemble these into message arrays, then translate via rosetta-ai.
19+
*/
20+
import { safeTranslate } from "rosetta-ai"
21+
import type { GenAIMessage } from "rosetta-ai"
22+
import type { OtlpKeyValue } from "../types.ts"
23+
import type { ParsedContent } from "./index.ts"
24+
25+
interface ToolCallData {
26+
name: string
27+
arguments: string
28+
}
29+
30+
interface ReassembledMessage {
31+
role: string
32+
content: string
33+
tool_calls?: { id: string; type: string; function: { name: string; arguments: string } }[]
34+
}
35+
36+
const INPUT_PREFIX = "llm.input_messages."
37+
const OUTPUT_PREFIX = "llm.output_messages."
38+
const TOOLS_PREFIX = "llm.tools."
39+
40+
function reassembleMessages(attrs: readonly OtlpKeyValue[], prefix: string): ReassembledMessage[] {
41+
const byIndex = new Map<number, Map<string, string>>()
42+
const toolCalls = new Map<number, Map<number, ToolCallData>>()
43+
44+
for (const attr of attrs) {
45+
if (!attr.key.startsWith(prefix)) continue
46+
const rest = attr.key.slice(prefix.length)
47+
48+
const dotIdx = rest.indexOf(".")
49+
if (dotIdx === -1) continue
50+
51+
const index = Number.parseInt(rest.slice(0, dotIdx), 10)
52+
if (Number.isNaN(index)) continue
53+
54+
const field = rest.slice(dotIdx + 1)
55+
const value = attr.value?.stringValue ?? ""
56+
57+
if (field.startsWith("message.tool_calls.")) {
58+
const tcRest = field.slice("message.tool_calls.".length)
59+
const tcDotIdx = tcRest.indexOf(".")
60+
if (tcDotIdx === -1) continue
61+
const tcIndex = Number.parseInt(tcRest.slice(0, tcDotIdx), 10)
62+
if (Number.isNaN(tcIndex)) continue
63+
const tcField = tcRest.slice(tcDotIdx + 1)
64+
65+
let msgToolCalls = toolCalls.get(index)
66+
if (!msgToolCalls) {
67+
msgToolCalls = new Map()
68+
toolCalls.set(index, msgToolCalls)
69+
}
70+
let tc = msgToolCalls.get(tcIndex)
71+
if (!tc) {
72+
tc = { name: "", arguments: "" }
73+
msgToolCalls.set(tcIndex, tc)
74+
}
75+
if (tcField === "tool_call.function.name") tc.name = value
76+
else if (tcField === "tool_call.function.arguments") tc.arguments = value
77+
} else if (field.startsWith("message.")) {
78+
const msgField = field.slice("message.".length)
79+
let fields = byIndex.get(index)
80+
if (!fields) {
81+
fields = new Map()
82+
byIndex.set(index, fields)
83+
}
84+
fields.set(msgField, value)
85+
}
86+
}
87+
88+
const maxIndex = Math.max(...byIndex.keys(), ...toolCalls.keys(), -1)
89+
if (maxIndex === -1) return []
90+
91+
const messages: ReassembledMessage[] = []
92+
for (let i = 0; i <= maxIndex; i++) {
93+
const fields = byIndex.get(i)
94+
const role = fields?.get("role") ?? "user"
95+
const content = fields?.get("content") ?? ""
96+
97+
const msg: ReassembledMessage = { role, content }
98+
99+
const msgToolCalls = toolCalls.get(i)
100+
if (msgToolCalls && msgToolCalls.size > 0) {
101+
const sorted = [...msgToolCalls.entries()].sort(([a], [b]) => a - b)
102+
msg.tool_calls = sorted.map(([j, tc]) => ({
103+
id: `call_${i}_${j}`,
104+
type: "function" as const,
105+
function: { name: tc.name, arguments: tc.arguments },
106+
}))
107+
}
108+
109+
messages.push(msg)
110+
}
111+
112+
return messages
113+
}
114+
115+
function reassembleToolDefinitions(attrs: readonly OtlpKeyValue[]): string {
116+
const tools = new Map<number, string>()
117+
118+
for (const attr of attrs) {
119+
if (!attr.key.startsWith(TOOLS_PREFIX)) continue
120+
const rest = attr.key.slice(TOOLS_PREFIX.length)
121+
const dotIdx = rest.indexOf(".")
122+
if (dotIdx === -1) continue
123+
const index = Number.parseInt(rest.slice(0, dotIdx), 10)
124+
if (Number.isNaN(index)) continue
125+
const field = rest.slice(dotIdx + 1)
126+
if (field === "tool.json_schema" && attr.value?.stringValue) {
127+
tools.set(index, attr.value.stringValue)
128+
}
129+
}
130+
131+
if (tools.size === 0) return ""
132+
133+
const sorted = [...tools.entries()].sort(([a], [b]) => a - b)
134+
const parsed = sorted.map(([, json]) => {
135+
try {
136+
return JSON.parse(json)
137+
} catch {
138+
return json
139+
}
140+
})
141+
return JSON.stringify(parsed)
142+
}
143+
144+
function translateReassembled(messages: ReassembledMessage[], direction: "input" | "output"): GenAIMessage[] {
145+
if (messages.length === 0) return []
146+
const result = safeTranslate(messages, { direction })
147+
if (result.error) return []
148+
return result.messages as GenAIMessage[]
149+
}
150+
151+
export function parseOpenInference(attrs: readonly OtlpKeyValue[]): ParsedContent {
152+
const inputRaw = reassembleMessages(attrs, INPUT_PREFIX)
153+
const outputRaw = reassembleMessages(attrs, OUTPUT_PREFIX)
154+
155+
let inputMessages: GenAIMessage[] = []
156+
let systemInstructions = ""
157+
158+
if (inputRaw.length > 0) {
159+
const result = safeTranslate(inputRaw, { direction: "input" })
160+
if (!result.error) {
161+
inputMessages = result.messages as GenAIMessage[]
162+
if (result.system) {
163+
systemInstructions = JSON.stringify(result.system)
164+
}
165+
}
166+
}
167+
168+
const outputMessages = translateReassembled(outputRaw, "output")
169+
const toolDefinitions = reassembleToolDefinitions(attrs)
170+
171+
return { inputMessages, outputMessages, systemInstructions, toolDefinitions }
172+
}

0 commit comments

Comments
 (0)