|
| 1 | +from textwrap import dedent |
| 2 | +from typing import Any, Literal |
| 3 | + |
| 4 | +import structlog |
| 5 | +from pydantic import BaseModel, Field |
| 6 | + |
| 7 | +from posthog.schema import MaxRecordingUniversalFilters, RecordingsQuery |
| 8 | + |
| 9 | +from posthog.sync import database_sync_to_async |
| 10 | + |
| 11 | +from products.replay.backend.prompts import ( |
| 12 | + DATE_FIELDS_PROMPT, |
| 13 | + FILTER_FIELDS_TAXONOMY_PROMPT, |
| 14 | + PRODUCT_DESCRIPTION_PROMPT, |
| 15 | + SESSION_REPLAY_EXAMPLES_PROMPT, |
| 16 | +) |
| 17 | + |
| 18 | +from ee.hogai.tool import MaxTool, ToolMessagesArtifact |
| 19 | +from ee.hogai.tools.replay.summarize_sessions import SummarizeSessionsTool |
| 20 | + |
| 21 | +logger = structlog.get_logger(__name__) |
| 22 | + |
| 23 | + |
| 24 | +class FilterSessionRecordingsToolArgs(BaseModel): |
| 25 | + recordings_filters: MaxRecordingUniversalFilters = Field( |
| 26 | + description=dedent(f""" |
| 27 | + User's question converted into a recordings query. |
| 28 | +
|
| 29 | + **CRITICAL: You MUST use the read_taxonomy tool to discover and clarify ALL properties and events before creating filters.** |
| 30 | +
|
| 31 | + {PRODUCT_DESCRIPTION_PROMPT} |
| 32 | +
|
| 33 | + {SESSION_REPLAY_EXAMPLES_PROMPT} |
| 34 | +
|
| 35 | + {FILTER_FIELDS_TAXONOMY_PROMPT} |
| 36 | +
|
| 37 | + {DATE_FIELDS_PROMPT} |
| 38 | +
|
| 39 | + # Property Types and Discovery |
| 40 | +
|
| 41 | + A session recording contains events and entities. When filtering, you must understand which property type to use: |
| 42 | +
|
| 43 | + **ENTITY PROPERTIES** (person, session, group): |
| 44 | + - **Person properties**: User attributes (email, name, country, custom fields). **MUST use read_taxonomy to discover available person properties.** |
| 45 | + - **Session properties**: Session-level data (device type, browser, OS, screen size, start timestamp). **MUST use read_taxonomy to discover available session properties.** |
| 46 | + - **Group properties**: Organization/account attributes (plan tier, company name). **MUST use read_taxonomy to discover group properties for specific group types.** The defined group types are: {{{{#groups}}}} {{{{.}}}}s,{{{{/groups}}}}. |
| 47 | +
|
| 48 | + **EVENT PROPERTIES**: |
| 49 | + - Properties of specific events that occurred during the recording (e.g., URL visited, button clicked). **MUST use read_taxonomy to discover properties for specific event names.** |
| 50 | +
|
| 51 | + **RECORDING PROPERTIES**: |
| 52 | + - Recording-level metrics (console_error_count, click_count, activity_score). These are built-in and don't require discovery. |
| 53 | +
|
| 54 | + **CRITICAL**: ALWAYS use read_taxonomy to discover properties before creating filters. Never assume property names or values exist without verification. If you can't find an exact property match, try the next best match. Do not call the same tool twice for the same entity/event. |
| 55 | +
|
| 56 | + # Property Value Matching |
| 57 | +
|
| 58 | + When using discovered property values: |
| 59 | + - **Related but not synonyms**: Use the user's original value. Example: User asks for browser "Chrome", tool returns ["Firefox", "Safari"] -> use "Chrome" (related concept) |
| 60 | + - **Synonyms or variants**: Use the discovered value. Example: User asks for city "New York", tool returns ["New York City", "NYC"] -> use "New York City" (synonym) |
| 61 | +
|
| 62 | + # Common Properties |
| 63 | +
|
| 64 | + **Session**: `$device_type` (Mobile/Desktop/Tablet), `$browser`, `$os`, `$screen_width`, `$screen_height` |
| 65 | + **Person**: `$geoip_country_code` (US/UK/FR), `$geoip_city_name`, custom fields |
| 66 | + **Event**: `$current_url`, `$event_type` ($rageclick/$pageview), `$pathname` |
| 67 | + **Recording**: `console_error_count`, `click_count`, `keypress_count`, `mouse_activity_count`, `activity_score` |
| 68 | +
|
| 69 | + # Filter Completion Strategy |
| 70 | +
|
| 71 | + Always aim to complete filters as much as possible: |
| 72 | + - **FIRST**: Use read_taxonomy to discover ALL relevant properties and events |
| 73 | + - If you found most properties but are missing some, return what you have (user can refine later) |
| 74 | + - If you've found very few properties, use read_taxonomy again or ask for clarification |
| 75 | + - Don't get stuck on perfect matches - use reasonable approximations when appropriate |
| 76 | + - **Remember**: Property discovery with read_taxonomy is MANDATORY before filter creation |
| 77 | +
|
| 78 | + # Critical Reminders |
| 79 | +
|
| 80 | + 1. **Property discovery**: ALWAYS use read_taxonomy to discover ALL properties and events before creating filters - never assume they exist |
| 81 | + 2. **Don't repeat tool calls**: If a property isn't found, try the next best option |
| 82 | + 3. **Minimalism**: Only include essential filters |
| 83 | + 4. **Defaults**: date_from="-3d", duration=[], filter_test_accounts=true |
| 84 | + 5. **Duration placement**: Duration filters go in `duration` array, NOT filter_group |
| 85 | + 6. **Value types**: Arrays for "exact"/"is_not", single values for comparisons |
| 86 | + 7. **Output format**: Valid JSON object only, no markdown or explanatory text |
| 87 | + 8. **Silence**: Do not output when performing taxonomy exploration, just use the tools |
| 88 | + """).strip() |
| 89 | + ) |
| 90 | + |
| 91 | + |
| 92 | +class FilterSessionRecordingsTool(MaxTool): |
| 93 | + name: Literal["filter_session_recordings"] = "filter_session_recordings" |
| 94 | + args_schema: type[BaseModel] = FilterSessionRecordingsToolArgs |
| 95 | + description: str = dedent(""" |
| 96 | + Filters session recordings by creating a recordings query, and then running it to list the recordings. The list is AUTOMATICALLY shown to the user as a widget. |
| 97 | + - When to use the tool: |
| 98 | + * When the user asks to update session recordings filters |
| 99 | + - "update" synonyms: "change", "modify", "adjust", and similar |
| 100 | + - "session recordings" synonyms: "sessions", "recordings", "replays", "user sessions", and similar |
| 101 | + * When the user asks to search for session recordings |
| 102 | + - "search for" synonyms: "find", "look up", and similar |
| 103 | + * When the user asks to summarize session recordings |
| 104 | +
|
| 105 | + When on the replay page, the tool will update the filters in the page. |
| 106 | + """).strip() |
| 107 | + |
| 108 | + async def _arun_impl( |
| 109 | + self, recordings_filters: MaxRecordingUniversalFilters |
| 110 | + ) -> tuple[str, ToolMessagesArtifact | None]: |
| 111 | + # Convert filters to recordings query and execute |
| 112 | + recordings_query = SummarizeSessionsTool._convert_max_filters_to_recordings_query(recordings_filters) |
| 113 | + |
| 114 | + try: |
| 115 | + query_results = await database_sync_to_async(self._get_recordings_with_filters, thread_sensitive=False)( |
| 116 | + recordings_query |
| 117 | + ) |
| 118 | + except: |
| 119 | + query_results = None |
| 120 | + |
| 121 | + if query_results is None: |
| 122 | + content = "⚠️ Updated session recordings filters, but encountered an issue fetching results." |
| 123 | + else: |
| 124 | + total_count = len(query_results.results) |
| 125 | + if total_count == 0: |
| 126 | + content = "✅ Filtered session recordings. No recordings found matching these criteria." |
| 127 | + elif total_count == 1: |
| 128 | + content = "✅ Filtered session recordings. Found 1 recording matching these criteria:\n\n" |
| 129 | + content += self._format_recording_metadata(query_results.results[0]) |
| 130 | + else: |
| 131 | + content = f"✅ Filtered session recordings. Found {total_count} recordings matching these criteria:\n\n" |
| 132 | + # Include metadata for up to first 5 recordings |
| 133 | + for i, recording in enumerate(query_results.results[:5]): |
| 134 | + content += f"{i+1}. {self._format_recording_metadata(recording)}\n" |
| 135 | + if total_count > 5: |
| 136 | + content += f"\n...and {total_count - 5} more recordings" |
| 137 | + return content, None |
| 138 | + |
| 139 | + def _get_recordings_with_filters(self, recordings_query: RecordingsQuery, limit: int = 50) -> Any: |
| 140 | + """Get recordings from DB with filters""" |
| 141 | + from posthog.session_recordings.queries.session_recording_list_from_query import SessionRecordingListFromQuery |
| 142 | + |
| 143 | + recordings_query.limit = limit |
| 144 | + try: |
| 145 | + query_runner = SessionRecordingListFromQuery( |
| 146 | + team=self._team, query=recordings_query, hogql_query_modifiers=None, limit=limit |
| 147 | + ) |
| 148 | + results = query_runner.run() |
| 149 | + except Exception as e: |
| 150 | + logger.exception( |
| 151 | + f"Error getting recordings with filters query ({recordings_query.model_dump_json(exclude_none=True)}): {e}" |
| 152 | + ) |
| 153 | + return None |
| 154 | + return results |
| 155 | + |
| 156 | + def _format_recording_metadata(self, recording: dict[str, Any]) -> str: |
| 157 | + """Format recording metadata for display.""" |
| 158 | + from datetime import datetime |
| 159 | + |
| 160 | + parts = [] |
| 161 | + |
| 162 | + # Person/distinct_id |
| 163 | + distinct_id = recording.get("distinct_id", "Unknown") |
| 164 | + parts.append(f"User: {distinct_id}") |
| 165 | + |
| 166 | + # Start time |
| 167 | + start_time = recording.get("start_time") |
| 168 | + if start_time: |
| 169 | + try: |
| 170 | + # start_time can be either a datetime object (from ClickHouse) or a string |
| 171 | + if isinstance(start_time, datetime): |
| 172 | + dt = start_time |
| 173 | + else: |
| 174 | + dt = datetime.fromisoformat(start_time.replace("Z", "+00:00")) |
| 175 | + parts.append(f"Started: {dt.strftime('%Y-%m-%d %H:%M:%S UTC')}") |
| 176 | + except (ValueError, AttributeError): |
| 177 | + parts.append(f"Started: {start_time}") |
| 178 | + |
| 179 | + # Duration |
| 180 | + duration = recording.get("duration") |
| 181 | + if duration is not None: |
| 182 | + minutes, seconds = divmod(int(duration), 60) |
| 183 | + hours, minutes = divmod(minutes, 60) |
| 184 | + if hours > 0: |
| 185 | + parts.append(f"Duration: {hours}h {minutes}m {seconds}s") |
| 186 | + elif minutes > 0: |
| 187 | + parts.append(f"Duration: {minutes}m {seconds}s") |
| 188 | + else: |
| 189 | + parts.append(f"Duration: {seconds}s") |
| 190 | + |
| 191 | + # Activity metrics |
| 192 | + click_count = recording.get("click_count") |
| 193 | + keypress_count = recording.get("keypress_count") |
| 194 | + if click_count is not None or keypress_count is not None: |
| 195 | + activity_parts = [] |
| 196 | + if click_count is not None: |
| 197 | + activity_parts.append(f"{click_count} clicks") |
| 198 | + if keypress_count is not None: |
| 199 | + activity_parts.append(f"{keypress_count} keypresses") |
| 200 | + parts.append(f"Activity: {', '.join(activity_parts)}") |
| 201 | + |
| 202 | + # Console errors |
| 203 | + console_error_count = recording.get("console_error_count") |
| 204 | + if console_error_count is not None and console_error_count > 0: |
| 205 | + parts.append(f"Console errors: {console_error_count}") |
| 206 | + |
| 207 | + # Active/inactive seconds |
| 208 | + active_seconds = recording.get("active_seconds") |
| 209 | + inactive_seconds = recording.get("inactive_seconds") |
| 210 | + if active_seconds is not None: |
| 211 | + parts.append(f"Active: {int(active_seconds)}s") |
| 212 | + if inactive_seconds is not None: |
| 213 | + parts.append(f"Inactive: {int(inactive_seconds)}s") |
| 214 | + |
| 215 | + # First URL |
| 216 | + first_url = recording.get("first_url") |
| 217 | + if first_url: |
| 218 | + parts.append(f"First URL: {first_url}") |
| 219 | + |
| 220 | + # Ongoing status |
| 221 | + ongoing = recording.get("ongoing") |
| 222 | + if ongoing: |
| 223 | + parts.append("Status: Ongoing") |
| 224 | + |
| 225 | + return " | ".join(parts) |
0 commit comments