1
+ from typing import List , Dict , Any , Union
2
+
3
+
4
+ def find_items_with_images (items : List [Dict [str , Any ]]) -> List [int ]:
5
+ """
6
+ Finds all items in the conversation history that contain images
7
+
8
+ Args:
9
+ items: Array of conversation items to check
10
+
11
+ Returns:
12
+ Array of indices where images were found
13
+ """
14
+ items_with_images = []
15
+
16
+ for index , item in enumerate (items ):
17
+ has_image = False
18
+
19
+ if isinstance (item .get ("content" ), list ):
20
+ has_image = any (
21
+ content_item .get ("type" ) == "tool_result"
22
+ and "content" in content_item
23
+ and isinstance (content_item ["content" ], list )
24
+ and any (
25
+ nested_item .get ("type" ) == "image"
26
+ for nested_item in content_item ["content" ]
27
+ if isinstance (nested_item , dict )
28
+ )
29
+ for content_item in item ["content" ]
30
+ if isinstance (content_item , dict )
31
+ )
32
+
33
+ if has_image :
34
+ items_with_images .append (index )
35
+
36
+ return items_with_images
37
+
38
+
39
+ def compress_conversation_images (
40
+ items : List [Dict [str , Any ]],
41
+ keep_most_recent_count : int = 2
42
+ ) -> Dict [str , List [Dict [str , Any ]]]:
43
+ """
44
+ Compresses conversation history by removing images from older items
45
+ while keeping the most recent images intact
46
+
47
+ Args:
48
+ items: Array of conversation items to process
49
+ keep_most_recent_count: Number of most recent image-containing items to preserve (default: 2)
50
+
51
+ Returns:
52
+ Dictionary with processed items
53
+ """
54
+ items_with_images = find_items_with_images (items )
55
+
56
+ for index , item in enumerate (items ):
57
+ image_index = - 1
58
+ if index in items_with_images :
59
+ image_index = items_with_images .index (index )
60
+
61
+ should_compress = (
62
+ image_index >= 0
63
+ and image_index < len (items_with_images ) - keep_most_recent_count
64
+ )
65
+
66
+ if should_compress :
67
+ if isinstance (item .get ("content" ), list ):
68
+ new_content = []
69
+ for content_item in item ["content" ]:
70
+ if isinstance (content_item , dict ):
71
+ if (
72
+ content_item .get ("type" ) == "tool_result"
73
+ and "content" in content_item
74
+ and isinstance (content_item ["content" ], list )
75
+ and any (
76
+ nested_item .get ("type" ) == "image"
77
+ for nested_item in content_item ["content" ]
78
+ if isinstance (nested_item , dict )
79
+ )
80
+ ):
81
+ # Replace the content with a text placeholder
82
+ new_content .append ({
83
+ ** content_item ,
84
+ "content" : "screenshot taken"
85
+ })
86
+ else :
87
+ new_content .append (content_item )
88
+ else :
89
+ new_content .append (content_item )
90
+
91
+ item ["content" ] = new_content
92
+
93
+ return {"items" : items }
0 commit comments