1- from datetime import datetime , timezone
1+ from datetime import date , datetime , timezone
22import json
3+ from annotated_types import Timezone
34from opentelemetry .sdk .trace import ReadableSpan
45from opentelemetry .sdk .trace .export import SpanExporter , SpanExportResult
56from typing import Dict , List , Optional , Sequence , cast
1011from literalai .helper import utc_now
1112from literalai .observability .generation import GenerationType
1213from literalai .observability .step import Step , StepDict
14+ from literalai .prompt_engineering .prompt import PromptDict
1315
1416
1517class LoggingSpanExporter (SpanExporter ):
@@ -56,14 +58,8 @@ def force_flush(self, timeout_millis: float = 30000) -> bool:
5658 """Force flush the exporter."""
5759 return True
5860
59- # # TODO: Add generation promptid
60- # # TODO: Add generation variables
61- # # TODO: Check missing variables
62- # # TODO: ttFirstToken
63- # # TODO: duration
64- # # TODO: tokenThroughputInSeconds
65- # # TODO: Add tools
66- # # TODO: error check with gemini error
61+ # TODO: error check with gemini error
62+ # TODO: ttFirstToken
6763 def _create_step_from_span (self , span : ReadableSpan ) -> Step :
6864 """Convert a span to a Step object"""
6965 attributes = span .attributes or {}
@@ -78,6 +74,11 @@ def _create_step_from_span(self, span: ReadableSpan) -> Step:
7874 if span .end_time
7975 else utc_now ()
8076 )
77+ duration , token_throughput = self ._calculate_duration_and_throughput (
78+ span .start_time ,
79+ span .end_time ,
80+ int (str (attributes .get ("llm.usage.total_tokens" , 0 ))),
81+ )
8182
8283 generation_type = attributes .get ("llm.request.type" )
8384 is_chat = generation_type == "chat"
@@ -103,19 +104,35 @@ def _create_step_from_span(self, span: ReadableSpan) -> Step:
103104 k : str (v ) for k , v in span_props .items () if v is not None and v != "None"
104105 }
105106
107+ serialized_prompt = attributes .get (
108+ "traceloop.association.properties.literal.prompt"
109+ )
110+ prompt = cast (
111+ Optional [PromptDict ],
112+ (
113+ self ._extract_json (str (serialized_prompt ))
114+ if serialized_prompt and serialized_prompt != "None"
115+ else None
116+ ),
117+ )
118+
106119 generation_content = {
120+ "duration" : duration ,
107121 "messages" : (
108- self .extract_messages (cast (Dict , attributes )) if is_chat else None
122+ self ._extract_messages (cast (Dict , attributes )) if is_chat else None
109123 ),
110124 "message_completion" : (
111- self .extract_messages (cast (Dict , attributes ), "gen_ai.completion." )[0 ]
125+ self ._extract_messages (cast (Dict , attributes ), "gen_ai.completion." )[0 ]
112126 if is_chat
113127 else None
114128 ),
115129 "prompt" : attributes .get ("gen_ai.prompt.0.user" ),
130+ "promptId" : prompt .get ("id" ) if prompt else None ,
116131 "completion" : attributes .get ("gen_ai.completion.0.content" ),
117132 "model" : attributes .get ("gen_ai.request.model" ),
118133 "provider" : attributes .get ("gen_ai.system" ),
134+ "tokenThroughputInSeconds" : token_throughput ,
135+ "variables" : prompt .get ("variables" ) if prompt else None ,
119136 }
120137 generation_settings = {
121138 "max_tokens" : attributes .get ("gen_ai.request.max_tokens" ),
@@ -133,13 +150,13 @@ def _create_step_from_span(self, span: ReadableSpan) -> Step:
133150 "id" : str (span .context .span_id ) if span .context else None ,
134151 "name" : span_props .get ("name" , span .name ),
135152 "type" : "llm" ,
136- "metadata" : self .extract_json (span_props .get ("metadata" , "{}" )),
153+ "metadata" : self ._extract_json (span_props .get ("metadata" , "{}" )),
137154 "startTime" : start_time ,
138155 "endTime" : end_time ,
139156 "threadId" : span_props .get ("thread_id" ),
140157 "parentId" : span_props .get ("parent_id" ),
141158 "rootRunId" : span_props .get ("root_run_id" ),
142- "tags" : self .extract_json (span_props .get ("tags" , "[]" )),
159+ "tags" : self ._extract_json (span_props .get ("tags" , "[]" )),
143160 "input" : {
144161 "content" : (
145162 generation_content ["messages" ]
@@ -176,7 +193,7 @@ def _create_step_from_span(self, span: ReadableSpan) -> Step:
176193
177194 return step
178195
179- def extract_messages (
196+ def _extract_messages (
180197 self , data : Dict , prefix : str = "gen_ai.prompt."
181198 ) -> List [Dict ]:
182199 messages = []
@@ -188,22 +205,42 @@ def extract_messages(
188205
189206 if role_key not in data or content_key not in data :
190207 break
208+ if data [role_key ] == "placeholder" :
209+ break
191210
192211 messages .append (
193212 {
194213 "role" : data [role_key ],
195- "content" : self .extract_json (data [content_key ]),
214+ "content" : self ._extract_json (data [content_key ]),
196215 }
197216 )
198217
199218 index += 1
200219
201220 return messages
202221
203- def extract_json (self , data : str ) -> Dict | List | str :
222+ def _extract_json (self , data : str ) -> Dict | List | str :
204223 try :
205224 content = json .loads (data )
206225 except Exception :
207226 content = data
208227
209228 return content
229+
230+ def _calculate_duration_and_throughput (
231+ self ,
232+ start_time_ns : Optional [int ],
233+ end_time_ns : Optional [int ],
234+ total_tokens : Optional [int ],
235+ ) -> tuple [float , Optional [float ]]:
236+ """Calculate duration in seconds and token throughput per second."""
237+ duration_ns = (
238+ end_time_ns - start_time_ns if start_time_ns and end_time_ns else 0
239+ )
240+ duration_seconds = duration_ns / 1e9
241+
242+ token_throughput = None
243+ if total_tokens is not None and duration_seconds > 0 :
244+ token_throughput = total_tokens / duration_seconds
245+
246+ return duration_seconds , token_throughput
0 commit comments