33authors: Balaxxe, nbellochi, Bermont, Mark Kazakov, Christian Taillon (Consolidated & Enhanced by AI)
44author_url: https://github.com/christian-taillon
55funding_url: https://github.com/open-webui
6- version: 8.2
6+ version: 8.5
77license: MIT
88requirements: pydantic>=2.0.0, aiohttp>=3.8.0
99environment_variables:
1616This script is the definitive, all-in-one integration for Anthropic models in OpenWebUI. It
1717combines the best features from multiple community scripts into a single, robust, and
1818future-proof solution.
19+ Changelog v8.5:
20+ - Removed cost tracking functionality (ENABLE_COST_TRACKING valve, MODEL_PRICING, cost calculation methods)
21+ - Removed cost display from responses and event emissions
22+ - Simplified _get_cache_info to only show token counts and cache hit percentage
23+ - Removed conversation cost storage and tracking
24+
25+ Changelog v8.3:
26+ - Added comprehensive event emitter integration for user visibility
27+ - Implemented _emit_status() and _emit_message() helper methods
28+ - Added status updates for non-streaming requests (connecting, success, failure)
29+ - Added progress indicators for streaming responses (thinking, tool use, completion)
30+ - Made cache info visible at end of streaming responses (previously only logged)
31+ - Added event emissions for all error conditions with proper status updates
32+ - Enhanced user experience with real-time UI feedback throughout request lifecycle
33+
1934Changelog v8.2:
2035- Improved cache control to use default 5-minute ephemeral caching (per Anthropic spec)
2136- Improved content block normalization with dedicated helper methods
@@ -76,15 +91,7 @@ class Pipe:
7691 "claude-sonnet-4" : 64000 ,
7792 "claude-sonnet-4-5" : 64000 ,
7893 }
79- MODEL_PRICING = { # Per million tokens (Input, Cache Write, Cache Read, Output)
80- "claude-3-opus" : (15.0 , 18.75 , 1.5 , 75.0 ),
81- "claude-3-sonnet" : (3.0 , 3.75 , 0.3 , 15.0 ),
82- "claude-3-haiku" : (0.25 , 0.3125 , 0.025 , 1.25 ),
83- "claude-3-5-sonnet" : (3.0 , 3.75 , 0.3 , 15.0 ),
84- "claude-3-7-sonnet" : (3.0 , 3.75 , 0.3 , 15.0 ),
85- "claude-sonnet-4" : (4.0 , 5.0 , 0.4 , 20.0 ),
86- "claude-opus-4" : (20.0 , 25.0 , 2.0 , 100.0 ),
87- }
94+
8895 # File and Content Constants
8996 SUPPORTED_IMAGE_TYPES = ["image/jpeg" , "image/png" , "image/gif" , "image/webp" ]
9097 MAX_IMAGE_SIZE = 5 * 1024 * 1024
@@ -146,6 +153,8 @@ class Valves(BaseModel):
146153 default = 3600 ,
147154 description = "Model list cache duration in seconds (default: 1 hour, 0 to disable)"
148155 )
156+
157+
149158 def __init__ (self ):
150159 logging .basicConfig (level = os .getenv ("LOG_LEVEL" , "INFO" ))
151160 self .type = "manifold"
@@ -246,6 +255,31 @@ async def pipes(self) -> List[dict]:
246255 self ._models_list_cache = self ._get_fallback_models ()
247256 return self ._models_list_cache
248257
258+ async def _emit_status (
259+ self ,
260+ __event_emitter__ : Optional [Any ],
261+ description : str ,
262+ done : bool = False
263+ ) -> None :
264+ """
265+ Emit status event to UI.
266+
267+ Args:
268+ __event_emitter__: Event emitter from OpenWebUI
269+ description: Status description to display
270+ done: Whether the status is complete
271+ """
272+ if __event_emitter__ :
273+ await __event_emitter__ (
274+ {
275+ "type" : "status" ,
276+ "data" : {
277+ "description" : description ,
278+ "done" : done
279+ }
280+ }
281+ )
282+
249283 def _format_error (
250284 self ,
251285 message : str ,
@@ -278,28 +312,23 @@ def _format_error(
278312 return " | " .join (error_parts [:3 ]) + error_parts [- 1 ]
279313
280314 def _get_cache_info (self , usage_data : Dict , model_id : str ) -> str :
281- """Formats cache usage information and cost savings for display."""
315+ """Formats cache usage information for display."""
282316 if not self .valves .SHOW_CACHE_INFO or not usage_data :
283317 return ""
284318 input_tokens , output_tokens , cached_tokens = (
285319 usage_data .get ("input_tokens" , 0 ),
286320 usage_data .get ("output_tokens" , 0 ),
287321 usage_data .get ("cache_read_input_tokens" , 0 ),
288322 )
289- base_model = self ._get_model_base (model_id )
290- prices = self .MODEL_PRICING .get (
291- base_model , self .MODEL_PRICING ["claude-3-5-sonnet" ]
292- )
293323 if cached_tokens > 0 :
294324 cache_percentage = (
295325 (cached_tokens / input_tokens * 100 ) if input_tokens > 0 else 0
296326 )
297- savings = (
298- (input_tokens - cached_tokens ) * (prices [0 ] - prices [2 ])
299- ) / 1_000_000
300- return f"```\n ✅ CACHE HIT: { cache_percentage :.1f} % cached. Savings: ~${ savings :.6f} \n Tokens: { input_tokens :,} In / { output_tokens :,} Out\n ```\n \n "
327+ return f"```\n ✅ CACHE HIT: { cache_percentage :.1f} % cached.\n Tokens: { input_tokens :,} In / { output_tokens :,} Out\n ```\n \n "
301328 else :
302329 return f"```\n ❌ CACHE MISS: No cache used.\n Tokens: { input_tokens :,} In / { output_tokens :,} Out\n ```\n \n "
330+
331+
303332 def _normalize_content_blocks (
304333 self ,
305334 raw_content : Union [List , Dict , str ],
@@ -689,11 +718,33 @@ async def pipe(
689718 if beta_headers_needed :
690719 headers ["anthropic-beta" ] = "," .join (sorted (list (beta_headers_needed )))
691720 if payload ["stream" ]:
692- return self ._stream_response (headers , payload , __event_emitter__ )
721+ return self ._stream_response (headers , payload , __event_emitter__ , body )
722+
723+ # Emit start status for non-streaming
724+ await self ._emit_status (__event_emitter__ , "Sending request to Anthropic API..." )
725+
693726 response_data = await self ._send_request (headers , payload )
694727 if isinstance (response_data , str ):
728+ # Error response
729+ await self ._emit_status (__event_emitter__ , "Request failed" , done = True )
695730 return response_data
731+
732+ # Emit success status
733+ await self ._emit_status (__event_emitter__ , "Response received" , done = True )
734+
696735 cache_info = self ._get_cache_info (response_data .get ("usage" ), model_id )
736+
737+ # Emit cache info if enabled
738+ if self .valves .SHOW_CACHE_INFO and response_data .get ("usage" ):
739+ usage = response_data .get ("usage" )
740+ cached_tokens = usage .get ("cache_read_input_tokens" , 0 )
741+ if cached_tokens > 0 :
742+ await self ._emit_status (
743+ __event_emitter__ ,
744+ f"Cache hit: { cached_tokens :,} tokens served from cache" ,
745+ True
746+ )
747+
697748 content = response_data .get ("content" , [])
698749 if any (c .get ("type" ) == "tool_use" for c in content ):
699750 tool_calls = [
@@ -712,16 +763,27 @@ async def pipe(
712763 response_text = "" .join (
713764 c .get ("text" , "" ) for c in content if c .get ("type" ) == "text"
714765 )
715- return cache_info + response_text
766+
767+ return response_text
716768 except Exception as e :
717769 logging .error (f"Error in pipe method: { e } " , exc_info = True )
718770 return f"An unexpected error occurred: { e } "
719771 async def _stream_response (
720- self , headers : Dict , payload : Dict , __event_emitter__ : Optional [Any ] = None
772+ self , headers : Dict , payload : Dict , __event_emitter__ : Optional [Any ] = None ,
773+ body : Optional [Dict ] = None
721774 ) -> AsyncGenerator [str , None ]:
722775 is_thinking , is_tool_use = False , False
723776 tool_call_chunks = {}
777+ usage_data = None
778+
724779 try :
780+ # Emit connection status
781+ await self ._emit_status (
782+ __event_emitter__ ,
783+ "Connecting to Anthropic API..." ,
784+ done = False
785+ )
786+
725787 async with aiohttp .ClientSession () as session :
726788 timeout = aiohttp .ClientTimeout (total = self .valves .REQUEST_TIMEOUT )
727789 async with session .post (
@@ -736,8 +798,22 @@ async def _stream_response(
736798 http_status = response .status ,
737799 request_id = self .request_id
738800 )
801+ # Emit error event
802+ await self ._emit_status (
803+ __event_emitter__ ,
804+ "Request failed" ,
805+ done = True
806+ )
739807 yield error_msg
740808 return
809+
810+ # Emit streaming started
811+ await self ._emit_status (
812+ __event_emitter__ ,
813+ "Streaming response..." ,
814+ done = False
815+ )
816+
741817 async for line in response .content :
742818 if not line .startswith (b"data: " ):
743819 continue
@@ -748,16 +824,35 @@ async def _stream_response(
748824 block_type = block .get ("type" )
749825 if block_type == "thinking" :
750826 is_thinking = True
827+ # Emit thinking started event
828+ await self ._emit_status (
829+ __event_emitter__ ,
830+ "Claude is thinking..." ,
831+ done = False
832+ )
751833 if self .valves .ENABLE_THINKING and self .valves .DISPLAY_THINKING :
752834 yield "<thinking>"
753835 elif block_type == "redacted_thinking" :
754836 is_thinking = True
837+ # Emit redacted thinking event
838+ await self ._emit_status (
839+ __event_emitter__ ,
840+ "Claude is thinking (redacted)..." ,
841+ done = False
842+ )
755843 elif block_type == "tool_use" :
756844 is_tool_use = True
757845 tool_use = block .get ("tool_use" , {})
846+ tool_name = tool_use .get ("name" , "unknown" )
847+ # Emit tool use detected
848+ await self ._emit_status (
849+ __event_emitter__ ,
850+ f"Using tool: { tool_name } " ,
851+ done = False
852+ )
758853 tool_call_chunks [data ["index" ]] = {
759854 "id" : tool_use .get ("id" ),
760- "name" : tool_use . get ( "name" ) ,
855+ "name" : tool_name ,
761856 "input_chunks" : [],
762857 }
763858 else :
@@ -784,8 +879,15 @@ async def _stream_response(
784879 ):
785880 yield delta .get ("text" , "" )
786881 elif event_type == "content_block_stop" :
787- if is_thinking and self .valves .ENABLE_THINKING and self .valves .DISPLAY_THINKING :
788- yield "</thinking>"
882+ if is_thinking :
883+ # Emit thinking complete
884+ await self ._emit_status (
885+ __event_emitter__ ,
886+ "Thinking complete" ,
887+ done = False
888+ )
889+ if self .valves .ENABLE_THINKING and self .valves .DISPLAY_THINKING :
890+ yield "</thinking>"
789891 if is_tool_use :
790892 tool = tool_call_chunks .get (data ["index" ])
791893 if tool :
@@ -806,18 +908,40 @@ async def _stream_response(
806908 )
807909 is_thinking = is_tool_use = False
808910 elif event_type == "message_stop" :
809- usage_info = self ._get_cache_info (data .get ('usage' ), payload ['model' ])
911+ # Capture usage data
912+ usage_data = data .get ('usage' )
913+
914+ # Emit completion status
915+ await self ._emit_status (
916+ __event_emitter__ ,
917+ "Stream complete" ,
918+ done = True
919+ )
920+
921+ usage_info = self ._get_cache_info (usage_data , payload ['model' ])
810922 logging .info (
811923 f"Stream finished [Request ID: { self .request_id } ]. { usage_info } "
812924 )
813925 break
926+
927+ # Yield cache info at end of stream
928+ if self .valves .SHOW_CACHE_INFO and usage_data :
929+ cache_info = self ._get_cache_info (usage_data , payload ['model' ])
930+ if cache_info :
931+ yield cache_info
814932 except asyncio .TimeoutError as e :
815933 logging .error (f"Streaming timeout: { e } " , exc_info = True )
816934 error_msg = self ._format_error (
817935 message = f"Request timed out after { self .valves .REQUEST_TIMEOUT } s" ,
818936 error_code = "TIMEOUT" ,
819937 request_id = self .request_id
820938 )
939+ # Emit error event
940+ await self ._emit_status (
941+ __event_emitter__ ,
942+ "Request timed out" ,
943+ done = True
944+ )
821945 yield error_msg
822946 except aiohttp .ClientError as e :
823947 logging .error (f"Streaming network error: { e } " , exc_info = True )
@@ -826,6 +950,12 @@ async def _stream_response(
826950 error_code = "NETWORK_ERROR" ,
827951 request_id = self .request_id
828952 )
953+ # Emit error event
954+ await self ._emit_status (
955+ __event_emitter__ ,
956+ "Network error occurred" ,
957+ done = True
958+ )
829959 yield error_msg
830960 except Exception as e :
831961 logging .error (f"Streaming error: { e } " , exc_info = True )
@@ -834,6 +964,12 @@ async def _stream_response(
834964 error_code = "STREAM_ERROR" ,
835965 request_id = self .request_id
836966 )
967+ # Emit error event
968+ await self ._emit_status (
969+ __event_emitter__ ,
970+ "Streaming error occurred" ,
971+ done = True
972+ )
837973 yield error_msg
838974 async def _send_request (self , headers : Dict , payload : Dict ) -> Union [Dict , str ]:
839975 for attempt in range (5 ):
@@ -883,3 +1019,12 @@ async def _send_request(self, headers: Dict, payload: Dict) -> Union[Dict, str]:
8831019 error_code = "MAX_RETRIES" ,
8841020 request_id = self .request_id
8851021 )
1022+
1023+ async def outlet (
1024+ self , body : dict , __user__ : Optional [dict ] = None , __event_emitter__ = None
1025+ ) -> dict :
1026+ """
1027+ Process the response body after the pipe completes.
1028+ Note: Cost tracking is now handled by injecting into response text.
1029+ """
1030+ return body
0 commit comments