Skip to content

Commit fa17f3b

Browse files
authored
ref(arq): Use new scopes API (#2878)
1 parent 8110496 commit fa17f3b

File tree

2 files changed

+18
-32
lines changed

2 files changed

+18
-32
lines changed

sentry_sdk/integrations/arq.py

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
import sys
22

3+
import sentry_sdk
34
from sentry_sdk._types import TYPE_CHECKING
4-
from sentry_sdk import Hub
55
from sentry_sdk.consts import OP
6-
from sentry_sdk.hub import _should_send_default_pii
76
from sentry_sdk.integrations import DidNotEnable, Integration
87
from sentry_sdk.integrations.logging import ignore_logger
9-
from sentry_sdk.scope import Scope
8+
from sentry_sdk.scope import Scope, should_send_default_pii
109
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
1110
from sentry_sdk.utils import (
1211
capture_internal_exceptions,
12+
ensure_integration_enabled,
13+
ensure_integration_enabled_async,
1314
event_from_exception,
1415
SENSITIVE_DATA_SUBSTITUTE,
1516
parse_version,
@@ -70,14 +71,10 @@ def patch_enqueue_job():
7071
# type: () -> None
7172
old_enqueue_job = ArqRedis.enqueue_job
7273

74+
@ensure_integration_enabled_async(ArqIntegration, old_enqueue_job)
7375
async def _sentry_enqueue_job(self, function, *args, **kwargs):
7476
# type: (ArqRedis, str, *Any, **Any) -> Optional[Job]
75-
hub = Hub.current
76-
77-
if hub.get_integration(ArqIntegration) is None:
78-
return await old_enqueue_job(self, function, *args, **kwargs)
79-
80-
with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function):
77+
with sentry_sdk.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function):
8178
return await old_enqueue_job(self, function, *args, **kwargs)
8279

8380
ArqRedis.enqueue_job = _sentry_enqueue_job
@@ -87,14 +84,10 @@ def patch_run_job():
8784
# type: () -> None
8885
old_run_job = Worker.run_job
8986

87+
@ensure_integration_enabled_async(ArqIntegration, old_run_job)
9088
async def _sentry_run_job(self, job_id, score):
9189
# type: (Worker, str, int) -> None
92-
hub = Hub(Hub.current)
93-
94-
if hub.get_integration(ArqIntegration) is None:
95-
return await old_run_job(self, job_id, score)
96-
97-
with hub.push_scope() as scope:
90+
with sentry_sdk.isolation_scope() as scope:
9891
scope._name = "arq"
9992
scope.clear_breadcrumbs()
10093

@@ -105,7 +98,7 @@ async def _sentry_run_job(self, job_id, score):
10598
source=TRANSACTION_SOURCE_TASK,
10699
)
107100

108-
with hub.start_transaction(transaction):
101+
with sentry_sdk.start_transaction(transaction):
109102
return await old_run_job(self, job_id, score)
110103

111104
Worker.run_job = _sentry_run_job
@@ -127,7 +120,7 @@ def _capture_exception(exc_info):
127120
client_options=Scope.get_client().options,
128121
mechanism={"type": ArqIntegration.identifier, "handled": False},
129122
)
130-
scope.capture_event(event, hint=hint)
123+
sentry_sdk.capture_event(event, hint=hint)
131124

132125

133126
def _make_event_processor(ctx, *args, **kwargs):
@@ -148,10 +141,10 @@ def event_processor(event, hint):
148141
extra["arq-job"] = {
149142
"task": ctx["job_name"],
150143
"args": (
151-
args if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
144+
args if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
152145
),
153146
"kwargs": (
154-
kwargs if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
147+
kwargs if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
155148
),
156149
"retry": ctx["job_try"],
157150
}
@@ -163,13 +156,11 @@ def event_processor(event, hint):
163156

164157
def _wrap_coroutine(name, coroutine):
165158
# type: (str, WorkerCoroutine) -> WorkerCoroutine
159+
160+
@ensure_integration_enabled_async(ArqIntegration, coroutine)
166161
async def _sentry_coroutine(ctx, *args, **kwargs):
167162
# type: (Dict[Any, Any], *Any, **Any) -> Any
168-
hub = Hub.current
169-
if hub.get_integration(ArqIntegration) is None:
170-
return await coroutine(ctx, *args, **kwargs)
171-
172-
hub.scope.add_event_processor(
163+
Scope.get_isolation_scope().add_event_processor(
173164
_make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
174165
)
175166

@@ -189,13 +180,9 @@ def patch_create_worker():
189180
# type: () -> None
190181
old_create_worker = arq.worker.create_worker
191182

183+
@ensure_integration_enabled(ArqIntegration, old_create_worker)
192184
def _sentry_create_worker(*args, **kwargs):
193185
# type: (*Any, **Any) -> Worker
194-
hub = Hub.current
195-
196-
if hub.get_integration(ArqIntegration) is None:
197-
return old_create_worker(*args, **kwargs)
198-
199186
settings_cls = args[0]
200187

201188
if hasattr(settings_cls, "functions"):

tests/integrations/arq/test_arq.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import pytest
33

4-
from sentry_sdk import start_transaction, Hub
4+
from sentry_sdk import get_client, start_transaction
55
from sentry_sdk.integrations.arq import ArqIntegration
66

77
import arq.worker
@@ -60,7 +60,6 @@ def inner(
6060
integrations=[ArqIntegration()],
6161
traces_sample_rate=1.0,
6262
send_default_pii=True,
63-
debug=True,
6463
)
6564

6665
server = FakeRedis()
@@ -245,7 +244,7 @@ async def dummy_job(_ctx):
245244

246245
pool, worker = init_arq([dummy_job])
247246
# remove the integration to trigger the edge case
248-
Hub.current.client.integrations.pop("arq")
247+
get_client().integrations.pop("arq")
249248

250249
job = await pool.enqueue_job("dummy_job")
251250

0 commit comments

Comments
 (0)