1717import logging
1818import re
1919import sys
20+ import time
2021import uuid
2122from io import BytesIO
2223
@@ -193,6 +194,7 @@ def create_chat_completion_message_event(
193194 request_id ,
194195 llm_metadata_dict ,
195196 response_id = None ,
197+ request_timestamp = None ,
196198):
197199 if not transaction :
198200 return
@@ -227,6 +229,8 @@ def create_chat_completion_message_event(
227229
228230 if settings .ai_monitoring .record_content .enabled :
229231 chat_completion_message_dict ["content" ] = content
232+ if request_timestamp :
233+ chat_completion_message_dict ["timestamp" ] = request_timestamp
230234
231235 chat_completion_message_dict .update (llm_metadata_dict )
232236
@@ -266,6 +270,8 @@ def create_chat_completion_message_event(
266270
267271 if settings .ai_monitoring .record_content .enabled :
268272 chat_completion_message_dict ["content" ] = content
273+ if request_timestamp :
274+ chat_completion_message_dict ["timestamp" ] = request_timestamp
269275
270276 chat_completion_message_dict .update (llm_metadata_dict )
271277
@@ -542,10 +548,22 @@ def extract_bedrock_cohere_model_streaming_response(response_body, bedrock_attrs
542548
543549
544550def handle_bedrock_exception (
545- exc , is_embedding , model , span_id , trace_id , request_extractor , request_body , ft , transaction , kwargs , is_converse
551+ exc ,
552+ is_embedding ,
553+ model ,
554+ span_id ,
555+ trace_id ,
556+ request_extractor ,
557+ request_body ,
558+ ft ,
559+ transaction ,
560+ kwargs ,
561+ is_converse ,
562+ request_timestamp = None ,
546563):
547564 try :
548565 bedrock_attrs = {"model" : model , "span_id" : span_id , "trace_id" : trace_id }
566+
549567 if is_converse :
550568 try :
551569 input_message_list = [
@@ -589,12 +607,14 @@ def handle_bedrock_exception(
589607 if is_embedding :
590608 handle_embedding_event (transaction , error_attributes )
591609 else :
592- handle_chat_completion_event (transaction , error_attributes )
610+ handle_chat_completion_event (transaction , error_attributes , request_timestamp )
593611 except Exception :
594612 _logger .warning (EXCEPTION_HANDLING_FAILURE_LOG_MESSAGE , exc_info = True )
595613
596614
597- def run_bedrock_response_extractor (response_extractor , response_body , bedrock_attrs , is_embedding , transaction ):
615+ def run_bedrock_response_extractor (
616+ response_extractor , response_body , bedrock_attrs , is_embedding , transaction , request_timestamp = None
617+ ):
598618 # Run response extractor for non-streaming responses
599619 try :
600620 response_extractor (response_body , bedrock_attrs )
@@ -604,7 +624,7 @@ def run_bedrock_response_extractor(response_extractor, response_body, bedrock_at
604624 if is_embedding :
605625 handle_embedding_event (transaction , bedrock_attrs )
606626 else :
607- handle_chat_completion_event (transaction , bedrock_attrs )
627+ handle_chat_completion_event (transaction , bedrock_attrs , request_timestamp )
608628
609629
610630def run_bedrock_request_extractor (request_extractor , request_body , bedrock_attrs ):
@@ -628,6 +648,8 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
628648 if not settings .ai_monitoring .enabled :
629649 return wrapped (* args , ** kwargs )
630650
651+ request_timestamp = int (1000.0 * time .time ())
652+
631653 transaction .add_ml_model_info ("Bedrock" , BOTOCORE_VERSION )
632654 transaction ._add_agent_attribute ("llm" , True )
633655
@@ -683,6 +705,7 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
683705 instance ._nr_ft = ft
684706 instance ._nr_response_streaming = response_streaming
685707 instance ._nr_settings = settings
708+ instance ._nr_request_timestamp = request_timestamp
686709
687710 # Add a bedrock flag to instance so we can determine when make_api_call instrumentation is hit from non-Bedrock paths and bypass it if so
688711 instance ._nr_is_bedrock = True
@@ -703,6 +726,7 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
703726 transaction ,
704727 kwargs ,
705728 is_converse = False ,
729+ request_timestamp = request_timestamp ,
706730 )
707731 raise
708732
@@ -733,6 +757,8 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
733757 run_bedrock_request_extractor (request_extractor , request_body , bedrock_attrs )
734758
735759 try :
760+ bedrock_attrs .pop ("timestamp" , None ) # The request timestamp is only needed for request extraction
761+
736762 if response_streaming :
737763 # Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
738764 # This class is used in numerous other services in botocore, and would cause conflicts.
@@ -748,7 +774,14 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
748774 bedrock_attrs ["duration" ] = ft .duration * 1000
749775 response ["body" ] = StreamingBody (BytesIO (response_body ), len (response_body ))
750776
751- run_bedrock_response_extractor (response_extractor , response_body , bedrock_attrs , is_embedding , transaction )
777+ run_bedrock_response_extractor (
778+ response_extractor ,
779+ response_body ,
780+ bedrock_attrs ,
781+ is_embedding ,
782+ transaction ,
783+ request_timestamp = request_timestamp ,
784+ )
752785
753786 except Exception :
754787 _logger .warning (RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE , exc_info = True )
@@ -770,6 +803,8 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
770803 if not settings .ai_monitoring .enabled :
771804 return wrapped (* args , ** kwargs )
772805
806+ request_timestamp = int (1000.0 * time .time ())
807+
773808 transaction .add_ml_model_info ("Bedrock" , BOTOCORE_VERSION )
774809 transaction ._add_agent_attribute ("llm" , True )
775810
@@ -800,6 +835,7 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
800835 instance ._nr_ft = ft
801836 instance ._nr_response_streaming = response_streaming
802837 instance ._nr_settings = settings
838+ instance ._nr_request_timestamp = request_timestamp
803839 instance ._nr_is_converse = True
804840
805841 # Add a bedrock flag to instance so we can determine when make_api_call instrumentation is hit from non-Bedrock paths and bypass it if so
@@ -810,7 +846,18 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
810846 response = wrapped (* args , ** kwargs )
811847 except Exception as exc :
812848 handle_bedrock_exception (
813- exc , False , model , span_id , trace_id , request_extractor , {}, ft , transaction , kwargs , is_converse = True
849+ exc ,
850+ False ,
851+ model ,
852+ span_id ,
853+ trace_id ,
854+ request_extractor ,
855+ {},
856+ ft ,
857+ transaction ,
858+ kwargs ,
859+ is_converse = True ,
860+ request_timestamp = request_timestamp ,
814861 )
815862 raise
816863
@@ -824,6 +871,7 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
824871
825872 response_headers = response .get ("ResponseMetadata" , {}).get ("HTTPHeaders" ) or {}
826873 bedrock_attrs = extract_bedrock_converse_attrs (kwargs , response , response_headers , model , span_id , trace_id )
874+ bedrock_attrs ["timestamp" ] = request_timestamp
827875
828876 try :
829877 if response_streaming :
@@ -838,7 +886,9 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
838886
839887 ft .__exit__ (None , None , None )
840888 bedrock_attrs ["duration" ] = ft .duration * 1000
841- run_bedrock_response_extractor (response_extractor , {}, bedrock_attrs , False , transaction )
889+ run_bedrock_response_extractor (
890+ response_extractor , {}, bedrock_attrs , False , transaction , request_timestamp = request_timestamp
891+ )
842892
843893 except Exception :
844894 _logger .warning (RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE , exc_info = True )
@@ -888,7 +938,7 @@ def extract_bedrock_converse_attrs(kwargs, response, response_headers, model, sp
888938
889939
890940class BedrockRecordEventMixin :
891- def record_events_on_stop_iteration (self , transaction ):
941+ def record_events_on_stop_iteration (self , transaction , request_timestamp = None ):
892942 if hasattr (self , "_nr_ft" ):
893943 bedrock_attrs = getattr (self , "_nr_bedrock_attrs" , {})
894944 self ._nr_ft .__exit__ (None , None , None )
@@ -899,14 +949,14 @@ def record_events_on_stop_iteration(self, transaction):
899949
900950 try :
901951 bedrock_attrs ["duration" ] = self ._nr_ft .duration * 1000
902- handle_chat_completion_event (transaction , bedrock_attrs )
952+ handle_chat_completion_event (transaction , bedrock_attrs , request_timestamp )
903953 except Exception :
904954 _logger .warning (RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE , exc_info = True )
905955
906956 # Clear cached data as this can be very large.
907957 self ._nr_bedrock_attrs .clear ()
908958
909- def record_error (self , transaction , exc ):
959+ def record_error (self , transaction , exc , request_timestamp = None ):
910960 if hasattr (self , "_nr_ft" ):
911961 try :
912962 ft = self ._nr_ft
@@ -929,32 +979,32 @@ def record_error(self, transaction, exc):
929979 ft .__exit__ (* sys .exc_info ())
930980 error_attributes ["duration" ] = ft .duration * 1000
931981
932- handle_chat_completion_event (transaction , error_attributes )
982+ handle_chat_completion_event (transaction , error_attributes , request_timestamp )
933983
934984 # Clear cached data as this can be very large.
935985 error_attributes .clear ()
936986 except Exception :
937987 _logger .warning (EXCEPTION_HANDLING_FAILURE_LOG_MESSAGE , exc_info = True )
938988
939- def record_stream_chunk (self , event , transaction ):
989+ def record_stream_chunk (self , event , transaction , request_timestamp = None ):
940990 if event :
941991 try :
942992 if getattr (self , "_nr_is_converse" , False ):
943993 return self .converse_record_stream_chunk (event , transaction )
944994 else :
945- return self .invoke_record_stream_chunk (event , transaction )
995+ return self .invoke_record_stream_chunk (event , transaction , request_timestamp )
946996 except Exception :
947997 _logger .warning (RESPONSE_EXTRACTOR_FAILURE_LOG_MESSAGE , exc_info = True )
948998
949- def invoke_record_stream_chunk (self , event , transaction ):
999+ def invoke_record_stream_chunk (self , event , transaction , request_timestamp = None ):
9501000 bedrock_attrs = getattr (self , "_nr_bedrock_attrs" , {})
9511001 chunk = json .loads (event ["chunk" ]["bytes" ].decode ("utf-8" ))
9521002 self ._nr_model_extractor (chunk , bedrock_attrs )
9531003 # In Langchain, the bedrock iterator exits early if type is "content_block_stop".
9541004 # So we need to call the record events here since stop iteration will not be raised.
9551005 _type = chunk .get ("type" )
9561006 if _type == "content_block_stop" :
957- self .record_events_on_stop_iteration (transaction )
1007+ self .record_events_on_stop_iteration (transaction , request_timestamp )
9581008
9591009 def converse_record_stream_chunk (self , event , transaction ):
9601010 bedrock_attrs = getattr (self , "_nr_bedrock_attrs" , {})
@@ -984,6 +1034,7 @@ def __iter__(self):
9841034class GeneratorProxy (BedrockRecordEventMixin , ObjectProxy ):
9851035 def __init__ (self , wrapped ):
9861036 super ().__init__ (wrapped )
1037+ self ._nr_request_timestamp = int (1000.0 * time .time ())
9871038
9881039 def __iter__ (self ):
9891040 return self
@@ -996,12 +1047,12 @@ def __next__(self):
9961047 return_val = None
9971048 try :
9981049 return_val = self .__wrapped__ .__next__ ()
999- self .record_stream_chunk (return_val , transaction )
1050+ self .record_stream_chunk (return_val , transaction , self . _nr_request_timestamp )
10001051 except StopIteration :
1001- self .record_events_on_stop_iteration (transaction )
1052+ self .record_events_on_stop_iteration (transaction , self . _nr_request_timestamp )
10021053 raise
10031054 except Exception as exc :
1004- self .record_error (transaction , exc )
1055+ self .record_error (transaction , exc , self . _nr_request_timestamp )
10051056 raise
10061057 return return_val
10071058
@@ -1020,6 +1071,10 @@ def __aiter__(self):
10201071
10211072
10221073class AsyncGeneratorProxy (BedrockRecordEventMixin , ObjectProxy ):
1074+ def __init__ (self , wrapped ):
1075+ super ().__init__ (wrapped )
1076+ self ._nr_request_timestamp = int (1000.0 * time .time ())
1077+
10231078 def __aiter__ (self ):
10241079 return self
10251080
@@ -1030,12 +1085,12 @@ async def __anext__(self):
10301085 return_val = None
10311086 try :
10321087 return_val = await self .__wrapped__ .__anext__ ()
1033- self .record_stream_chunk (return_val , transaction )
1088+ self .record_stream_chunk (return_val , transaction , self . _nr_request_timestamp )
10341089 except StopAsyncIteration :
1035- self .record_events_on_stop_iteration (transaction )
1090+ self .record_events_on_stop_iteration (transaction , self . _nr_request_timestamp )
10361091 raise
10371092 except Exception as exc :
1038- self .record_error (transaction , exc )
1093+ self .record_error (transaction , exc , self . _nr_request_timestamp )
10391094 raise
10401095 return return_val
10411096
@@ -1084,7 +1139,7 @@ def handle_embedding_event(transaction, bedrock_attrs):
10841139 transaction .record_custom_event ("LlmEmbedding" , embedding_dict )
10851140
10861141
1087- def handle_chat_completion_event (transaction , bedrock_attrs ):
1142+ def handle_chat_completion_event (transaction , bedrock_attrs , request_timestamp = None ):
10881143 chat_completion_id = str (uuid .uuid4 ())
10891144 # Grab LLM-related custom attributes off of the transaction to store as metadata on LLM events
10901145 custom_attrs_dict = transaction ._custom_params
@@ -1128,6 +1183,7 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
11281183 "response.number_of_messages" : number_of_messages ,
11291184 "response.choices.finish_reason" : bedrock_attrs .get ("response.choices.finish_reason" , None ),
11301185 "error" : bedrock_attrs .get ("error" , None ),
1186+ "timestamp" : request_timestamp or None ,
11311187 }
11321188 chat_completion_summary_dict .update (llm_metadata_dict )
11331189 chat_completion_summary_dict = {k : v for k , v in chat_completion_summary_dict .items () if v is not None }
@@ -1144,6 +1200,7 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
11441200 request_id = request_id ,
11451201 llm_metadata_dict = llm_metadata_dict ,
11461202 response_id = response_id ,
1203+ request_timestamp = request_timestamp ,
11471204 )
11481205
11491206
0 commit comments