Skip to content

Commit cd844be

Browse files
committed
Fix(Arq): fix integration with Worker settings is dict
1 parent e28dcf6 commit cd844be

File tree

2 files changed

+112
-14
lines changed

2 files changed

+112
-14
lines changed

sentry_sdk/integrations/arq.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,17 @@ def _sentry_create_worker(*args, **kwargs):
198198
# type: (*Any, **Any) -> Worker
199199
settings_cls = args[0]
200200

201+
if isinstance(settings_cls, dict):
202+
if "functions" in settings_cls:
203+
settings_cls["functions"] = [
204+
_get_arq_function(func) for func in settings_cls["functions"]
205+
]
206+
if "cron_jobs" in settings_cls:
207+
settings_cls["cron_jobs"] = [
208+
_get_arq_cron_job(cron_job)
209+
for cron_job in settings_cls["cron_jobs"]
210+
]
211+
201212
if hasattr(settings_cls, "functions"):
202213
settings_cls.functions = [
203214
_get_arq_function(func) for func in settings_cls.functions

tests/integrations/arq/test_arq.py

Lines changed: 101 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
from typing import Any
3+
24
import pytest
35

46
from sentry_sdk import get_client, start_transaction
@@ -83,14 +85,65 @@ class WorkerSettings:
8385
return inner
8486

8587

88+
@pytest.fixture
89+
def init_arq_with_dict_settings(sentry_init):
90+
def inner(
91+
cls_functions=None,
92+
cls_cron_jobs=None,
93+
kw_functions=None,
94+
kw_cron_jobs=None,
95+
allow_abort_jobs_=False,
96+
):
97+
cls_functions = cls_functions or []
98+
cls_cron_jobs = cls_cron_jobs or []
99+
100+
kwargs = {}
101+
if kw_functions is not None:
102+
kwargs["functions"] = kw_functions
103+
if kw_cron_jobs is not None:
104+
kwargs["cron_jobs"] = kw_cron_jobs
105+
106+
sentry_init(
107+
integrations=[ArqIntegration()],
108+
traces_sample_rate=1.0,
109+
send_default_pii=True,
110+
)
111+
112+
server = FakeRedis()
113+
pool = ArqRedis(pool_or_conn=server.connection_pool)
114+
115+
WorkerSettings: dict[str, Any] = {
116+
"functions": cls_functions,
117+
"cron_jobs": cls_cron_jobs,
118+
"redis_pool": pool,
119+
"allow_abort_jobs": allow_abort_jobs_,
120+
}
121+
122+
if not WorkerSettings["functions"]:
123+
del WorkerSettings["functions"]
124+
if not WorkerSettings["cron_jobs"]:
125+
del WorkerSettings["cron_jobs"]
126+
127+
worker = arq.worker.create_worker(WorkerSettings, **kwargs)
128+
129+
return pool, worker
130+
131+
return inner
132+
133+
86134
@pytest.mark.asyncio
87-
async def test_job_result(init_arq):
135+
@pytest.mark.parametrize(
136+
"init_arq_settings", ["init_arq", "init_arq_with_dict_settings"]
137+
)
138+
async def test_job_result(init_arq_settings: str, request):
88139
async def increase(ctx, num):
89140
return num + 1
90141

142+
init_fixture_method = request.getfixturevalue(init_arq_settings)
143+
91144
increase.__qualname__ = increase.__name__
92145

93-
pool, worker = init_arq([increase])
146+
pool, worker = init_fixture_method([increase])
94147

95148
job = await pool.enqueue_job("increase", 3)
96149

@@ -105,14 +158,19 @@ async def increase(ctx, num):
105158

106159

107160
@pytest.mark.asyncio
108-
async def test_job_retry(capture_events, init_arq):
161+
@pytest.mark.parametrize(
162+
"init_arq_settings", ["init_arq", "init_arq_with_dict_settings"]
163+
)
164+
async def test_job_retry(capture_events, init_arq_settings, request):
109165
async def retry_job(ctx):
110166
if ctx["job_try"] < 2:
111167
raise arq.worker.Retry
112168

169+
init_fixture_method = request.getfixturevalue(init_arq_settings)
170+
113171
retry_job.__qualname__ = retry_job.__name__
114172

115-
pool, worker = init_arq([retry_job])
173+
pool, worker = init_fixture_method([retry_job])
116174

117175
job = await pool.enqueue_job("retry_job")
118176

@@ -139,11 +197,18 @@ async def retry_job(ctx):
139197
"source", [("cls_functions", "cls_cron_jobs"), ("kw_functions", "kw_cron_jobs")]
140198
)
141199
@pytest.mark.parametrize("job_fails", [True, False], ids=["error", "success"])
200+
@pytest.mark.parametrize(
201+
"init_arq_settings", ["init_arq", "init_arq_with_dict_settings"]
202+
)
142203
@pytest.mark.asyncio
143-
async def test_job_transaction(capture_events, init_arq, source, job_fails):
204+
async def test_job_transaction(
205+
capture_events, init_arq_settings, source, job_fails, request
206+
):
144207
async def division(_, a, b=0):
145208
return a / b
146209

