|
4 | 4 | from dataclasses import dataclass |
5 | 5 | from datetime import datetime |
6 | 6 | from enum import Enum |
7 | | -from typing import Any, List, Mapping, MutableMapping, Optional, Tuple |
| 7 | +from typing import Any, Mapping, MutableMapping, Optional, Tuple |
8 | 8 |
|
9 | | -from sentry_redis_tools.clients import RedisCluster, StrictRedis |
10 | | - |
11 | | -from sentry.models.project import Project |
12 | 9 | from sentry.utils.math import ExponentialMovingAverage |
13 | 10 |
|
14 | 11 | logger = logging.getLogger("sentry.tasks.statistical_detectors") |
@@ -79,68 +76,25 @@ class TrendPayload: |
79 | 76 | timestamp: datetime |
80 | 77 |
|
81 | 78 |
|
82 | | -def run_trend_detection( |
83 | | - client: RedisCluster | StrictRedis, |
84 | | - project: Project, |
85 | | - start: datetime, |
86 | | - payloads: List[TrendPayload], |
87 | | -) -> Tuple[List[TrendPayload], List[TrendPayload]]: |
88 | | - with client.pipeline() as pipeline: |
89 | | - for payload in payloads: |
90 | | - key = make_key(project.id, payload, VERSION) |
91 | | - pipeline.hgetall(key) |
92 | | - results = pipeline.execute() |
93 | | - |
94 | | - old_states = [TrendState.from_dict(result) for result in results] |
95 | | - new_states, regressed, improved = compute_new_trend_states(project.id, old_states, payloads) |
96 | | - |
97 | | - with client.pipeline() as pipeline: |
98 | | - for key, value in new_states: |
99 | | - pipeline.hmset(key, value.as_dict()) |
100 | | - pipeline.expire(key, KEY_TTL) |
101 | | - |
102 | | - pipeline.execute() |
103 | | - |
104 | | - return regressed, improved |
105 | | - |
106 | | - |
107 | 79 | def compute_new_trend_states( |
108 | | - project_id: int, |
109 | | - old_states: List[TrendState], |
110 | | - payloads: List[TrendPayload], |
111 | | -) -> Tuple[List[Tuple[str, TrendState]], List[TrendPayload], List[TrendPayload]]: |
112 | | - new_states: List[Tuple[str, TrendState]] = [] |
113 | | - regressed: List[TrendPayload] = [] |
114 | | - improved: List[TrendPayload] = [] |
115 | | - |
116 | | - for payload, old_state in zip(payloads, old_states): |
117 | | - if old_state.timestamp is not None and old_state.timestamp > payload.timestamp: |
118 | | - # In the event that the timestamp is before the payload's timestamps, |
119 | | - # we do not want to process this payload. |
120 | | - # |
121 | | - # This should not happen other than in some error state. |
122 | | - logger.warning( |
123 | | - "Trend detection out of order. Processing %s, but last processed was %s", |
124 | | - payload.timestamp.isoformat(), |
125 | | - old_state.timestamp.isoformat(), |
126 | | - ) |
127 | | - continue |
128 | | - |
129 | | - key = make_key(project_id, payload, VERSION) |
130 | | - trend, value = detect_trend(old_state, payload) |
131 | | - |
132 | | - if trend == TrendType.Regressed: |
133 | | - regressed.append(payload) |
134 | | - elif trend == TrendType.Improved: |
135 | | - improved.append(payload) |
136 | | - |
137 | | - new_states.append((key, value)) |
138 | | - |
139 | | - return new_states, regressed, improved |
140 | | - |
141 | | - |
142 | | -def make_key(project_id: int, payload: TrendPayload, version: int) -> str: |
143 | | - return f"statdtr:v:{version}:p:{project_id}:f:{payload.group}" |
| 80 | + cur_state: TrendState, |
| 81 | + payload: TrendPayload, |
| 82 | +) -> Optional[Tuple[TrendState, Tuple[TrendType, TrendPayload]]]: |
| 83 | + if cur_state.timestamp is not None and cur_state.timestamp > payload.timestamp: |
| 84 | + # In the event that the timestamp is before the payload's timestamps, |
| 85 | + # we do not want to process this payload. |
| 86 | + # |
| 87 | + # This should not happen other than in some error state. |
| 88 | + logger.warning( |
| 89 | + "Trend detection out of order. Processing %s, but last processed was %s", |
| 90 | + payload.timestamp.isoformat(), |
| 91 | + cur_state.timestamp.isoformat(), |
| 92 | + ) |
| 93 | + return None |
| 94 | + |
| 95 | + trend, new_state = detect_trend(cur_state, payload) |
| 96 | + |
| 97 | + return new_state, (trend, payload) |
144 | 98 |
|
145 | 99 |
|
146 | 100 | def detect_trend(state: TrendState, payload: TrendPayload) -> Tuple[TrendType, TrendState]: |
|
0 commit comments