Skip to content

Commit 8e0fd5e

Browse files
committed
test:workers cleanup
1 parent 84b4502 commit 8e0fd5e

File tree

4 files changed

+25
-24
lines changed

4 files changed

+25
-24
lines changed

scheduler/redis_models/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def as_str(v: Union[bytes, str]) -> Optional[str]:
2424
return v
2525
if isinstance(v, bytes):
2626
return v.decode("utf-8")
27-
raise ValueError("Unknown type %r" % type(v))
27+
raise ValueError(f"Unknown type {type(v)} for `{v}`.")
2828

2929

3030
def decode_dict(d: Dict[bytes, bytes], exclude_keys: Set[str]) -> Dict[str, str]:

scheduler/redis_models/result.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from enum import Enum
44
from typing import Optional, Any, Self, ClassVar, List
55

6-
from scheduler.types import ConnectionType
76
from scheduler.helpers.utils import utcnow
87
from scheduler.redis_models.base import StreamModel, decode_dict
8+
from scheduler.types import ConnectionType
99

1010

1111
class ResultType(Enum):
@@ -31,14 +31,14 @@ class Result(StreamModel):
3131

3232
@classmethod
3333
def create(
34-
cls,
35-
connection: ConnectionType,
36-
job_name: str,
37-
worker_name: str,
38-
_type: ResultType,
39-
ttl: int,
40-
return_value: Any = None,
41-
exc_string: Optional[str] = None,
34+
cls,
35+
connection: ConnectionType,
36+
job_name: str,
37+
worker_name: str,
38+
_type: ResultType,
39+
ttl: int,
40+
return_value: Any = None,
41+
exc_string: Optional[str] = None,
4242
) -> Self:
4343
result = cls(
4444
parent=job_name,
@@ -65,15 +65,3 @@ def fetch_latest(cls, connection: ConnectionType, job_name: str) -> Optional["Re
6565
result_id, payload = response[0]
6666
res = cls.deserialize(decode_dict(payload, set()))
6767
return res
68-
69-
def __repr__(self):
70-
return f"Result(name={self.name}, type={self.type.name})"
71-
72-
def __eq__(self, other: Self) -> bool:
73-
try:
74-
return self.name == other.name
75-
except AttributeError:
76-
return False
77-
78-
def __bool__(self) -> bool:
79-
return bool(self.name)

scheduler/redis_models/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ def cleanup(cls, connection: ConnectionType, queue_name: Optional[str] = None):
9898
pipeline.exists(worker_key)
9999
worker_exist = pipeline.execute()
100100
invalid_workers = list()
101-
for i, worker_key in enumerate(worker_keys):
101+
for i, worker_name in enumerate(worker_names):
102102
if not worker_exist[i]:
103-
invalid_workers.append(worker_key)
103+
invalid_workers.append(worker_name)
104104
if len(invalid_workers) == 0:
105105
return
106106
for invalid_subset in _split_list(invalid_workers, MAX_KEYS):

scheduler/tests/test_worker/test_worker_creation.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import uuid
33

44
from scheduler import settings
5+
from scheduler.redis_models import WorkerModel
56
from scheduler.worker import create_worker
67
from scheduler.tests import test_settings # noqa
78
from scheduler.tests.testtools import SchedulerBaseCase
@@ -41,3 +42,15 @@ def test_create_worker__scheduler_interval(self):
4142
self.assertEqual(worker.scheduler.interval, 1)
4243
settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL = prev
4344
worker.teardown()
45+
46+
def test_create_worker__cleanup(self):
47+
worker = create_worker("default", name="test", burst=True, with_scheduler=False)
48+
worker.bootstrap()
49+
worker.connection.delete(WorkerModel.key_for(worker.name))
50+
all_names = WorkerModel.all_names(worker.connection)
51+
self.assertIn(worker.name, all_names)
52+
# act
53+
WorkerModel.cleanup(worker.connection, "default")
54+
# assert
55+
all_names = WorkerModel.all_names(worker.connection)
56+
self.assertNotIn(worker.name, all_names)

0 commit comments

Comments
 (0)