4
4
from itertools import count
5
5
from time import time
6
6
7
+ from celery import states
7
8
from celery .events import Event as _Event
8
9
from celery .events .state import State , Worker , Task
9
10
from celery .utils import gen_unique_id
@@ -52,14 +53,14 @@ def test_get_heartbeat(self):
52
53
t2 = time ()
53
54
t3 = time ()
54
55
for t in t1 , t2 , t3 :
55
- worker .on_heartbeat ( t , t )
56
+ worker .event ( 'heartbeat' , t , t , {} )
56
57
self .state .workers [worker .hostname ] = worker
57
58
self .assertEqual (self .cam .get_heartbeat (worker ),
58
59
make_aware (datetime .fromtimestamp (t3 )))
59
60
60
61
def test_handle_worker (self ):
61
62
worker = Worker (hostname = 'fuzzie' )
62
- worker .on_online ( time (), time ())
63
+ worker .event ( 'online' , time (), time (), {} )
63
64
self .cam ._last_worker_write .clear ()
64
65
m = self .cam .handle_worker ((worker .hostname , worker ))
65
66
self .assertTrue (m )
@@ -71,11 +72,11 @@ def test_handle_worker(self):
71
72
72
73
def test_handle_task_received (self ):
73
74
worker = Worker (hostname = 'fuzzie' )
74
- worker .on_online ( time (), time ())
75
+ worker .event ( 'oneline' , time (), time (), {} )
75
76
self .cam .handle_worker ((worker .hostname , worker ))
76
77
77
78
task = create_task (worker )
78
- task .on_received ( time ())
79
+ task .event ( 'received' , time (), time (), {} )
79
80
self .assertEqual (task .state , 'RECEIVED' )
80
81
mt = self .cam .handle_task ((task .uuid , task ))
81
82
self .assertEqual (mt .name , task .name )
@@ -87,36 +88,38 @@ def test_handle_task_received(self):
87
88
88
89
def test_handle_task (self ):
89
90
worker1 = Worker (hostname = 'fuzzie' )
90
- worker1 .on_online ( time (), time ())
91
+ worker1 .event ( 'online' , time (), time (), {} )
91
92
mw = self .cam .handle_worker ((worker1 .hostname , worker1 ))
92
93
task1 = create_task (worker1 )
93
- task1 .on_received ( timestamp = time ())
94
+ task1 .event ( 'received' , time (), time (), {} )
94
95
mt = self .cam .handle_task ((task1 .uuid , task1 ))
95
96
self .assertEqual (mt .worker , mw )
96
97
97
98
worker2 = Worker (hostname = None )
98
99
task2 = create_task (worker2 )
99
- task2 .on_received ( timestamp = time ())
100
+ task2 .event ( 'received' , time (), time (), {} )
100
101
mt = self .cam .handle_task ((task2 .uuid , task2 ))
101
102
self .assertIsNone (mt .worker )
102
103
103
- task1 .on_succeeded (timestamp = time (), result = 42 )
104
+ task1 .event ('succeeded' , time (), time (), {'result' : 42 })
105
+ self .assertEqual (task1 .state , states .SUCCESS )
106
+ self .assertEqual (task1 .result , 42 )
104
107
mt = self .cam .handle_task ((task1 .uuid , task1 ))
105
108
self .assertEqual (mt .name , task1 .name )
106
109
self .assertEqual (mt .result , 42 )
107
110
108
111
task3 = create_task (worker1 , name = None )
109
- task3 .on_revoked ( timestamp = time ())
112
+ task3 .event ( 'revoked' , time (), time (), {} )
110
113
mt = self .cam .handle_task ((task3 .uuid , task3 ))
111
114
self .assertIsNone (mt )
112
115
113
116
def assertExpires (self , dec , expired , tasks = 10 ):
114
117
worker = Worker (hostname = 'fuzzie' )
115
- worker .on_online ( time (), time ())
118
+ worker .event ( 'online' , time (), time (), {} )
116
119
for total in xrange (tasks ):
117
120
task = create_task (worker )
118
- task .on_received ( timestamp = time () - dec )
119
- task .on_succeeded ( timestamp = time () - dec , result = 42 )
121
+ task .event ( 'received' , time () - dec , time () - dec , {} )
122
+ task .event ( 'succeeded' , time () - dec , time () - dec , { ' result' : 42 } )
120
123
self .assertTrue (task .name )
121
124
self .assertTrue (self .cam .handle_task ((task .uuid , task )))
122
125
self .assertEqual (self .cam .on_cleanup (), expired )
0 commit comments