Skip to content

Commit c8bfef0

Browse files
feat(events tracking): add abstract class and logging implementation (#80117)
[design doc](https://www.notion.so/sentry/Conversion-rate-of-ingest-transactions-to-save-trx-1298b10e4b5d801ab517c8e2218d13d5) need to track the completion of each stage, to 1) compute events conversion rates 2) enable debugging visibility into where events are being dropped the usage will be heavily sampled to not blow up traffic this PR only adds REDIS_PUT stage, in subsequent PRs I will add all the other stages listed in EventStageStatus class **!!!!!IMPORTANT!!!!!!** hash based sampling here's a [blog post](https://www.rsyslog.com/doc/tutorials/hash_sampling.html) explaining hash based sampling, which would provide "all or nothing" logging for the events sampled across the entire pipeline. That's the idea I want to implement the hashing algorithm used must be consistent and uniformly distributed in order for all or nothing sampling to work. I cannot find references that say that md5 is consistent and evenly distributed other than various [stackoverflow pages](https://crypto.stackexchange.com/questions/14967/distribution-for-a-subset-of-md5). All the official sources are too academic and long and i can't understand ---------- for reviewers: please review with the thoughts of how this can be generalized to other pipelines as well, such as errors
1 parent d2c1f14 commit c8bfef0

File tree

4 files changed

+105
-0
lines changed

4 files changed

+105
-0
lines changed

src/sentry/ingest/consumer/processors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from sentry.utils import metrics
2626
from sentry.utils.cache import cache_key_for_event
2727
from sentry.utils.dates import to_datetime
28+
from sentry.utils.event_tracker import TransactionStageStatus, track_sampled_event
2829
from sentry.utils.sdk import set_current_event_project
2930
from sentry.utils.snuba import RateLimitExceeded
3031

@@ -202,6 +203,11 @@ def process_event(
202203
else:
203204
with metrics.timer("ingest_consumer._store_event"):
204205
cache_key = processing_store.store(data)
206+
if data.get("type") == "transaction":
207+
track_sampled_event(
208+
data["event_id"], "transaction", TransactionStageStatus.REDIS_PUT
209+
)
210+
205211
save_attachments(attachments, cache_key)
206212

207213
try:

src/sentry/options/defaults.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2915,3 +2915,8 @@
29152915
default=[],
29162916
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
29172917
)
2918+
register(
2919+
"performance.event-tracker.sample-rate.transaction",
2920+
default=0.0,
2921+
flags=FLAG_AUTOMATOR_MODIFIABLE,
2922+
)

src/sentry/utils/event_tracker.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import logging
2+
from enum import StrEnum
3+
4+
from sentry import options
5+
6+
7+
class EventType(StrEnum):
8+
TRANSACTION = "transaction"
9+
ERROR = "error"
10+
11+
12+
class TransactionStageStatus(StrEnum):
13+
# the transaction is stored to rc-transactions
14+
REDIS_PUT = "redis_put"
15+
16+
# a save_transactions task is kicked off
17+
SAVE_TRX_STARTED = "save_trx_started"
18+
19+
# a save_transactions task is finished
20+
SAVE_TRX_FINISHED = "save_trx_finished"
21+
22+
# the transaction is published to the `events` topic for snuba/sbc consumers to consume
23+
SNUBA_TOPIC_PUT = "snuba_topic_put"
24+
25+
# the transaction is published to the `snuba-commit-log` topic
26+
COMMIT_LOG_TOPIC_PUT = "commit_log_topic_put"
27+
28+
# a post_process task is kicked off
29+
POST_PROCESS_STARTED = "post_process_started"
30+
31+
# the transaction is deleted from rc-transactions
32+
REDIS_DELETED = "redis_deleted"
33+
34+
35+
logger = logging.getLogger("EventTracker")
36+
37+
38+
def track_sampled_event(event_id: str, event_type: str, status: TransactionStageStatus) -> None:
39+
"""
40+
Records how far an event has made it through the ingestion pipeline.
41+
Each event type will pick up its sampling rate from its registered option.
42+
"""
43+
44+
sample_rate = options.get(f"performance.event-tracker.sample-rate.{event_type}")
45+
if sample_rate == 0:
46+
return
47+
48+
event_float = (int(event_id, 16) % 10000) / 10000
49+
if event_float < sample_rate:
50+
extra = {
51+
"event_id": event_id,
52+
"event_type": getattr(EventType, event_type.upper(), None),
53+
"status": status,
54+
}
55+
_do_record(extra)
56+
57+
58+
def _do_record(extra):
59+
# All Python logs will be picked up by Google Cloud Logging.
60+
# TODO: make a google Cloud Sink to filter for these EventTracker logs and put them into BigQuery and do data analysis downstream
61+
logger.info("EventTracker.recorded", extra=extra)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import unittest
2+
from unittest.mock import patch
3+
4+
from sentry.testutils.cases import TestCase
5+
from sentry.testutils.helpers.options import override_options
6+
from sentry.utils.event_tracker import EventType, TransactionStageStatus, track_sampled_event
7+
8+
EVENT_ID = "9cdc4c32dff14fbbb012b0aa9e908126"
9+
EVENT_TYPE_STR = "transaction"
10+
STATUS = TransactionStageStatus.REDIS_PUT
11+
12+
EXPECTED_EVENT_TYPE = EventType.TRANSACTION
13+
14+
15+
class TestEventTracking(TestCase):
16+
17+
@patch("sentry.utils.event_tracker._do_record")
18+
def test_track_sampled_event_logs_event(self, mock_do_record):
19+
with override_options({"performance.event-tracker.sample-rate.transaction": 1.0}):
20+
track_sampled_event(EVENT_ID, EVENT_TYPE_STR, STATUS)
21+
mock_do_record.assert_called_once_with(
22+
{"event_id": EVENT_ID, "event_type": EXPECTED_EVENT_TYPE, "status": STATUS}
23+
)
24+
25+
@patch("sentry.utils.event_tracker._do_record")
26+
def test_track_sampled_event_does_not_log_event(self, mock_do_record):
27+
with override_options({"performance.event-tracker.sample-rate.transaction": 0.0}):
28+
track_sampled_event(EVENT_ID, EVENT_TYPE_STR, STATUS)
29+
mock_do_record.assert_not_called()
30+
31+
32+
if __name__ == "__main__":
33+
unittest.main()

0 commit comments

Comments
 (0)