210+
init_fixture_method = request.getfixturevalue(init_arq_settings)
211+
147212
division.__qualname__ = division.__name__
148213

149214
cron_func = async_partial(division, a=1, b=int(not job_fails))
@@ -152,7 +217,9 @@ async def division(_, a, b=0):
152217
cron_job = cron(cron_func, minute=0, run_at_startup=True)
153218

154219
functions_key, cron_jobs_key = source
155-
pool, worker = init_arq(**{functions_key: [division], cron_jobs_key: [cron_job]})
220+
pool, worker = init_fixture_method(
221+
**{functions_key: [division], cron_jobs_key: [cron_job]}
222+
)
156223

157224
events = capture_events()
158225

@@ -213,12 +280,17 @@ async def division(_, a, b=0):
213280

214281

215282
@pytest.mark.parametrize("source", ["cls_functions", "kw_functions"])
283+
@pytest.mark.parametrize(
284+
"init_arq_settings", ["init_arq", "init_arq_with_dict_settings"]
285+
)
216286
@pytest.mark.asyncio
217-
async def test_enqueue_job(capture_events, init_arq, source):
287+
async def test_enqueue_job(capture_events, init_arq_settings, source, request):
218288
async def dummy_job(_):
219289
pass
220290

221-
pool, _ = init_arq(**{source: [dummy_job]})
291+
init_fixture_method = request.getfixturevalue(init_arq_settings)
292+
293+
pool, _ = init_fixture_method(**{source: [dummy_job]})
222294

223295
events = capture_events()
224296

@@ -236,13 +308,18 @@ async def dummy_job(_):
236308

237309

238310
@pytest.mark.asyncio
239-
async def test_execute_job_without_integration(init_arq):
311+
@pytest.mark.parametrize(
312+
"init_arq_settings", ["init_arq", "init_arq_with_dict_settings"]
313+
)
314+
async def test_execute_job_without_integration(init_arq_settings, request):
240315
async def dummy_job(_ctx):
241316
pass
242317

318+
init_fixture_method = request.getfixturevalue(init_arq_settings)
319+
243320
dummy_job.__qualname__ = dummy_job.__name__
244321

245-
pool, worker = init_arq([dummy_job])
322+
pool, worker = init_fixture_method([dummy_job])
246323
# remove the integration to trigger the edge case
247324
get_client().integrations.pop("arq")
248325

@@ -254,12 +331,17 @@ async def dummy_job(_ctx):
254331

255332

256333
@pytest.mark.parametrize("source", ["cls_functions", "kw_functions"])
334+
@pytest.mark.parametrize(
335+
"init_arq_settings", ["init_arq", "init_arq_with_dict_settings"]
336+
)
257337
@pytest.mark.asyncio
258-
async def test_span_origin_producer(capture_events, init_arq, source):
338+
async def test_span_origin_producer(capture_events, init_arq_settings, source, request):
259339
async def dummy_job(_):
260340
pass
261341

262-
pool, _ = init_arq(**{source: [dummy_job]})
342+
init_fixture_method = request.getfixturevalue(init_arq_settings)
343+
344+
pool, _ = init_fixture_method(**{source: [dummy_job]})
263345

264346
events = capture_events()
265347

@@ -272,13 +354,18 @@ async def dummy_job(_):
272354

273355

274356
@pytest.mark.asyncio
275-
async def test_span_origin_consumer(capture_events, init_arq):
357+
@pytest.mark.parametrize(
358+
"init_arq_settings", ["init_arq", "init_arq_with_dict_settings"]
359+
)
360+
async def test_span_origin_consumer(capture_events, init_arq_settings, request):
276361
async def job(ctx):
277362
pass
278363

364+
init_fixture_method = request.getfixturevalue(init_arq_settings)
365+
279366
job.__qualname__ = job.__name__
280367

281-
pool, worker = init_arq([job])
368+
pool, worker = init_fixture_method([job])
282369

283370
job = await pool.enqueue_job("retry_job")
284371

0 commit comments

Comments
 (0)