1010
1111from collections import namedtuple
1212from enum import Enum
13- from multiprocessing import Process , Queue , Pipe
13+ from multiprocessing import Process , Pipe
1414from queue import Empty
1515
1616from .isolation import Isolation , DebugState
@@ -54,11 +54,12 @@ class StopSearch(Exception): pass # Exception class used to halt search
5454
5555
5656class TimedQueue :
57- """Modified Queue class to block .put() after a time limit expires,
57+ """Modified queue class to block .put() after a time limit expires,
5858 and to include both a context object & action choice in the queue.
5959 """
60- def __init__ (self , queue , time_limit ):
61- self .__queue = queue
60+ def __init__ (self , receiver , sender , time_limit ):
61+ self .__sender = sender
62+ self .__receiver = receiver
6263 self .__time_limit = time_limit / 1000
6364 self .__stop_time = None
6465 self .agent = None
@@ -69,24 +70,22 @@ def start_timer(self):
6970 def put (self , item , block = True , timeout = None ):
7071 if self .__stop_time and time .perf_counter () > self .__stop_time :
7172 raise StopSearch
72- try :
73- self .__queue .get_nowait ()
74- except Empty :
75- pass
76- self .__queue .put_nowait ((getattr (self .agent , "context" , None ), item ))
73+ if self .__receiver .poll ():
74+ self .__receiver .recv ()
75+ self .__sender .send ((getattr (self .agent , "context" , None ), item ))
7776
7877 def put_nowait (self , item ):
7978 self .put (item , block = False )
8079
8180 def get (self , block = True , timeout = None ):
82- return self .__queue . get ( block = block , timeout = timeout )
81+ return self .__receiver . recv ( )
8382
8483 def get_nowait (self ):
8584 return self .get (block = False )
8685
87- def qsize (self ): return self .__queue . qsize ( )
88- def empty (self ): return self .__queue . empty ()
89- def full (self ): return self .__queue . full ()
86+ def qsize (self ): return int ( self .__receiver . poll () )
87+ def empty (self ): return ~ self .__receiver . poll ()
88+ def full (self ): return self .__receiver . poll ()
9089
9190
9291def play (args ): return _play (* args ) # multithreading ThreadPool.map doesn't expand args
@@ -163,7 +162,8 @@ def _play(agents, game_state, time_limit, match_id, debug=False):
163162
164163
165164def fork_get_action (game_state , active_player , time_limit , debug = False ):
166- action_queue = TimedQueue (Queue (), time_limit )
165+ receiver , sender = Pipe ()
166+ action_queue = TimedQueue (receiver , sender , time_limit )
167167 if debug : # run the search in the main process and thread
168168 from copy import deepcopy
169169 active_player .queue = None
0 commit comments