1313# See the License for the specific language governing permissions and
1414# limitations under the License.
1515
16- from typing import Callable , Hashable , Tuple
16+ from typing import Hashable , Tuple
17+
18+ from typing_extensions import Protocol
1719
1820from twisted .internet import defer , reactor
1921from twisted .internet .base import ReactorBase
2527from tests import unittest
2628
2729
30+ class UnblockFunction (Protocol ):
31+ def __call__ (self , pump_reactor : bool = True ) -> None :
32+ ...
33+
34+
2835class LinearizerTestCase (unittest .TestCase ):
2936 def _start_task (
3037 self , linearizer : Linearizer , key : Hashable
31- ) -> Tuple ["Deferred[None]" , "Deferred[None]" , Callable [[], None ] ]:
38+ ) -> Tuple ["Deferred[None]" , "Deferred[None]" , UnblockFunction ]:
3239 """Starts a task which acquires the linearizer lock, blocks, then completes.
3340
3441 Args:
@@ -52,11 +59,12 @@ async def task() -> None:
5259
5360 d = defer .ensureDeferred (task ())
5461
55- def unblock () -> None :
62+ def unblock (pump_reactor : bool = True ) -> None :
5663 unblock_d .callback (None )
5764 # The next task, if it exists, will acquire the lock and require a kick of
5865 # the reactor to advance.
59- self ._pump ()
66+ if pump_reactor :
67+ self ._pump ()
6068
6169 return d , acquired_d , unblock
6270
@@ -212,3 +220,38 @@ def test_cancellation(self) -> None:
212220 )
213221 unblock3 ()
214222 self .successResultOf (d3 )
223+
224+ def test_cancellation_during_sleep (self ) -> None :
225+ """Tests cancellation during the sleep just after waiting for a `Linearizer`."""
226+ linearizer = Linearizer ()
227+
228+ key = object ()
229+
230+ d1 , acquired_d1 , unblock1 = self ._start_task (linearizer , key )
231+ self .assertTrue (acquired_d1 .called )
232+
233+ # Create a second task, waiting for the first task.
234+ d2 , acquired_d2 , _ = self ._start_task (linearizer , key )
235+ self .assertFalse (acquired_d2 .called )
236+
237+ # Create a third task, waiting for the second task.
238+ d3 , acquired_d3 , unblock3 = self ._start_task (linearizer , key )
239+ self .assertFalse (acquired_d3 .called )
240+
241+ # Once the first task completes, cancel the waiting second task while it is
242+ # sleeping just after acquiring the lock.
243+ unblock1 (pump_reactor = False )
244+ self .successResultOf (d1 )
245+ d2 .cancel ()
246+ self ._pump ()
247+
248+ self .assertTrue (d2 .called )
249+ self .failureResultOf (d2 , CancelledError )
250+
251+ # The third task should continue running.
252+ self .assertTrue (
253+ acquired_d3 .called ,
254+ "Third task did not get the lock after the second task was cancelled" ,
255+ )
256+ unblock3 ()
257+ self .successResultOf (d3 )
0 commit comments