|
| 1 | +"""Create a compact, day-wise "top-by" index for user drill-down. |
| 2 | +
|
| 3 | +Source index: copilot_user_metrics (one doc per user/day) |
| 4 | +Dest index: copilot_user_metrics_top_by_day (one doc per user/day) |
| 5 | +
|
| 6 | +Each destination doc stores the TOP item within each nested totals_by_* array |
| 7 | +for that user/day. |
| 8 | +
|
| 9 | +We compute an internal activity score to select the top item: |
| 10 | +
|
| 11 | + score = code_generation_activity_count + user_initiated_interaction_count + code_acceptance_activity_count |
| 12 | +
|
| 13 | +But we do NOT persist the score in the destination document (only the labels). |
| 14 | +""" |
| 15 | + |
| 16 | +from __future__ import annotations |
| 17 | + |
| 18 | +import os |
| 19 | +import logging |
| 20 | +from typing import Any, Iterable |
| 21 | + |
| 22 | +from elasticsearch import Elasticsearch |
| 23 | +from elasticsearch.helpers import bulk |
| 24 | + |
| 25 | + |
| 26 | +logging.basicConfig(level=logging.INFO, format="%(asctime)s - [%(levelname)s] - %(message)s") |
| 27 | +logger = logging.getLogger(__name__) |
| 28 | + |
| 29 | + |
| 30 | +DEFAULT_SOURCE_INDEX = os.getenv("INDEX_USER_METRICS", "copilot_user_metrics") |
| 31 | +DEFAULT_DEST_INDEX = os.getenv("INDEX_USER_METRICS_TOP_BY_DAY", "copilot_user_metrics_top_by_day") |
| 32 | + |
| 33 | + |
| 34 | +def get_es_client() -> Elasticsearch: |
| 35 | + es_host = os.getenv("ELASTICSEARCH_HOST", "elasticsearch") |
| 36 | + es_port = int(os.getenv("ELASTICSEARCH_PORT", "9200")) |
| 37 | + es_url = f"http://{es_host}:{es_port}" |
| 38 | + |
| 39 | + es_user = os.getenv("ELASTICSEARCH_USER") |
| 40 | + es_password = os.getenv("ELASTICSEARCH_PASSWORD") |
| 41 | + |
| 42 | + if es_user and es_password: |
| 43 | + logger.info(f"Connecting to Elasticsearch at {es_url} with authentication") |
| 44 | + return Elasticsearch([es_url], basic_auth=(es_user, es_password)) |
| 45 | + |
| 46 | + logger.info(f"Connecting to Elasticsearch at {es_url} without authentication") |
| 47 | + return Elasticsearch([es_url]) |
| 48 | + |
| 49 | + |
| 50 | +def ensure_dest_index(es: Elasticsearch, index_name: str) -> None: |
| 51 | + if es.indices.exists(index=index_name): |
| 52 | + return |
| 53 | + |
| 54 | + es.indices.create( |
| 55 | + index=index_name, |
| 56 | + body={ |
| 57 | + "mappings": { |
| 58 | + "properties": { |
| 59 | + "day": {"type": "date"}, |
| 60 | + "user_login": {"type": "keyword"}, |
| 61 | + "organization_slug": {"type": "keyword"}, |
| 62 | + "enterprise_id": {"type": "keyword"}, |
| 63 | + "top_ide": {"type": "keyword"}, |
| 64 | + "top_feature": {"type": "keyword"}, |
| 65 | + "top_language_feature": {"type": "keyword"}, |
| 66 | + "top_language_model": {"type": "keyword"}, |
| 67 | + "top_model_feature": {"type": "keyword"}, |
| 68 | + } |
| 69 | + } |
| 70 | + }, |
| 71 | + ) |
| 72 | + logger.info(f"Created index: {index_name}") |
| 73 | + |
| 74 | + |
| 75 | +def _safe_int(value: Any) -> int: |
| 76 | + try: |
| 77 | + return int(value or 0) |
| 78 | + except Exception: |
| 79 | + return 0 |
| 80 | + |
| 81 | + |
| 82 | +def activity_score(entry: dict[str, Any]) -> int: |
| 83 | + return ( |
| 84 | + _safe_int(entry.get("code_generation_activity_count")) |
| 85 | + + _safe_int(entry.get("user_initiated_interaction_count")) |
| 86 | + + _safe_int(entry.get("code_acceptance_activity_count")) |
| 87 | + ) |
| 88 | + |
| 89 | + |
| 90 | +def _pick_top(entries: Iterable[dict[str, Any]], key_fn) -> str | None: |
| 91 | + best_value: str | None = None |
| 92 | + best_score = -1 |
| 93 | + for entry in entries or []: |
| 94 | + value = key_fn(entry) |
| 95 | + if not value: |
| 96 | + continue |
| 97 | + score = activity_score(entry) |
| 98 | + if score > best_score: |
| 99 | + best_score = score |
| 100 | + best_value = value |
| 101 | + return best_value |
| 102 | + |
| 103 | + |
| 104 | +def build_top_doc(source_doc: dict[str, Any]) -> dict[str, Any] | None: |
| 105 | + day = source_doc.get("day") |
| 106 | + user_login = source_doc.get("user_login") |
| 107 | + if not day or not user_login: |
| 108 | + return None |
| 109 | + |
| 110 | + base = { |
| 111 | + "day": day, |
| 112 | + "user_login": user_login, |
| 113 | + "organization_slug": source_doc.get("organization_slug"), |
| 114 | + "enterprise_id": str(source_doc.get("enterprise_id")) if source_doc.get("enterprise_id") is not None else None, |
| 115 | + } |
| 116 | + |
| 117 | + ide = _pick_top(source_doc.get("totals_by_ide", []), lambda e: e.get("ide")) |
| 118 | + feature = _pick_top(source_doc.get("totals_by_feature", []), lambda e: e.get("feature")) |
| 119 | + lang_feat = _pick_top( |
| 120 | + source_doc.get("totals_by_language_feature", []), |
| 121 | + lambda e: f"{e.get('language', 'unknown')}|{e.get('feature', 'unknown')}", |
| 122 | + ) |
| 123 | + lang_model = _pick_top( |
| 124 | + source_doc.get("totals_by_language_model", []), |
| 125 | + lambda e: f"{e.get('language', 'unknown')}|{e.get('model', 'unknown')}", |
| 126 | + ) |
| 127 | + model_feat = _pick_top( |
| 128 | + source_doc.get("totals_by_model_feature", []), |
| 129 | + lambda e: f"{e.get('model', 'unknown')}|{e.get('feature', 'unknown')}", |
| 130 | + ) |
| 131 | + |
| 132 | + return { |
| 133 | + **base, |
| 134 | + "top_ide": ide or "unknown", |
| 135 | + "top_feature": feature or "unknown", |
| 136 | + "top_language_feature": lang_feat or "unknown|unknown", |
| 137 | + "top_language_model": lang_model or "unknown|unknown", |
| 138 | + "top_model_feature": model_feat or "unknown|unknown", |
| 139 | + } |
| 140 | + |
| 141 | + |
| 142 | +def create_user_top_by_day(source_index: str = DEFAULT_SOURCE_INDEX, dest_index: str = DEFAULT_DEST_INDEX) -> int: |
| 143 | + es = get_es_client() |
| 144 | + ensure_dest_index(es, dest_index) |
| 145 | + |
| 146 | + query = { |
| 147 | + "sort": [{"day": "asc"}], |
| 148 | + "query": {"match_all": {}}, |
| 149 | + } |
| 150 | + |
| 151 | + # Use a scroll to handle larger datasets. |
| 152 | + page_size = 500 |
| 153 | + resp = es.search(index=source_index, body={**query, "size": page_size}, scroll="2m") |
| 154 | + scroll_id = resp.get("_scroll_id") |
| 155 | + hits = resp.get("hits", {}).get("hits", []) |
| 156 | + |
| 157 | + total_written = 0 |
| 158 | + |
| 159 | + def flush(actions: list[dict[str, Any]]) -> int: |
| 160 | + if not actions: |
| 161 | + return 0 |
| 162 | + ok, _ = bulk(es, actions, raise_on_error=False, request_timeout=60) |
| 163 | + return int(ok) |
| 164 | + |
| 165 | + actions: list[dict[str, Any]] = [] |
| 166 | + |
| 167 | + while hits: |
| 168 | + for hit in hits: |
| 169 | + source_doc = hit.get("_source", {}) |
| 170 | + doc = build_top_doc(source_doc) |
| 171 | + if doc is None: |
| 172 | + continue |
| 173 | + doc_id = f"{doc.get('user_login')}|{doc.get('day')}" |
| 174 | + actions.append({"_op_type": "index", "_index": dest_index, "_id": doc_id, "_source": doc}) |
| 175 | + |
| 176 | + if len(actions) >= 2000: |
| 177 | + total_written += flush(actions) |
| 178 | + actions = [] |
| 179 | + |
| 180 | + resp = es.scroll(scroll_id=scroll_id, scroll="2m") |
| 181 | + scroll_id = resp.get("_scroll_id") |
| 182 | + hits = resp.get("hits", {}).get("hits", []) |
| 183 | + |
| 184 | + if actions: |
| 185 | + total_written += flush(actions) |
| 186 | + |
| 187 | + try: |
| 188 | + if scroll_id: |
| 189 | + es.clear_scroll(scroll_id=scroll_id) |
| 190 | + except Exception: |
| 191 | + pass |
| 192 | + |
| 193 | + logger.info(f"Created/updated {total_written} top-by-day docs in {dest_index}") |
| 194 | + return total_written |
| 195 | + |
| 196 | + |
| 197 | +if __name__ == "__main__": |
| 198 | + create_user_top_by_day() |
0 commit comments