1
+ from unittest .mock import patch , MagicMock
2
+ from dramatiq .encoder import DecodeError
3
+ from dramatiq import get_broker , get_encoder
4
+ from dramatiq .brokers .stub import StubBroker
5
+
6
+ import pytest
7
+ import orjson
8
+
9
+ from common import AppConfig
10
+ from common .config import DramatiqConfig
11
+ from common .dramatiq import ORJSONEncoder , init_dramatiq
12
+
13
+
14
+ @patch ("common.dramatiq.orjson.dumps" , return_value = b"serialized" )
15
+ @patch ("common.dramatiq.orjson.loads" , return_value = "deserialized" )
16
+ def test_orjson_encoder (
17
+ mocked_loads : MagicMock ,
18
+ mocked_dumps : MagicMock ,
19
+ ):
20
+ encoder = ORJSONEncoder ()
21
+
22
+ serialized = encoder .encode ({})
23
+ assert serialized == b"serialized"
24
+ mocked_dumps .assert_called_once_with ({})
25
+ deserialized = encoder .decode (serialized )
26
+ assert deserialized == "deserialized"
27
+ mocked_loads .assert_called_once_with (b"serialized" )
28
+
29
+
30
+ @patch ("common.dramatiq.orjson.loads" , side_effect = orjson .JSONDecodeError ("msg" ,"doc" , 123 ))
31
+ def test_orjson_encoder_fails (
32
+ mocked_loads : MagicMock ,
33
+ ):
34
+ encoder = ORJSONEncoder ()
35
+
36
+ with pytest .raises (DecodeError ):
37
+ encoder .decode (b"serialized" )
38
+
39
+
40
+ def test_init_dramatiq_with_test_env ():
41
+ """Test if the StubBroker is set in the 'test' environment."""
42
+ config = AppConfig (ENVIRONMENT = "test" , DRAMATIQ = DramatiqConfig ()) # Mock config
43
+ init_dramatiq (config )
44
+ assert isinstance (get_broker (), StubBroker )
45
+ assert isinstance (get_encoder (), ORJSONEncoder )
46
+
47
+
48
+ def test_init_dramatiq_with_redis ():
49
+ """Test if the RedisBroker is set with a valid Redis URL."""
50
+ redis_url = "redis://localhost:6379/0"
51
+ config = AppConfig (
52
+ ENVIRONMENT = "production" , DRAMATIQ = DramatiqConfig (REDIS_URL = redis_url )
53
+ ) # Mock config
54
+ with patch ("common.dramatiq.RedisBroker" ) as mock_redis_broker :
55
+ init_dramatiq (config )
56
+ mock_redis_broker .assert_called_once_with (url = redis_url )
57
+ assert get_broker () == mock_redis_broker .return_value
58
+ assert isinstance (get_encoder (), ORJSONEncoder )
59
+
60
+
61
+ def test_init_dramatiq_without_redis_url ():
62
+ """Test if an exception is raised when in non-test environment without Redis URL."""
63
+ config = AppConfig (ENVIRONMENT = "production" , DRAMATIQ = DramatiqConfig (REDIS_URL = None )) # Mock config
64
+ with pytest .raises (RuntimeError , match = "Running a non-test environment without Redis URL set" ):
65
+ init_dramatiq (config )
0 commit comments